aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-11-22 21:16:44 +0300
committereivanov89 <eivanov89@ydb.tech>2022-11-22 21:16:44 +0300
commitcaf8bbcb03bda3879cf4ce405b587b813878196e (patch)
tree5117cb2b9861764d4589248eb46a812b38c30350
parent1ddb2b4f00e412e37019b2e00bf1b1a910fa4c11 (diff)
downloadydb-caf8bbcb03bda3879cf4ce405b587b813878196e.tar.gz
keyFrom option for load actors (and no randomness in keys)
-rw-r--r--ydb/core/protos/datashard_load.proto3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp115
-rw-r--r--ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp22
-rw-r--r--ydb/core/tx/datashard/testload/kqp_upsert.cpp4
4 files changed, 123 insertions, 21 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index 2314bb768c7..3a0c1d28294 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -21,6 +21,9 @@ message TEvTestLoadRequest {
message TUpdateStart {
optional uint64 RowCount = 1;
optional uint32 Inflight = 2;
+
+ // Key is a string like sprintf("user%.19lu", keyNum)
+ optional uint64 KeyFrom = 3 [default = 0];
}
message TReadStart {
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
index b2a86f3ba76..2f519142ae4 100644
--- a/ydb/core/tx/datashard/datashard_ut_testload.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -14,12 +14,17 @@ using namespace Tests;
namespace {
-// We use YCSB defaule schema: table named 'usertable' with 'key' column
+// We use YCSB defaule schema: table with 'key' column
// and 'field0' to 'field9' value columns. All fields are Utf8
const TString DefaultTableName = "usertable";
const TString FieldPrefix = "field";
const size_t ValueColumnsCount = 10;
+TString GetKey(size_t n) {
+ // user1000385178204227360
+ return Sprintf("user%.19lu", n);
+}
+
void CreateTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
@@ -179,7 +184,9 @@ struct TTestHelper {
return WaitReadResult();
}
- void CheckKeysCount(size_t expectedRowCount) {
+ void CheckKeys(size_t keyFrom, size_t expectedRowCount) {
+ Y_UNUSED(keyFrom);
+
TVector<TString> from = {TString("user")};
TVector<TString> to = {TString("zzz")};
@@ -194,7 +201,18 @@ struct TTestHelper {
auto readResult = SendRead(request.release());
UNIT_ASSERT(readResult);
+
+ const auto& record = readResult->Record;
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), expectedRowCount);
+
+ auto nrows = readResult->GetRowsCount();
+ for (size_t i = 0; i < nrows; ++i) {
+ auto cells = readResult->GetCells(i);
+ const auto& keyCell = cells[0];
+ TString key(keyCell.Data(), keyCell.Size());
+ UNIT_ASSERT_VALUES_EQUAL(key, GetKey(i + keyFrom));
+ }
}
std::unique_ptr<TEvDataShardLoad::TEvTestLoadFinished> RunTestLoad(
@@ -232,13 +250,14 @@ struct TTestHelper {
void RunUpsertTestLoad(
std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> loadRequest,
+ size_t keyFrom,
size_t expectedRowCount,
bool forceResolve = false)
{
RunTestLoad(std::move(loadRequest));
if (!Settings.CreateTable || forceResolve)
ResolveTable();
- CheckKeysCount(expectedRowCount);
+ CheckKeys(keyFrom, expectedRowCount);
}
public:
@@ -268,8 +287,9 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
auto& target = *record.MutableTargetShard();
target.SetTabletId(helper.Table.TabletId);
target.SetTableId(helper.Table.UserTable.GetPathId());
+ target.SetTableName(DefaultTableName);
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
}
Y_UNIT_TEST(ShouldWriteDataBulkUpsert2) {
@@ -291,7 +311,30 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
target.SetTableId(helper.Table.UserTable.GetPathId());
target.SetTableName(settings.TableName);
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
+ }
+
+ Y_UNIT_TEST(ShouldWriteDataBulkUpsertKeyFrom) {
+ // check nondefault keyFrom
+ TTestHelper helper;
+
+ const ui64 keyFrom = 12345;
+ const ui64 expectedRowCount = 10;
+
+ std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableUpsertBulkStart();
+
+ command.SetRowCount(expectedRowCount);
+ command.SetInflight(3);
+ command.SetKeyFrom(keyFrom);
+
+ auto& target = *record.MutableTargetShard();
+ target.SetTabletId(helper.Table.TabletId);
+ target.SetTableId(helper.Table.UserTable.GetPathId());
+ target.SetTableName(DefaultTableName);
+
+ helper.RunUpsertTestLoad(std::move(request), keyFrom, expectedRowCount);
}
Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkql) {
@@ -311,7 +354,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
target.SetTableId(helper.Table.UserTable.GetPathId());
target.SetTableName(DefaultTableName);
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
}
Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkql2) {
@@ -333,7 +376,30 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
target.SetTableId(helper.Table.UserTable.GetPathId());
target.SetTableName(settings.TableName);
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
+ }
+
+ Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkqlKeyFrom) {
+ // check nondefault keyFrom
+ TTestHelper helper;
+
+ const ui64 keyFrom = 12345;
+ const ui64 expectedRowCount = 10;
+
+ std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableUpsertLocalMkqlStart();
+
+ command.SetRowCount(expectedRowCount);
+ command.SetInflight(3);
+ command.SetKeyFrom(keyFrom);
+
+ auto& target = *record.MutableTargetShard();
+ target.SetTabletId(helper.Table.TabletId);
+ target.SetTableId(helper.Table.UserTable.GetPathId());
+ target.SetTableName(DefaultTableName);
+
+ helper.RunUpsertTestLoad(std::move(request), keyFrom, expectedRowCount);
}
Y_UNIT_TEST(ShouldWriteKqpUpsert) {
@@ -354,7 +420,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
target.SetWorkingDir("/Root");
target.SetTableName("usertable");
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
}
Y_UNIT_TEST(ShouldWriteKqpUpsert2) {
@@ -377,7 +443,30 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
target.SetWorkingDir("/Root");
target.SetTableName(settings.TableName);
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
+ }
+
+ Y_UNIT_TEST(ShouldWriteKqpUpsertKeyFrom) {
+ TTestHelper helper;
+
+ const ui64 keyFrom = 12345;
+ const ui64 expectedRowCount = 20;
+
+ std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableUpsertKqpStart();
+
+ command.SetRowCount(expectedRowCount);
+ command.SetInflight(5);
+ command.SetKeyFrom(keyFrom);
+
+ auto& target = *record.MutableTargetShard();
+ target.SetTabletId(helper.Table.TabletId);
+ target.SetTableId(helper.Table.UserTable.GetPathId());
+ target.SetWorkingDir("/Root");
+ target.SetTableName("usertable");
+
+ helper.RunUpsertTestLoad(std::move(request), keyFrom, expectedRowCount);
}
Y_UNIT_TEST(ShouldCreateTable) {
@@ -398,7 +487,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
setupTable.SetWorkingDir("/Root");
setupTable.SetTableName(settings.TableName);
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
}
Y_UNIT_TEST(ShouldDropCreateTable) {
@@ -418,7 +507,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
target.SetTabletId(helper.Table.TabletId);
target.SetTableId(helper.Table.UserTable.GetPathId());
- helper.RunUpsertTestLoad(std::move(request), 100);
+ helper.RunUpsertTestLoad(std::move(request), 0, 100);
}
// because of drop we should see only these rows
@@ -436,7 +525,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
setupTable.SetWorkingDir("/Root");
setupTable.SetTableName(settings.TableName);
- helper.RunUpsertTestLoad(std::move(request), expectedRowCount, true);
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount, true);
}
} // Y_UNIT_TEST_SUITE(UpsertLoad)
@@ -469,7 +558,7 @@ Y_UNIT_TEST_SUITE(ReadLoad) {
UNIT_ASSERT_VALUES_EQUAL(result->Report->OperationsOK, (4 * expectedRowCount));
// sanity check that there was data in table
- helper.CheckKeysCount(expectedRowCount);
+ helper.CheckKeys(0, expectedRowCount);
}
} // Y_UNIT_TEST_SUITE(ReadLoad)
diff --git a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
index 83a0c026bd9..e0f23650ff8 100644
--- a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
@@ -91,18 +91,23 @@ TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum, const TSt
return TUploadRequest(request.release());
}
-TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType, const TString& table) {
+TRequestsVector GenerateRequests(
+ ui64 tableId,
+ ui64 keyFrom,
+ ui64 n,
+ ERequestType requestType,
+ const TString& table)
+{
TRequestsVector requests;
requests.reserve(n);
- for (size_t i = 0; i < n; ++i) {
- auto keyNum = RandomNumber(Max<ui64>());
+ for (size_t i = keyFrom; i < keyFrom + n; ++i) {
switch (requestType) {
case ERequestType::UpsertBulk:
- requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum));
+ requests.emplace_back(GenerateBulkRowRequest(tableId, i));
break;
case ERequestType::UpsertLocalMkql:
- requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum, table));
+ requests.emplace_back(GenerateMkqlRowRequest(tableId, i, table));
break;
default:
// should not happen, just for compiler
@@ -159,7 +164,12 @@ public:
// note that we generate all requests at once to send at max speed, i.e.
// do not mess with protobufs, strings, etc when send data
- Requests = GenerateRequests(Target.GetTableId(), Config.GetRowCount(), RequestType, Target.GetTableName());
+ Requests = GenerateRequests(
+ Target.GetTableId(),
+ Config.GetKeyFrom(),
+ Config.GetRowCount(),
+ RequestType,
+ Target.GetTableName());
Become(&TUpsertActor::StateFunc);
Connect(ctx);
diff --git a/ydb/core/tx/datashard/testload/kqp_upsert.cpp b/ydb/core/tx/datashard/testload/kqp_upsert.cpp
index 1ab3f018219..041b65233f0 100644
--- a/ydb/core/tx/datashard/testload/kqp_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/kqp_upsert.cpp
@@ -295,13 +295,13 @@ private:
TVector<TRequestsVector> perActorRequests;
perActorRequests.reserve(actorsCount);
- size_t rowCount = 0;
+ size_t currentKey = Config.GetKeyFrom();
for (size_t i = 0; i < actorsCount; ++i) {
TRequestsVector requests;
requests.reserve(requestsPerActor);
for (size_t i = 0; i < requestsPerActor; ++i) {
- auto queryInfo = GenerateUpsert(rowCount++, Target.GetTableName());
+ auto queryInfo = GenerateUpsert(currentKey++, Target.GetTableName());
auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
request->Record.MutableRequest()->SetKeepSession(true);