diff options
author | kungasc <kungasc@yandex-team.com> | 2023-08-24 18:53:18 +0300 |
---|---|---|
committer | kungasc <kungasc@yandex-team.com> | 2023-08-24 19:12:58 +0300 |
commit | 036e55c4182c5cc43a0b7e6af296d52cc5aa1d95 (patch) | |
tree | e1c7a1dc0f55a4ee493eb49bd86e9d28ff239a97 | |
parent | 3f437a5ee5b80f2ed451775761ddd143e017c067 (diff) | |
download | ydb-036e55c4182c5cc43a0b7e6af296d52cc5aa1d95.tar.gz |
KIKIMR-19119 mixed kv workload
-rw-r--r-- | ydb/library/workload/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/workload/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/library/workload/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/workload/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/workload/kv_workload.cpp | 318 | ||||
-rw-r--r-- | ydb/library/workload/kv_workload.h | 54 | ||||
-rw-r--r-- | ydb/library/workload/workload_query_generator.h | 5 | ||||
-rw-r--r-- | ydb/library/workload/ya.make | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.cpp | 54 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.h | 20 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 21 |
11 files changed, 429 insertions, 48 deletions
diff --git a/ydb/library/workload/CMakeLists.darwin-x86_64.txt b/ydb/library/workload/CMakeLists.darwin-x86_64.txt index 948585439c..69416c94e7 100644 --- a/ydb/library/workload/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/workload/CMakeLists.darwin-x86_64.txt @@ -12,6 +12,7 @@ target_link_libraries(ydb-library-workload PUBLIC contrib-libs-cxxsupp yutil api-protos + cpp-client-ydb_table ) target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp diff --git a/ydb/library/workload/CMakeLists.linux-aarch64.txt b/ydb/library/workload/CMakeLists.linux-aarch64.txt index ca62ddeed5..d5dba36558 100644 --- a/ydb/library/workload/CMakeLists.linux-aarch64.txt +++ b/ydb/library/workload/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ target_link_libraries(ydb-library-workload PUBLIC contrib-libs-cxxsupp yutil api-protos + cpp-client-ydb_table ) target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp diff --git a/ydb/library/workload/CMakeLists.linux-x86_64.txt b/ydb/library/workload/CMakeLists.linux-x86_64.txt index ca62ddeed5..d5dba36558 100644 --- a/ydb/library/workload/CMakeLists.linux-x86_64.txt +++ b/ydb/library/workload/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(ydb-library-workload PUBLIC contrib-libs-cxxsupp yutil api-protos + cpp-client-ydb_table ) target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp diff --git a/ydb/library/workload/CMakeLists.windows-x86_64.txt b/ydb/library/workload/CMakeLists.windows-x86_64.txt index 948585439c..69416c94e7 100644 --- a/ydb/library/workload/CMakeLists.windows-x86_64.txt +++ b/ydb/library/workload/CMakeLists.windows-x86_64.txt @@ -12,6 +12,7 @@ target_link_libraries(ydb-library-workload PUBLIC contrib-libs-cxxsupp yutil api-protos + cpp-client-ydb_table ) target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp index 23ca57117e..5588105535 100644 --- a/ydb/library/workload/kv_workload.cpp +++ b/ydb/library/workload/kv_workload.cpp @@ -12,6 +12,7 @@ #include <thread> #include <random> #include <sstream> +#include <chrono> template <> void Out<NYdbWorkload::KvWorkloadConstants>(IOutputStream& out, NYdbWorkload::KvWorkloadConstants constant) @@ -21,11 +22,124 @@ void Out<NYdbWorkload::KvWorkloadConstants>(IOutputStream& out, NYdbWorkload::Kv namespace NYdbWorkload { +using TRow = TKvWorkloadGenerator::TRow; + +// Note: there is no mechanism to update row values for now so all keys should be different +struct TRowsVerifyer { + TRowsVerifyer(size_t capacity = 10000) + : Capacity(capacity) + { + Rows.reserve(Capacity); + } + + void TryInsert(TRow row) { + std::unique_lock<std::mutex> lock(Mutex, std::try_to_lock); + if (!lock.owns_lock()) { + return; + } + + if (Rows.size() < Capacity) { + Rows.push_back(row); + return; + } + + Rows[RandomNumber<size_t>(Rows.size())] = row; + } + + std::optional<TRow> GetRandom() { + std::unique_lock<std::mutex> lock(Mutex, std::try_to_lock); + if (!lock.owns_lock()) { + return { }; + } + + if (!Rows.size()) { + return { }; + } + + size_t index = RandomNumber<size_t>(Rows.size()); + return Rows[index]; + } + +private: + const size_t Capacity; + std::mutex Mutex; + TVector<TRow> Rows; +}; + +TRowsVerifyer RowsVerifyer; + +void Fail() { + // Note: sleep helps to detect more fails + std::this_thread::sleep_for(std::chrono::seconds(3)); + Y_FAIL(); +} + +void AddResultSet(const NYdb::TResultSet& resultSet, TVector<TRow>& rows) { + NYdb::TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + TRow row; + + for (size_t col = 0; col < parser.ColumnsCount(); col++) { + auto& valueParser = parser.ColumnParser(col); + if (valueParser.GetPrimitiveType() == NYdb::EPrimitiveType::Uint64) { + row.Ints.push_back(valueParser.GetUint64()); + } else { + row.Strings.push_back(valueParser.GetString()); + } + } + + rows.push_back(std::move(row)); + } +} + +void VerifyRows(const TRow& checkRow, const TVector<TRow>& readRows, TString message) { + if (readRows.empty()) { + Cerr << "Expected to have " << checkRow.ToString() + << " but got empty " + << message + << Endl; + + Fail(); + } + + if (readRows.size() > 1) { + Cerr << "Expected to have " << checkRow.ToString() + << " but got " << readRows.size() << " rows " + << message + << Endl; + + for (auto r : readRows) { + Cerr << r.ToString() << Endl; + } + + Fail(); + } + + if (readRows[0] != checkRow) { + Cerr << "Expected to have " << checkRow.ToString() + << " but got " << readRows[0].ToString() << " " + << message + << Endl; + + Fail(); + } else { + // Cerr << "OK " << checkRow.ToString() << " " << message << Endl; + } +} + + TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params) - : DbPath(params->DbPath) - , Params(*params) + : Params(*params) , BigString(NKikimr::GenDataForLZ4(Params.StringLen)) { + if (Params.MixedChangePartitionsSize) { + MixedNextChangePartitionsSize = Now(); + } else { + MixedNextChangePartitionsSize = TInstant::Max(); + } + + Y_VERIFY(Params.IntColumnsCnt <= Params.ColumnsCnt); + Y_VERIFY(Params.KeyColumnsCnt <= Params.ColumnsCnt); } TKvWorkloadParams* TKvWorkloadGenerator::GetParams() { @@ -33,18 +147,16 @@ TKvWorkloadParams* TKvWorkloadGenerator::GetParams() { } std::string TKvWorkloadGenerator::GetDDLQueries() const { - std::string partsNum = std::to_string(Params.MinPartitions); - std::stringstream ss; ss << "--!syntax_v1\n"; - ss << "CREATE TABLE `" << DbPath << "/kv_test`("; + ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`("; for (size_t i = 0; i < Params.ColumnsCnt; ++i) { if (i < Params.IntColumnsCnt) { - ss << "c" << i << " Uint64, "; + ss << "c" << i << " Uint64 NOT NULL, "; } else { - ss << "c" << i << " String, "; + ss << "c" << i << " String NOT NULL, "; } } @@ -60,8 +172,10 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const { if (Params.PartitionsByLoad) { ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; } - ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << partsNum << ", " - << "UNIFORM_PARTITIONS = " << partsNum << ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; + ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ", "; + ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; + ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", "; + ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ")"; return ss.str(); } @@ -69,20 +183,22 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const { TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) { switch (static_cast<EType>(type)) { case EType::UpsertRandom: - return UpsertRandom(); + return Upsert(GenerateRandomRows()); case EType::InsertRandom: - return InsertRandom(); + return Insert(GenerateRandomRows()); case EType::SelectRandom: - return SelectRandom(); + return Select(GenerateRandomRows()); case EType::ReadRowsRandom: - return ReadRowsRandom(); + return ReadRows(GenerateRandomRows()); + case EType::Mixed: + return Mixed(); default: return TQueryInfoList(); } } -TQueryInfoList TKvWorkloadGenerator::AddOperation(TString operation) { +TQueryInfoList TKvWorkloadGenerator::WriteRows(TString operation, TVector<TRow>&& rows) { std::stringstream ss; NYdb::TParamsBuilder paramsBuilder; @@ -98,16 +214,14 @@ TQueryInfoList TKvWorkloadGenerator::AddOperation(TString operation) { ss << "DECLARE " << cname << " AS String;\n"; } if (col < Params.IntColumnsCnt) { - ui64 val = col < Params.KeyColumnsCnt ? RandomNumber<ui64>(Params.MaxFirstKey) : RandomNumber<ui64>(); - paramsBuilder.AddParam(cname).Uint64(val).Build(); + paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col]).Build(); } else { - TString val = col < Params.KeyColumnsCnt ? TString(std::format("{:x}", RandomNumber<ui64>())) : BigString; - paramsBuilder.AddParam(cname).String(val).Build(); + paramsBuilder.AddParam(cname).String(rows[row].Strings[col - Params.IntColumnsCnt]).Build(); } } } - ss << operation << " INTO `kv_test` ("; + ss << operation << " INTO `" << Params.TableName << "` ("; for (size_t col = 0; col < Params.ColumnsCnt; ++col) { ss << "c" << col; @@ -138,15 +252,15 @@ TQueryInfoList TKvWorkloadGenerator::AddOperation(TString operation) { return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); } -TQueryInfoList TKvWorkloadGenerator::UpsertRandom() { - return AddOperation("UPSERT"); +TQueryInfoList TKvWorkloadGenerator::Upsert(TVector<TRow>&& rows) { + return WriteRows("UPSERT", std::move(rows)); } -TQueryInfoList TKvWorkloadGenerator::InsertRandom() { - return AddOperation("INSERT"); +TQueryInfoList TKvWorkloadGenerator::Insert(TVector<TRow>&& rows) { + return WriteRows("INSERT", std::move(rows)); } -TQueryInfoList TKvWorkloadGenerator::SelectRandom() { +TQueryInfoList TKvWorkloadGenerator::Select(TVector<TRow>&& rows) { std::stringstream ss; NYdb::TParamsBuilder paramsBuilder; @@ -158,10 +272,10 @@ TQueryInfoList TKvWorkloadGenerator::SelectRandom() { TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col); if (col < Params.IntColumnsCnt) { ss << "DECLARE " << paramName << " AS Uint64;\n"; - paramsBuilder.AddParam(paramName).Uint64(RandomNumber<ui64>(Params.MaxFirstKey)).Build(); + paramsBuilder.AddParam(paramName).Uint64(rows[row].Ints[col]).Build(); } else { ss << "DECLARE " << paramName << " AS String;\n"; - paramsBuilder.AddParam(paramName).String(std::format("{:x}", RandomNumber<ui64>())).Build(); + paramsBuilder.AddParam(paramName).String(rows[row].Strings[col - Params.IntColumnsCnt]).Build(); } } } @@ -175,7 +289,7 @@ TQueryInfoList TKvWorkloadGenerator::SelectRandom() { ss << " "; } - ss << "FROM `kv_test` WHERE "; + ss << "FROM `" << Params.TableName << "` WHERE "; for (size_t row = 0; row < Params.RowsCnt; ++row) { for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) { TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col); @@ -194,16 +308,16 @@ TQueryInfoList TKvWorkloadGenerator::SelectRandom() { return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); } -TQueryInfoList TKvWorkloadGenerator::ReadRowsRandom() { +TQueryInfoList TKvWorkloadGenerator::ReadRows(TVector<TRow>&& rows) { NYdb::TValueBuilder keys; keys.BeginList(); - for (size_t i = 0; i < Params.RowsCnt; ++i) { + for (size_t row = 0; row < Params.RowsCnt; ++row) { keys.AddListItem().BeginStruct(); for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) { if (col < Params.IntColumnsCnt) { - keys.AddMember("c" + std::to_string(col)).Uint64(RandomNumber<ui64>(Params.MaxFirstKey)); + keys.AddMember("c" + std::to_string(col)).Uint64(rows[row].Ints[col]); } else { - keys.AddMember("c" + std::to_string(col)).String(std::format("{:x}", RandomNumber<ui64>())); + keys.AddMember("c" + std::to_string(col)).String(rows[row].Strings[col - Params.IntColumnsCnt]); } } keys.EndStruct(); @@ -211,16 +325,118 @@ TQueryInfoList TKvWorkloadGenerator::ReadRowsRandom() { keys.EndList(); TQueryInfo info; - info.TablePath = DbPath + "/kv_test"; + info.TablePath = Params.DbPath + "/" + Params.TableName; info.UseReadRows = true; info.KeyToRead = keys.Build(); return TQueryInfoList(1, std::move(info)); } +TQueryInfoList TKvWorkloadGenerator::Mixed() { + static thread_local TRow lastRow; + + if (Params.MixedChangePartitionsSize && RandomNumber<ui32>(1000) == 0) { + TInstant nextChange = MixedNextChangePartitionsSize; + if (nextChange <= Now() && MixedNextChangePartitionsSize.compare_exchange_strong(nextChange, nextChange + TDuration::Seconds(RandomNumber<ui32>(60*20)))) { + auto alterTable = NYdb::NTable::TAlterTableSettings() + .AlterPartitioningSettings(NYdb::NTable::TPartitioningSettingsBuilder() + .SetPartitionSizeMb(RandomNumber<ui32>(1000) + 10).Build()); + + TQueryInfo info; + info.TablePath = Params.DbPath + "/" + Params.TableName; + info.AlterTable = alterTable; + return TQueryInfoList(1, std::move(info)); + } + } + + if (RandomNumber<ui32>(2) == 0) { // write + TVector<TRow> rows = GenerateRandomRows(true); + lastRow = rows[0]; + + auto upsertQuery = Upsert(std::move(rows)); + + upsertQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) { + if (queryResult.IsSuccess()) { + // Note: helps to keep old values too + if (RandomNumber<ui32>(1000) == 0) { + RowsVerifyer.TryInsert(lastRow); + } + } + }; + + return upsertQuery; + } else { // read + auto checkRow = RowsVerifyer.GetRandom(); + if (checkRow) { + lastRow = checkRow.value(); + } else { + lastRow.Ints.clear(); + lastRow.Strings.clear(); + } + + TVector<TRow> rows = checkRow ? TVector<TRow>{checkRow.value()} : GenerateRandomRows(true); + + bool doReadRows = false; + bool doSelect = false; + if (!Params.MixedDoReadRows || !Params.MixedDoSelect) { + if (Params.MixedDoReadRows) doReadRows = true; + if (Params.MixedDoSelect) doSelect = true; + } else { + if (RandomNumber<ui32>(2) == 0) + doReadRows = true; + else + doSelect = true; + } + Y_VERIFY(doReadRows ^ doSelect); + + if (doSelect) { + auto selectQuery = Select(std::move(rows)); + + if (checkRow) { + selectQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) { + if (queryResult.IsSuccess()) { + TVector<TRow> readRows; + for (auto& resultSet : queryResult.GetResultSets()) { + AddResultSet(resultSet, readRows); + } + TString queryStatus = "Status: " + std::to_string(int(queryResult.GetStatus())) + " Method: SELECT" + " Issues: " + queryResult.GetIssues().ToOneLineString() + " Meta: "; + for (auto m : queryResult.GetResponseMetadata()) { + queryStatus += " " + m.first + "=" + m.second; + } + VerifyRows(lastRow, std::move(readRows), queryStatus); + } + }; + } + + return selectQuery; + } + if (doReadRows) { + auto readRowsQuery = ReadRows(std::move(rows)); + + if (checkRow) { + readRowsQuery.front().ReadRowsResultCallback = [](NYdb::NTable::TReadRowsResult queryResult) { + if (queryResult.IsSuccess()) { + TVector<TRow> readRows; + AddResultSet(queryResult.GetResultSet(), readRows); + TString queryStatus = "Status: " + std::to_string(int(queryResult.GetStatus())) + " Method: ReadRows" + " Issues: " + queryResult.GetIssues().ToOneLineString() + " Meta: "; + for (auto m : queryResult.GetResponseMetadata()) { + queryStatus += " " + m.first + "=" + m.second; + } + VerifyRows(lastRow, std::move(readRows), queryStatus); + } + }; + } + + return readRowsQuery; + } + } + + return TQueryInfoList(); +} + TQueryInfoList TKvWorkloadGenerator::GetInitialData() { TQueryInfoList res; for (size_t i = 0; i < Params.InitRowCount; ++i) { - auto queryInfos = UpsertRandom(); + auto queryInfos = Upsert(GenerateRandomRows()); res.insert(res.end(), queryInfos.begin(), queryInfos.end()); } @@ -228,11 +444,41 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() { } std::string TKvWorkloadGenerator::GetCleanDDLQueries() const { - std::string query = R"( - DROP TABLE `kv_test`; - )"; + std::string query = "DROP TABLE `" + Params.TableName + "`;"; return query; } +TVector<TRow> TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) { + TVector<TRow> result(Params.RowsCnt); + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + result[row].Ints.resize(Params.IntColumnsCnt); + result[row].Strings.resize(Params.ColumnsCnt - Params.IntColumnsCnt); + + for (size_t col = 0; col < Params.ColumnsCnt; ++col) { + if (col < Params.IntColumnsCnt) { + ui64 val = col < Params.KeyColumnsCnt ? RandomNumber<ui64>(Params.MaxFirstKey) : RandomNumber<ui64>(); + result[row].Ints[col] = val; + } else { + TString val; + if (col < Params.KeyColumnsCnt) { + val = TString(std::format("{:x}", RandomNumber<ui64>(Params.MaxFirstKey))); + } else if (randomValues) { + val = TString(Params.StringLen, '_'); + for (size_t i = 0; i < Params.StringLen; i++) { + val[i] = (char)('a' + RandomNumber<u_char>(26)); + } + } else { + val = BigString; + } + result[row].Strings[col - Params.IntColumnsCnt] = val; + } + } + } + + return result; +} + + }
\ No newline at end of file diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h index 96a95e95a3..5ea7ca4fe4 100644 --- a/ydb/library/workload/kv_workload.h +++ b/ydb/library/workload/kv_workload.h @@ -4,11 +4,14 @@ #include <cctype> #include <random> +#include <sstream> namespace NYdbWorkload { enum KvWorkloadConstants : ui64 { MIN_PARTITIONS = 40, + MAX_PARTITIONS = 1000, + PARTITION_SIZE_MB = 2000, INIT_ROW_COUNT = 1000, MAX_FIRST_KEY = Max<ui64>(), STRING_LEN = 8, @@ -16,11 +19,17 @@ enum KvWorkloadConstants : ui64 { INT_COLUMNS_CNT = 1, KEY_COLUMNS_CNT = 1, ROWS_CNT = 1, - PARTITIONS_BY_LOAD = true + PARTITIONS_BY_LOAD = true, + + MIXED_CHANGE_PARTITIONS_SIZE = false, + MIXED_DO_READ_ROWS = true, + MIXED_DO_SELECT = true, }; struct TKvWorkloadParams : public TWorkloadParams { ui64 MinPartitions = KvWorkloadConstants::MIN_PARTITIONS; + const ui64 MaxPartitions = KvWorkloadConstants::MAX_PARTITIONS; + ui64 PartitionSizeMb = KvWorkloadConstants::PARTITION_SIZE_MB; ui64 InitRowCount = KvWorkloadConstants::INIT_ROW_COUNT; ui64 MaxFirstKey = KvWorkloadConstants::MAX_FIRST_KEY; ui64 StringLen = KvWorkloadConstants::STRING_LEN; @@ -29,10 +38,37 @@ struct TKvWorkloadParams : public TWorkloadParams { ui64 KeyColumnsCnt = KvWorkloadConstants::KEY_COLUMNS_CNT; ui64 RowsCnt = KvWorkloadConstants::ROWS_CNT; bool PartitionsByLoad = KvWorkloadConstants::PARTITIONS_BY_LOAD; + + ui64 MixedChangePartitionsSize = KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE; + ui64 MixedDoReadRows = KvWorkloadConstants::MIXED_DO_READ_ROWS; + ui64 MixedDoSelect = KvWorkloadConstants::MIXED_DO_SELECT; + + const std::string TableName = "kv_test"; }; class TKvWorkloadGenerator : public IWorkloadQueryGenerator { public: + struct TRow { + TVector<ui64> Ints; + TVector<TString> Strings; + + TString ToString() const { + std::stringstream ss; + ss << "( "; + for (auto i : Ints) { + ss << i << " "; + } + for (auto s : Strings) { + ss << s << " "; + } + ss << ")"; + return ss.str(); + } + + bool operator == (const TRow &other) const { + return Ints == other.Ints && Strings == other.Strings; + } + }; static TKvWorkloadGenerator* New(const TKvWorkloadParams* params) { return new TKvWorkloadGenerator(params); @@ -55,23 +91,27 @@ public: InsertRandom, SelectRandom, ReadRowsRandom, + Mixed, MaxType }; private: - TQueryInfoList UpsertRandom(); - TQueryInfoList InsertRandom(); - TQueryInfoList SelectRandom(); - TQueryInfoList ReadRowsRandom(); + TQueryInfoList Upsert(TVector<TRow>&& rows); + TQueryInfoList Insert(TVector<TRow>&& rows); + TQueryInfoList WriteRows(TString operation, TVector<TRow>&& rows); + TQueryInfoList Select(TVector<TRow>&& rows); + TQueryInfoList ReadRows(TVector<TRow>&& rows); + TQueryInfoList Mixed(); TKvWorkloadGenerator(const TKvWorkloadParams* params); TQueryInfo FillKvData() const; - TQueryInfoList AddOperation(TString operation); + TVector<TRow> GenerateRandomRows(bool randomValues = false); - std::string DbPath; TKvWorkloadParams Params; TString BigString; + + std::atomic<TInstant> MixedNextChangePartitionsSize; }; } // namespace NYdbWorkload diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h index 1f79bb4e2f..19815e95b1 100644 --- a/ydb/library/workload/workload_query_generator.h +++ b/ydb/library/workload/workload_query_generator.h @@ -1,5 +1,6 @@ #pragma once +#include "ydb/public/sdk/cpp/client/ydb_table/table.h" #include <ydb/public/sdk/cpp/client/ydb_params/params.h> #include <ydb/public/sdk/cpp/client/ydb_value/value.h> @@ -26,6 +27,10 @@ struct TQueryInfo { bool UseReadRows = false; TString TablePath; std::optional<NYdb::TValue> KeyToRead; + std::optional<NYdb::NTable::TAlterTableSettings> AlterTable; + + std::optional<std::function<void(NYdb::NTable::TReadRowsResult)>> ReadRowsResultCallback; + std::optional<std::function<void(NYdb::NTable::TDataQueryResult)>> DataQueryResultCallback; }; using TQueryInfoList = std::list<TQueryInfo>; diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make index c947999b82..1e593d3d97 100644 --- a/ydb/library/workload/ya.make +++ b/ydb/library/workload/ya.make @@ -8,6 +8,7 @@ SRCS( PEERDIR( ydb/public/api/protos + ydb/public/sdk/cpp/client/ydb_table ) END() diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp index 1e04e17873..d1d999a116 100644 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp @@ -36,6 +36,8 @@ void TCommandKvInit::Config(TConfig& config) { .DefaultValue(NYdbWorkload::KvWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount); config.Opts->AddLongOption("min-partitions", "Minimum partitions for tables.") .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); + config.Opts->AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSize); config.Opts->AddLongOption("auto-partition", "Enable auto partitioning by load.") .DefaultValue(NYdbWorkload::KvWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad); config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") @@ -63,6 +65,7 @@ int TCommandKvInit::Run(TConfig& config) { params.DbPath = config.Database; params.InitRowCount = InitRowCount; params.MinPartitions = MinPartitions; + params.PartitionSizeMb = PartitionSize; params.PartitionsByLoad = PartitionsByLoad; params.MaxFirstKey = MaxFirstKey; params.StringLen = StringLen; @@ -109,6 +112,7 @@ TCommandKvRun::TCommandKvRun() AddCommand(std::make_unique<TCommandKvRunInsertRandom>()); AddCommand(std::make_unique<TCommandKvRunSelectRandom>()); AddCommand(std::make_unique<TCommandKvRunReadRowsRandom>()); + AddCommand(std::make_unique<TCommandKvRunMixed>()); } TCommandKvRunUpsertRandom::TCommandKvRunUpsertRandom() @@ -281,4 +285,54 @@ int TCommandKvRunReadRowsRandom::Run(TConfig& config) { return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::ReadRowsRandom)); } +TCommandKvRunMixed::TCommandKvRunMixed() + : TWorkloadCommand("mixed", {}, "Writes and SELECT/ReadsRows rows randomly") +{} + +void TCommandKvRunMixed::Config(TConfig& config) { + TWorkloadCommand::Config(config); + config.SetFreeArgsNum(0); + + config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); + config.Opts->AddLongOption("len", "String len") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + config.Opts->AddLongOption("cols", "Number of columns to select for a single query") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); + config.Opts->AddLongOption("int-cols", "Number of int columns") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + config.Opts->AddLongOption("key-cols", "Number of key columns") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + config.Opts->AddLongOption("change-partitions-size", "Apply random changes of AUTO_PARTITIONING_PARTITION_SIZE_MB setting.") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE).StoreResult(&ChangePartitionsSize); + config.Opts->AddLongOption("do-select", "Do SELECT operations.") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_DO_SELECT).StoreResult(&DoSelect); + config.Opts->AddLongOption("do-read-rows", "Do ReadRows operations.") + .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_DO_READ_ROWS).StoreResult(&DoReadRows); +} + +void TCommandKvRunMixed::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandKvRunMixed::Run(TConfig& config) { + PrepareForRun(config); + + NYdbWorkload::TKvWorkloadParams params; + params.DbPath = config.Database; + params.MaxFirstKey = MaxFirstKey; + params.StringLen = StringLen; + params.ColumnsCnt = ColumnsCnt; + params.IntColumnsCnt = IntColumnsCnt; + params.KeyColumnsCnt = KeyColumnsCnt; + params.MixedChangePartitionsSize = ChangePartitionsSize; + params.MixedDoReadRows = DoReadRows; + params.MixedDoSelect = DoSelect; + + NYdbWorkload::TWorkloadFactory factory; + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); + + return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::Mixed)); +} + } // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h index 81cce96564..c1eaa49f72 100644 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.h +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.h @@ -20,6 +20,7 @@ public: private: ui64 InitRowCount; ui64 MinPartitions; + ui64 PartitionSize; ui64 MaxFirstKey; ui64 StringLen; ui64 ColumnsCnt; @@ -108,5 +109,24 @@ private: }; +class TCommandKvRunMixed : public TWorkloadCommand { +public: + TCommandKvRunMixed(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + +private: + ui64 MaxFirstKey; + ui64 StringLen; + ui64 ColumnsCnt; + ui64 IntColumnsCnt; + ui64 KeyColumnsCnt; + bool ChangePartitionsSize; + bool DoReadRows; + bool DoSelect; + +}; + } } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 4ba575ea5f..baf6454146 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -118,17 +118,28 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co NYdbWorkload::TQueryInfo queryInfo; auto runQuery = [this, &queryInfo, &querySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus { ++retryCount; - TStatus result(EStatus::SUCCESS, NYql::TIssues()); - if (queryInfo.UseReadRows) { - result = TableClient->ReadRows(queryInfo.TablePath, std::move(*queryInfo.KeyToRead)) + if (queryInfo.AlterTable) { + auto result = TableClient->RetryOperationSync([&queryInfo](NTable::TSession session) { + return session.AlterTable(queryInfo.TablePath, queryInfo.AlterTable.value()).GetValueSync(); + }); + return result; + } else if (queryInfo.UseReadRows) { + auto result = TableClient->ReadRows(queryInfo.TablePath, std::move(*queryInfo.KeyToRead)) .GetValueSync(); + if (queryInfo.ReadRowsResultCallback) { + queryInfo.ReadRowsResultCallback.value()(result); + } + return result; } else { - result = session.ExecuteDataQuery(queryInfo.Query.c_str(), + auto result = session.ExecuteDataQuery(queryInfo.Query.c_str(), NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), queryInfo.Params, querySettings ).GetValueSync(); + if (queryInfo.DataQueryResultCallback) { + queryInfo.DataQueryResultCallback.value()(result); + } + return result; } - return result; }; while (Now() < StopTime) { |