diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-07 16:36:00 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-07 16:36:00 +0300 |
commit | 274921aab863e1cb7a037ba33d6e4613546bf31c (patch) | |
tree | ec993601b70e68ad0656b401b048a84b73c6abce | |
parent | 40a7e9a70ad69a2962b7120fd4fbcaf3731162b1 (diff) | |
download | ydb-274921aab863e1cb7a037ba33d6e4613546bf31c.tar.gz |
rename Upsert to UpsertMkql, add tests for bulk and mkql upsert actors
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 244 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_upsert.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt | 56 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt | 60 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_testload/CMakeLists.txt | 13 |
8 files changed, 380 insertions, 6 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 0c41df854e7..fc834143fec 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1809,7 +1809,7 @@ message TEvTestLoadRequest { oneof Command { TLoadStop LoadStop = 2; TUpdateStart BulkUpsertStart = 3; - TUpdateStart UpsertStart = 4; + TUpdateStart UpsertMkqlStart = 4; } } diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 62157ed5b3e..e2e48709bf8 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -33,6 +33,7 @@ add_subdirectory(ut_replication) add_subdirectory(ut_rs) add_subdirectory(ut_snapshot) add_subdirectory(ut_stats) +add_subdirectory(ut_testload) add_subdirectory(ut_upload_rows) add_library(core-tx-datashard) diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp new file mode 100644 index 00000000000..0813db9ea8a --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -0,0 +1,244 @@ +#include "datashard_ut_common.h" + +#include "testload/test_load_actor.h" + +namespace NKikimr { + +using namespace NKikimr::NDataShard; +using namespace NSchemeShard; +using namespace Tests; + +namespace { + +// We use YCSB defaule schema: table named 'usertable' with 'key' column +// and 'field0' to 'field9' value columns. All fields are Utf8 +const TString TableName = "usertable"; +const TString FieldPrefix = "field"; +const size_t ValueColumnsCount = 10; + +void CreateTable(Tests::TServer::TPtr server, + TActorId sender, + const TString &root) +{ + TVector<TShardedTableOptions::TColumn> columns; + columns.reserve(ValueColumnsCount + 1); + + columns.emplace_back("key", "Utf8", true, false); + + for (size_t i = 0; i < ValueColumnsCount; ++i) { + TString fieldName = FieldPrefix + ToString(i); + columns.emplace_back(fieldName, "Utf8", false, false); + } + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns(columns); + + CreateShardedTable(server, sender, root, TableName, opts); +} + +TVector<TCell> ToCells(const std::vector<TString>& keys) { + TVector<TCell> cells; + for (auto& key: keys) { + cells.emplace_back(TCell(key.data(), key.size())); + } + return cells; +} + +void AddRangeQuery( + TEvDataShard::TEvRead& request, + const std::vector<TString>& from, + bool fromInclusive, + const std::vector<TString>& to, + bool toInclusive) +{ + auto fromCells = ToCells(from); + auto toCells = ToCells(to); + + // convertion is ugly, but for tests is OK + auto fromBuf = TSerializedCellVec::Serialize(fromCells); + auto toBuf = TSerializedCellVec::Serialize(toCells); + + request.Ranges.emplace_back(fromBuf, toBuf, fromInclusive, toInclusive); +} + +struct TTableInfo { + TString Name; + + ui64 TabletId; + ui64 OwnerId; + NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable; + + TActorId ClientId; +}; + +struct TTestHelper { + TTestHelper() { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + init(serverSettings); + } + + void init(const TServerSettings& serverSettings) { + Server = new TServer(serverSettings); + + auto &runtime = *Server->GetRuntime(); + Sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::DS_LOAD_TEST, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(Server, Sender); + + Table.Name = TableName; + { + CreateTable(Server, Sender, "/Root"); + + auto shards = GetTableShards(Server, Sender, "/Root/" + TableName); + Table.TabletId = shards.at(0); + + auto [tables, ownerId] = GetTables(Server, Table.TabletId); + Table.OwnerId = ownerId; + Table.UserTable = tables[TableName]; + } + + Table.ClientId = runtime.ConnectToPipe(Table.TabletId, Sender, 0, GetPipeConfigWithRetries()); + } + + std::unique_ptr<TEvDataShard::TEvRead> GetBaseReadRequest() { + std::unique_ptr<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); + auto& record = request->Record; + + record.SetReadId(ReadId++); + record.MutableTableId()->SetOwnerId(Table.OwnerId); + record.MutableTableId()->SetTableId(Table.UserTable.GetPathId()); + + const auto& description = Table.UserTable.GetDescription(); + std::vector<ui32> keyColumns( + description.GetKeyColumnIds().begin(), + description.GetKeyColumnIds().end()); + + for (const auto& column: description.GetColumns()) { + record.AddColumns(column.GetId()); + } + + record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); + + return request; + } + + std::unique_ptr<TEvDataShard::TEvReadResult> WaitReadResult(TDuration timeout = TDuration::Max()) { + auto &runtime = *Server->GetRuntime(); + TAutoPtr<IEventHandle> handle; + runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(handle, timeout); + if (!handle) { + return nullptr; + } + auto event = handle->Release<TEvDataShard::TEvReadResult>(); + return std::unique_ptr<TEvDataShard::TEvReadResult>(event.Release()); + } + + std::unique_ptr<TEvDataShard::TEvReadResult> SendRead(TEvDataShard::TEvRead* request) + { + auto &runtime = *Server->GetRuntime(); + runtime.SendToPipe( + Table.TabletId, + Sender, + request, + 0, + GetPipeConfigWithRetries(), + Table.ClientId); + + return WaitReadResult(); + } + + void TestLoad(std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request, size_t expectedRowCount) { + auto &runtime = *Server->GetRuntime(); + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters()); + auto testLoadActor = runtime.Register(CreateTestLoadActor(counters)); + + runtime.Send(new IEventHandle(testLoadActor, Sender, request.release()), 0, true); + + TAutoPtr<IEventHandle> handle; + runtime.GrabEdgeEventRethrow<TEvDataShard::TEvTestLoadResponse>(handle); + UNIT_ASSERT(handle); + auto response = handle->Release<TEvDataShard::TEvTestLoadResponse>(); + auto& responseRecord = response->Record; + UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK); + + // holds memory for TCell + TVector<TString> from = {TString("user")}; + TVector<TString> to = {TString("zzz")}; + + // busywait + while (1) { + auto request = GetBaseReadRequest(); + AddRangeQuery( + *request, + from, + true, + to, + true + ); + + auto readResult = SendRead(request.release()); + UNIT_ASSERT(readResult); + if (readResult->GetRowsCount() == expectedRowCount) + break; + + SimulateSleep(Server, TDuration::Seconds(1)); + } + } + +public: + Tests::TServer::TPtr Server; + TActorId Sender; + TTableInfo Table; + + ui64 ReadId = 1; +}; + +} // anonymous + +Y_UNIT_TEST_SUITE(UpsertLoad) { + Y_UNIT_TEST(ShouldWriteDataBulkUpsert) { + TTestHelper helper; + + const ui64 expectedRowCount = 10; + + std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableBulkUpsertStart(); + + command.SetTag(1); + command.SetRowCount(expectedRowCount); + command.SetTabletId(helper.Table.TabletId); + command.SetTableId(helper.Table.UserTable.GetPathId()); + command.SetInflight(3); + + helper.TestLoad(std::move(request), expectedRowCount); + } + + Y_UNIT_TEST(ShouldWriteDataBulkUpsertMkql) { + TTestHelper helper; + + const ui64 expectedRowCount = 10; + + std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableUpsertMkqlStart(); + + command.SetTag(1); + command.SetRowCount(expectedRowCount); + command.SetTabletId(helper.Table.TabletId); + command.SetTableId(helper.Table.UserTable.GetPathId()); + command.SetInflight(3); + + helper.TestLoad(std::move(request), expectedRowCount); + } +} + +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp index bc1ec221412..c5c54496db0 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.cpp +++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp @@ -112,8 +112,8 @@ public: break; } - case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertStart: { - const auto& cmd = record.GetUpsertStart(); + case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertMkqlStart: { + const auto& cmd = record.GetUpsertMkqlStart(); const ui64 tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); diff --git a/ydb/core/tx/datashard/testload/test_load_upsert.cpp b/ydb/core/tx/datashard/testload/test_load_upsert.cpp index 58f32872e20..fb0a2484f7b 100644 --- a/ydb/core/tx/datashard/testload/test_load_upsert.cpp +++ b/ydb/core/tx/datashard/testload/test_load_upsert.cpp @@ -22,7 +22,7 @@ namespace { enum class ERequestType { BulkUpsert, - Upsert, + UpsertMkql, }; TString GetKey(size_t n) { @@ -109,7 +109,7 @@ TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType) case ERequestType::BulkUpsert: requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum)); break; - case ERequestType::Upsert: + case ERequestType::UpsertMkql: requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum)); break; } @@ -307,7 +307,7 @@ NActors::IActor *CreateBulkUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequ NActors::IActor *CreateUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) { - return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::Upsert); + return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertMkql); } } // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt b/ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt new file mode 100644 index 00000000000..5e53efa7975 --- /dev/null +++ b/ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt @@ -0,0 +1,56 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_testload) +target_compile_options(ydb-core-tx-datashard-ut_testload PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_testload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_testload PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-datashard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + ydb-core-testlib + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_testload PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-datashard-ut_testload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_testload.cpp +) +add_test( + NAME + ydb-core-tx-datashard-ut_testload + COMMAND + ydb-core-tx-datashard-ut_testload + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-tx-datashard-ut_testload) diff --git a/ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt b/ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt new file mode 100644 index 00000000000..d42847ee463 --- /dev/null +++ b/ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt @@ -0,0 +1,60 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_testload) +target_compile_options(ydb-core-tx-datashard-ut_testload PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_testload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_testload PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-datashard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + ydb-core-testlib + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_testload PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-datashard-ut_testload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_testload.cpp +) +add_test( + NAME + ydb-core-tx-datashard-ut_testload + COMMAND + ydb-core-tx-datashard-ut_testload + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-tx-datashard-ut_testload) diff --git a/ydb/core/tx/datashard/ut_testload/CMakeLists.txt b/ydb/core/tx/datashard/ut_testload/CMakeLists.txt new file mode 100644 index 00000000000..dbfe6fa2c47 --- /dev/null +++ b/ydb/core/tx/datashard/ut_testload/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() |