diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-06-01 10:59:51 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-06-01 10:59:51 +0300 |
commit | 2ba2c4c1e2f61ac4b37ba0bf16d71da1dbc93214 (patch) | |
tree | 2d02fe653ae52654d54e82d2fe5437612ed73636 | |
parent | 4dea553457efda88fef237809afb5b9a95da7d41 (diff) | |
download | ydb-2ba2c4c1e2f61ac4b37ba0bf16d71da1dbc93214.tar.gz |
support multiple modifications per transaction for idx test
feature(idx_test): support multiple operations per transaction
-rw-r--r-- | ydb/public/lib/idx_test/idx_test.h | 1 | ||||
-rw-r--r-- | ydb/public/lib/idx_test/idx_test_data_provider.cpp | 11 | ||||
-rw-r--r-- | ydb/public/lib/idx_test/idx_test_data_provider.h | 1 | ||||
-rw-r--r-- | ydb/public/lib/idx_test/idx_test_loader.cpp | 172 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_index_ut.cpp | 4 |
5 files changed, 114 insertions, 75 deletions
diff --git a/ydb/public/lib/idx_test/idx_test.h b/ydb/public/lib/idx_test/idx_test.h index 53fb82bca6c..4bee3797587 100644 --- a/ydb/public/lib/idx_test/idx_test.h +++ b/ydb/public/lib/idx_test/idx_test.h @@ -51,6 +51,7 @@ public: struct TRunSettings { size_t RunLimit; size_t Infly; + size_t OpsPerTx; }; virtual NJson::TJsonValue Run(const TString& tableName, ui32 loadCommands, const TRunSettings& settings) = 0; diff --git a/ydb/public/lib/idx_test/idx_test_data_provider.cpp b/ydb/public/lib/idx_test/idx_test_data_provider.cpp index 0006551f290..1f68dc3b3f4 100644 --- a/ydb/public/lib/idx_test/idx_test_data_provider.cpp +++ b/ydb/public/lib/idx_test/idx_test_data_provider.cpp @@ -203,16 +203,21 @@ NYdb::TParams CreateParamsAsItems(const TVector<TValue>& values, const TVector<T } NYdb::TParams CreateParamsAsList(const TVector<NYdb::TValue>& batch, const TString& paramName) { + NYdb::TParamsBuilder paramsBuilder; + AddParamsAsList(paramsBuilder, batch, paramName); + return paramsBuilder.Build(); +} + +void AddParamsAsList(NYdb::TParamsBuilder& paramsBuilder, const TVector<NYdb::TValue>& batch, const TString& paramName) { TValueBuilder builder; builder.BeginList(); - for (const NYdb::TValue& item: batch) { + for (const NYdb::TValue& item : batch) { builder.AddListItem(item); } builder.EndList(); - NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam(paramName, builder.Build()); - return paramsBuilder.Build(); } class TDataProvider diff --git a/ydb/public/lib/idx_test/idx_test_data_provider.h b/ydb/public/lib/idx_test/idx_test_data_provider.h index e6a9bd4aabc..bf0b420ba2c 100644 --- a/ydb/public/lib/idx_test/idx_test_data_provider.h +++ b/ydb/public/lib/idx_test/idx_test_data_provider.h @@ -97,5 +97,6 @@ NYdb::TValue CreateValue(const NYdb::TColumn& column, const TRandomValueProvider NYdb::TValue CreateRow(const TVector<NYdb::TColumn>& columns, const TRandomValueProvider& rvp); NYdb::TParams CreateParamsAsItems(const TVector<NYdb::TValue>& values, const TVector<TString>& paramNames); NYdb::TParams CreateParamsAsList(const TVector<NYdb::TValue>& batch, const TString& paramName); +void AddParamsAsList(NYdb::TParamsBuilder& paramsBuilder, const TVector<NYdb::TValue>& batch, const TString& paramName); } diff --git a/ydb/public/lib/idx_test/idx_test_loader.cpp b/ydb/public/lib/idx_test/idx_test_loader.cpp index a0a13687f45..d32bbff9f29 100644 --- a/ydb/public/lib/idx_test/idx_test_loader.cpp +++ b/ydb/public/lib/idx_test/idx_test_loader.cpp @@ -700,12 +700,14 @@ public: TTableDescription tableDescription, const TString& tableName, TTableClient& client, - IWorkLoader::ELoadCommand stmt) + IWorkLoader::ELoadCommand stmt, + size_t opsPerTx) : TRandomValueProvider(TableDescriptionToShardsPower(tableDescription), KEYRANGELIMIT) , TableDescription_(tableDescription) , TableName_(tableName) , Client_(client) , Stmt_(stmt) + , OpsPerTx_(opsPerTx) { CreatePrograms(); } @@ -782,46 +784,62 @@ public: } TString sql; + TVector<TString> predicates; + predicates.reserve(OpsPerTx_); + TVector<TString> updates; + updates.reserve(OpsPerTx_); TVector<std::pair<TColumn, TString>> resultColumns; Y_VERIFY(Stmt_ == IWorkLoader::LC_DELETE || Stmt_ == IWorkLoader::LC_UPDATE); - size_t id = 0; - TString predicate; - for (const TString& str : pkTypeStrings) { - const TString paramName = Sprintf("$items_%zu", id); - sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), str.c_str()); - resultColumns.push_back({pkCol[id], paramName}); - predicate += Sprintf(" %s = %s ", pkCol[id].Name.c_str(), paramName.c_str()); - id++; - if (id != pkTypeStrings.size()) { - predicate += " AND "; - } - } - if (Stmt_ == IWorkLoader::LC_DELETE) { - sql += Sprintf("DELETE FROM `%s` WHERE ", TableName_.c_str()); - sql += predicate; - } else { - TString update = " SET "; - size_t dataId = 0; - for (const TString& str : valueTypeStrings) { - const TString paramName = Sprintf("$items_%zu", id); - sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), str.c_str()); - resultColumns.push_back({valueCol[dataId], paramName}); - update += Sprintf(" %s = %s ", valueCol[dataId].Name.c_str(), paramName.c_str()); - id++; - dataId++; - if (dataId != valueTypeStrings.size()) { - update += ", "; + for (size_t opIndex = 0, paramId = 0; opIndex < OpsPerTx_; ++opIndex) { + TString predicate; + + for (size_t pkId = 0; pkId < pkTypeStrings.size(); ++pkId) { + const TString paramName = Sprintf("$items_%zu", paramId++); + sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), pkTypeStrings[pkId].c_str()); + resultColumns.push_back({pkCol[pkId], paramName}); + predicate += Sprintf(" %s = %s ", pkCol[pkId].Name.c_str(), paramName.c_str()); + + if (pkId + 1 != pkTypeStrings.size()) { + predicate += " AND "; } } - sql += Sprintf("UPDATE `%s` ", TableName_.c_str()); - sql += update; - sql += " WHERE "; - sql += predicate; + predicates.emplace_back(std::move(predicate)); + + if (Stmt_ == IWorkLoader::LC_UPDATE) { + TString update = " SET "; + + for (size_t dataId = 0; dataId < valueTypeStrings.size(); ++dataId) { + const TString paramName = Sprintf("$items_%zu", paramId++); + sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), valueTypeStrings[dataId].c_str()); + resultColumns.push_back({valueCol[dataId], paramName}); + update += Sprintf(" %s = %s ", valueCol[dataId].Name.c_str(), paramName.c_str()); + + if (dataId + 1 != valueTypeStrings.size()) { + update += ", "; + } + } + + updates.emplace_back(std::move(update)); + } } + for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) { + if (Stmt_ == IWorkLoader::LC_DELETE) { + sql += Sprintf("DELETE FROM `%s` WHERE ", TableName_.c_str()); + sql += predicates[opIndex]; + sql += ";\n"; + } else { + sql += Sprintf("UPDATE `%s` ", TableName_.c_str()); + sql += updates[opIndex]; + sql += " WHERE "; + sql += predicates[opIndex]; + sql += ";\n"; + } + } + return {SetPragmas(sql), resultColumns}; } @@ -849,6 +867,7 @@ private: TString TableName_; TTableClient Client_; const IWorkLoader::ELoadCommand Stmt_; + const size_t OpsPerTx_; // program, columns TVector<std::pair<TString, TVector<std::pair<TColumn, TString>>>> Programs_; }; @@ -861,12 +880,14 @@ public: TTableDescription tableDescription, const TString& tableName, TTableClient& client, - IWorkLoader::ELoadCommand stmt) + IWorkLoader::ELoadCommand stmt, + size_t opsPerTx) : TRandomValueProvider(TableDescriptionToShardsPower(tableDescription), KEYRANGELIMIT) , TableDescription_(tableDescription) , TableName_(tableName) , Client_(client) , Stmt_(stmt) + , OpsPerTx_(opsPerTx) { CreatePrograms(); } @@ -875,11 +896,16 @@ public: auto rnd = RandomNumber<ui32>(Programs_.size()); const auto& program = Programs_[rnd]; const auto& programText = program.first; - TVector<NYdb::TValue> batch; - batch.emplace_back(::NIdxTest::CreateRow(program.second, *this)); - const auto params = ::NIdxTest::CreateParamsAsList(batch, ParamName_); - return RunOperation(Client_, programText, params); + NYdb::TParamsBuilder params; + for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) { + const TString paramName = Sprintf("$items_%zu", opIndex); + TVector<NYdb::TValue> batch; + batch.emplace_back(::NIdxTest::CreateRow(program.second, *this)); + ::NIdxTest::AddParamsAsList(params, batch, paramName); + } + + return RunOperation(Client_, programText, params.Build()); } IWorkLoader::ELoadCommand GetTaskId() const override { @@ -928,33 +954,39 @@ private: auto rowType = NIdxTest::CreateRow(resCol, *this); auto rowsTypeString = NYdb::FormatType(rowType.GetType()); - TString sql; - switch (Stmt_) { - case IWorkLoader::LC_UPSERT: - sql = "DECLARE %s AS List<%s>; UPSERT INTO `%s` SELECT * FROM AS_TABLE(%s);"; - break; - case IWorkLoader::LC_REPLACE: - sql = "DECLARE %s AS List<%s>; REPLACE INTO `%s` SELECT * FROM AS_TABLE(%s);"; - break; - case IWorkLoader::LC_INSERT: - sql = "DECLARE %s AS List<%s>; INSERT INTO `%s` SELECT * FROM AS_TABLE(%s);"; - break; - case IWorkLoader::LC_UPDATE_ON: - sql = "DECLARE %s AS List<%s>; UPDATE `%s` ON SELECT * FROM AS_TABLE(%s);"; - break; - case IWorkLoader::LC_DELETE_ON: - sql = "DECLARE %s AS List<%s>; DELETE FROM `%s` ON SELECT * FROM AS_TABLE(%s);"; - break; - default: - ythrow yexception() << "unsupported statement"; - break; + TString program; + for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) { + const TString paramName = Sprintf("$items_%zu", opIndex); + program += Sprintf("DECLARE %s AS List<%s>;\n", paramName.c_str(), rowsTypeString.c_str()); } - auto program = Sprintf(sql.c_str(), - ParamName_.c_str(), - rowsTypeString.c_str(), - TableName_.c_str(), - ParamName_.c_str()); + for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) { + TString sql; + const TString paramName = Sprintf("$items_%zu", opIndex); + + switch (Stmt_) { + case IWorkLoader::LC_UPSERT: + sql = "UPSERT INTO `%s` SELECT * FROM AS_TABLE(%s);\n"; + break; + case IWorkLoader::LC_REPLACE: + sql = "REPLACE INTO `%s` SELECT * FROM AS_TABLE(%s);\n"; + break; + case IWorkLoader::LC_INSERT: + sql = "INSERT INTO `%s` SELECT * FROM AS_TABLE(%s);\n"; + break; + case IWorkLoader::LC_UPDATE_ON: + sql = "UPDATE `%s` ON SELECT * FROM AS_TABLE(%s);\n"; + break; + case IWorkLoader::LC_DELETE_ON: + sql = "DELETE FROM `%s` ON SELECT * FROM AS_TABLE(%s);\n"; + break; + default: + ythrow yexception() << "unsupported statement"; + break; + } + + program += Sprintf(sql.c_str(), TableName_.c_str(), paramName.c_str()); + } return {SetPragmas(program), resCol}; } @@ -975,9 +1007,9 @@ private: TString TableName_; TTableClient Client_; const IWorkLoader::ELoadCommand Stmt_; + const size_t OpsPerTx_; // program, columns TVector<std::pair<TString, TVector<TColumn>>> Programs_; - const TString ParamName_ = "$items"; }; class TTimingTracker { @@ -1075,31 +1107,31 @@ public: switch (i & loadCommands) { case IWorkLoader::LC_UPSERT: tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>( - tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPSERT), 1000)); + tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPSERT, settings.OpsPerTx), 1000)); break; case IWorkLoader::LC_INSERT: tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>( - tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_INSERT), 1000)); + tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_INSERT, settings.OpsPerTx), 1000)); break; case IWorkLoader::LC_UPDATE: tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamItemsTask>( - tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE), 1000)); + tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE, settings.OpsPerTx), 1000)); break; case IWorkLoader::LC_UPDATE_ON: tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>( - tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE_ON), 1000)); + tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE_ON, settings.OpsPerTx), 1000)); break; case IWorkLoader::LC_REPLACE: tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>( - tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_REPLACE), 1000)); + tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_REPLACE, settings.OpsPerTx), 1000)); break; case IWorkLoader::LC_DELETE: tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamItemsTask>( - tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE), 1000)); + tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE, settings.OpsPerTx), 1000)); break; case IWorkLoader::LC_DELETE_ON: tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>( - tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE_ON), 1000)); + tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE_ON, settings.OpsPerTx), 1000)); break; case IWorkLoader::LC_SELECT: tasks.emplace_back(std::make_pair(std::make_unique<TSelectAndCompareTask>( diff --git a/ydb/services/ydb/ydb_index_ut.cpp b/ydb/services/ydb/ydb_index_ut.cpp index aa26d04bb6f..7692b3a838e 100644 --- a/ydb/services/ydb/ydb_index_ut.cpp +++ b/ydb/services/ydb/ydb_index_ut.cpp @@ -67,7 +67,7 @@ static void RunTest(ui32 shardsCount, ui32 rowsCount, ui32 indexCount, const TRu IWorkLoader::LC_UPDATE_ON | IWorkLoader::LC_DELETE_ON | IWorkLoader::LC_DELETE; - workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{rowsCount, 5}); + workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{rowsCount, 5, 1}); auto checker = CreateChecker(driver); checker->Run("Root/Test"); driver.Stop(true); @@ -153,7 +153,7 @@ Y_UNIT_TEST_SUITE(YdbIndexTable) { ui32 stms = IWorkLoader::LC_UPSERT | (withDataColumn ? IWorkLoader::LC_ALTER_ADD_INDEX_WITH_DATA_COLUMN : IWorkLoader::LC_ALTER_ADD_INDEX); - workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{2000, 1}); + workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{2000, 1, 1}); auto checker = CreateChecker(driver); checker->Run("Root/Test"); driver.Stop(true); |