aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-08-24 18:53:18 +0300
committerkungasc <kungasc@yandex-team.com>2023-08-24 19:12:58 +0300
commit036e55c4182c5cc43a0b7e6af296d52cc5aa1d95 (patch)
treee1c7a1dc0f55a4ee493eb49bd86e9d28ff239a97
parent3f437a5ee5b80f2ed451775761ddd143e017c067 (diff)
downloadydb-036e55c4182c5cc43a0b7e6af296d52cc5aa1d95.tar.gz
KIKIMR-19119 mixed kv workload
-rw-r--r--ydb/library/workload/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/workload/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/workload/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/workload/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/workload/kv_workload.cpp318
-rw-r--r--ydb/library/workload/kv_workload.h54
-rw-r--r--ydb/library/workload/workload_query_generator.h5
-rw-r--r--ydb/library/workload/ya.make1
-rw-r--r--ydb/public/lib/ydb_cli/commands/kv_workload.cpp54
-rw-r--r--ydb/public/lib/ydb_cli/commands/kv_workload.h20
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp21
11 files changed, 429 insertions, 48 deletions
diff --git a/ydb/library/workload/CMakeLists.darwin-x86_64.txt b/ydb/library/workload/CMakeLists.darwin-x86_64.txt
index 948585439c..69416c94e7 100644
--- a/ydb/library/workload/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/workload/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,7 @@ target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-cxxsupp
yutil
api-protos
+ cpp-client-ydb_table
)
target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
diff --git a/ydb/library/workload/CMakeLists.linux-aarch64.txt b/ydb/library/workload/CMakeLists.linux-aarch64.txt
index ca62ddeed5..d5dba36558 100644
--- a/ydb/library/workload/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/workload/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-cxxsupp
yutil
api-protos
+ cpp-client-ydb_table
)
target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
diff --git a/ydb/library/workload/CMakeLists.linux-x86_64.txt b/ydb/library/workload/CMakeLists.linux-x86_64.txt
index ca62ddeed5..d5dba36558 100644
--- a/ydb/library/workload/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/workload/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-cxxsupp
yutil
api-protos
+ cpp-client-ydb_table
)
target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
diff --git a/ydb/library/workload/CMakeLists.windows-x86_64.txt b/ydb/library/workload/CMakeLists.windows-x86_64.txt
index 948585439c..69416c94e7 100644
--- a/ydb/library/workload/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/workload/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,7 @@ target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-cxxsupp
yutil
api-protos
+ cpp-client-ydb_table
)
target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp
index 23ca57117e..5588105535 100644
--- a/ydb/library/workload/kv_workload.cpp
+++ b/ydb/library/workload/kv_workload.cpp
@@ -12,6 +12,7 @@
#include <thread>
#include <random>
#include <sstream>
+#include <chrono>
template <>
void Out<NYdbWorkload::KvWorkloadConstants>(IOutputStream& out, NYdbWorkload::KvWorkloadConstants constant)
@@ -21,11 +22,124 @@ void Out<NYdbWorkload::KvWorkloadConstants>(IOutputStream& out, NYdbWorkload::Kv
namespace NYdbWorkload {
+using TRow = TKvWorkloadGenerator::TRow;
+
+// Note: there is no mechanism to update row values for now so all keys should be different
+struct TRowsVerifyer {
+ TRowsVerifyer(size_t capacity = 10000)
+ : Capacity(capacity)
+ {
+ Rows.reserve(Capacity);
+ }
+
+ void TryInsert(TRow row) {
+ std::unique_lock<std::mutex> lock(Mutex, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ return;
+ }
+
+ if (Rows.size() < Capacity) {
+ Rows.push_back(row);
+ return;
+ }
+
+ Rows[RandomNumber<size_t>(Rows.size())] = row;
+ }
+
+ std::optional<TRow> GetRandom() {
+ std::unique_lock<std::mutex> lock(Mutex, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ return { };
+ }
+
+ if (!Rows.size()) {
+ return { };
+ }
+
+ size_t index = RandomNumber<size_t>(Rows.size());
+ return Rows[index];
+ }
+
+private:
+ const size_t Capacity;
+ std::mutex Mutex;
+ TVector<TRow> Rows;
+};
+
+TRowsVerifyer RowsVerifyer;
+
+void Fail() {
+ // Note: sleep helps to detect more fails
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+ Y_FAIL();
+}
+
+void AddResultSet(const NYdb::TResultSet& resultSet, TVector<TRow>& rows) {
+ NYdb::TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ TRow row;
+
+ for (size_t col = 0; col < parser.ColumnsCount(); col++) {
+ auto& valueParser = parser.ColumnParser(col);
+ if (valueParser.GetPrimitiveType() == NYdb::EPrimitiveType::Uint64) {
+ row.Ints.push_back(valueParser.GetUint64());
+ } else {
+ row.Strings.push_back(valueParser.GetString());
+ }
+ }
+
+ rows.push_back(std::move(row));
+ }
+}
+
+void VerifyRows(const TRow& checkRow, const TVector<TRow>& readRows, TString message) {
+ if (readRows.empty()) {
+ Cerr << "Expected to have " << checkRow.ToString()
+ << " but got empty "
+ << message
+ << Endl;
+
+ Fail();
+ }
+
+ if (readRows.size() > 1) {
+ Cerr << "Expected to have " << checkRow.ToString()
+ << " but got " << readRows.size() << " rows "
+ << message
+ << Endl;
+
+ for (auto r : readRows) {
+ Cerr << r.ToString() << Endl;
+ }
+
+ Fail();
+ }
+
+ if (readRows[0] != checkRow) {
+ Cerr << "Expected to have " << checkRow.ToString()
+ << " but got " << readRows[0].ToString() << " "
+ << message
+ << Endl;
+
+ Fail();
+ } else {
+ // Cerr << "OK " << checkRow.ToString() << " " << message << Endl;
+ }
+}
+
+
TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)
- : DbPath(params->DbPath)
- , Params(*params)
+ : Params(*params)
, BigString(NKikimr::GenDataForLZ4(Params.StringLen))
{
+ if (Params.MixedChangePartitionsSize) {
+ MixedNextChangePartitionsSize = Now();
+ } else {
+ MixedNextChangePartitionsSize = TInstant::Max();
+ }
+
+ Y_VERIFY(Params.IntColumnsCnt <= Params.ColumnsCnt);
+ Y_VERIFY(Params.KeyColumnsCnt <= Params.ColumnsCnt);
}
TKvWorkloadParams* TKvWorkloadGenerator::GetParams() {
@@ -33,18 +147,16 @@ TKvWorkloadParams* TKvWorkloadGenerator::GetParams() {
}
std::string TKvWorkloadGenerator::GetDDLQueries() const {
- std::string partsNum = std::to_string(Params.MinPartitions);
-
std::stringstream ss;
ss << "--!syntax_v1\n";
- ss << "CREATE TABLE `" << DbPath << "/kv_test`(";
+ ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`(";
for (size_t i = 0; i < Params.ColumnsCnt; ++i) {
if (i < Params.IntColumnsCnt) {
- ss << "c" << i << " Uint64, ";
+ ss << "c" << i << " Uint64 NOT NULL, ";
} else {
- ss << "c" << i << " String, ";
+ ss << "c" << i << " String NOT NULL, ";
}
}
@@ -60,8 +172,10 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const {
if (Params.PartitionsByLoad) {
ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";
}
- ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << partsNum << ", "
- << "UNIFORM_PARTITIONS = " << partsNum << ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)";
+ ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ", ";
+ ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", ";
+ ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", ";
+ ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ")";
return ss.str();
}
@@ -69,20 +183,22 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const {
TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) {
switch (static_cast<EType>(type)) {
case EType::UpsertRandom:
- return UpsertRandom();
+ return Upsert(GenerateRandomRows());
case EType::InsertRandom:
- return InsertRandom();
+ return Insert(GenerateRandomRows());
case EType::SelectRandom:
- return SelectRandom();
+ return Select(GenerateRandomRows());
case EType::ReadRowsRandom:
- return ReadRowsRandom();
+ return ReadRows(GenerateRandomRows());
+ case EType::Mixed:
+ return Mixed();
default:
return TQueryInfoList();
}
}
-TQueryInfoList TKvWorkloadGenerator::AddOperation(TString operation) {
+TQueryInfoList TKvWorkloadGenerator::WriteRows(TString operation, TVector<TRow>&& rows) {
std::stringstream ss;
NYdb::TParamsBuilder paramsBuilder;
@@ -98,16 +214,14 @@ TQueryInfoList TKvWorkloadGenerator::AddOperation(TString operation) {
ss << "DECLARE " << cname << " AS String;\n";
}
if (col < Params.IntColumnsCnt) {
- ui64 val = col < Params.KeyColumnsCnt ? RandomNumber<ui64>(Params.MaxFirstKey) : RandomNumber<ui64>();
- paramsBuilder.AddParam(cname).Uint64(val).Build();
+ paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col]).Build();
} else {
- TString val = col < Params.KeyColumnsCnt ? TString(std::format("{:x}", RandomNumber<ui64>())) : BigString;
- paramsBuilder.AddParam(cname).String(val).Build();
+ paramsBuilder.AddParam(cname).String(rows[row].Strings[col - Params.IntColumnsCnt]).Build();
}
}
}
- ss << operation << " INTO `kv_test` (";
+ ss << operation << " INTO `" << Params.TableName << "` (";
for (size_t col = 0; col < Params.ColumnsCnt; ++col) {
ss << "c" << col;
@@ -138,15 +252,15 @@ TQueryInfoList TKvWorkloadGenerator::AddOperation(TString operation) {
return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));
}
-TQueryInfoList TKvWorkloadGenerator::UpsertRandom() {
- return AddOperation("UPSERT");
+TQueryInfoList TKvWorkloadGenerator::Upsert(TVector<TRow>&& rows) {
+ return WriteRows("UPSERT", std::move(rows));
}
-TQueryInfoList TKvWorkloadGenerator::InsertRandom() {
- return AddOperation("INSERT");
+TQueryInfoList TKvWorkloadGenerator::Insert(TVector<TRow>&& rows) {
+ return WriteRows("INSERT", std::move(rows));
}
-TQueryInfoList TKvWorkloadGenerator::SelectRandom() {
+TQueryInfoList TKvWorkloadGenerator::Select(TVector<TRow>&& rows) {
std::stringstream ss;
NYdb::TParamsBuilder paramsBuilder;
@@ -158,10 +272,10 @@ TQueryInfoList TKvWorkloadGenerator::SelectRandom() {
TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col);
if (col < Params.IntColumnsCnt) {
ss << "DECLARE " << paramName << " AS Uint64;\n";
- paramsBuilder.AddParam(paramName).Uint64(RandomNumber<ui64>(Params.MaxFirstKey)).Build();
+ paramsBuilder.AddParam(paramName).Uint64(rows[row].Ints[col]).Build();
} else {
ss << "DECLARE " << paramName << " AS String;\n";
- paramsBuilder.AddParam(paramName).String(std::format("{:x}", RandomNumber<ui64>())).Build();
+ paramsBuilder.AddParam(paramName).String(rows[row].Strings[col - Params.IntColumnsCnt]).Build();
}
}
}
@@ -175,7 +289,7 @@ TQueryInfoList TKvWorkloadGenerator::SelectRandom() {
ss << " ";
}
- ss << "FROM `kv_test` WHERE ";
+ ss << "FROM `" << Params.TableName << "` WHERE ";
for (size_t row = 0; row < Params.RowsCnt; ++row) {
for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) {
TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col);
@@ -194,16 +308,16 @@ TQueryInfoList TKvWorkloadGenerator::SelectRandom() {
return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));
}
-TQueryInfoList TKvWorkloadGenerator::ReadRowsRandom() {
+TQueryInfoList TKvWorkloadGenerator::ReadRows(TVector<TRow>&& rows) {
NYdb::TValueBuilder keys;
keys.BeginList();
- for (size_t i = 0; i < Params.RowsCnt; ++i) {
+ for (size_t row = 0; row < Params.RowsCnt; ++row) {
keys.AddListItem().BeginStruct();
for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) {
if (col < Params.IntColumnsCnt) {
- keys.AddMember("c" + std::to_string(col)).Uint64(RandomNumber<ui64>(Params.MaxFirstKey));
+ keys.AddMember("c" + std::to_string(col)).Uint64(rows[row].Ints[col]);
} else {
- keys.AddMember("c" + std::to_string(col)).String(std::format("{:x}", RandomNumber<ui64>()));
+ keys.AddMember("c" + std::to_string(col)).String(rows[row].Strings[col - Params.IntColumnsCnt]);
}
}
keys.EndStruct();
@@ -211,16 +325,118 @@ TQueryInfoList TKvWorkloadGenerator::ReadRowsRandom() {
keys.EndList();
TQueryInfo info;
- info.TablePath = DbPath + "/kv_test";
+ info.TablePath = Params.DbPath + "/" + Params.TableName;
info.UseReadRows = true;
info.KeyToRead = keys.Build();
return TQueryInfoList(1, std::move(info));
}
+TQueryInfoList TKvWorkloadGenerator::Mixed() {
+ static thread_local TRow lastRow;
+
+ if (Params.MixedChangePartitionsSize && RandomNumber<ui32>(1000) == 0) {
+ TInstant nextChange = MixedNextChangePartitionsSize;
+ if (nextChange <= Now() && MixedNextChangePartitionsSize.compare_exchange_strong(nextChange, nextChange + TDuration::Seconds(RandomNumber<ui32>(60*20)))) {
+ auto alterTable = NYdb::NTable::TAlterTableSettings()
+ .AlterPartitioningSettings(NYdb::NTable::TPartitioningSettingsBuilder()
+ .SetPartitionSizeMb(RandomNumber<ui32>(1000) + 10).Build());
+
+ TQueryInfo info;
+ info.TablePath = Params.DbPath + "/" + Params.TableName;
+ info.AlterTable = alterTable;
+ return TQueryInfoList(1, std::move(info));
+ }
+ }
+
+ if (RandomNumber<ui32>(2) == 0) { // write
+ TVector<TRow> rows = GenerateRandomRows(true);
+ lastRow = rows[0];
+
+ auto upsertQuery = Upsert(std::move(rows));
+
+ upsertQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) {
+ if (queryResult.IsSuccess()) {
+ // Note: helps to keep old values too
+ if (RandomNumber<ui32>(1000) == 0) {
+ RowsVerifyer.TryInsert(lastRow);
+ }
+ }
+ };
+
+ return upsertQuery;
+ } else { // read
+ auto checkRow = RowsVerifyer.GetRandom();
+ if (checkRow) {
+ lastRow = checkRow.value();
+ } else {
+ lastRow.Ints.clear();
+ lastRow.Strings.clear();
+ }
+
+ TVector<TRow> rows = checkRow ? TVector<TRow>{checkRow.value()} : GenerateRandomRows(true);
+
+ bool doReadRows = false;
+ bool doSelect = false;
+ if (!Params.MixedDoReadRows || !Params.MixedDoSelect) {
+ if (Params.MixedDoReadRows) doReadRows = true;
+ if (Params.MixedDoSelect) doSelect = true;
+ } else {
+ if (RandomNumber<ui32>(2) == 0)
+ doReadRows = true;
+ else
+ doSelect = true;
+ }
+ Y_VERIFY(doReadRows ^ doSelect);
+
+ if (doSelect) {
+ auto selectQuery = Select(std::move(rows));
+
+ if (checkRow) {
+ selectQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) {
+ if (queryResult.IsSuccess()) {
+ TVector<TRow> readRows;
+ for (auto& resultSet : queryResult.GetResultSets()) {
+ AddResultSet(resultSet, readRows);
+ }
+ TString queryStatus = "Status: " + std::to_string(int(queryResult.GetStatus())) + " Method: SELECT" + " Issues: " + queryResult.GetIssues().ToOneLineString() + " Meta: ";
+ for (auto m : queryResult.GetResponseMetadata()) {
+ queryStatus += " " + m.first + "=" + m.second;
+ }
+ VerifyRows(lastRow, std::move(readRows), queryStatus);
+ }
+ };
+ }
+
+ return selectQuery;
+ }
+ if (doReadRows) {
+ auto readRowsQuery = ReadRows(std::move(rows));
+
+ if (checkRow) {
+ readRowsQuery.front().ReadRowsResultCallback = [](NYdb::NTable::TReadRowsResult queryResult) {
+ if (queryResult.IsSuccess()) {
+ TVector<TRow> readRows;
+ AddResultSet(queryResult.GetResultSet(), readRows);
+ TString queryStatus = "Status: " + std::to_string(int(queryResult.GetStatus())) + " Method: ReadRows" + " Issues: " + queryResult.GetIssues().ToOneLineString() + " Meta: ";
+ for (auto m : queryResult.GetResponseMetadata()) {
+ queryStatus += " " + m.first + "=" + m.second;
+ }
+ VerifyRows(lastRow, std::move(readRows), queryStatus);
+ }
+ };
+ }
+
+ return readRowsQuery;
+ }
+ }
+
+ return TQueryInfoList();
+}
+
TQueryInfoList TKvWorkloadGenerator::GetInitialData() {
TQueryInfoList res;
for (size_t i = 0; i < Params.InitRowCount; ++i) {
- auto queryInfos = UpsertRandom();
+ auto queryInfos = Upsert(GenerateRandomRows());
res.insert(res.end(), queryInfos.begin(), queryInfos.end());
}
@@ -228,11 +444,41 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() {
}
std::string TKvWorkloadGenerator::GetCleanDDLQueries() const {
- std::string query = R"(
- DROP TABLE `kv_test`;
- )";
+ std::string query = "DROP TABLE `" + Params.TableName + "`;";
return query;
}
+TVector<TRow> TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) {
+ TVector<TRow> result(Params.RowsCnt);
+
+ for (size_t row = 0; row < Params.RowsCnt; ++row) {
+ result[row].Ints.resize(Params.IntColumnsCnt);
+ result[row].Strings.resize(Params.ColumnsCnt - Params.IntColumnsCnt);
+
+ for (size_t col = 0; col < Params.ColumnsCnt; ++col) {
+ if (col < Params.IntColumnsCnt) {
+ ui64 val = col < Params.KeyColumnsCnt ? RandomNumber<ui64>(Params.MaxFirstKey) : RandomNumber<ui64>();
+ result[row].Ints[col] = val;
+ } else {
+ TString val;
+ if (col < Params.KeyColumnsCnt) {
+ val = TString(std::format("{:x}", RandomNumber<ui64>(Params.MaxFirstKey)));
+ } else if (randomValues) {
+ val = TString(Params.StringLen, '_');
+ for (size_t i = 0; i < Params.StringLen; i++) {
+ val[i] = (char)('a' + RandomNumber<u_char>(26));
+ }
+ } else {
+ val = BigString;
+ }
+ result[row].Strings[col - Params.IntColumnsCnt] = val;
+ }
+ }
+ }
+
+ return result;
+}
+
+
} \ No newline at end of file
diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h
index 96a95e95a3..5ea7ca4fe4 100644
--- a/ydb/library/workload/kv_workload.h
+++ b/ydb/library/workload/kv_workload.h
@@ -4,11 +4,14 @@
#include <cctype>
#include <random>
+#include <sstream>
namespace NYdbWorkload {
enum KvWorkloadConstants : ui64 {
MIN_PARTITIONS = 40,
+ MAX_PARTITIONS = 1000,
+ PARTITION_SIZE_MB = 2000,
INIT_ROW_COUNT = 1000,
MAX_FIRST_KEY = Max<ui64>(),
STRING_LEN = 8,
@@ -16,11 +19,17 @@ enum KvWorkloadConstants : ui64 {
INT_COLUMNS_CNT = 1,
KEY_COLUMNS_CNT = 1,
ROWS_CNT = 1,
- PARTITIONS_BY_LOAD = true
+ PARTITIONS_BY_LOAD = true,
+
+ MIXED_CHANGE_PARTITIONS_SIZE = false,
+ MIXED_DO_READ_ROWS = true,
+ MIXED_DO_SELECT = true,
};
struct TKvWorkloadParams : public TWorkloadParams {
ui64 MinPartitions = KvWorkloadConstants::MIN_PARTITIONS;
+ const ui64 MaxPartitions = KvWorkloadConstants::MAX_PARTITIONS;
+ ui64 PartitionSizeMb = KvWorkloadConstants::PARTITION_SIZE_MB;
ui64 InitRowCount = KvWorkloadConstants::INIT_ROW_COUNT;
ui64 MaxFirstKey = KvWorkloadConstants::MAX_FIRST_KEY;
ui64 StringLen = KvWorkloadConstants::STRING_LEN;
@@ -29,10 +38,37 @@ struct TKvWorkloadParams : public TWorkloadParams {
ui64 KeyColumnsCnt = KvWorkloadConstants::KEY_COLUMNS_CNT;
ui64 RowsCnt = KvWorkloadConstants::ROWS_CNT;
bool PartitionsByLoad = KvWorkloadConstants::PARTITIONS_BY_LOAD;
+
+ ui64 MixedChangePartitionsSize = KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE;
+ ui64 MixedDoReadRows = KvWorkloadConstants::MIXED_DO_READ_ROWS;
+ ui64 MixedDoSelect = KvWorkloadConstants::MIXED_DO_SELECT;
+
+ const std::string TableName = "kv_test";
};
class TKvWorkloadGenerator : public IWorkloadQueryGenerator {
public:
+ struct TRow {
+ TVector<ui64> Ints;
+ TVector<TString> Strings;
+
+ TString ToString() const {
+ std::stringstream ss;
+ ss << "( ";
+ for (auto i : Ints) {
+ ss << i << " ";
+ }
+ for (auto s : Strings) {
+ ss << s << " ";
+ }
+ ss << ")";
+ return ss.str();
+ }
+
+ bool operator == (const TRow &other) const {
+ return Ints == other.Ints && Strings == other.Strings;
+ }
+ };
static TKvWorkloadGenerator* New(const TKvWorkloadParams* params) {
return new TKvWorkloadGenerator(params);
@@ -55,23 +91,27 @@ public:
InsertRandom,
SelectRandom,
ReadRowsRandom,
+ Mixed,
MaxType
};
private:
- TQueryInfoList UpsertRandom();
- TQueryInfoList InsertRandom();
- TQueryInfoList SelectRandom();
- TQueryInfoList ReadRowsRandom();
+ TQueryInfoList Upsert(TVector<TRow>&& rows);
+ TQueryInfoList Insert(TVector<TRow>&& rows);
+ TQueryInfoList WriteRows(TString operation, TVector<TRow>&& rows);
+ TQueryInfoList Select(TVector<TRow>&& rows);
+ TQueryInfoList ReadRows(TVector<TRow>&& rows);
+ TQueryInfoList Mixed();
TKvWorkloadGenerator(const TKvWorkloadParams* params);
TQueryInfo FillKvData() const;
- TQueryInfoList AddOperation(TString operation);
+ TVector<TRow> GenerateRandomRows(bool randomValues = false);
- std::string DbPath;
TKvWorkloadParams Params;
TString BigString;
+
+ std::atomic<TInstant> MixedNextChangePartitionsSize;
};
} // namespace NYdbWorkload
diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h
index 1f79bb4e2f..19815e95b1 100644
--- a/ydb/library/workload/workload_query_generator.h
+++ b/ydb/library/workload/workload_query_generator.h
@@ -1,5 +1,6 @@
#pragma once
+#include "ydb/public/sdk/cpp/client/ydb_table/table.h"
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
@@ -26,6 +27,10 @@ struct TQueryInfo {
bool UseReadRows = false;
TString TablePath;
std::optional<NYdb::TValue> KeyToRead;
+ std::optional<NYdb::NTable::TAlterTableSettings> AlterTable;
+
+ std::optional<std::function<void(NYdb::NTable::TReadRowsResult)>> ReadRowsResultCallback;
+ std::optional<std::function<void(NYdb::NTable::TDataQueryResult)>> DataQueryResultCallback;
};
using TQueryInfoList = std::list<TQueryInfo>;
diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make
index c947999b82..1e593d3d97 100644
--- a/ydb/library/workload/ya.make
+++ b/ydb/library/workload/ya.make
@@ -8,6 +8,7 @@ SRCS(
PEERDIR(
ydb/public/api/protos
+ ydb/public/sdk/cpp/client/ydb_table
)
END()
diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
index 1e04e17873..d1d999a116 100644
--- a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
@@ -36,6 +36,8 @@ void TCommandKvInit::Config(TConfig& config) {
.DefaultValue(NYdbWorkload::KvWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount);
config.Opts->AddLongOption("min-partitions", "Minimum partitions for tables.")
.DefaultValue(NYdbWorkload::KvWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions);
+ config.Opts->AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSize);
config.Opts->AddLongOption("auto-partition", "Enable auto partitioning by load.")
.DefaultValue(NYdbWorkload::KvWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad);
config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key")
@@ -63,6 +65,7 @@ int TCommandKvInit::Run(TConfig& config) {
params.DbPath = config.Database;
params.InitRowCount = InitRowCount;
params.MinPartitions = MinPartitions;
+ params.PartitionSizeMb = PartitionSize;
params.PartitionsByLoad = PartitionsByLoad;
params.MaxFirstKey = MaxFirstKey;
params.StringLen = StringLen;
@@ -109,6 +112,7 @@ TCommandKvRun::TCommandKvRun()
AddCommand(std::make_unique<TCommandKvRunInsertRandom>());
AddCommand(std::make_unique<TCommandKvRunSelectRandom>());
AddCommand(std::make_unique<TCommandKvRunReadRowsRandom>());
+ AddCommand(std::make_unique<TCommandKvRunMixed>());
}
TCommandKvRunUpsertRandom::TCommandKvRunUpsertRandom()
@@ -281,4 +285,54 @@ int TCommandKvRunReadRowsRandom::Run(TConfig& config) {
return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::ReadRowsRandom));
}
+TCommandKvRunMixed::TCommandKvRunMixed()
+ : TWorkloadCommand("mixed", {}, "Writes and SELECT/ReadsRows rows randomly")
+{}
+
+void TCommandKvRunMixed::Config(TConfig& config) {
+ TWorkloadCommand::Config(config);
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey);
+ config.Opts->AddLongOption("len", "String len")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
+ config.Opts->AddLongOption("cols", "Number of columns to select for a single query")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
+ config.Opts->AddLongOption("int-cols", "Number of int columns")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
+ config.Opts->AddLongOption("key-cols", "Number of key columns")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
+ config.Opts->AddLongOption("change-partitions-size", "Apply random changes of AUTO_PARTITIONING_PARTITION_SIZE_MB setting.")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE).StoreResult(&ChangePartitionsSize);
+ config.Opts->AddLongOption("do-select", "Do SELECT operations.")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_DO_SELECT).StoreResult(&DoSelect);
+ config.Opts->AddLongOption("do-read-rows", "Do ReadRows operations.")
+ .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_DO_READ_ROWS).StoreResult(&DoReadRows);
+}
+
+void TCommandKvRunMixed::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandKvRunMixed::Run(TConfig& config) {
+ PrepareForRun(config);
+
+ NYdbWorkload::TKvWorkloadParams params;
+ params.DbPath = config.Database;
+ params.MaxFirstKey = MaxFirstKey;
+ params.StringLen = StringLen;
+ params.ColumnsCnt = ColumnsCnt;
+ params.IntColumnsCnt = IntColumnsCnt;
+ params.KeyColumnsCnt = KeyColumnsCnt;
+ params.MixedChangePartitionsSize = ChangePartitionsSize;
+ params.MixedDoReadRows = DoReadRows;
+ params.MixedDoSelect = DoSelect;
+
+ NYdbWorkload::TWorkloadFactory factory;
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
+
+ return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::Mixed));
+}
+
} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h
index 81cce96564..c1eaa49f72 100644
--- a/ydb/public/lib/ydb_cli/commands/kv_workload.h
+++ b/ydb/public/lib/ydb_cli/commands/kv_workload.h
@@ -20,6 +20,7 @@ public:
private:
ui64 InitRowCount;
ui64 MinPartitions;
+ ui64 PartitionSize;
ui64 MaxFirstKey;
ui64 StringLen;
ui64 ColumnsCnt;
@@ -108,5 +109,24 @@ private:
};
+class TCommandKvRunMixed : public TWorkloadCommand {
+public:
+ TCommandKvRunMixed();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+private:
+ ui64 MaxFirstKey;
+ ui64 StringLen;
+ ui64 ColumnsCnt;
+ ui64 IntColumnsCnt;
+ ui64 KeyColumnsCnt;
+ bool ChangePartitionsSize;
+ bool DoReadRows;
+ bool DoSelect;
+
+};
+
}
}
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index 4ba575ea5f..baf6454146 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -118,17 +118,28 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co
NYdbWorkload::TQueryInfo queryInfo;
auto runQuery = [this, &queryInfo, &querySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus {
++retryCount;
- TStatus result(EStatus::SUCCESS, NYql::TIssues());
- if (queryInfo.UseReadRows) {
- result = TableClient->ReadRows(queryInfo.TablePath, std::move(*queryInfo.KeyToRead))
+ if (queryInfo.AlterTable) {
+ auto result = TableClient->RetryOperationSync([&queryInfo](NTable::TSession session) {
+ return session.AlterTable(queryInfo.TablePath, queryInfo.AlterTable.value()).GetValueSync();
+ });
+ return result;
+ } else if (queryInfo.UseReadRows) {
+ auto result = TableClient->ReadRows(queryInfo.TablePath, std::move(*queryInfo.KeyToRead))
.GetValueSync();
+ if (queryInfo.ReadRowsResultCallback) {
+ queryInfo.ReadRowsResultCallback.value()(result);
+ }
+ return result;
} else {
- result = session.ExecuteDataQuery(queryInfo.Query.c_str(),
+ auto result = session.ExecuteDataQuery(queryInfo.Query.c_str(),
NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
queryInfo.Params, querySettings
).GetValueSync();
+ if (queryInfo.DataQueryResultCallback) {
+ queryInfo.DataQueryResultCallback.value()(result);
+ }
+ return result;
}
- return result;
};
while (Now() < StopTime) {