aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorОлег <150132506+iddqdex@users.noreply.github.com>2024-10-21 11:14:54 +0300
committerGitHub <noreply@github.com>2024-10-21 08:14:54 +0000
commit3fb6997b618982c100eb94a8e81441cd03688d0a (patch)
treefd99d18b2082fbe6b2d65502552a4597b9cdce36
parentb71da951207c6a8e9924af486d08f5f0f508b940 (diff)
downloadydb-3fb6997b618982c100eb94a8e81441cd03688d0a.tar.gz
Add tests for tpc {h | ds} data generator (#10564)
-rw-r--r--ydb/library/workload/tpcds/data_generator.cpp45
-rw-r--r--ydb/library/workload/tpcds/data_generator.h7
-rw-r--r--ydb/library/workload/tpch/driver.c18
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp104
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload_import.h15
-rw-r--r--ydb/tests/functional/tpc/canondata/result.json20
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1/s1.hash48
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash48
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state/s1.hash48
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1/s1.hash16
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_parts/s1.hash16
-rw-r--r--ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state/s1.hash16
-rw-r--r--ydb/tests/functional/tpc/test_generator.py190
-rw-r--r--ydb/tests/functional/tpc/ya.make27
-rw-r--r--ydb/tests/functional/ya.make1
15 files changed, 571 insertions, 48 deletions
diff --git a/ydb/library/workload/tpcds/data_generator.cpp b/ydb/library/workload/tpcds/data_generator.cpp
index 9de91e9dcc..6b6aee1c73 100644
--- a/ydb/library/workload/tpcds/data_generator.cpp
+++ b/ydb/library/workload/tpcds/data_generator.cpp
@@ -54,19 +54,30 @@ TBulkDataGeneratorList TTpcdsWorkloadDataInitializerGenerator::DoGetBulkInitialD
return TBulkDataGeneratorList(gens.begin(), gens.end());
}
-ui64 TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) {
+TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) {
+ static const TSet<ui32> allowedModules{1, 2, 4};
+ TPositions result;
const auto* tdef = getTdefsByNumber(tableNum);
if (!tdef) {
- return 0;
+ return result;
}
- i64 position = 0;
+ split_work(tableNum, &result.FirstRow, &result.Count);
if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) {
- position = owner.StateProcessor->GetState().at(tdef->name).Position;
+ result.Position = owner.StateProcessor->GetState().at(tdef->name).Position;
+ result.Count -= std::min<i64>(result.Count, result.Position);
+
+ //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
+ while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
+ --result.Position;
+ ++result.Count;
+ }
}
- ds_key_t firstRow;
- ds_key_t rowCount;
- split_work(tableNum, &firstRow, &rowCount);
- return rowCount > position ? (rowCount - position) : 0;
+ //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
+ while (result.FirstRow > 1 && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
+ --result.FirstRow;
+ ++result.Count;
+ }
+ return result;
}
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum)
@@ -116,10 +127,10 @@ TString TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTable
}
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum)
- : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true))
+ : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count)
, TableNum(tableNum)
, Owner(owner)
- , TableSize(CalcCountToGenerate(owner, tableNum, false))
+ , TableSize(CalcCountToGenerate(owner, tableNum, false).Count)
{}
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
@@ -136,16 +147,10 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
}
auto g = Guard(Lock);
- ds_key_t firstRow;
- ds_key_t rowCount;
- split_work(TableNum, &firstRow, &rowCount);
+ auto positions = CalcCountToGenerate(Owner, TableNum, !Generated);
if (!Generated) {
- ui32 toSkip = firstRow - 1;
- if (!!Owner.StateProcessor && Owner.StateProcessor->GetState().contains(GetName())) {
- Generated = Owner.StateProcessor->GetState().at(TString(GetName())).Position;
- toSkip += Generated;
- }
- if (toSkip) {
+ Generated = positions.Position;
+ if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) {
row_skip(TableNum, toSkip);
if (tdef->flags & FL_PARENT) {
row_skip(tdef->nParam, toSkip);
@@ -160,7 +165,7 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
return result;
}
ctxs.front().SetCount(count);
- ctxs.front().SetStart(firstRow + Generated);
+ ctxs.front().SetStart(positions.FirstRow + Generated);
Generated += count;
GenerateRows(ctxs, std::move(g));
for(auto& ctx: ctxs) {
diff --git a/ydb/library/workload/tpcds/data_generator.h b/ydb/library/workload/tpcds/data_generator.h
index 42095b25ec..4eed19725b 100644
--- a/ydb/library/workload/tpcds/data_generator.h
+++ b/ydb/library/workload/tpcds/data_generator.h
@@ -62,7 +62,12 @@ public:
private:
TString GetFullTableName(const char* table) const;
- static ui64 CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
+ struct TPositions {
+ i64 FirstRow = 1;
+ ui64 Position = 0;
+ i64 Count = 0;
+ };
+ static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
const TTpcdsWorkloadDataInitializerGenerator& Owner;
ui64 TableSize;
};
diff --git a/ydb/library/workload/tpch/driver.c b/ydb/library/workload/tpch/driver.c
index aff43e2d83..64e84cc6ad 100644
--- a/ydb/library/workload/tpch/driver.c
+++ b/ydb/library/workload/tpch/driver.c
@@ -8,6 +8,7 @@
#define NO_FUNC (int (*) ()) NULL /* to clean up tdefs */
#define NO_LFUNC (long (*) ()) NULL /* to clean up tdefs */
+void advanceStream(int nStream, DSS_HUGE nCalls, int bUse64Bit);
long sd_cust (int child, DSS_HUGE skip_count);
long sd_line (int child, DSS_HUGE skip_count);
long sd_order (int child, DSS_HUGE skip_count);
@@ -16,6 +17,19 @@ long sd_psupp (int child, DSS_HUGE skip_count);
long sd_supp (int child, DSS_HUGE skip_count);
long sd_order_line (int child, DSS_HUGE skip_count);
long sd_part_psupp (int child, DSS_HUGE skip_count);
+
+long sd_region (int child, DSS_HUGE skip_count) {
+ (void)child;
+ advanceStream(R_CMNT_SD, 2 * skip_count, 0);
+ return 0;
+}
+
+long sd_nation (int child, DSS_HUGE skip_count) {
+ (void)child;
+ advanceStream(N_CMNT_SD, 2 * skip_count, 0);
+ return 0;
+}
+
void ReadDistFromResource(const char* name, distribution* target);
tdef tdefs[] =
@@ -28,8 +42,8 @@ tdef tdefs[] =
{"lineitem", "lineitem table", 150000 * ORDERS_PER_CUST, NO_FUNC, sd_line, NONE, 0}, //LINE 5
{"orders", "orders/lineitem tables", 150000 * ORDERS_PER_CUST, NO_FUNC, sd_order, LINE, 0}, //ORDER_LINE 6
{"part", "part/partsupplier tables", 200000, NO_FUNC, sd_part, PSUPP, 0}, //PART_PSUPP 7
- {"nation", "nation table", NATIONS_MAX, NO_FUNC, NO_LFUNC, NONE, 0}, //NATION 8
- {"region", "region table", NATIONS_MAX, NO_FUNC, NO_LFUNC, NONE, 0}, //REGION 9
+ {"nation", "nation table", NATIONS_MAX, NO_FUNC, sd_nation, NONE, 0}, //NATION 8
+ {"region", "region table", NATIONS_MAX, NO_FUNC, sd_region, NONE, 0}, //REGION 9
};
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
index 225ad93ecc..f65f1a8d66 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
@@ -24,6 +24,8 @@ void TWorkloadCommandImport::Config(TConfig& config) {
.DefaultValue(WorkloadParams.BulkSize).StoreResult(&WorkloadParams.BulkSize);
config.Opts->AddLongOption("max-in-flight", "Maximum number if data portions that can be simultaneously in process.")
.DefaultValue(UploadParams.MaxInFlight).StoreResult(&UploadParams.MaxInFlight);
+ config.Opts->AddLongOption('f', "file-output-path", "Path to a directory to save tables into as files instead of uploading it to db.")
+ .StoreResult(&UploadParams.FileOutputPath);
}
TWorkloadCommandImport::TUploadParams::TUploadParams()
@@ -45,6 +47,11 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
auto dataGeneratorList = Initializer->GetBulkInitialData();
AtomicSet(ErrorsCount, 0);
InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight);
+ if (UploadParams.FileOutputPath.IsDefined()) {
+ Writer = MakeHolder<TFileWriter>(*this);
+ } else {
+ Writer = MakeHolder<TDbWriter>(*this);
+ }
for (auto dataGen : dataGeneratorList) {
TThreadPoolParams params;
params.SetCatching(false);
@@ -69,39 +76,88 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
return AtomicGet(ErrorsCount) ? EXIT_FAILURE : EXIT_SUCCESS;
}
-TAsyncStatus TWorkloadCommandImport::TUploadCommand::SendDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) const {
- auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) {
- return TStatus(result.GetValueSync());
- };
- if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
- return TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult);
+class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter {
+public:
+ using IWriter::IWriter;
+
+ TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override {
+ auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) {
+ return TStatus(result.GetValueSync());
+ };
+ if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
+ return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult);
+ }
+ NRetry::TRetryOperationSettings retrySettings;
+ retrySettings.RetryUndefined(true);
+ retrySettings.MaxRetries(30);
+ if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
+ return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
+ NTable::TBulkUpsertSettings settings;
+ settings.FormatSettings(value->FormatString);
+ return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings)
+ .Apply(convertResult);
+ }, retrySettings);
+ }
+ if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
+ return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
+ return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema)
+ .Apply(convertResult);
+ }, retrySettings);
+ }
+ Y_FAIL_S("Invalid data portion");
+ }
+};
+
+class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
+public:
+ TFileWriter(const TWorkloadCommandImport::TUploadCommand& owner)
+ :IWriter(owner)
+ {
+ Owner.UploadParams.FileOutputPath.ForceDelete();
+ Owner.UploadParams.FileOutputPath.MkDirs();
}
- NRetry::TRetryOperationSettings retrySettings;
- retrySettings.RetryUndefined(true);
- retrySettings.MaxRetries(30);
- if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
- return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
- NTable::TBulkUpsertSettings settings;
- settings.FormatSettings(value->FormatString);
- return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings)
- .Apply(convertResult);
- }, retrySettings);
+
+ TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override {
+ if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
+ return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented"));
+ }
+ if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
+ auto g = Guard(Lock);
+ auto [out, created] = GetOutput(portion->GetTable());
+ TStringBuf toWrite(value->Data);
+ if (!created) {
+ TStringBuf firstLine;
+ toWrite.ReadLine(firstLine);
+ }
+ out->Write(toWrite);
+ return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
+ }
+ if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
+ return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented"));
+ }
+ Y_FAIL_S("Invalid data portion");
}
- if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
- return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
- return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema)
- .Apply(convertResult);
- }, retrySettings);
+
+private:
+ std::pair<TFileOutput*, bool> GetOutput(const TString& table) {
+ auto fname = TFsPath(table).Basename();
+ if (auto* result = MapFindPtr(CsvOutputs, fname)) {
+ return std::make_pair(result->Get(), false);
+ }
+ auto result = MakeAtomicShared<TFileOutput>(Owner.UploadParams.FileOutputPath / fname);
+ CsvOutputs[fname] = result;
+ return std::make_pair(result.Get(), true);
}
- Y_FAIL_S("Invalid data portion");
-}
+ TMap<TString, TAtomicSharedPtr<TFileOutput>> CsvOutputs;
+ TAdaptiveLock Lock;
+};
void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
TAtomic counter = 0;
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
for (const auto& data: portions) {
AtomicIncrement(counter);
- SendDataPortion(data).Apply(
+ Writer->WriteDataPortion(data).Apply(
[data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
const auto& res = result.GetValueSync();
data->SetSendResult(res);
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
index 8bf49facc6..6cd8ced007 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
@@ -13,6 +13,7 @@ private:
TUploadParams();
ui32 Threads;
ui32 MaxInFlight = 128;
+ TFsPath FileOutputPath;
};
class TUploadCommand;
@@ -26,9 +27,20 @@ public:
virtual void Config(TConfig& config) override;
private:
+ class IWriter {
+ public:
+ IWriter(const TWorkloadCommandImport::TUploadCommand& owner)
+ : Owner(owner)
+ {}
+ virtual ~IWriter() = default;
+ virtual TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) = 0;
+ protected:
+ const TWorkloadCommandImport::TUploadCommand& Owner;
+ };
+ class TFileWriter;
+ class TDbWriter;
NTable::TSession GetSession();
int DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) override;
- TAsyncStatus SendDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) const;
void ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept;
const TUploadParams& UploadParams;
@@ -37,6 +49,7 @@ private:
TAdaptiveLock Lock;
THolder<TFastSemaphore> InFlightSemaphore;
TAtomic ErrorsCount;
+ THolder<IWriter> Writer;
};
} \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/canondata/result.json b/ydb/tests/functional/tpc/canondata/result.json
new file mode 100644
index 0000000000..58d8118d24
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/result.json
@@ -0,0 +1,20 @@
+{
+ "test_generator.TestTpcdsGenerator.test_s1": {
+ "uri": "file://test_generator.TestTpcdsGenerator.test_s1/s1.hash"
+ },
+ "test_generator.TestTpcdsGenerator.test_s1_parts": {
+ "uri": "file://test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash"
+ },
+ "test_generator.TestTpcdsGenerator.test_s1_state": {
+ "uri": "file://test_generator.TestTpcdsGenerator.test_s1_state/s1.hash"
+ },
+ "test_generator.TestTpchGenerator.test_s1": {
+ "uri": "file://test_generator.TestTpchGenerator.test_s1/s1.hash"
+ },
+ "test_generator.TestTpchGenerator.test_s1_parts": {
+ "uri": "file://test_generator.TestTpchGenerator.test_s1_parts/s1.hash"
+ },
+ "test_generator.TestTpchGenerator.test_s1_state": {
+ "uri": "file://test_generator.TestTpchGenerator.test_s1_state/s1.hash"
+ }
+}
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1/s1.hash
new file mode 100644
index 0000000000..8c635baa01
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1/s1.hash
@@ -0,0 +1,48 @@
+call_center count: 6
+call_center md5: 86db117a0bb48668acbe63c473e85d96
+catalog_page count: 11718
+catalog_page md5: 0bf750caa038dee0f1f9618414f8add1
+catalog_returns count: 144067
+catalog_returns md5: 78c1bd181f07d7067be644a301a1e15a
+catalog_sales count: 1441548
+catalog_sales md5: 47a7b34e4cd097c9b89457497550527c
+customer count: 100000
+customer md5: 4f35263f5c2e15d6ab687f14d1acfee7
+customer_address count: 50000
+customer_address md5: edda298b082245c2d0ce0bcd97af1335
+customer_demographics count: 1920800
+customer_demographics md5: 4f6182b865d1c183d50860387332c0b5
+date_dim count: 73049
+date_dim md5: f4ef03663ab568ddeb16309f493896c0
+household_demographics count: 7200
+household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7
+income_band count: 20
+income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e
+inventory count: 11745000
+inventory md5: 24fe36237ddbad4d9be3136f9ec49299
+item count: 18000
+item md5: 364b883875279ed9ef3ab5dada368d7c
+promotion count: 300
+promotion md5: 1660520863026204779c646c58cb8870
+reason count: 35
+reason md5: 89493ae8b5ab9f63f750c1bdadc57089
+ship_mode count: 20
+ship_mode md5: 25d7c1abd229862398b88818f81f72fc
+store count: 12
+store md5: f342258aaec198b0ec4d6bb6e9f7991e
+store_returns count: 287514
+store_returns md5: 038278c999f980849c84e99da9e213c2
+store_sales count: 2880404
+store_sales md5: 5c5d3f51c925f546248c7cf68d9055cd
+time_dim count: 86400
+time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122
+warehouse count: 5
+warehouse md5: 02268070dffd49682bc54c42580ac2ac
+web_page count: 60
+web_page md5: 4a2551b4b2243b5030e5f23a605db603
+web_returns count: 71763
+web_returns md5: 0b4934c14bed8f3048deb6873cc37921
+web_sales count: 719384
+web_sales md5: b866c9b742560f2d78630853dd2b81c4
+web_site count: 30
+web_site md5: 707d556c664272f685ee8d7ddbc46f61 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash
new file mode 100644
index 0000000000..8c635baa01
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash
@@ -0,0 +1,48 @@
+call_center count: 6
+call_center md5: 86db117a0bb48668acbe63c473e85d96
+catalog_page count: 11718
+catalog_page md5: 0bf750caa038dee0f1f9618414f8add1
+catalog_returns count: 144067
+catalog_returns md5: 78c1bd181f07d7067be644a301a1e15a
+catalog_sales count: 1441548
+catalog_sales md5: 47a7b34e4cd097c9b89457497550527c
+customer count: 100000
+customer md5: 4f35263f5c2e15d6ab687f14d1acfee7
+customer_address count: 50000
+customer_address md5: edda298b082245c2d0ce0bcd97af1335
+customer_demographics count: 1920800
+customer_demographics md5: 4f6182b865d1c183d50860387332c0b5
+date_dim count: 73049
+date_dim md5: f4ef03663ab568ddeb16309f493896c0
+household_demographics count: 7200
+household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7
+income_band count: 20
+income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e
+inventory count: 11745000
+inventory md5: 24fe36237ddbad4d9be3136f9ec49299
+item count: 18000
+item md5: 364b883875279ed9ef3ab5dada368d7c
+promotion count: 300
+promotion md5: 1660520863026204779c646c58cb8870
+reason count: 35
+reason md5: 89493ae8b5ab9f63f750c1bdadc57089
+ship_mode count: 20
+ship_mode md5: 25d7c1abd229862398b88818f81f72fc
+store count: 12
+store md5: f342258aaec198b0ec4d6bb6e9f7991e
+store_returns count: 287514
+store_returns md5: 038278c999f980849c84e99da9e213c2
+store_sales count: 2880404
+store_sales md5: 5c5d3f51c925f546248c7cf68d9055cd
+time_dim count: 86400
+time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122
+warehouse count: 5
+warehouse md5: 02268070dffd49682bc54c42580ac2ac
+web_page count: 60
+web_page md5: 4a2551b4b2243b5030e5f23a605db603
+web_returns count: 71763
+web_returns md5: 0b4934c14bed8f3048deb6873cc37921
+web_sales count: 719384
+web_sales md5: b866c9b742560f2d78630853dd2b81c4
+web_site count: 30
+web_site md5: 707d556c664272f685ee8d7ddbc46f61 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state/s1.hash
new file mode 100644
index 0000000000..8c635baa01
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpcdsGenerator.test_s1_state/s1.hash
@@ -0,0 +1,48 @@
+call_center count: 6
+call_center md5: 86db117a0bb48668acbe63c473e85d96
+catalog_page count: 11718
+catalog_page md5: 0bf750caa038dee0f1f9618414f8add1
+catalog_returns count: 144067
+catalog_returns md5: 78c1bd181f07d7067be644a301a1e15a
+catalog_sales count: 1441548
+catalog_sales md5: 47a7b34e4cd097c9b89457497550527c
+customer count: 100000
+customer md5: 4f35263f5c2e15d6ab687f14d1acfee7
+customer_address count: 50000
+customer_address md5: edda298b082245c2d0ce0bcd97af1335
+customer_demographics count: 1920800
+customer_demographics md5: 4f6182b865d1c183d50860387332c0b5
+date_dim count: 73049
+date_dim md5: f4ef03663ab568ddeb16309f493896c0
+household_demographics count: 7200
+household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7
+income_band count: 20
+income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e
+inventory count: 11745000
+inventory md5: 24fe36237ddbad4d9be3136f9ec49299
+item count: 18000
+item md5: 364b883875279ed9ef3ab5dada368d7c
+promotion count: 300
+promotion md5: 1660520863026204779c646c58cb8870
+reason count: 35
+reason md5: 89493ae8b5ab9f63f750c1bdadc57089
+ship_mode count: 20
+ship_mode md5: 25d7c1abd229862398b88818f81f72fc
+store count: 12
+store md5: f342258aaec198b0ec4d6bb6e9f7991e
+store_returns count: 287514
+store_returns md5: 038278c999f980849c84e99da9e213c2
+store_sales count: 2880404
+store_sales md5: 5c5d3f51c925f546248c7cf68d9055cd
+time_dim count: 86400
+time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122
+warehouse count: 5
+warehouse md5: 02268070dffd49682bc54c42580ac2ac
+web_page count: 60
+web_page md5: 4a2551b4b2243b5030e5f23a605db603
+web_returns count: 71763
+web_returns md5: 0b4934c14bed8f3048deb6873cc37921
+web_sales count: 719384
+web_sales md5: b866c9b742560f2d78630853dd2b81c4
+web_site count: 30
+web_site md5: 707d556c664272f685ee8d7ddbc46f61 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1/s1.hash
new file mode 100644
index 0000000000..325e897ab8
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1/s1.hash
@@ -0,0 +1,16 @@
+customer count: 150000
+customer md5: 1808efe529a289183e7ccf8aa1a2d8e9
+lineitem count: 6001215
+lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7
+nation count: 25
+nation md5: 0e91944824fb13e44cda58882f0fedbe
+orders count: 1500000
+orders md5: 01c5ca96aa3149c64427291ebbd792d4
+part count: 200000
+part md5: d67727d976d8c05b5d145840efaad449
+partsupp count: 800000
+partsupp md5: d62e99cf993c6de288a905ae2f95eced
+region count: 5
+region md5: d1c494f597244c77001246888185e3e3
+supplier count: 10000
+supplier md5: 815d49d8e71c7993531b32113b2da5b5 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_parts/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_parts/s1.hash
new file mode 100644
index 0000000000..325e897ab8
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_parts/s1.hash
@@ -0,0 +1,16 @@
+customer count: 150000
+customer md5: 1808efe529a289183e7ccf8aa1a2d8e9
+lineitem count: 6001215
+lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7
+nation count: 25
+nation md5: 0e91944824fb13e44cda58882f0fedbe
+orders count: 1500000
+orders md5: 01c5ca96aa3149c64427291ebbd792d4
+part count: 200000
+part md5: d67727d976d8c05b5d145840efaad449
+partsupp count: 800000
+partsupp md5: d62e99cf993c6de288a905ae2f95eced
+region count: 5
+region md5: d1c494f597244c77001246888185e3e3
+supplier count: 10000
+supplier md5: 815d49d8e71c7993531b32113b2da5b5 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state/s1.hash b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state/s1.hash
new file mode 100644
index 0000000000..325e897ab8
--- /dev/null
+++ b/ydb/tests/functional/tpc/canondata/test_generator.TestTpchGenerator.test_s1_state/s1.hash
@@ -0,0 +1,16 @@
+customer count: 150000
+customer md5: 1808efe529a289183e7ccf8aa1a2d8e9
+lineitem count: 6001215
+lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7
+nation count: 25
+nation md5: 0e91944824fb13e44cda58882f0fedbe
+orders count: 1500000
+orders md5: 01c5ca96aa3149c64427291ebbd792d4
+part count: 200000
+part md5: d67727d976d8c05b5d145840efaad449
+partsupp count: 800000
+partsupp md5: d62e99cf993c6de288a905ae2f95eced
+region count: 5
+region md5: d1c494f597244c77001246888185e3e3
+supplier count: 10000
+supplier md5: 815d49d8e71c7993531b32113b2da5b5 \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/test_generator.py b/ydb/tests/functional/tpc/test_generator.py
new file mode 100644
index 0000000000..232be40f34
--- /dev/null
+++ b/ydb/tests/functional/tpc/test_generator.py
@@ -0,0 +1,190 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import annotations
+
+import os
+import logging
+import hashlib
+import pytest
+import json
+import random
+
+from ydb.tests.library.common import yatest_common
+from ydb.tests.oss.canonical import set_canondata_root
+from threading import Thread
+
+logger = logging.getLogger(__name__)
+
+
+def ydb_bin():
+ if os.getenv('YDB_CLI_BINARY'):
+ return yatest_common.binary_path(os.getenv('YDB_CLI_BINARY'))
+ raise RuntimeError('YDB_CLI_BINARY enviroment variable is not specified')
+
+
+class TpcGeneratorBase(object):
+ tables: dict[str, dict[int, int]]
+ workload: str
+
+ @classmethod
+ def execute_generator(cls, output_path, scale=1, import_args=[], generator_args=[]):
+ return yatest_common.execute(
+ [
+ ydb_bin(),
+ '--endpoint', 'grpc://localhost',
+ '--database', '/Root/db',
+ 'workload', cls.workload, '-p', f'/Root/db/{cls.workload}/s{scale}',
+ 'import', '-f', output_path,
+ ]
+ + [str(arg) for arg in import_args]
+ + ['generator', '--scale', str(scale)]
+ + [str(arg) for arg in generator_args],
+ wait=False,
+ )
+
+ @staticmethod
+ def canonical_result(output_result, tmp_path):
+ with open(tmp_path, 'w') as f:
+ f.write(output_result)
+ return yatest_common.canonical_file(tmp_path, local=True, universal_lines=True)
+
+ @staticmethod
+ def calc_hashes(files: str | list[str]):
+ if not isinstance(files, list):
+ files = [files]
+ rows: set[str] = set()
+ for file_path in files:
+ if not os.path.exists(file_path):
+ continue
+ first_line = True
+ with open(file_path, 'r') as f:
+ for line in f:
+ if first_line:
+ first_line = False
+ else:
+ rows.add(line)
+ m = hashlib.md5()
+ for row in sorted(rows):
+ m.update(row.encode())
+ return len(rows), m.hexdigest()
+
+ @classmethod
+ def scale_hash(cls, paths: str | list[str]):
+ if not isinstance(paths, list):
+ paths = [paths]
+ tables = list(sorted(cls.tables.items()))
+ result = [''] * 2 * len(tables)
+ threads = []
+
+ def _calc_hash(result: list[str], index: int):
+ fname, _ = tables[index]
+ count, md5 = cls.calc_hashes([os.path.join(path, fname) for path in paths])
+ result[index * 2] = f'{fname} count: {count}'
+ result[index * 2 + 1] = f'{fname} md5: {md5}'
+
+ for index in range(len(tables)):
+ threads.append(Thread(target=_calc_hash, args=(result, index)))
+ threads[-1].start()
+ for t in threads:
+ t.join()
+ return '\n'.join(result)
+
+ @classmethod
+ def setup_class(cls):
+ set_canondata_root(f'ydb/tests/functional/{cls.workload}/canondata')
+
+ @pytest.fixture(autouse=True, scope='function')
+ def init_test(self, tmp_path):
+ self._tmp_path = tmp_path
+
+ def tmp_path(self, *paths):
+ return os.path.join(self._tmp_path, *paths)
+
+ def get_cannonical(self, paths, execs):
+ for exe in execs:
+ exe.wait(check_exit_code=True)
+ return self.canonical_result(self.scale_hash(paths), self.tmp_path('s1.hash'))
+
+ def test_s1(self):
+ out_fpath = self.tmp_path('s1')
+ return self.get_cannonical(
+ paths=[out_fpath],
+ execs=[self.execute_generator(out_fpath)]
+ )
+
+ def test_s1_parts(self):
+ parts_count = 10
+ paths = []
+ execs = []
+ for part_index in range(parts_count):
+ paths.append(self.tmp_path(f's1.{part_index}_{parts_count}'))
+ execs.append(
+ self.execute_generator(
+ output_path=paths[-1],
+ generator_args=['--proccess-count', parts_count, '--proccess-index', part_index]
+ )
+ )
+ return self.get_cannonical(paths=paths, execs=execs)
+
+ def test_s1_state(self):
+ state_path = self.tmp_path('state.json')
+ with open(state_path, 'w') as f:
+ json.dump({
+ 'sources': {
+ k: {'position': int(v[1] * random.uniform(0.25, 0.75))}
+ for k, v in self.tables.items()
+ }
+ }, f)
+ paths = [self.tmp_path(path) for path in ['s1.1', 's1.2']]
+ execs = [
+ self.execute_generator(output_path=paths[0]),
+ self.execute_generator(
+ output_path=paths[1],
+ generator_args=['--state', state_path],
+ ),
+ ]
+ return self.get_cannonical(paths=paths, execs=execs)
+
+
+class TestTpchGenerator(TpcGeneratorBase):
+ workload = 'tpch'
+ tables = {
+ 'customer': {1: 150000},
+ 'lineitem': {1: 6001215},
+ 'nation': {1: 25},
+ 'orders': {1: 1500000},
+ 'part': {1: 200000},
+ 'partsupp': {1: 800000},
+ 'region': {1: 5},
+ 'supplier': {1: 10000}
+ }
+
+
+class TestTpcdsGenerator(TpcGeneratorBase):
+ workload = 'tpcds'
+ tables = {
+ 'call_center': {1: 6},
+ 'catalog_page': {1: 11718},
+ 'catalog_returns': {1: 144067},
+ 'catalog_sales': {1: 1441548},
+ 'customer': {1: 100000},
+ 'customer_address': {1: 50000},
+ 'customer_demographics': {1: 1920800},
+ 'date_dim': {1: 73049},
+ 'household_demographics': {1: 7200},
+ 'income_band': {1: 20},
+ 'inventory': {1: 11745000},
+ 'item': {1: 18000},
+ 'promotion': {1: 300},
+ 'reason': {1: 35},
+ 'ship_mode': {1: 20},
+ 'store': {1: 12},
+ 'store_returns': {1: 287514},
+ 'store_sales': {1: 2880404},
+ 'time_dim': {1: 86400},
+ 'warehouse': {1: 5},
+ 'web_page': {1: 60},
+ 'web_returns': {1: 71763},
+ 'web_sales': {1: 719384},
+ 'web_site': {1: 30}
+ }
diff --git a/ydb/tests/functional/tpc/ya.make b/ydb/tests/functional/tpc/ya.make
new file mode 100644
index 0000000000..8ba56dcd01
--- /dev/null
+++ b/ydb/tests/functional/tpc/ya.make
@@ -0,0 +1,27 @@
+IF (NOT SANITIZER_TYPE)
+
+PY3TEST()
+
+TEST_SRCS(test_generator.py)
+
+TIMEOUT(600)
+SIZE(MEDIUM)
+
+ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
+
+DEPENDS(
+ ydb/apps/ydb
+)
+
+PEERDIR(
+ ydb/tests/oss/ydb_sdk_import
+ ydb/public/sdk/python
+ contrib/python/PyHamcrest
+ ydb/tests/library
+)
+
+FORK_SUBTESTS()
+FORK_TEST_FILES()
+END()
+
+ENDIF()
diff --git a/ydb/tests/functional/ya.make b/ydb/tests/functional/ya.make
index 89170c3bcb..1d20092f92 100644
--- a/ydb/tests/functional/ya.make
+++ b/ydb/tests/functional/ya.make
@@ -29,6 +29,7 @@ RECURSE(
serverless
sqs
suite_tests
+ tpc
tenants
ttl
wardens