diff options
9 files changed, 198 insertions, 88 deletions
diff --git a/ydb/library/workload/abstract/workload_query_generator.h b/ydb/library/workload/abstract/workload_query_generator.h index 4e27dbe47dc..7181cf30c15 100644 --- a/ydb/library/workload/abstract/workload_query_generator.h +++ b/ydb/library/workload/abstract/workload_query_generator.h @@ -102,7 +102,7 @@ public: virtual TDataPortions GenerateDataPortion() = 0; YDB_READONLY_DEF(std::string, Name); - YDB_READONLY(ui64, Size, 0); + YDB_READONLY_PROTECT(ui64, Size, 0); }; using TBulkDataGeneratorList = std::vector<std::shared_ptr<IBulkDataGenerator>>; diff --git a/ydb/library/workload/tpcds/data_generator.cpp b/ydb/library/workload/tpcds/data_generator.cpp index c0d0c472c98..b15589ca550 100644 --- a/ydb/library/workload/tpcds/data_generator.cpp +++ b/ydb/library/workload/tpcds/data_generator.cpp @@ -53,30 +53,6 @@ TBulkDataGeneratorList TTpcdsWorkloadDataInitializerGenerator::DoGetBulkInitialD return TBulkDataGeneratorList(gens.begin(), gens.end()); } -TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) { - static const TSet<ui32> allowedModules{1, 2, 4}; - TPositions result; - const auto* tdef = getTdefsByNumber(tableNum); - if (!tdef) { - return result; - } - split_work(tableNum, &result.FirstRow, &result.Count); - if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) { - result.Position = owner.StateProcessor->GetState().at(tdef->name).Position; - - //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c - while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) { - --result.Position; - } - } - //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c - while (result.FirstRow > 1 && !allowedModules.contains((result.FirstRow + result.Position) % 6)) { - --result.FirstRow; - ++result.Count; - } - return result; -} - TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum) : Owner(owner) , TableNum(tableNum) @@ -95,25 +71,27 @@ TStringBuilder& TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TCon } void TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::AppendPortions(TDataPortions& result) { - const auto name = getTdefsByNumber(TableNum)->name; + const auto* tdef = getTdefsByNumber(TableNum); + const auto name = tdef->name; const auto path = Owner.GetFullTableName(name); + auto* stateProcessor = (tdef->flags & FL_CHILD) ? nullptr : Owner.Owner.StateProcessor.Get(); if (Builder) { Builder->EndList(); result.push_back(MakeIntrusive<TDataPortionWithState>( - Owner.Owner.StateProcessor.Get(), + stateProcessor, path, name, Builder->Build(), - Start - 1, + Start - Owner.FirstRow, Count )); } else if (Csv) { result.push_back(MakeIntrusive<TDataPortionWithState>( - Owner.Owner.StateProcessor.Get(), + stateProcessor, path, name, TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::PsvFormatString), - Start - 1, + Start - Owner.FirstRow, Count )); } @@ -124,10 +102,41 @@ TString TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTable } TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum) - : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count) + : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, 0) , TableNum(tableNum) , Owner(owner) -{} +{ + static const TSet<ui32> allowedModules{1, 2, 4}; + const auto* tdef = getTdefsByNumber(TableNum); + if (!tdef) { + return; + } + ds_key_t rowsCount; + split_work(TableNum, &FirstRow, &rowsCount); + //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c + while (FirstRow > 1 && !allowedModules.contains(FirstRow % 6)) { + --FirstRow; + ++rowsCount; + } + if (owner.StateProcessor) { + if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), tdef->name)) { + StartPosition = state->Position; + + //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c + while (StartPosition && !allowedModules.contains((1 + StartPosition) % 6)) { + --StartPosition; + } + if (StartPosition) { + FirstPortion = MakeIntrusive<TDataPortion>( + GetFullTableName(tdef->name), + TDataPortion::TSkip(), + StartPosition + ); + } + } + } + Size = rowsCount; +} TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() { TDataPortions result; @@ -143,30 +152,24 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds } auto g = Guard(Lock); - auto positions = CalcCountToGenerate(Owner, TableNum, !Generated); if (!Generated) { - Generated = positions.Position; - result.push_back(MakeIntrusive<TDataPortion>( - GetFullTableName(tdef->name), - TDataPortion::TSkip(), - Generated - )); - if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) { + Generated = StartPosition; + if (const auto toSkip = FirstRow + StartPosition - 1) { row_skip(TableNum, toSkip); if (tdef->flags & FL_PARENT) { row_skip(tdef->nParam, toSkip); } } - if (tdef->flags & FL_SMALL) { - resetCountCount(); - } + } + if (FirstPortion) { + result.emplace_back(std::move(FirstPortion)); } const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0; if (!count) { return result; } ctxs.front().SetCount(count); - ctxs.front().SetStart(positions.FirstRow + Generated); + ctxs.front().SetStart(FirstRow + Generated); Generated += count; GenerateRows(ctxs, std::move(g)); for(auto& ctx: ctxs) { diff --git a/ydb/library/workload/tpcds/data_generator.h b/ydb/library/workload/tpcds/data_generator.h index 3774e7019b1..72d2341f717 100644 --- a/ydb/library/workload/tpcds/data_generator.h +++ b/ydb/library/workload/tpcds/data_generator.h @@ -66,12 +66,9 @@ public: private: TString GetFullTableName(const char* table) const; - struct TPositions { - ds_key_t FirstRow = 1; - ui64 Position = 0; - ds_key_t Count = 0; - }; - static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState); + ds_key_t FirstRow = 1; + ui64 StartPosition = 0; + TDataPortionPtr FirstPortion; const TTpcdsWorkloadDataInitializerGenerator& Owner; }; }; diff --git a/ydb/library/workload/tpch/data_generator.cpp b/ydb/library/workload/tpch/data_generator.cpp index c393f1675b6..1fdbe46142b 100644 --- a/ydb/library/workload/tpch/data_generator.cpp +++ b/ydb/library/workload/tpch/data_generator.cpp @@ -45,22 +45,6 @@ TBulkDataGeneratorList TTpchWorkloadDataInitializerGenerator::DoGetBulkInitialDa return TBulkDataGeneratorList(gens.begin(), gens.end()); } - -ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum) { - if (tableNum == NONE) { - return 0; - } - if (tableNum >= NATION) { - return owner.GetProcessIndex() ? 0 : tdefs[tableNum].base; - } - ui64 rowCount = tdefs[tableNum].base * owner.GetScale(); - ui64 extraRows = 0; - if (owner.GetProcessIndex() + 1 >= owner.GetProcessCount()) { - extraRows = rowCount % owner.GetProcessCount(); - } - return rowCount / owner.GetProcessCount() + extraRows; -} - TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum, TGeneratorStateProcessor* state) : Owner(owner) , TableNum(tableNum) @@ -88,7 +72,7 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append path, tdefs[TableNum].name, Builder->Build(), - Start - 1, + Start - Owner.FirstRow, Count )); } else if (Csv) { @@ -97,7 +81,7 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append path, tdefs[TableNum].name, TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::PsvFormatString), - Start - 1, + Start - Owner.FirstRow, Count )); } @@ -108,10 +92,35 @@ TString TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTableN } TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum) - : IBulkDataGenerator(tdefs[tableNum].name, CalcCountToGenerate(owner, tableNum)) + : IBulkDataGenerator(tdefs[tableNum].name, 0) , TableNum(tableNum) , Owner(owner) -{} +{ + if (TableNum == NONE) { + return; + } + if (tableNum >= NATION) { + Size = owner.GetProcessIndex() ? 0 : tdefs[tableNum].base; + } else { + DSS_HUGE extraRows = 0; + Size = set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &extraRows); + FirstRow += Size * Owner.GetProcessIndex(); + if (Owner.GetProcessIndex() + 1 == Owner.GetProcessCount()) { + Size += extraRows; + } + } + if (!!Owner.StateProcessor) { + if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) { + Generated = state->Position; + FirstPortion = MakeIntrusive<TDataPortion>( + GetFullTableName(tdefs[TableNum].name), + TDataPortion::TSkip(), + Generated + ); + GenSeed(TableNum, Generated); + } + } +} TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() { TDataPortions result; @@ -125,29 +134,15 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo } auto g = Guard(Lock); - if (!Generated) { - if (Owner.GetProcessCount() > 1) { - DSS_HUGE e; - set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &e); - } - if (!!Owner.StateProcessor) { - if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) { - Generated = state->Position; - result.push_back(MakeIntrusive<TDataPortion>( - GetFullTableName(tdefs[TableNum].name), - TDataPortion::TSkip(), - Generated - )); - GenSeed(TableNum, Generated); - } - } + if (FirstPortion) { + result.emplace_back(std::move(FirstPortion)); } const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0; if (!count) { return result; } ctxs.front().SetCount(count); - ctxs.front().SetStart((tdefs[TableNum].base * Owner.GetScale() / Owner.GetProcessCount()) * Owner.GetProcessIndex() + Generated + 1); + ctxs.front().SetStart(FirstRow + Generated); Generated += count; GenerateRows(ctxs, std::move(g)); for(auto& ctx: ctxs) { diff --git a/ydb/library/workload/tpch/data_generator.h b/ydb/library/workload/tpch/data_generator.h index b2838ab0aaf..f830fb66e8b 100644 --- a/ydb/library/workload/tpch/data_generator.h +++ b/ydb/library/workload/tpch/data_generator.h @@ -52,12 +52,13 @@ public: int TableNum; ui64 Generated = 0; + ui64 FirstRow = 1; TAdaptiveLock Lock; private: TString GetFullTableName(const char* table) const; - static ui64 CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum); const TTpchWorkloadDataInitializerGenerator& Owner; + TDataPortionPtr FirstPortion; }; }; diff --git a/ydb/tests/functional/tpc/canondata/result.json b/ydb/tests/functional/tpc/canondata/result.json index 8cd981075ce..4ae82132a23 100644 --- a/ydb/tests/functional/tpc/canondata/result.json +++ b/ydb/tests/functional/tpc/canondata/result.json @@ -8,6 +8,9 @@ "test_generator.TestTpcdsGenerator.test_s1_state": { "uri": "file://test_generator.TestTpcdsGenerator.test_s1_state/s1.hash" }, + "test_generator.TestTpcdsGenerator.test_s1_state_and_parts": { + "uri": "file://test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash" + }, "test_generator.TestTpchGenerator.test_s1": { "uri": "file://test_generator.TestTpchGenerator.test_s1/s1.hash" }, @@ -17,6 +20,9 @@ "test_generator.TestTpchGenerator.test_s1_state": { "uri": "file://test_generator.TestTpchGenerator.test_s1_state/s1.hash" }, + "test_generator.TestTpchGenerator.test_s1_state_and_parts": { + "uri": "file://test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash" + }, "test_init.TestClickbenchInit.test_s1_column": { "uri": "file://test_init.TestClickbenchInit.test_s1_column/s1_column" }, diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash new file mode 100644 index 00000000000..d2f163cc789 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash @@ -0,0 +1,48 @@ +call_center count: 6 +call_center md5: 86db117a0bb48668acbe63c473e85d96 +catalog_page count: 11718 +catalog_page md5: e45ffa0691c04f86e0be055a91264cd6 +catalog_returns count: 144067 +catalog_returns md5: a2a74a6552e74a4a63dae4dbf65855db +catalog_sales count: 1441548 +catalog_sales md5: 07f03d83e8579e9a22b565a41835c090 +customer count: 100000 +customer md5: 1e03db62671f58e0950acfe2748b2009 +customer_address count: 50000 +customer_address md5: f5df0212260c1a9078fc175f2f74a1b8 +customer_demographics count: 1920800 +customer_demographics md5: 4f6182b865d1c183d50860387332c0b5 +date_dim count: 73049 +date_dim md5: f4ef03663ab568ddeb16309f493896c0 +household_demographics count: 7200 +household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7 +income_band count: 20 +income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e +inventory count: 11745000 +inventory md5: b5cb61d5a9b1eb9acf7144623e63315a +item count: 18000 +item md5: 0caece8d586d854a0713886012ee6786 +promotion count: 300 +promotion md5: cf9b3443efc3a5d1c3c983e41779dcaf +reason count: 35 +reason md5: 89493ae8b5ab9f63f750c1bdadc57089 +ship_mode count: 20 +ship_mode md5: 25d7c1abd229862398b88818f81f72fc +store count: 12 +store md5: f342258aaec198b0ec4d6bb6e9f7991e +store_returns count: 287514 +store_returns md5: bc9f15ce6d773f0af978c22ee084802b +store_sales count: 2880404 +store_sales md5: 069e459494d5875a78b5eaeeef5340b0 +time_dim count: 86400 +time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122 +warehouse count: 5 +warehouse md5: b54252167f4e5dbdb42e163d82ba8c3d +web_page count: 60 +web_page md5: db2cf0327ecff09ed59bc5d68ba8aacc +web_returns count: 71763 +web_returns md5: 305e332bb00d9590f62e05d5a52519d5 +web_sales count: 719384 +web_sales md5: ad5c63cc7f6d2830bc3592a5f35cce38 +web_site count: 30 +web_site md5: 707d556c664272f685ee8d7ddbc46f61
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash new file mode 100644 index 00000000000..325e897ab84 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash @@ -0,0 +1,16 @@ +customer count: 150000 +customer md5: 1808efe529a289183e7ccf8aa1a2d8e9 +lineitem count: 6001215 +lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7 +nation count: 25 +nation md5: 0e91944824fb13e44cda58882f0fedbe +orders count: 1500000 +orders md5: 01c5ca96aa3149c64427291ebbd792d4 +part count: 200000 +part md5: d67727d976d8c05b5d145840efaad449 +partsupp count: 800000 +partsupp md5: d62e99cf993c6de288a905ae2f95eced +region count: 5 +region md5: d1c494f597244c77001246888185e3e3 +supplier count: 10000 +supplier md5: 815d49d8e71c7993531b32113b2da5b5
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/test_generator.py b/ydb/tests/functional/tpc/test_generator.py index 86e3c862994..92bad73a94c 100644 --- a/ydb/tests/functional/tpc/test_generator.py +++ b/ydb/tests/functional/tpc/test_generator.py @@ -146,6 +146,50 @@ class TpcGeneratorBase(object): ] return self.get_cannonical(paths=paths, execs=execs) + def test_s1_state_and_parts(self): + state = [self.tmp_path(f'state_{i}.json') for i in range(2)] + paths = [self.tmp_path(f's1.{i}') for i in range(4)] + execs = [ + self.execute_generator( + output_path=paths[0], + generator_args=['--state', state[0], '--proccess-count', 4, '--proccess-index', 0], + import_args=['-t', 1] + ), + self.execute_generator( + output_path=paths[1], + generator_args=['--state', state[1], '--proccess-count', 4, '--proccess-index', 2], + import_args=['-t', 1] + ) + ] + for e in execs: + e.wait(check_exit_code=True) + execs += [ + self.execute_generator( + output_path=paths[2], + generator_args=['--state', state[0], '--proccess-count', 2, '--proccess-index', 0], + ), + self.execute_generator( + output_path=paths[3], + generator_args=['--state', state[1], '--proccess-count', 2, '--proccess-index', 1], + ) + ] + for e in execs: + e.wait(check_exit_code=True) + counts = {} + for p in paths: + for table_name, _ in self.tables.items(): + fpath = os.path.join(p, table_name) + if os.path.exists(fpath): + with open(fpath, 'r') as f: + counts[table_name] = counts.get(table_name, 0) + len(f.readlines()) - 1 + for table_name, _ in self.tables.items(): + c = counts.get(table_name, 0) + e = self.tables[table_name][1] + if c - e > 5 * len(paths): # some lines can be rebuild on continue building + pytest.fail(f'Too many lines was generated for table `{table_name}`, fact: {c}, expected: {e}') + + return self.get_cannonical(paths=paths, execs=execs) + class TestTpchGenerator(TpcGeneratorBase): workload = 'tpch' |
