diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-11-22 21:16:44 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-11-22 21:16:44 +0300 |
commit | caf8bbcb03bda3879cf4ce405b587b813878196e (patch) | |
tree | 5117cb2b9861764d4589248eb46a812b38c30350 | |
parent | 1ddb2b4f00e412e37019b2e00bf1b1a910fa4c11 (diff) | |
download | ydb-caf8bbcb03bda3879cf4ce405b587b813878196e.tar.gz |
keyFrom option for load actors (and no randomness in keys)
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 115 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/kqp_upsert.cpp | 4 |
4 files changed, 123 insertions, 21 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index 2314bb768c7..3a0c1d28294 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -21,6 +21,9 @@ message TEvTestLoadRequest { message TUpdateStart { optional uint64 RowCount = 1; optional uint32 Inflight = 2; + + // Key is a string like sprintf("user%.19lu", keyNum) + optional uint64 KeyFrom = 3 [default = 0]; } message TReadStart { diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index b2a86f3ba76..2f519142ae4 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -14,12 +14,17 @@ using namespace Tests; namespace { -// We use YCSB defaule schema: table named 'usertable' with 'key' column +// We use YCSB defaule schema: table with 'key' column // and 'field0' to 'field9' value columns. All fields are Utf8 const TString DefaultTableName = "usertable"; const TString FieldPrefix = "field"; const size_t ValueColumnsCount = 10; +TString GetKey(size_t n) { + // user1000385178204227360 + return Sprintf("user%.19lu", n); +} + void CreateTable(Tests::TServer::TPtr server, TActorId sender, const TString &root, @@ -179,7 +184,9 @@ struct TTestHelper { return WaitReadResult(); } - void CheckKeysCount(size_t expectedRowCount) { + void CheckKeys(size_t keyFrom, size_t expectedRowCount) { + Y_UNUSED(keyFrom); + TVector<TString> from = {TString("user")}; TVector<TString> to = {TString("zzz")}; @@ -194,7 +201,18 @@ struct TTestHelper { auto readResult = SendRead(request.release()); UNIT_ASSERT(readResult); + + const auto& record = readResult->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), expectedRowCount); + + auto nrows = readResult->GetRowsCount(); + for (size_t i = 0; i < nrows; ++i) { + auto cells = readResult->GetCells(i); + const auto& keyCell = cells[0]; + TString key(keyCell.Data(), keyCell.Size()); + UNIT_ASSERT_VALUES_EQUAL(key, GetKey(i + keyFrom)); + } } std::unique_ptr<TEvDataShardLoad::TEvTestLoadFinished> RunTestLoad( @@ -232,13 +250,14 @@ struct TTestHelper { void RunUpsertTestLoad( std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> loadRequest, + size_t keyFrom, size_t expectedRowCount, bool forceResolve = false) { RunTestLoad(std::move(loadRequest)); if (!Settings.CreateTable || forceResolve) ResolveTable(); - CheckKeysCount(expectedRowCount); + CheckKeys(keyFrom, expectedRowCount); } public: @@ -268,8 +287,9 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { auto& target = *record.MutableTargetShard(); target.SetTabletId(helper.Table.TabletId); target.SetTableId(helper.Table.UserTable.GetPathId()); + target.SetTableName(DefaultTableName); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); } Y_UNIT_TEST(ShouldWriteDataBulkUpsert2) { @@ -291,7 +311,30 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { target.SetTableId(helper.Table.UserTable.GetPathId()); target.SetTableName(settings.TableName); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); + } + + Y_UNIT_TEST(ShouldWriteDataBulkUpsertKeyFrom) { + // check nondefault keyFrom + TTestHelper helper; + + const ui64 keyFrom = 12345; + const ui64 expectedRowCount = 10; + + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableUpsertBulkStart(); + + command.SetRowCount(expectedRowCount); + command.SetInflight(3); + command.SetKeyFrom(keyFrom); + + auto& target = *record.MutableTargetShard(); + target.SetTabletId(helper.Table.TabletId); + target.SetTableId(helper.Table.UserTable.GetPathId()); + target.SetTableName(DefaultTableName); + + helper.RunUpsertTestLoad(std::move(request), keyFrom, expectedRowCount); } Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkql) { @@ -311,7 +354,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { target.SetTableId(helper.Table.UserTable.GetPathId()); target.SetTableName(DefaultTableName); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); } Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkql2) { @@ -333,7 +376,30 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { target.SetTableId(helper.Table.UserTable.GetPathId()); target.SetTableName(settings.TableName); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); + } + + Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkqlKeyFrom) { + // check nondefault keyFrom + TTestHelper helper; + + const ui64 keyFrom = 12345; + const ui64 expectedRowCount = 10; + + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableUpsertLocalMkqlStart(); + + command.SetRowCount(expectedRowCount); + command.SetInflight(3); + command.SetKeyFrom(keyFrom); + + auto& target = *record.MutableTargetShard(); + target.SetTabletId(helper.Table.TabletId); + target.SetTableId(helper.Table.UserTable.GetPathId()); + target.SetTableName(DefaultTableName); + + helper.RunUpsertTestLoad(std::move(request), keyFrom, expectedRowCount); } Y_UNIT_TEST(ShouldWriteKqpUpsert) { @@ -354,7 +420,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { target.SetWorkingDir("/Root"); target.SetTableName("usertable"); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); } Y_UNIT_TEST(ShouldWriteKqpUpsert2) { @@ -377,7 +443,30 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { target.SetWorkingDir("/Root"); target.SetTableName(settings.TableName); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); + } + + Y_UNIT_TEST(ShouldWriteKqpUpsertKeyFrom) { + TTestHelper helper; + + const ui64 keyFrom = 12345; + const ui64 expectedRowCount = 20; + + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableUpsertKqpStart(); + + command.SetRowCount(expectedRowCount); + command.SetInflight(5); + command.SetKeyFrom(keyFrom); + + auto& target = *record.MutableTargetShard(); + target.SetTabletId(helper.Table.TabletId); + target.SetTableId(helper.Table.UserTable.GetPathId()); + target.SetWorkingDir("/Root"); + target.SetTableName("usertable"); + + helper.RunUpsertTestLoad(std::move(request), keyFrom, expectedRowCount); } Y_UNIT_TEST(ShouldCreateTable) { @@ -398,7 +487,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { setupTable.SetWorkingDir("/Root"); setupTable.SetTableName(settings.TableName); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); } Y_UNIT_TEST(ShouldDropCreateTable) { @@ -418,7 +507,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { target.SetTabletId(helper.Table.TabletId); target.SetTableId(helper.Table.UserTable.GetPathId()); - helper.RunUpsertTestLoad(std::move(request), 100); + helper.RunUpsertTestLoad(std::move(request), 0, 100); } // because of drop we should see only these rows @@ -436,7 +525,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { setupTable.SetWorkingDir("/Root"); setupTable.SetTableName(settings.TableName); - helper.RunUpsertTestLoad(std::move(request), expectedRowCount, true); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount, true); } } // Y_UNIT_TEST_SUITE(UpsertLoad) @@ -469,7 +558,7 @@ Y_UNIT_TEST_SUITE(ReadLoad) { UNIT_ASSERT_VALUES_EQUAL(result->Report->OperationsOK, (4 * expectedRowCount)); // sanity check that there was data in table - helper.CheckKeysCount(expectedRowCount); + helper.CheckKeys(0, expectedRowCount); } } // Y_UNIT_TEST_SUITE(ReadLoad) diff --git a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp index 83a0c026bd9..e0f23650ff8 100644 --- a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp +++ b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp @@ -91,18 +91,23 @@ TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum, const TSt return TUploadRequest(request.release()); } -TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType, const TString& table) { +TRequestsVector GenerateRequests( + ui64 tableId, + ui64 keyFrom, + ui64 n, + ERequestType requestType, + const TString& table) +{ TRequestsVector requests; requests.reserve(n); - for (size_t i = 0; i < n; ++i) { - auto keyNum = RandomNumber(Max<ui64>()); + for (size_t i = keyFrom; i < keyFrom + n; ++i) { switch (requestType) { case ERequestType::UpsertBulk: - requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum)); + requests.emplace_back(GenerateBulkRowRequest(tableId, i)); break; case ERequestType::UpsertLocalMkql: - requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum, table)); + requests.emplace_back(GenerateMkqlRowRequest(tableId, i, table)); break; default: // should not happen, just for compiler @@ -159,7 +164,12 @@ public: // note that we generate all requests at once to send at max speed, i.e. // do not mess with protobufs, strings, etc when send data - Requests = GenerateRequests(Target.GetTableId(), Config.GetRowCount(), RequestType, Target.GetTableName()); + Requests = GenerateRequests( + Target.GetTableId(), + Config.GetKeyFrom(), + Config.GetRowCount(), + RequestType, + Target.GetTableName()); Become(&TUpsertActor::StateFunc); Connect(ctx); diff --git a/ydb/core/tx/datashard/testload/kqp_upsert.cpp b/ydb/core/tx/datashard/testload/kqp_upsert.cpp index 1ab3f018219..041b65233f0 100644 --- a/ydb/core/tx/datashard/testload/kqp_upsert.cpp +++ b/ydb/core/tx/datashard/testload/kqp_upsert.cpp @@ -295,13 +295,13 @@ private: TVector<TRequestsVector> perActorRequests; perActorRequests.reserve(actorsCount); - size_t rowCount = 0; + size_t currentKey = Config.GetKeyFrom(); for (size_t i = 0; i < actorsCount; ++i) { TRequestsVector requests; requests.reserve(requestsPerActor); for (size_t i = 0; i < requestsPerActor; ++i) { - auto queryInfo = GenerateUpsert(rowCount++, Target.GetTableName()); + auto queryInfo = GenerateUpsert(currentKey++, Target.GetTableName()); auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); request->Record.MutableRequest()->SetKeepSession(true); |