diff options
author | Олег <150132506+iddqdex@users.noreply.github.com> | 2024-10-21 11:14:54 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-21 08:14:54 +0000 |
commit | 3fb6997b618982c100eb94a8e81441cd03688d0a (patch) | |
tree | fd99d18b2082fbe6b2d65502552a4597b9cdce36 | |
parent | b71da951207c6a8e9924af486d08f5f0f508b940 (diff) | |
download | ydb-3fb6997b618982c100eb94a8e81441cd03688d0a.tar.gz |
Add tests for tpc {h | ds} data generator (#10564)
15 files changed, 571 insertions, 48 deletions
diff --git a/ydb/library/workload/tpcds/data_generator.cpp b/ydb/library/workload/tpcds/data_generator.cpp index 9de91e9dcc..6b6aee1c73 100644 --- a/ydb/library/workload/tpcds/data_generator.cpp +++ b/ydb/library/workload/tpcds/data_generator.cpp @@ -54,19 +54,30 @@ TBulkDataGeneratorList TTpcdsWorkloadDataInitializerGenerator::DoGetBulkInitialD return TBulkDataGeneratorList(gens.begin(), gens.end()); } -ui64 TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) { +TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) { + static const TSet<ui32> allowedModules{1, 2, 4}; + TPositions result; const auto* tdef = getTdefsByNumber(tableNum); if (!tdef) { - return 0; + return result; } - i64 position = 0; + split_work(tableNum, &result.FirstRow, &result.Count); if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) { - position = owner.StateProcessor->GetState().at(tdef->name).Position; + result.Position = owner.StateProcessor->GetState().at(tdef->name).Position; + result.Count -= std::min<i64>(result.Count, result.Position); + + //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c + while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) { + --result.Position; + ++result.Count; + } } - ds_key_t firstRow; - ds_key_t rowCount; - split_work(tableNum, &firstRow, &rowCount); - return rowCount > position ? (rowCount - position) : 0; + //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c + while (result.FirstRow > 1 && !allowedModules.contains((result.FirstRow + result.Position) % 6)) { + --result.FirstRow; + ++result.Count; + } + return result; } TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum) @@ -116,10 +127,10 @@ TString TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTable } TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum) - : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true)) + : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count) , TableNum(tableNum) , Owner(owner) - , TableSize(CalcCountToGenerate(owner, tableNum, false)) + , TableSize(CalcCountToGenerate(owner, tableNum, false).Count) {} TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() { @@ -136,16 +147,10 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds } auto g = Guard(Lock); - ds_key_t firstRow; - ds_key_t rowCount; - split_work(TableNum, &firstRow, &rowCount); + auto positions = CalcCountToGenerate(Owner, TableNum, !Generated); if (!Generated) { - ui32 toSkip = firstRow - 1; - if (!!Owner.StateProcessor && Owner.StateProcessor->GetState().contains(GetName())) { - Generated = Owner.StateProcessor->GetState().at(TString(GetName())).Position; - toSkip += Generated; - } - if (toSkip) { + Generated = positions.Position; + if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) { row_skip(TableNum, toSkip); if (tdef->flags & FL_PARENT) { row_skip(tdef->nParam, toSkip); @@ -160,7 +165,7 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds return result; } ctxs.front().SetCount(count); - ctxs.front().SetStart(firstRow + Generated); + ctxs.front().SetStart(positions.FirstRow + Generated); Generated += count; GenerateRows(ctxs, std::move(g)); for(auto& ctx: ctxs) { diff --git a/ydb/library/workload/tpcds/data_generator.h b/ydb/library/workload/tpcds/data_generator.h index 42095b25ec..4eed19725b 100644 --- a/ydb/library/workload/tpcds/data_generator.h +++ b/ydb/library/workload/tpcds/data_generator.h @@ -62,7 +62,12 @@ public: private: TString GetFullTableName(const char* table) const; - static ui64 CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState); + struct TPositions { + i64 FirstRow = 1; + ui64 Position = 0; + i64 Count = 0; + }; + static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState); const TTpcdsWorkloadDataInitializerGenerator& Owner; ui64 TableSize; }; diff --git a/ydb/library/workload/tpch/driver.c b/ydb/library/workload/tpch/driver.c index aff43e2d83..64e84cc6ad 100644 --- a/ydb/library/workload/tpch/driver.c +++ b/ydb/library/workload/tpch/driver.c @@ -8,6 +8,7 @@ #define NO_FUNC (int (*) ()) NULL /* to clean up tdefs */ #define NO_LFUNC (long (*) ()) NULL /* to clean up tdefs */ +void advanceStream(int nStream, DSS_HUGE nCalls, int bUse64Bit); long sd_cust (int child, DSS_HUGE skip_count); long sd_line (int child, DSS_HUGE skip_count); long sd_order (int child, DSS_HUGE skip_count); @@ -16,6 +17,19 @@ long sd_psupp (int child, DSS_HUGE skip_count); long sd_supp (int child, DSS_HUGE skip_count); long sd_order_line (int child, DSS_HUGE skip_count); long sd_part_psupp (int child, DSS_HUGE skip_count); + +long sd_region (int child, DSS_HUGE skip_count) { + (void)child; + advanceStream(R_CMNT_SD, 2 * skip_count, 0); + return 0; +} + +long sd_nation (int child, DSS_HUGE skip_count) { + (void)child; + advanceStream(N_CMNT_SD, 2 * skip_count, 0); + return 0; +} + void ReadDistFromResource(const char* name, distribution* target); tdef tdefs[] = @@ -28,8 +42,8 @@ tdef tdefs[] = {"lineitem", "lineitem table", 150000 * ORDERS_PER_CUST, NO_FUNC, sd_line, NONE, 0}, //LINE 5 {"orders", "orders/lineitem tables", 150000 * ORDERS_PER_CUST, NO_FUNC, sd_order, LINE, 0}, //ORDER_LINE 6 {"part", "part/partsupplier tables", 200000, NO_FUNC, sd_part, PSUPP, 0}, //PART_PSUPP 7 - {"nation", "nation table", NATIONS_MAX, NO_FUNC, NO_LFUNC, NONE, 0}, //NATION 8 - {"region", "region table", NATIONS_MAX, NO_FUNC, NO_LFUNC, NONE, 0}, //REGION 9 + {"nation", "nation table", NATIONS_MAX, NO_FUNC, sd_nation, NONE, 0}, //NATION 8 + {"region", "region table", NATIONS_MAX, NO_FUNC, sd_region, NONE, 0}, //REGION 9 }; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp index 225ad93ecc..f65f1a8d66 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp @@ -24,6 +24,8 @@ void TWorkloadCommandImport::Config(TConfig& config) { .DefaultValue(WorkloadParams.BulkSize).StoreResult(&WorkloadParams.BulkSize); config.Opts->AddLongOption("max-in-flight", "Maximum number if data portions that can be simultaneously in process.") .DefaultValue(UploadParams.MaxInFlight).StoreResult(&UploadParams.MaxInFlight); + config.Opts->AddLongOption('f', "file-output-path", "Path to a directory to save tables into as files instead of uploading it to db.") + .StoreResult(&UploadParams.FileOutputPath); } TWorkloadCommandImport::TUploadParams::TUploadParams() @@ -45,6 +47,11 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe auto dataGeneratorList = Initializer->GetBulkInitialData(); AtomicSet(ErrorsCount, 0); InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight); + if (UploadParams.FileOutputPath.IsDefined()) { + Writer = MakeHolder<TFileWriter>(*this); + } else { + Writer = MakeHolder<TDbWriter>(*this); + } for (auto dataGen : dataGeneratorList) { TThreadPoolParams params; params.SetCatching(false); @@ -69,39 +76,88 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe return AtomicGet(ErrorsCount) ? EXIT_FAILURE : EXIT_SUCCESS; } -TAsyncStatus TWorkloadCommandImport::TUploadCommand::SendDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) const { - auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) { - return TStatus(result.GetValueSync()); - }; - if (auto* value = std::get_if<TValue>(&portion->MutableData())) { - return TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult); +class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter { +public: + using IWriter::IWriter; + + TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override { + auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) { + return TStatus(result.GetValueSync()); + }; + if (auto* value = std::get_if<TValue>(&portion->MutableData())) { + return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult); + } + NRetry::TRetryOperationSettings retrySettings; + retrySettings.RetryUndefined(true); + retrySettings.MaxRetries(30); + if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) { + return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) { + NTable::TBulkUpsertSettings settings; + settings.FormatSettings(value->FormatString); + return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings) + .Apply(convertResult); + }, retrySettings); + } + if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) { + return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) { + return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema) + .Apply(convertResult); + }, retrySettings); + } + Y_FAIL_S("Invalid data portion"); + } +}; + +class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter { +public: + TFileWriter(const TWorkloadCommandImport::TUploadCommand& owner) + :IWriter(owner) + { + Owner.UploadParams.FileOutputPath.ForceDelete(); + Owner.UploadParams.FileOutputPath.MkDirs(); } - NRetry::TRetryOperationSettings retrySettings; - retrySettings.RetryUndefined(true); - retrySettings.MaxRetries(30); - if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) { - return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) { - NTable::TBulkUpsertSettings settings; - settings.FormatSettings(value->FormatString); - return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings) - .Apply(convertResult); - }, retrySettings); + + TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override { + if (auto* value = std::get_if<TValue>(&portion->MutableData())) { + return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented")); + } + if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) { + auto g = Guard(Lock); + auto [out, created] = GetOutput(portion->GetTable()); + TStringBuf toWrite(value->Data); + if (!created) { + TStringBuf firstLine; + toWrite.ReadLine(firstLine); + } + out->Write(toWrite); + return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues())); + } + if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) { + return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented")); + } + Y_FAIL_S("Invalid data portion"); } - if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) { - return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) { - return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema) - .Apply(convertResult); - }, retrySettings); + +private: + std::pair<TFileOutput*, bool> GetOutput(const TString& table) { + auto fname = TFsPath(table).Basename(); + if (auto* result = MapFindPtr(CsvOutputs, fname)) { + return std::make_pair(result->Get(), false); + } + auto result = MakeAtomicShared<TFileOutput>(Owner.UploadParams.FileOutputPath / fname); + CsvOutputs[fname] = result; + return std::make_pair(result.Get(), true); } - Y_FAIL_S("Invalid data portion"); -} + TMap<TString, TAtomicSharedPtr<TFileOutput>> CsvOutputs; + TAdaptiveLock Lock; +}; void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try { TAtomic counter = 0; for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) { for (const auto& data: portions) { AtomicIncrement(counter); - SendDataPortion(data).Apply( + Writer->WriteDataPortion(data).Apply( [data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) { const auto& res = result.GetValueSync(); data->SetSendResult(res); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h index 8bf49facc6..6cd8ced007 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h @@ -13,6 +13,7 @@ private: TUploadParams(); ui32 Threads; ui32 MaxInFlight = 128; + TFsPath FileOutputPath; }; class TUploadCommand; @@ -26,9 +27,20 @@ public: virtual void Config(TConfig& config) override; private: + class IWriter { + public: + IWriter(const TWorkloadCommandImport::TUploadCommand& owner) + : Owner(owner) + {} + virtual ~IWriter() = default; + virtual TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) = 0; + protected: + const TWorkloadCommandImport::TUploadCommand& Owner; + }; + class TFileWriter; + class TDbWriter; NTable::TSession GetSession(); int DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) override; - TAsyncStatus SendDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) const; void ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept; const TUploadParams& UploadParams; @@ -37,6 +49,7 @@ private: TAdaptiveLock Lock; THolder<TFastSemaphore> InFlightSemaphore; TAtomic ErrorsCount; + THolder<IWriter> Writer; }; }
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/canondata/result.json b/ydb/tests/functional/tpc/canondata/result.json new file mode 100644 index 0000000000..58d8118d24 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/result.json @@ -0,0 +1,20 @@ +{ + "test_generator.TestTpcdsGenerator.test_s1": { + "uri": "file://test_generator.TestTpcdsGenerator.test_s1/s1.hash" + }, + "test_generator.TestTpcdsGenerator.test_s1_parts": { + "uri": "file://test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash" + }, + "test_generator.TestTpcdsGenerator.test_s1_state": { + "uri": "file://test_generator.TestTpcdsGenerator.test_s1_state/s1.hash" + }, + "test_generator.TestTpchGenerator.test_s1": { + "uri": "file://test_generator.TestTpchGenerator.test_s1/s1.hash" + }, + "test_generator.TestTpchGenerator.test_s1_parts": { + "uri": "file://test_generator.TestTpchGenerator.test_s1_parts/s1.hash" + }, + "test_generator.TestTpchGenerator.test_s1_state": { + "uri": "file://test_generator.TestTpchGenerator.test_s1_state/s1.hash" + } +} diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1/s1.hash new file mode 100644 index 0000000000..8c635baa01 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1/s1.hash @@ -0,0 +1,48 @@ +call_center count: 6 +call_center md5: 86db117a0bb48668acbe63c473e85d96 +catalog_page count: 11718 +catalog_page md5: 0bf750caa038dee0f1f9618414f8add1 +catalog_returns count: 144067 +catalog_returns md5: 78c1bd181f07d7067be644a301a1e15a +catalog_sales count: 1441548 +catalog_sales md5: 47a7b34e4cd097c9b89457497550527c +customer count: 100000 +customer md5: 4f35263f5c2e15d6ab687f14d1acfee7 +customer_address count: 50000 +customer_address md5: edda298b082245c2d0ce0bcd97af1335 +customer_demographics count: 1920800 +customer_demographics md5: 4f6182b865d1c183d50860387332c0b5 +date_dim count: 73049 +date_dim md5: f4ef03663ab568ddeb16309f493896c0 +household_demographics count: 7200 +household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7 +income_band count: 20 +income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e +inventory count: 11745000 +inventory md5: 24fe36237ddbad4d9be3136f9ec49299 +item count: 18000 +item md5: 364b883875279ed9ef3ab5dada368d7c +promotion count: 300 +promotion md5: 1660520863026204779c646c58cb8870 +reason count: 35 +reason md5: 89493ae8b5ab9f63f750c1bdadc57089 +ship_mode count: 20 +ship_mode md5: 25d7c1abd229862398b88818f81f72fc +store count: 12 +store md5: f342258aaec198b0ec4d6bb6e9f7991e +store_returns count: 287514 +store_returns md5: 038278c999f980849c84e99da9e213c2 +store_sales count: 2880404 +store_sales md5: 5c5d3f51c925f546248c7cf68d9055cd +time_dim count: 86400 +time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122 +warehouse count: 5 +warehouse md5: 02268070dffd49682bc54c42580ac2ac +web_page count: 60 +web_page md5: 4a2551b4b2243b5030e5f23a605db603 +web_returns count: 71763 +web_returns md5: 0b4934c14bed8f3048deb6873cc37921 +web_sales count: 719384 +web_sales md5: b866c9b742560f2d78630853dd2b81c4 +web_site count: 30 +web_site md5: 707d556c664272f685ee8d7ddbc46f61
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash new file mode 100644 index 0000000000..8c635baa01 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash @@ -0,0 +1,48 @@ +call_center count: 6 +call_center md5: 86db117a0bb48668acbe63c473e85d96 +catalog_page count: 11718 +catalog_page md5: 0bf750caa038dee0f1f9618414f8add1 +catalog_returns count: 144067 +catalog_returns md5: 78c1bd181f07d7067be644a301a1e15a +catalog_sales count: 1441548 +catalog_sales md5: 47a7b34e4cd097c9b89457497550527c +customer count: 100000 +customer md5: 4f35263f5c2e15d6ab687f14d1acfee7 +customer_address count: 50000 +customer_address md5: edda298b082245c2d0ce0bcd97af1335 +customer_demographics count: 1920800 +customer_demographics md5: 4f6182b865d1c183d50860387332c0b5 +date_dim count: 73049 +date_dim md5: f4ef03663ab568ddeb16309f493896c0 +household_demographics count: 7200 +household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7 +income_band count: 20 +income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e +inventory count: 11745000 +inventory md5: 24fe36237ddbad4d9be3136f9ec49299 +item count: 18000 +item md5: 364b883875279ed9ef3ab5dada368d7c +promotion count: 300 +promotion md5: 1660520863026204779c646c58cb8870 +reason count: 35 +reason md5: 89493ae8b5ab9f63f750c1bdadc57089 +ship_mode count: 20 +ship_mode md5: 25d7c1abd229862398b88818f81f72fc +store count: 12 +store md5: f342258aaec198b0ec4d6bb6e9f7991e +store_returns count: 287514 +store_returns md5: 038278c999f980849c84e99da9e213c2 +store_sales count: 2880404 +store_sales md5: 5c5d3f51c925f546248c7cf68d9055cd +time_dim count: 86400 +time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122 +warehouse count: 5 +warehouse md5: 02268070dffd49682bc54c42580ac2ac +web_page count: 60 +web_page md5: 4a2551b4b2243b5030e5f23a605db603 +web_returns count: 71763 +web_returns md5: 0b4934c14bed8f3048deb6873cc37921 +web_sales count: 719384 +web_sales md5: b866c9b742560f2d78630853dd2b81c4 +web_site count: 30 +web_site md5: 707d556c664272f685ee8d7ddbc46f61
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state/s1.hash new file mode 100644 index 0000000000..8c635baa01 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state/s1.hash @@ -0,0 +1,48 @@ +call_center count: 6 +call_center md5: 86db117a0bb48668acbe63c473e85d96 +catalog_page count: 11718 +catalog_page md5: 0bf750caa038dee0f1f9618414f8add1 +catalog_returns count: 144067 +catalog_returns md5: 78c1bd181f07d7067be644a301a1e15a +catalog_sales count: 1441548 +catalog_sales md5: 47a7b34e4cd097c9b89457497550527c +customer count: 100000 +customer md5: 4f35263f5c2e15d6ab687f14d1acfee7 +customer_address count: 50000 +customer_address md5: edda298b082245c2d0ce0bcd97af1335 +customer_demographics count: 1920800 +customer_demographics md5: 4f6182b865d1c183d50860387332c0b5 +date_dim count: 73049 +date_dim md5: f4ef03663ab568ddeb16309f493896c0 +household_demographics count: 7200 +household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7 +income_band count: 20 +income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e +inventory count: 11745000 +inventory md5: 24fe36237ddbad4d9be3136f9ec49299 +item count: 18000 +item md5: 364b883875279ed9ef3ab5dada368d7c +promotion count: 300 +promotion md5: 1660520863026204779c646c58cb8870 +reason count: 35 +reason md5: 89493ae8b5ab9f63f750c1bdadc57089 +ship_mode count: 20 +ship_mode md5: 25d7c1abd229862398b88818f81f72fc +store count: 12 +store md5: f342258aaec198b0ec4d6bb6e9f7991e +store_returns count: 287514 +store_returns md5: 038278c999f980849c84e99da9e213c2 +store_sales count: 2880404 +store_sales md5: 5c5d3f51c925f546248c7cf68d9055cd +time_dim count: 86400 +time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122 +warehouse count: 5 +warehouse md5: 02268070dffd49682bc54c42580ac2ac +web_page count: 60 +web_page md5: 4a2551b4b2243b5030e5f23a605db603 +web_returns count: 71763 +web_returns md5: 0b4934c14bed8f3048deb6873cc37921 +web_sales count: 719384 +web_sales md5: b866c9b742560f2d78630853dd2b81c4 +web_site count: 30 +web_site md5: 707d556c664272f685ee8d7ddbc46f61
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1/s1.hash new file mode 100644 index 0000000000..325e897ab8 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1/s1.hash @@ -0,0 +1,16 @@ +customer count: 150000 +customer md5: 1808efe529a289183e7ccf8aa1a2d8e9 +lineitem count: 6001215 +lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7 +nation count: 25 +nation md5: 0e91944824fb13e44cda58882f0fedbe +orders count: 1500000 +orders md5: 01c5ca96aa3149c64427291ebbd792d4 +part count: 200000 +part md5: d67727d976d8c05b5d145840efaad449 +partsupp count: 800000 +partsupp md5: d62e99cf993c6de288a905ae2f95eced +region count: 5 +region md5: d1c494f597244c77001246888185e3e3 +supplier count: 10000 +supplier md5: 815d49d8e71c7993531b32113b2da5b5
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_parts/s1.hash new file mode 100644 index 0000000000..325e897ab8 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_parts/s1.hash @@ -0,0 +1,16 @@ +customer count: 150000 +customer md5: 1808efe529a289183e7ccf8aa1a2d8e9 +lineitem count: 6001215 +lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7 +nation count: 25 +nation md5: 0e91944824fb13e44cda58882f0fedbe +orders count: 1500000 +orders md5: 01c5ca96aa3149c64427291ebbd792d4 +part count: 200000 +part md5: d67727d976d8c05b5d145840efaad449 +partsupp count: 800000 +partsupp md5: d62e99cf993c6de288a905ae2f95eced +region count: 5 +region md5: d1c494f597244c77001246888185e3e3 +supplier count: 10000 +supplier md5: 815d49d8e71c7993531b32113b2da5b5
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state/s1.hash new file mode 100644 index 0000000000..325e897ab8 --- /dev/null +++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state/s1.hash @@ -0,0 +1,16 @@ +customer count: 150000 +customer md5: 1808efe529a289183e7ccf8aa1a2d8e9 +lineitem count: 6001215 +lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7 +nation count: 25 +nation md5: 0e91944824fb13e44cda58882f0fedbe +orders count: 1500000 +orders md5: 01c5ca96aa3149c64427291ebbd792d4 +part count: 200000 +part md5: d67727d976d8c05b5d145840efaad449 +partsupp count: 800000 +partsupp md5: d62e99cf993c6de288a905ae2f95eced +region count: 5 +region md5: d1c494f597244c77001246888185e3e3 +supplier count: 10000 +supplier md5: 815d49d8e71c7993531b32113b2da5b5
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/test_generator.py b/ydb/tests/functional/tpc/test_generator.py new file mode 100644 index 0000000000..232be40f34 --- /dev/null +++ b/ydb/tests/functional/tpc/test_generator.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- + +from __future__ import annotations + +import os +import logging +import hashlib +import pytest +import json +import random + +from ydb.tests.library.common import yatest_common +from ydb.tests.oss.canonical import set_canondata_root +from threading import Thread + +logger = logging.getLogger(__name__) + + +def ydb_bin(): + if os.getenv('YDB_CLI_BINARY'): + return yatest_common.binary_path(os.getenv('YDB_CLI_BINARY')) + raise RuntimeError('YDB_CLI_BINARY enviroment variable is not specified') + + +class TpcGeneratorBase(object): + tables: dict[str, dict[int, int]] + workload: str + + @classmethod + def execute_generator(cls, output_path, scale=1, import_args=[], generator_args=[]): + return yatest_common.execute( + [ + ydb_bin(), + '--endpoint', 'grpc://localhost', + '--database', '/Root/db', + 'workload', cls.workload, '-p', f'/Root/db/{cls.workload}/s{scale}', + 'import', '-f', output_path, + ] + + [str(arg) for arg in import_args] + + ['generator', '--scale', str(scale)] + + [str(arg) for arg in generator_args], + wait=False, + ) + + @staticmethod + def canonical_result(output_result, tmp_path): + with open(tmp_path, 'w') as f: + f.write(output_result) + return yatest_common.canonical_file(tmp_path, local=True, universal_lines=True) + + @staticmethod + def calc_hashes(files: str | list[str]): + if not isinstance(files, list): + files = [files] + rows: set[str] = set() + for file_path in files: + if not os.path.exists(file_path): + continue + first_line = True + with open(file_path, 'r') as f: + for line in f: + if first_line: + first_line = False + else: + rows.add(line) + m = hashlib.md5() + for row in sorted(rows): + m.update(row.encode()) + return len(rows), m.hexdigest() + + @classmethod + def scale_hash(cls, paths: str | list[str]): + if not isinstance(paths, list): + paths = [paths] + tables = list(sorted(cls.tables.items())) + result = [''] * 2 * len(tables) + threads = [] + + def _calc_hash(result: list[str], index: int): + fname, _ = tables[index] + count, md5 = cls.calc_hashes([os.path.join(path, fname) for path in paths]) + result[index * 2] = f'{fname} count: {count}' + result[index * 2 + 1] = f'{fname} md5: {md5}' + + for index in range(len(tables)): + threads.append(Thread(target=_calc_hash, args=(result, index))) + threads[-1].start() + for t in threads: + t.join() + return '\n'.join(result) + + @classmethod + def setup_class(cls): + set_canondata_root(f'ydb/tests/functional/{cls.workload}/canondata') + + @pytest.fixture(autouse=True, scope='function') + def init_test(self, tmp_path): + self._tmp_path = tmp_path + + def tmp_path(self, *paths): + return os.path.join(self._tmp_path, *paths) + + def get_cannonical(self, paths, execs): + for exe in execs: + exe.wait(check_exit_code=True) + return self.canonical_result(self.scale_hash(paths), self.tmp_path('s1.hash')) + + def test_s1(self): + out_fpath = self.tmp_path('s1') + return self.get_cannonical( + paths=[out_fpath], + execs=[self.execute_generator(out_fpath)] + ) + + def test_s1_parts(self): + parts_count = 10 + paths = [] + execs = [] + for part_index in range(parts_count): + paths.append(self.tmp_path(f's1.{part_index}_{parts_count}')) + execs.append( + self.execute_generator( + output_path=paths[-1], + generator_args=['--proccess-count', parts_count, '--proccess-index', part_index] + ) + ) + return self.get_cannonical(paths=paths, execs=execs) + + def test_s1_state(self): + state_path = self.tmp_path('state.json') + with open(state_path, 'w') as f: + json.dump({ + 'sources': { + k: {'position': int(v[1] * random.uniform(0.25, 0.75))} + for k, v in self.tables.items() + } + }, f) + paths = [self.tmp_path(path) for path in ['s1.1', 's1.2']] + execs = [ + self.execute_generator(output_path=paths[0]), + self.execute_generator( + output_path=paths[1], + generator_args=['--state', state_path], + ), + ] + return self.get_cannonical(paths=paths, execs=execs) + + +class TestTpchGenerator(TpcGeneratorBase): + workload = 'tpch' + tables = { + 'customer': {1: 150000}, + 'lineitem': {1: 6001215}, + 'nation': {1: 25}, + 'orders': {1: 1500000}, + 'part': {1: 200000}, + 'partsupp': {1: 800000}, + 'region': {1: 5}, + 'supplier': {1: 10000} + } + + +class TestTpcdsGenerator(TpcGeneratorBase): + workload = 'tpcds' + tables = { + 'call_center': {1: 6}, + 'catalog_page': {1: 11718}, + 'catalog_returns': {1: 144067}, + 'catalog_sales': {1: 1441548}, + 'customer': {1: 100000}, + 'customer_address': {1: 50000}, + 'customer_demographics': {1: 1920800}, + 'date_dim': {1: 73049}, + 'household_demographics': {1: 7200}, + 'income_band': {1: 20}, + 'inventory': {1: 11745000}, + 'item': {1: 18000}, + 'promotion': {1: 300}, + 'reason': {1: 35}, + 'ship_mode': {1: 20}, + 'store': {1: 12}, + 'store_returns': {1: 287514}, + 'store_sales': {1: 2880404}, + 'time_dim': {1: 86400}, + 'warehouse': {1: 5}, + 'web_page': {1: 60}, + 'web_returns': {1: 71763}, + 'web_sales': {1: 719384}, + 'web_site': {1: 30} + } diff --git a/ydb/tests/functional/tpc/ya.make b/ydb/tests/functional/tpc/ya.make new file mode 100644 index 0000000000..8ba56dcd01 --- /dev/null +++ b/ydb/tests/functional/tpc/ya.make @@ -0,0 +1,27 @@ +IF (NOT SANITIZER_TYPE) + +PY3TEST() + +TEST_SRCS(test_generator.py) + +TIMEOUT(600) +SIZE(MEDIUM) + +ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") + +DEPENDS( + ydb/apps/ydb +) + +PEERDIR( + ydb/tests/oss/ydb_sdk_import + ydb/public/sdk/python + contrib/python/PyHamcrest + ydb/tests/library +) + +FORK_SUBTESTS() +FORK_TEST_FILES() +END() + +ENDIF() diff --git a/ydb/tests/functional/ya.make b/ydb/tests/functional/ya.make index 89170c3bcb..1d20092f92 100644 --- a/ydb/tests/functional/ya.make +++ b/ydb/tests/functional/ya.make @@ -29,6 +29,7 @@ RECURSE( serverless sqs suite_tests + tpc tenants ttl wardens |