aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-07 16:36:00 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-07 16:36:00 +0300
commit274921aab863e1cb7a037ba33d6e4613546bf31c (patch)
treeec993601b70e68ad0656b401b048a84b73c6abce
parent40a7e9a70ad69a2962b7120fd4fbcaf3731162b1 (diff)
downloadydb-274921aab863e1cb7a037ba33d6e4613546bf31c.tar.gz
rename Upsert to UpsertMkql, add tests for bulk and mkql upsert actors
-rw-r--r--ydb/core/protos/tx_datashard.proto2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp244
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.cpp4
-rw-r--r--ydb/core/tx/datashard/testload/test_load_upsert.cpp6
-rw-r--r--ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt56
-rw-r--r--ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt60
-rw-r--r--ydb/core/tx/datashard/ut_testload/CMakeLists.txt13
8 files changed, 380 insertions, 6 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 0c41df854e7..fc834143fec 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1809,7 +1809,7 @@ message TEvTestLoadRequest {
oneof Command {
TLoadStop LoadStop = 2;
TUpdateStart BulkUpsertStart = 3;
- TUpdateStart UpsertStart = 4;
+ TUpdateStart UpsertMkqlStart = 4;
}
}
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 62157ed5b3e..e2e48709bf8 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -33,6 +33,7 @@ add_subdirectory(ut_replication)
add_subdirectory(ut_rs)
add_subdirectory(ut_snapshot)
add_subdirectory(ut_stats)
+add_subdirectory(ut_testload)
add_subdirectory(ut_upload_rows)
add_library(core-tx-datashard)
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
new file mode 100644
index 00000000000..0813db9ea8a
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -0,0 +1,244 @@
+#include "datashard_ut_common.h"
+
+#include "testload/test_load_actor.h"
+
+namespace NKikimr {
+
+using namespace NKikimr::NDataShard;
+using namespace NSchemeShard;
+using namespace Tests;
+
+namespace {
+
+// We use YCSB defaule schema: table named 'usertable' with 'key' column
+// and 'field0' to 'field9' value columns. All fields are Utf8
+const TString TableName = "usertable";
+const TString FieldPrefix = "field";
+const size_t ValueColumnsCount = 10;
+
+void CreateTable(Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &root)
+{
+ TVector<TShardedTableOptions::TColumn> columns;
+ columns.reserve(ValueColumnsCount + 1);
+
+ columns.emplace_back("key", "Utf8", true, false);
+
+ for (size_t i = 0; i < ValueColumnsCount; ++i) {
+ TString fieldName = FieldPrefix + ToString(i);
+ columns.emplace_back(fieldName, "Utf8", false, false);
+ }
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns(columns);
+
+ CreateShardedTable(server, sender, root, TableName, opts);
+}
+
+TVector<TCell> ToCells(const std::vector<TString>& keys) {
+ TVector<TCell> cells;
+ for (auto& key: keys) {
+ cells.emplace_back(TCell(key.data(), key.size()));
+ }
+ return cells;
+}
+
+void AddRangeQuery(
+ TEvDataShard::TEvRead& request,
+ const std::vector<TString>& from,
+ bool fromInclusive,
+ const std::vector<TString>& to,
+ bool toInclusive)
+{
+ auto fromCells = ToCells(from);
+ auto toCells = ToCells(to);
+
+ // convertion is ugly, but for tests is OK
+ auto fromBuf = TSerializedCellVec::Serialize(fromCells);
+ auto toBuf = TSerializedCellVec::Serialize(toCells);
+
+ request.Ranges.emplace_back(fromBuf, toBuf, fromInclusive, toInclusive);
+}
+
+struct TTableInfo {
+ TString Name;
+
+ ui64 TabletId;
+ ui64 OwnerId;
+ NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable;
+
+ TActorId ClientId;
+};
+
+struct TTestHelper {
+ TTestHelper() {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+ init(serverSettings);
+ }
+
+ void init(const TServerSettings& serverSettings) {
+ Server = new TServer(serverSettings);
+
+ auto &runtime = *Server->GetRuntime();
+ Sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::DS_LOAD_TEST, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(Server, Sender);
+
+ Table.Name = TableName;
+ {
+ CreateTable(Server, Sender, "/Root");
+
+ auto shards = GetTableShards(Server, Sender, "/Root/" + TableName);
+ Table.TabletId = shards.at(0);
+
+ auto [tables, ownerId] = GetTables(Server, Table.TabletId);
+ Table.OwnerId = ownerId;
+ Table.UserTable = tables[TableName];
+ }
+
+ Table.ClientId = runtime.ConnectToPipe(Table.TabletId, Sender, 0, GetPipeConfigWithRetries());
+ }
+
+ std::unique_ptr<TEvDataShard::TEvRead> GetBaseReadRequest() {
+ std::unique_ptr<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
+ auto& record = request->Record;
+
+ record.SetReadId(ReadId++);
+ record.MutableTableId()->SetOwnerId(Table.OwnerId);
+ record.MutableTableId()->SetTableId(Table.UserTable.GetPathId());
+
+ const auto& description = Table.UserTable.GetDescription();
+ std::vector<ui32> keyColumns(
+ description.GetKeyColumnIds().begin(),
+ description.GetKeyColumnIds().end());
+
+ for (const auto& column: description.GetColumns()) {
+ record.AddColumns(column.GetId());
+ }
+
+ record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+
+ return request;
+ }
+
+ std::unique_ptr<TEvDataShard::TEvReadResult> WaitReadResult(TDuration timeout = TDuration::Max()) {
+ auto &runtime = *Server->GetRuntime();
+ TAutoPtr<IEventHandle> handle;
+ runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(handle, timeout);
+ if (!handle) {
+ return nullptr;
+ }
+ auto event = handle->Release<TEvDataShard::TEvReadResult>();
+ return std::unique_ptr<TEvDataShard::TEvReadResult>(event.Release());
+ }
+
+ std::unique_ptr<TEvDataShard::TEvReadResult> SendRead(TEvDataShard::TEvRead* request)
+ {
+ auto &runtime = *Server->GetRuntime();
+ runtime.SendToPipe(
+ Table.TabletId,
+ Sender,
+ request,
+ 0,
+ GetPipeConfigWithRetries(),
+ Table.ClientId);
+
+ return WaitReadResult();
+ }
+
+ void TestLoad(std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request, size_t expectedRowCount) {
+ auto &runtime = *Server->GetRuntime();
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters());
+ auto testLoadActor = runtime.Register(CreateTestLoadActor(counters));
+
+ runtime.Send(new IEventHandle(testLoadActor, Sender, request.release()), 0, true);
+
+ TAutoPtr<IEventHandle> handle;
+ runtime.GrabEdgeEventRethrow<TEvDataShard::TEvTestLoadResponse>(handle);
+ UNIT_ASSERT(handle);
+ auto response = handle->Release<TEvDataShard::TEvTestLoadResponse>();
+ auto& responseRecord = response->Record;
+ UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+
+ // holds memory for TCell
+ TVector<TString> from = {TString("user")};
+ TVector<TString> to = {TString("zzz")};
+
+ // busywait
+ while (1) {
+ auto request = GetBaseReadRequest();
+ AddRangeQuery(
+ *request,
+ from,
+ true,
+ to,
+ true
+ );
+
+ auto readResult = SendRead(request.release());
+ UNIT_ASSERT(readResult);
+ if (readResult->GetRowsCount() == expectedRowCount)
+ break;
+
+ SimulateSleep(Server, TDuration::Seconds(1));
+ }
+ }
+
+public:
+ Tests::TServer::TPtr Server;
+ TActorId Sender;
+ TTableInfo Table;
+
+ ui64 ReadId = 1;
+};
+
+} // anonymous
+
+Y_UNIT_TEST_SUITE(UpsertLoad) {
+ Y_UNIT_TEST(ShouldWriteDataBulkUpsert) {
+ TTestHelper helper;
+
+ const ui64 expectedRowCount = 10;
+
+ std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableBulkUpsertStart();
+
+ command.SetTag(1);
+ command.SetRowCount(expectedRowCount);
+ command.SetTabletId(helper.Table.TabletId);
+ command.SetTableId(helper.Table.UserTable.GetPathId());
+ command.SetInflight(3);
+
+ helper.TestLoad(std::move(request), expectedRowCount);
+ }
+
+ Y_UNIT_TEST(ShouldWriteDataBulkUpsertMkql) {
+ TTestHelper helper;
+
+ const ui64 expectedRowCount = 10;
+
+ std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableUpsertMkqlStart();
+
+ command.SetTag(1);
+ command.SetRowCount(expectedRowCount);
+ command.SetTabletId(helper.Table.TabletId);
+ command.SetTableId(helper.Table.UserTable.GetPathId());
+ command.SetInflight(3);
+
+ helper.TestLoad(std::move(request), expectedRowCount);
+ }
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp
index bc1ec221412..c5c54496db0 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp
@@ -112,8 +112,8 @@ public:
break;
}
- case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertStart: {
- const auto& cmd = record.GetUpsertStart();
+ case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertMkqlStart: {
+ const auto& cmd = record.GetUpsertMkqlStart();
const ui64 tag = GetOrGenerateTag(cmd);
if (LoadActors.count(tag) != 0) {
ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
diff --git a/ydb/core/tx/datashard/testload/test_load_upsert.cpp b/ydb/core/tx/datashard/testload/test_load_upsert.cpp
index 58f32872e20..fb0a2484f7b 100644
--- a/ydb/core/tx/datashard/testload/test_load_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_upsert.cpp
@@ -22,7 +22,7 @@ namespace {
enum class ERequestType {
BulkUpsert,
- Upsert,
+ UpsertMkql,
};
TString GetKey(size_t n) {
@@ -109,7 +109,7 @@ TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType)
case ERequestType::BulkUpsert:
requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum));
break;
- case ERequestType::Upsert:
+ case ERequestType::UpsertMkql:
requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum));
break;
}
@@ -307,7 +307,7 @@ NActors::IActor *CreateBulkUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequ
NActors::IActor *CreateUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
{
- return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::Upsert);
+ return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertMkql);
}
} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt b/ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..5e53efa7975
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_testload/CMakeLists.darwin.txt
@@ -0,0 +1,56 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_testload)
+target_compile_options(ydb-core-tx-datashard-ut_testload PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_testload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_testload PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-datashard
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ ydb-core-testlib
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_testload PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-tx-datashard-ut_testload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_testload.cpp
+)
+add_test(
+ NAME
+ ydb-core-tx-datashard-ut_testload
+ COMMAND
+ ydb-core-tx-datashard-ut_testload
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-tx-datashard-ut_testload)
diff --git a/ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt b/ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt
new file mode 100644
index 00000000000..d42847ee463
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_testload/CMakeLists.linux.txt
@@ -0,0 +1,60 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_testload)
+target_compile_options(ydb-core-tx-datashard-ut_testload PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_testload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_testload PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-datashard
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ ydb-core-testlib
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_testload PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-datashard-ut_testload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_testload.cpp
+)
+add_test(
+ NAME
+ ydb-core-tx-datashard-ut_testload
+ COMMAND
+ ydb-core-tx-datashard-ut_testload
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-tx-datashard-ut_testload)
diff --git a/ydb/core/tx/datashard/ut_testload/CMakeLists.txt b/ydb/core/tx/datashard/ut_testload/CMakeLists.txt
new file mode 100644
index 00000000000..dbfe6fa2c47
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_testload/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE)
+ include(CMakeLists.linux.txt)
+endif()