summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/library/workload/abstract/workload_query_generator.h2
-rw-r--r--ydb/library/workload/tpcds/data_generator.cpp89
-rw-r--r--ydb/library/workload/tpcds/data_generator.h9
-rw-r--r--ydb/library/workload/tpch/data_generator.cpp69
-rw-r--r--ydb/library/workload/tpch/data_generator.h3
-rw-r--r--ydb/tests/functional/tpc/canondata/result.json6
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash48
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash16
-rw-r--r--ydb/tests/functional/tpc/test_generator.py44
9 files changed, 198 insertions, 88 deletions
diff --git a/ydb/library/workload/abstract/workload_query_generator.h b/ydb/library/workload/abstract/workload_query_generator.h
index 4e27dbe47dc..7181cf30c15 100644
--- a/ydb/library/workload/abstract/workload_query_generator.h
+++ b/ydb/library/workload/abstract/workload_query_generator.h
@@ -102,7 +102,7 @@ public:
virtual TDataPortions GenerateDataPortion() = 0;
YDB_READONLY_DEF(std::string, Name);
- YDB_READONLY(ui64, Size, 0);
+ YDB_READONLY_PROTECT(ui64, Size, 0);
};
using TBulkDataGeneratorList = std::vector<std::shared_ptr<IBulkDataGenerator>>;
diff --git a/ydb/library/workload/tpcds/data_generator.cpp b/ydb/library/workload/tpcds/data_generator.cpp
index c0d0c472c98..b15589ca550 100644
--- a/ydb/library/workload/tpcds/data_generator.cpp
+++ b/ydb/library/workload/tpcds/data_generator.cpp
@@ -53,30 +53,6 @@ TBulkDataGeneratorList TTpcdsWorkloadDataInitializerGenerator::DoGetBulkInitialD
return TBulkDataGeneratorList(gens.begin(), gens.end());
}
-TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) {
- static const TSet<ui32> allowedModules{1, 2, 4};
- TPositions result;
- const auto* tdef = getTdefsByNumber(tableNum);
- if (!tdef) {
- return result;
- }
- split_work(tableNum, &result.FirstRow, &result.Count);
- if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) {
- result.Position = owner.StateProcessor->GetState().at(tdef->name).Position;
-
- //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
- while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
- --result.Position;
- }
- }
- //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
- while (result.FirstRow > 1 && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
- --result.FirstRow;
- ++result.Count;
- }
- return result;
-}
-
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum)
: Owner(owner)
, TableNum(tableNum)
@@ -95,25 +71,27 @@ TStringBuilder& TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TCon
}
void TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::AppendPortions(TDataPortions& result) {
- const auto name = getTdefsByNumber(TableNum)->name;
+ const auto* tdef = getTdefsByNumber(TableNum);
+ const auto name = tdef->name;
const auto path = Owner.GetFullTableName(name);
+ auto* stateProcessor = (tdef->flags & FL_CHILD) ? nullptr : Owner.Owner.StateProcessor.Get();
if (Builder) {
Builder->EndList();
result.push_back(MakeIntrusive<TDataPortionWithState>(
- Owner.Owner.StateProcessor.Get(),
+ stateProcessor,
path,
name,
Builder->Build(),
- Start - 1,
+ Start - Owner.FirstRow,
Count
));
} else if (Csv) {
result.push_back(MakeIntrusive<TDataPortionWithState>(
- Owner.Owner.StateProcessor.Get(),
+ stateProcessor,
path,
name,
TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::PsvFormatString),
- Start - 1,
+ Start - Owner.FirstRow,
Count
));
}
@@ -124,10 +102,41 @@ TString TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTable
}
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum)
- : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count)
+ : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, 0)
, TableNum(tableNum)
, Owner(owner)
-{}
+{
+ static const TSet<ui32> allowedModules{1, 2, 4};
+ const auto* tdef = getTdefsByNumber(TableNum);
+ if (!tdef) {
+ return;
+ }
+ ds_key_t rowsCount;
+ split_work(TableNum, &FirstRow, &rowsCount);
+ //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
+ while (FirstRow > 1 && !allowedModules.contains(FirstRow % 6)) {
+ --FirstRow;
+ ++rowsCount;
+ }
+ if (owner.StateProcessor) {
+ if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), tdef->name)) {
+ StartPosition = state->Position;
+
+ //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
+ while (StartPosition && !allowedModules.contains((1 + StartPosition) % 6)) {
+ --StartPosition;
+ }
+ if (StartPosition) {
+ FirstPortion = MakeIntrusive<TDataPortion>(
+ GetFullTableName(tdef->name),
+ TDataPortion::TSkip(),
+ StartPosition
+ );
+ }
+ }
+ }
+ Size = rowsCount;
+}
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
TDataPortions result;
@@ -143,30 +152,24 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
}
auto g = Guard(Lock);
- auto positions = CalcCountToGenerate(Owner, TableNum, !Generated);
if (!Generated) {
- Generated = positions.Position;
- result.push_back(MakeIntrusive<TDataPortion>(
- GetFullTableName(tdef->name),
- TDataPortion::TSkip(),
- Generated
- ));
- if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) {
+ Generated = StartPosition;
+ if (const auto toSkip = FirstRow + StartPosition - 1) {
row_skip(TableNum, toSkip);
if (tdef->flags & FL_PARENT) {
row_skip(tdef->nParam, toSkip);
}
}
- if (tdef->flags & FL_SMALL) {
- resetCountCount();
- }
+ }
+ if (FirstPortion) {
+ result.emplace_back(std::move(FirstPortion));
}
const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0;
if (!count) {
return result;
}
ctxs.front().SetCount(count);
- ctxs.front().SetStart(positions.FirstRow + Generated);
+ ctxs.front().SetStart(FirstRow + Generated);
Generated += count;
GenerateRows(ctxs, std::move(g));
for(auto& ctx: ctxs) {
diff --git a/ydb/library/workload/tpcds/data_generator.h b/ydb/library/workload/tpcds/data_generator.h
index 3774e7019b1..72d2341f717 100644
--- a/ydb/library/workload/tpcds/data_generator.h
+++ b/ydb/library/workload/tpcds/data_generator.h
@@ -66,12 +66,9 @@ public:
private:
TString GetFullTableName(const char* table) const;
- struct TPositions {
- ds_key_t FirstRow = 1;
- ui64 Position = 0;
- ds_key_t Count = 0;
- };
- static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
+ ds_key_t FirstRow = 1;
+ ui64 StartPosition = 0;
+ TDataPortionPtr FirstPortion;
const TTpcdsWorkloadDataInitializerGenerator& Owner;
};
};
diff --git a/ydb/library/workload/tpch/data_generator.cpp b/ydb/library/workload/tpch/data_generator.cpp
index c393f1675b6..1fdbe46142b 100644
--- a/ydb/library/workload/tpch/data_generator.cpp
+++ b/ydb/library/workload/tpch/data_generator.cpp
@@ -45,22 +45,6 @@ TBulkDataGeneratorList TTpchWorkloadDataInitializerGenerator::DoGetBulkInitialDa
return TBulkDataGeneratorList(gens.begin(), gens.end());
}
-
-ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum) {
- if (tableNum == NONE) {
- return 0;
- }
- if (tableNum >= NATION) {
- return owner.GetProcessIndex() ? 0 : tdefs[tableNum].base;
- }
- ui64 rowCount = tdefs[tableNum].base * owner.GetScale();
- ui64 extraRows = 0;
- if (owner.GetProcessIndex() + 1 >= owner.GetProcessCount()) {
- extraRows = rowCount % owner.GetProcessCount();
- }
- return rowCount / owner.GetProcessCount() + extraRows;
-}
-
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum, TGeneratorStateProcessor* state)
: Owner(owner)
, TableNum(tableNum)
@@ -88,7 +72,7 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append
path,
tdefs[TableNum].name,
Builder->Build(),
- Start - 1,
+ Start - Owner.FirstRow,
Count
));
} else if (Csv) {
@@ -97,7 +81,7 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append
path,
tdefs[TableNum].name,
TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::PsvFormatString),
- Start - 1,
+ Start - Owner.FirstRow,
Count
));
}
@@ -108,10 +92,35 @@ TString TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTableN
}
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum)
- : IBulkDataGenerator(tdefs[tableNum].name, CalcCountToGenerate(owner, tableNum))
+ : IBulkDataGenerator(tdefs[tableNum].name, 0)
, TableNum(tableNum)
, Owner(owner)
-{}
+{
+ if (TableNum == NONE) {
+ return;
+ }
+ if (tableNum >= NATION) {
+ Size = owner.GetProcessIndex() ? 0 : tdefs[tableNum].base;
+ } else {
+ DSS_HUGE extraRows = 0;
+ Size = set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &extraRows);
+ FirstRow += Size * Owner.GetProcessIndex();
+ if (Owner.GetProcessIndex() + 1 == Owner.GetProcessCount()) {
+ Size += extraRows;
+ }
+ }
+ if (!!Owner.StateProcessor) {
+ if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
+ Generated = state->Position;
+ FirstPortion = MakeIntrusive<TDataPortion>(
+ GetFullTableName(tdefs[TableNum].name),
+ TDataPortion::TSkip(),
+ Generated
+ );
+ GenSeed(TableNum, Generated);
+ }
+ }
+}
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
TDataPortions result;
@@ -125,29 +134,15 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo
}
auto g = Guard(Lock);
- if (!Generated) {
- if (Owner.GetProcessCount() > 1) {
- DSS_HUGE e;
- set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &e);
- }
- if (!!Owner.StateProcessor) {
- if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
- Generated = state->Position;
- result.push_back(MakeIntrusive<TDataPortion>(
- GetFullTableName(tdefs[TableNum].name),
- TDataPortion::TSkip(),
- Generated
- ));
- GenSeed(TableNum, Generated);
- }
- }
+ if (FirstPortion) {
+ result.emplace_back(std::move(FirstPortion));
}
const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0;
if (!count) {
return result;
}
ctxs.front().SetCount(count);
- ctxs.front().SetStart((tdefs[TableNum].base * Owner.GetScale() / Owner.GetProcessCount()) * Owner.GetProcessIndex() + Generated + 1);
+ ctxs.front().SetStart(FirstRow + Generated);
Generated += count;
GenerateRows(ctxs, std::move(g));
for(auto& ctx: ctxs) {
diff --git a/ydb/library/workload/tpch/data_generator.h b/ydb/library/workload/tpch/data_generator.h
index b2838ab0aaf..f830fb66e8b 100644
--- a/ydb/library/workload/tpch/data_generator.h
+++ b/ydb/library/workload/tpch/data_generator.h
@@ -52,12 +52,13 @@ public:
int TableNum;
ui64 Generated = 0;
+ ui64 FirstRow = 1;
TAdaptiveLock Lock;
private:
TString GetFullTableName(const char* table) const;
- static ui64 CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum);
const TTpchWorkloadDataInitializerGenerator& Owner;
+ TDataPortionPtr FirstPortion;
};
};
diff --git a/ydb/tests/functional/tpc/canondata/result.json b/ydb/tests/functional/tpc/canondata/result.json
index 8cd981075ce..4ae82132a23 100644
--- a/ydb/tests/functional/tpc/canondata/result.json
+++ b/ydb/tests/functional/tpc/canondata/result.json
@@ -8,6 +8,9 @@
"test_generator.TestTpcdsGenerator.test_s1_state": {
"uri": "file://test_generator.TestTpcdsGenerator.test_s1_state/s1.hash"
},
+ "test_generator.TestTpcdsGenerator.test_s1_state_and_parts": {
+ "uri": "file://test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash"
+ },
"test_generator.TestTpchGenerator.test_s1": {
"uri": "file://test_generator.TestTpchGenerator.test_s1/s1.hash"
},
@@ -17,6 +20,9 @@
"test_generator.TestTpchGenerator.test_s1_state": {
"uri": "file://test_generator.TestTpchGenerator.test_s1_state/s1.hash"
},
+ "test_generator.TestTpchGenerator.test_s1_state_and_parts": {
+ "uri": "file://test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash"
+ },
"test_init.TestClickbenchInit.test_s1_column": {
"uri": "file://test_init.TestClickbenchInit.test_s1_column/s1_column"
},
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash
new file mode 100644
index 00000000000..d2f163cc789
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash
@@ -0,0 +1,48 @@
+call_center count: 6
+call_center md5: 86db117a0bb48668acbe63c473e85d96
+catalog_page count: 11718
+catalog_page md5: e45ffa0691c04f86e0be055a91264cd6
+catalog_returns count: 144067
+catalog_returns md5: a2a74a6552e74a4a63dae4dbf65855db
+catalog_sales count: 1441548
+catalog_sales md5: 07f03d83e8579e9a22b565a41835c090
+customer count: 100000
+customer md5: 1e03db62671f58e0950acfe2748b2009
+customer_address count: 50000
+customer_address md5: f5df0212260c1a9078fc175f2f74a1b8
+customer_demographics count: 1920800
+customer_demographics md5: 4f6182b865d1c183d50860387332c0b5
+date_dim count: 73049
+date_dim md5: f4ef03663ab568ddeb16309f493896c0
+household_demographics count: 7200
+household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7
+income_band count: 20
+income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e
+inventory count: 11745000
+inventory md5: b5cb61d5a9b1eb9acf7144623e63315a
+item count: 18000
+item md5: 0caece8d586d854a0713886012ee6786
+promotion count: 300
+promotion md5: cf9b3443efc3a5d1c3c983e41779dcaf
+reason count: 35
+reason md5: 89493ae8b5ab9f63f750c1bdadc57089
+ship_mode count: 20
+ship_mode md5: 25d7c1abd229862398b88818f81f72fc
+store count: 12
+store md5: f342258aaec198b0ec4d6bb6e9f7991e
+store_returns count: 287514
+store_returns md5: bc9f15ce6d773f0af978c22ee084802b
+store_sales count: 2880404
+store_sales md5: 069e459494d5875a78b5eaeeef5340b0
+time_dim count: 86400
+time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122
+warehouse count: 5
+warehouse md5: b54252167f4e5dbdb42e163d82ba8c3d
+web_page count: 60
+web_page md5: db2cf0327ecff09ed59bc5d68ba8aacc
+web_returns count: 71763
+web_returns md5: 305e332bb00d9590f62e05d5a52519d5
+web_sales count: 719384
+web_sales md5: ad5c63cc7f6d2830bc3592a5f35cce38
+web_site count: 30
+web_site md5: 707d556c664272f685ee8d7ddbc46f61 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash
new file mode 100644
index 00000000000..325e897ab84
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash
@@ -0,0 +1,16 @@
+customer count: 150000
+customer md5: 1808efe529a289183e7ccf8aa1a2d8e9
+lineitem count: 6001215
+lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7
+nation count: 25
+nation md5: 0e91944824fb13e44cda58882f0fedbe
+orders count: 1500000
+orders md5: 01c5ca96aa3149c64427291ebbd792d4
+part count: 200000
+part md5: d67727d976d8c05b5d145840efaad449
+partsupp count: 800000
+partsupp md5: d62e99cf993c6de288a905ae2f95eced
+region count: 5
+region md5: d1c494f597244c77001246888185e3e3
+supplier count: 10000
+supplier md5: 815d49d8e71c7993531b32113b2da5b5 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/test_generator.py b/ydb/tests/functional/tpc/test_generator.py
index 86e3c862994..92bad73a94c 100644
--- a/ydb/tests/functional/tpc/test_generator.py
+++ b/ydb/tests/functional/tpc/test_generator.py
@@ -146,6 +146,50 @@ class TpcGeneratorBase(object):
]
return self.get_cannonical(paths=paths, execs=execs)
+ def test_s1_state_and_parts(self):
+ state = [self.tmp_path(f'state_{i}.json') for i in range(2)]
+ paths = [self.tmp_path(f's1.{i}') for i in range(4)]
+ execs = [
+ self.execute_generator(
+ output_path=paths[0],
+ generator_args=['--state', state[0], '--proccess-count', 4, '--proccess-index', 0],
+ import_args=['-t', 1]
+ ),
+ self.execute_generator(
+ output_path=paths[1],
+ generator_args=['--state', state[1], '--proccess-count', 4, '--proccess-index', 2],
+ import_args=['-t', 1]
+ )
+ ]
+ for e in execs:
+ e.wait(check_exit_code=True)
+ execs += [
+ self.execute_generator(
+ output_path=paths[2],
+ generator_args=['--state', state[0], '--proccess-count', 2, '--proccess-index', 0],
+ ),
+ self.execute_generator(
+ output_path=paths[3],
+ generator_args=['--state', state[1], '--proccess-count', 2, '--proccess-index', 1],
+ )
+ ]
+ for e in execs:
+ e.wait(check_exit_code=True)
+ counts = {}
+ for p in paths:
+ for table_name, _ in self.tables.items():
+ fpath = os.path.join(p, table_name)
+ if os.path.exists(fpath):
+ with open(fpath, 'r') as f:
+ counts[table_name] = counts.get(table_name, 0) + len(f.readlines()) - 1
+ for table_name, _ in self.tables.items():
+ c = counts.get(table_name, 0)
+ e = self.tables[table_name][1]
+ if c - e > 5 * len(paths): # some lines can be rebuild on continue building
+ pytest.fail(f'Too many lines was generated for table `{table_name}`, fact: {c}, expected: {e}')
+
+ return self.get_cannonical(paths=paths, execs=execs)
+
class TestTpchGenerator(TpcGeneratorBase):
workload = 'tpch'