aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-06-01 10:59:51 +0300
committerulya-sidorina <yulia@ydb.tech>2023-06-01 10:59:51 +0300
commit2ba2c4c1e2f61ac4b37ba0bf16d71da1dbc93214 (patch)
tree2d02fe653ae52654d54e82d2fe5437612ed73636
parent4dea553457efda88fef237809afb5b9a95da7d41 (diff)
downloadydb-2ba2c4c1e2f61ac4b37ba0bf16d71da1dbc93214.tar.gz
support multiple modifications per transaction for idx test
feature(idx_test): support multiple operations per transaction
-rw-r--r--ydb/public/lib/idx_test/idx_test.h1
-rw-r--r--ydb/public/lib/idx_test/idx_test_data_provider.cpp11
-rw-r--r--ydb/public/lib/idx_test/idx_test_data_provider.h1
-rw-r--r--ydb/public/lib/idx_test/idx_test_loader.cpp172
-rw-r--r--ydb/services/ydb/ydb_index_ut.cpp4
5 files changed, 114 insertions, 75 deletions
diff --git a/ydb/public/lib/idx_test/idx_test.h b/ydb/public/lib/idx_test/idx_test.h
index 53fb82bca6c..4bee3797587 100644
--- a/ydb/public/lib/idx_test/idx_test.h
+++ b/ydb/public/lib/idx_test/idx_test.h
@@ -51,6 +51,7 @@ public:
struct TRunSettings {
size_t RunLimit;
size_t Infly;
+ size_t OpsPerTx;
};
virtual NJson::TJsonValue Run(const TString& tableName, ui32 loadCommands, const TRunSettings& settings) = 0;
diff --git a/ydb/public/lib/idx_test/idx_test_data_provider.cpp b/ydb/public/lib/idx_test/idx_test_data_provider.cpp
index 0006551f290..1f68dc3b3f4 100644
--- a/ydb/public/lib/idx_test/idx_test_data_provider.cpp
+++ b/ydb/public/lib/idx_test/idx_test_data_provider.cpp
@@ -203,16 +203,21 @@ NYdb::TParams CreateParamsAsItems(const TVector<TValue>& values, const TVector<T
}
NYdb::TParams CreateParamsAsList(const TVector<NYdb::TValue>& batch, const TString& paramName) {
+ NYdb::TParamsBuilder paramsBuilder;
+ AddParamsAsList(paramsBuilder, batch, paramName);
+ return paramsBuilder.Build();
+}
+
+void AddParamsAsList(NYdb::TParamsBuilder& paramsBuilder, const TVector<NYdb::TValue>& batch, const TString& paramName) {
TValueBuilder builder;
builder.BeginList();
- for (const NYdb::TValue& item: batch) {
+ for (const NYdb::TValue& item : batch) {
builder.AddListItem(item);
}
builder.EndList();
- NYdb::TParamsBuilder paramsBuilder;
+
paramsBuilder.AddParam(paramName, builder.Build());
- return paramsBuilder.Build();
}
class TDataProvider
diff --git a/ydb/public/lib/idx_test/idx_test_data_provider.h b/ydb/public/lib/idx_test/idx_test_data_provider.h
index e6a9bd4aabc..bf0b420ba2c 100644
--- a/ydb/public/lib/idx_test/idx_test_data_provider.h
+++ b/ydb/public/lib/idx_test/idx_test_data_provider.h
@@ -97,5 +97,6 @@ NYdb::TValue CreateValue(const NYdb::TColumn& column, const TRandomValueProvider
NYdb::TValue CreateRow(const TVector<NYdb::TColumn>& columns, const TRandomValueProvider& rvp);
NYdb::TParams CreateParamsAsItems(const TVector<NYdb::TValue>& values, const TVector<TString>& paramNames);
NYdb::TParams CreateParamsAsList(const TVector<NYdb::TValue>& batch, const TString& paramName);
+void AddParamsAsList(NYdb::TParamsBuilder& paramsBuilder, const TVector<NYdb::TValue>& batch, const TString& paramName);
}
diff --git a/ydb/public/lib/idx_test/idx_test_loader.cpp b/ydb/public/lib/idx_test/idx_test_loader.cpp
index a0a13687f45..d32bbff9f29 100644
--- a/ydb/public/lib/idx_test/idx_test_loader.cpp
+++ b/ydb/public/lib/idx_test/idx_test_loader.cpp
@@ -700,12 +700,14 @@ public:
TTableDescription tableDescription,
const TString& tableName,
TTableClient& client,
- IWorkLoader::ELoadCommand stmt)
+ IWorkLoader::ELoadCommand stmt,
+ size_t opsPerTx)
: TRandomValueProvider(TableDescriptionToShardsPower(tableDescription), KEYRANGELIMIT)
, TableDescription_(tableDescription)
, TableName_(tableName)
, Client_(client)
, Stmt_(stmt)
+ , OpsPerTx_(opsPerTx)
{
CreatePrograms();
}
@@ -782,46 +784,62 @@ public:
}
TString sql;
+ TVector<TString> predicates;
+ predicates.reserve(OpsPerTx_);
+ TVector<TString> updates;
+ updates.reserve(OpsPerTx_);
TVector<std::pair<TColumn, TString>> resultColumns;
Y_VERIFY(Stmt_ == IWorkLoader::LC_DELETE || Stmt_ == IWorkLoader::LC_UPDATE);
- size_t id = 0;
- TString predicate;
- for (const TString& str : pkTypeStrings) {
- const TString paramName = Sprintf("$items_%zu", id);
- sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), str.c_str());
- resultColumns.push_back({pkCol[id], paramName});
- predicate += Sprintf(" %s = %s ", pkCol[id].Name.c_str(), paramName.c_str());
- id++;
- if (id != pkTypeStrings.size()) {
- predicate += " AND ";
- }
- }
- if (Stmt_ == IWorkLoader::LC_DELETE) {
- sql += Sprintf("DELETE FROM `%s` WHERE ", TableName_.c_str());
- sql += predicate;
- } else {
- TString update = " SET ";
- size_t dataId = 0;
- for (const TString& str : valueTypeStrings) {
- const TString paramName = Sprintf("$items_%zu", id);
- sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), str.c_str());
- resultColumns.push_back({valueCol[dataId], paramName});
- update += Sprintf(" %s = %s ", valueCol[dataId].Name.c_str(), paramName.c_str());
- id++;
- dataId++;
- if (dataId != valueTypeStrings.size()) {
- update += ", ";
+ for (size_t opIndex = 0, paramId = 0; opIndex < OpsPerTx_; ++opIndex) {
+ TString predicate;
+
+ for (size_t pkId = 0; pkId < pkTypeStrings.size(); ++pkId) {
+ const TString paramName = Sprintf("$items_%zu", paramId++);
+ sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), pkTypeStrings[pkId].c_str());
+ resultColumns.push_back({pkCol[pkId], paramName});
+ predicate += Sprintf(" %s = %s ", pkCol[pkId].Name.c_str(), paramName.c_str());
+
+ if (pkId + 1 != pkTypeStrings.size()) {
+ predicate += " AND ";
}
}
- sql += Sprintf("UPDATE `%s` ", TableName_.c_str());
- sql += update;
- sql += " WHERE ";
- sql += predicate;
+ predicates.emplace_back(std::move(predicate));
+
+ if (Stmt_ == IWorkLoader::LC_UPDATE) {
+ TString update = " SET ";
+
+ for (size_t dataId = 0; dataId < valueTypeStrings.size(); ++dataId) {
+ const TString paramName = Sprintf("$items_%zu", paramId++);
+ sql += Sprintf("DECLARE %s AS %s;\n", paramName.c_str(), valueTypeStrings[dataId].c_str());
+ resultColumns.push_back({valueCol[dataId], paramName});
+ update += Sprintf(" %s = %s ", valueCol[dataId].Name.c_str(), paramName.c_str());
+
+ if (dataId + 1 != valueTypeStrings.size()) {
+ update += ", ";
+ }
+ }
+
+ updates.emplace_back(std::move(update));
+ }
}
+ for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) {
+ if (Stmt_ == IWorkLoader::LC_DELETE) {
+ sql += Sprintf("DELETE FROM `%s` WHERE ", TableName_.c_str());
+ sql += predicates[opIndex];
+ sql += ";\n";
+ } else {
+ sql += Sprintf("UPDATE `%s` ", TableName_.c_str());
+ sql += updates[opIndex];
+ sql += " WHERE ";
+ sql += predicates[opIndex];
+ sql += ";\n";
+ }
+ }
+
return {SetPragmas(sql), resultColumns};
}
@@ -849,6 +867,7 @@ private:
TString TableName_;
TTableClient Client_;
const IWorkLoader::ELoadCommand Stmt_;
+ const size_t OpsPerTx_;
// program, columns
TVector<std::pair<TString, TVector<std::pair<TColumn, TString>>>> Programs_;
};
@@ -861,12 +880,14 @@ public:
TTableDescription tableDescription,
const TString& tableName,
TTableClient& client,
- IWorkLoader::ELoadCommand stmt)
+ IWorkLoader::ELoadCommand stmt,
+ size_t opsPerTx)
: TRandomValueProvider(TableDescriptionToShardsPower(tableDescription), KEYRANGELIMIT)
, TableDescription_(tableDescription)
, TableName_(tableName)
, Client_(client)
, Stmt_(stmt)
+ , OpsPerTx_(opsPerTx)
{
CreatePrograms();
}
@@ -875,11 +896,16 @@ public:
auto rnd = RandomNumber<ui32>(Programs_.size());
const auto& program = Programs_[rnd];
const auto& programText = program.first;
- TVector<NYdb::TValue> batch;
- batch.emplace_back(::NIdxTest::CreateRow(program.second, *this));
- const auto params = ::NIdxTest::CreateParamsAsList(batch, ParamName_);
- return RunOperation(Client_, programText, params);
+ NYdb::TParamsBuilder params;
+ for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) {
+ const TString paramName = Sprintf("$items_%zu", opIndex);
+ TVector<NYdb::TValue> batch;
+ batch.emplace_back(::NIdxTest::CreateRow(program.second, *this));
+ ::NIdxTest::AddParamsAsList(params, batch, paramName);
+ }
+
+ return RunOperation(Client_, programText, params.Build());
}
IWorkLoader::ELoadCommand GetTaskId() const override {
@@ -928,33 +954,39 @@ private:
auto rowType = NIdxTest::CreateRow(resCol, *this);
auto rowsTypeString = NYdb::FormatType(rowType.GetType());
- TString sql;
- switch (Stmt_) {
- case IWorkLoader::LC_UPSERT:
- sql = "DECLARE %s AS List<%s>; UPSERT INTO `%s` SELECT * FROM AS_TABLE(%s);";
- break;
- case IWorkLoader::LC_REPLACE:
- sql = "DECLARE %s AS List<%s>; REPLACE INTO `%s` SELECT * FROM AS_TABLE(%s);";
- break;
- case IWorkLoader::LC_INSERT:
- sql = "DECLARE %s AS List<%s>; INSERT INTO `%s` SELECT * FROM AS_TABLE(%s);";
- break;
- case IWorkLoader::LC_UPDATE_ON:
- sql = "DECLARE %s AS List<%s>; UPDATE `%s` ON SELECT * FROM AS_TABLE(%s);";
- break;
- case IWorkLoader::LC_DELETE_ON:
- sql = "DECLARE %s AS List<%s>; DELETE FROM `%s` ON SELECT * FROM AS_TABLE(%s);";
- break;
- default:
- ythrow yexception() << "unsupported statement";
- break;
+ TString program;
+ for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) {
+ const TString paramName = Sprintf("$items_%zu", opIndex);
+ program += Sprintf("DECLARE %s AS List<%s>;\n", paramName.c_str(), rowsTypeString.c_str());
}
- auto program = Sprintf(sql.c_str(),
- ParamName_.c_str(),
- rowsTypeString.c_str(),
- TableName_.c_str(),
- ParamName_.c_str());
+ for (size_t opIndex = 0; opIndex < OpsPerTx_; ++opIndex) {
+ TString sql;
+ const TString paramName = Sprintf("$items_%zu", opIndex);
+
+ switch (Stmt_) {
+ case IWorkLoader::LC_UPSERT:
+ sql = "UPSERT INTO `%s` SELECT * FROM AS_TABLE(%s);\n";
+ break;
+ case IWorkLoader::LC_REPLACE:
+ sql = "REPLACE INTO `%s` SELECT * FROM AS_TABLE(%s);\n";
+ break;
+ case IWorkLoader::LC_INSERT:
+ sql = "INSERT INTO `%s` SELECT * FROM AS_TABLE(%s);\n";
+ break;
+ case IWorkLoader::LC_UPDATE_ON:
+ sql = "UPDATE `%s` ON SELECT * FROM AS_TABLE(%s);\n";
+ break;
+ case IWorkLoader::LC_DELETE_ON:
+ sql = "DELETE FROM `%s` ON SELECT * FROM AS_TABLE(%s);\n";
+ break;
+ default:
+ ythrow yexception() << "unsupported statement";
+ break;
+ }
+
+ program += Sprintf(sql.c_str(), TableName_.c_str(), paramName.c_str());
+ }
return {SetPragmas(program), resCol};
}
@@ -975,9 +1007,9 @@ private:
TString TableName_;
TTableClient Client_;
const IWorkLoader::ELoadCommand Stmt_;
+ const size_t OpsPerTx_;
// program, columns
TVector<std::pair<TString, TVector<TColumn>>> Programs_;
- const TString ParamName_ = "$items";
};
class TTimingTracker {
@@ -1075,31 +1107,31 @@ public:
switch (i & loadCommands) {
case IWorkLoader::LC_UPSERT:
tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>(
- tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPSERT), 1000));
+ tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPSERT, settings.OpsPerTx), 1000));
break;
case IWorkLoader::LC_INSERT:
tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>(
- tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_INSERT), 1000));
+ tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_INSERT, settings.OpsPerTx), 1000));
break;
case IWorkLoader::LC_UPDATE:
tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamItemsTask>(
- tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE), 1000));
+ tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE, settings.OpsPerTx), 1000));
break;
case IWorkLoader::LC_UPDATE_ON:
tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>(
- tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE_ON), 1000));
+ tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_UPDATE_ON, settings.OpsPerTx), 1000));
break;
case IWorkLoader::LC_REPLACE:
tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>(
- tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_REPLACE), 1000));
+ tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_REPLACE, settings.OpsPerTx), 1000));
break;
case IWorkLoader::LC_DELETE:
tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamItemsTask>(
- tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE), 1000));
+ tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE, settings.OpsPerTx), 1000));
break;
case IWorkLoader::LC_DELETE_ON:
tasks.emplace_back(std::make_pair(std::make_unique<TUpdateViaParamListTask>(
- tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE_ON), 1000));
+ tableDescription.GetRef(), tableName, Client_, IWorkLoader::LC_DELETE_ON, settings.OpsPerTx), 1000));
break;
case IWorkLoader::LC_SELECT:
tasks.emplace_back(std::make_pair(std::make_unique<TSelectAndCompareTask>(
diff --git a/ydb/services/ydb/ydb_index_ut.cpp b/ydb/services/ydb/ydb_index_ut.cpp
index aa26d04bb6f..7692b3a838e 100644
--- a/ydb/services/ydb/ydb_index_ut.cpp
+++ b/ydb/services/ydb/ydb_index_ut.cpp
@@ -67,7 +67,7 @@ static void RunTest(ui32 shardsCount, ui32 rowsCount, ui32 indexCount, const TRu
IWorkLoader::LC_UPDATE_ON |
IWorkLoader::LC_DELETE_ON |
IWorkLoader::LC_DELETE;
- workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{rowsCount, 5});
+ workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{rowsCount, 5, 1});
auto checker = CreateChecker(driver);
checker->Run("Root/Test");
driver.Stop(true);
@@ -153,7 +153,7 @@ Y_UNIT_TEST_SUITE(YdbIndexTable) {
ui32 stms =
IWorkLoader::LC_UPSERT |
(withDataColumn ? IWorkLoader::LC_ALTER_ADD_INDEX_WITH_DATA_COLUMN : IWorkLoader::LC_ALTER_ADD_INDEX);
- workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{2000, 1});
+ workLoader->Run("Root/Test", stms, IWorkLoader::TRunSettings{2000, 1, 1});
auto checker = CreateChecker(driver);
checker->Run("Root/Test");
driver.Stop(true);