diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-15 18:35:53 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-15 18:35:53 +0300 |
commit | 3895bf6da495927577224d3cb636e536f78478fa (patch) | |
tree | a1d4e3627638078286eec508d40e334db6571517 | |
parent | 2553b9e00feb59b262b8db6d94aa6a2ea4f19119 (diff) | |
download | ydb-3895bf6da495927577224d3cb636e536f78478fa.tar.gz |
proper event space and proto for load actors, skeleton for read iter load actor
-rw-r--r-- | ydb/core/base/events.h | 1 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_datashard_load.cpp | 12 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 51 | ||||
-rw-r--r-- | ydb/core/protos/msgbus.proto | 4 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 34 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/actors.h | 43 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.cpp | 54 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.h | 64 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_read_iterator.cpp | 90 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_upsert.cpp | 74 |
15 files changed, 299 insertions, 164 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 2e68d4ec1da..b17cc32123a 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -147,6 +147,7 @@ struct TKikimrEvents : TEvents { ES_REPLICATION_CONTROLLER, ES_HTTP_PROXY, ES_BLOB_DEPOT, + ES_DATASHARD_LOAD, }; }; diff --git a/ydb/core/client/server/msgbus_server_datashard_load.cpp b/ydb/core/client/server/msgbus_server_datashard_load.cpp index 323ba5df0c6..ac7b1cc32e4 100644 --- a/ydb/core/client/server/msgbus_server_datashard_load.cpp +++ b/ydb/core/client/server/msgbus_server_datashard_load.cpp @@ -1,13 +1,13 @@ #include "msgbus_servicereq.h" #include <ydb/core/base/services/datashard_service_id.h> -#include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/tx/datashard/testload/test_load_actor.h> namespace NKikimr::NMsgBusProxy { class TDsTestLoadActorRequest : public TActorBootstrapped<TDsTestLoadActorRequest>, public TMessageBusSessionIdentHolder { ui32 NodeId = 0; - NKikimrTxDataShard::TEvTestLoadRequest Cmd; + NKikimrDataShardLoad::TEvTestLoadRequest Cmd; NKikimrClient::TDsTestLoadResponse Response; public: @@ -22,15 +22,15 @@ public: {} void Bootstrap(const TActorContext& ctx) { - auto msg = MakeHolder<TEvDataShard::TEvTestLoadRequest>(); + auto msg = std::make_unique<TEvDataShardLoad::TEvTestLoadRequest>(); msg->Record = Cmd; msg->Record.SetCookie(NodeId); - ctx.Send(MakeDataShardLoadId(NodeId), msg.Release()); + ctx.Send(MakeDataShardLoadId(NodeId), msg.release()); Become(&TDsTestLoadActorRequest::StateFunc); } - void Handle(TEvDataShard::TEvTestLoadResponse::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvDataShardLoad::TEvTestLoadResponse::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; ui32 nodeId = record.GetCookie(); @@ -51,7 +51,7 @@ public: STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { - HFunc(TEvDataShard::TEvTestLoadResponse, Handle); + HFunc(TEvDataShardLoad::TEvTestLoadResponse, Handle); } } }; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 21d0a6efc12..02d64868730 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1840,7 +1840,7 @@ void TLoadInitializer::InitializeServices(NActors::TActorSystemSetup *setup, con setup->LocalServices.emplace_back(MakeBlobStorageLoadID(NodeId), TActorSetupCmd(bsActor, TMailboxType::HTSwap, appData->UserPoolId)); // FIXME: correct service id - IActor *dsActor = NDataShard::CreateTestLoadActor(appData->Counters); + IActor *dsActor = NDataShardLoad::CreateTestLoadActor(appData->Counters); setup->LocalServices.emplace_back(MakeDataShardLoadId(NodeId), TActorSetupCmd(dsActor, TMailboxType::HTSwap, appData->UserPoolId)); } diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt index a5b72ef56b7..4ab0d4b49e4 100644 --- a/ydb/core/protos/CMakeLists.txt +++ b/ydb/core/protos/CMakeLists.txt @@ -73,6 +73,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_mediator.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto new file mode 100644 index 00000000000..6ed5f767525 --- /dev/null +++ b/ydb/core/protos/datashard_load.proto @@ -0,0 +1,51 @@ +option cc_enable_arenas = true; + +import "ydb/core/protos/tx_datashard.proto"; + +package NKikimrDataShardLoad; +option java_package = "ru.yandex.kikimr.proto"; + +message TEvTestLoadRequest { + message TLoadStop { + optional uint64 Tag = 1; + optional bool RemoveAllTags = 2; + } + + message TUpdateStart { + optional uint64 Tag = 1; + optional uint64 RowCount = 2; + optional uint64 TabletId = 3; + optional uint64 TableId = 4; + optional uint32 Inflight = 5; + + // in some cases we need full DB path with table + optional string Path = 6; + } + + message TReadStart { + optional uint64 Tag = 1; + optional uint64 TabletId = 2; + optional uint64 TableId = 3; + + // Specifies the format for result data in TEvReadResult + optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 4; + } + + optional uint64 Cookie = 1; + + oneof Command { + TLoadStop LoadStop = 2; + TUpdateStart UpsertBulkStart = 3; + TUpdateStart UpsertLocalMkqlStart = 4; + TUpdateStart UpsertKqpStart = 5; + TUpdateStart UpsertProposeStart = 6; + + TReadStart ReadIteratorStart = 7; + } +} + +message TEvTestLoadResponse { + optional uint32 Status = 1; // EResponseStatus from ydb/core/client/base/msgbus.h + optional string ErrorReason = 2; + optional uint64 Cookie = 3; +} diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index fcf4b60b35a..02e7fa02bd6 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -3,9 +3,9 @@ import "library/cpp/actors/protos/interconnect.proto"; import "ydb/core/protos/console_base.proto"; import "ydb/core/protos/console_config.proto"; import "ydb/core/protos/console_tenant.proto"; +import "ydb/core/protos/datashard_load.proto"; import "ydb/core/protos/tablet_counters.proto"; import "ydb/core/protos/flat_scheme_op.proto"; -import "ydb/core/protos/tx_datashard.proto"; import "ydb/core/protos/tx_proxy.proto"; import "ydb/core/protos/tx_scheme.proto"; import "ydb/core/protos/node_whiteboard.proto"; @@ -464,7 +464,7 @@ message TSchemeOperationStatus { message TDsTestLoadRequest { optional uint32 NodeId = 1; - optional NKikimrTxDataShard.TEvTestLoadRequest Event = 2; + optional NKikimrDataShardLoad.TEvTestLoadRequest Event = 2; }; message TDsTestLoadResponse { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index d2dd9ae0898..6ad3b0942be 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1791,37 +1791,3 @@ message TEvReplicationSourceOffsetsCancel { // Cancels a previously started read for (Sender, ReadId) optional uint64 ReadId = 1; } - -message TEvTestLoadRequest { - message TLoadStop { - optional uint64 Tag = 1; - optional bool RemoveAllTags = 2; - } - - message TUpdateStart { - optional uint64 Tag = 1; - optional uint64 RowCount = 2; - optional uint64 TabletId = 3; - optional uint64 TableId = 4; - optional uint32 Inflight = 5; - - // in some cases we need full DB path with table - optional string Path = 6; - } - - optional uint64 Cookie = 1; - - oneof Command { - TLoadStop LoadStop = 2; - TUpdateStart BulkUpsertStart = 3; - TUpdateStart UpsertLocalMkqlStart = 4; - TUpdateStart UpsertKqpStart = 5; - TUpdateStart UpsertStart = 6; - } -} - -message TEvTestLoadResponse { - optional uint32 Status = 1; // EResponseStatus from ydb/core/client/base/msgbus.h - optional string ErrorReason = 2; - optional uint64 Cookie = 3; -} diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 6b34a9ab7c3..7a7e362aea8 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -1573,22 +1573,6 @@ struct TEvDataShard { } }; - struct TEvTestLoadRequest - : public TEventPB<TEvTestLoadRequest, - NKikimrTxDataShard::TEvTestLoadRequest, - EvTestLoadRequest> - { - TEvTestLoadRequest() = default; - }; - - struct TEvTestLoadResponse - : public TEventPB<TEvTestLoadResponse, - NKikimrTxDataShard::TEvTestLoadResponse, - EvTestLoadResponse> - { - TEvTestLoadResponse() = default; - }; - }; IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info); diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index 0b6792869f6..671e1df1474 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -8,7 +8,7 @@ namespace NKikimr { -using namespace NKikimr::NDataShard; +using namespace NKikimr::NDataShardLoad; using namespace NSchemeShard; using namespace Tests; @@ -159,7 +159,7 @@ struct TTestHelper { return WaitReadResult(); } - void TestLoad(std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request, size_t expectedRowCount) { + void TestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request, size_t expectedRowCount) { auto &runtime = *Server->GetRuntime(); TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters()); auto testLoadActor = runtime.Register(CreateTestLoadActor(counters)); @@ -167,9 +167,9 @@ struct TTestHelper { runtime.Send(new IEventHandle(testLoadActor, Sender, request.release()), 0, true); TAutoPtr<IEventHandle> handle; - runtime.GrabEdgeEventRethrow<TEvDataShard::TEvTestLoadResponse>(handle); + runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadResponse>(handle); UNIT_ASSERT(handle); - auto response = handle->Release<TEvDataShard::TEvTestLoadResponse>(); + auto response = handle->Release<TEvDataShardLoad::TEvTestLoadResponse>(); auto& responseRecord = response->Record; UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK); @@ -213,9 +213,9 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { const ui64 expectedRowCount = 10; - std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest()); + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); auto& record = request->Record; - auto& command = *record.MutableBulkUpsertStart(); + auto& command = *record.MutableUpsertBulkStart(); command.SetTag(1); command.SetRowCount(expectedRowCount); @@ -231,7 +231,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { const ui64 expectedRowCount = 10; - std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest()); + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); auto& record = request->Record; auto& command = *record.MutableUpsertLocalMkqlStart(); @@ -249,7 +249,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { const ui64 expectedRowCount = 20; - std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest()); + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); auto& record = request->Record; auto& command = *record.MutableUpsertKqpStart(); diff --git a/ydb/core/tx/datashard/testload/CMakeLists.txt b/ydb/core/tx/datashard/testload/CMakeLists.txt index fbb436a1391..762516e8b4e 100644 --- a/ydb/core/tx/datashard/testload/CMakeLists.txt +++ b/ydb/core/tx/datashard/testload/CMakeLists.txt @@ -24,4 +24,5 @@ target_link_libraries(tx-datashard-testload PUBLIC target_sources(tx-datashard-testload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_upsert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp ) diff --git a/ydb/core/tx/datashard/testload/actors.h b/ydb/core/tx/datashard/testload/actors.h index f800c3d3602..13dff18d8b5 100644 --- a/ydb/core/tx/datashard/testload/actors.h +++ b/ydb/core/tx/datashard/testload/actors.h @@ -1,51 +1,28 @@ #pragma once #include "defs.h" +#include "test_load_actor.h" #include <ydb/core/tx/datashard/datashard.h> -namespace NKikimr::NDataShard { +namespace NKikimr::NDataShardLoad { -NActors::IActor *CreateBulkUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag); -NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag); -NActors::IActor *CreateKqpUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag); -NActors::IActor *CreateUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag); -class TLoadActorException : public yexception { -}; - -struct TLoadReport { - TDuration Duration; - ui64 OperationsOK = 0; - ui64 OperationsError = 0; - - TString ToString() const { - TStringStream ss; - ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError; - if (OperationsOK && Duration.Seconds()) { - ui64 throughput = OperationsOK / Duration.Seconds(); - ss << ", throughput=" << throughput << " OK_ops/s"; - } - return ss.Str(); - } -}; - -struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, TEvDataShard::EvTestLoadFinished> { - ui64 Tag; - std::optional<TLoadReport> Report; - TString ErrorReason; +NActors::IActor *CreateReadIteratorActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart& cmd, + const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag); - TEvTestLoadFinished(ui64 tag, const TString& error = {}) - : Tag(tag) - , ErrorReason(error) - {} +class TLoadActorException : public yexception { }; #define VERIFY_PARAM2(FIELD, NAME) \ @@ -57,4 +34,4 @@ struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, TEvDataShar #define VERIFY_PARAM(NAME) VERIFY_PARAM2(cmd, NAME) -} // NKikimr::NDataShard +} // NKikimr::NDataShardLoad diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp index 205d161d64e..07194b91aa8 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.cpp +++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp @@ -9,7 +9,7 @@ #include <google/protobuf/text_format.h> #include <library/cpp/monlib/service/pages/templates.h> -namespace NKikimr::NDataShard { +namespace NKikimr::NDataShardLoad { class TLoadActor : public TActorBootstrapped<TLoadActor> { // per-actor HTTP info @@ -31,7 +31,7 @@ class TLoadActor : public TActorBootstrapped<TLoadActor> { ui64 Tag; TString ErrorReason; TInstant FinishTime; - std::optional<TLoadReport> Report; + std::optional<TEvDataShardLoad::TLoadReport> Report; }; // info about finished actors @@ -62,7 +62,7 @@ public: Become(&TLoadActor::StateFunc); } - void Handle(TEvDataShard::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvDataShardLoad::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) { ui32 status = NMsgBusProxy::MSTATUS_OK; TString error; const auto& record = ev->Get()->Record; @@ -74,7 +74,7 @@ public: status = NMsgBusProxy::MSTATUS_ERROR; error = ex.what(); } - auto response = std::make_unique<TEvDataShard::TEvTestLoadResponse>(); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadResponse>(); response->Record.SetStatus(status); if (error) { response->Record.SetErrorReason(error); @@ -98,21 +98,21 @@ public: } } - void ProcessCmd(const NKikimrTxDataShard::TEvTestLoadRequest& record, const TActorContext& ctx) { + void ProcessCmd(const NKikimrDataShardLoad::TEvTestLoadRequest& record, const TActorContext& ctx) { switch (record.Command_case()) { - case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kBulkUpsertStart: { - const auto& cmd = record.GetBulkUpsertStart(); + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart: { + const auto& cmd = record.GetUpsertBulkStart(); const ui64 tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new bulk upsert load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateBulkUpsertActor(cmd, ctx.SelfID, + LoadActors.emplace(tag, ctx.Register(CreateUpsertBulkActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag))); break; } - case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: { + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: { const auto& cmd = record.GetUpsertLocalMkqlStart(); const ui64 tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { @@ -124,7 +124,7 @@ public: break; } - case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: { + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: { const auto& cmd = record.GetUpsertKqpStart(); const ui64 tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { @@ -136,19 +136,31 @@ public: break; } - case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertStart: { - const auto& cmd = record.GetUpsertStart(); + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: { + const auto& cmd = record.GetUpsertProposeStart(); const ui64 tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new upsert load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateUpsertActor(cmd, ctx.SelfID, + LoadActors.emplace(tag, ctx.Register(CreateProposeUpsertActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag))); break; } - case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kLoadStop: { + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart: { + const auto& cmd = record.GetReadIteratorStart(); + const ui64 tag = GetOrGenerateTag(cmd); + if (LoadActors.count(tag) != 0) { + ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); + } + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new read iterator load actor# " << tag); + LoadActors.emplace(tag, ctx.Register(CreateReadIteratorActor(cmd, ctx.SelfID, + GetServiceCounters(Counters, "load_actor"), tag))); + break; + } + + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kLoadStop: { const auto& cmd = record.GetLoadStop(); if (cmd.HasRemoveAllTags() && cmd.GetRemoveAllTags()) { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete all running load actors"); @@ -176,14 +188,14 @@ public: TString protoTxt; google::protobuf::TextFormat::PrintToString(record, &protoTxt); ythrow TLoadActorException() << (TStringBuilder() - << "TLoadActor::Handle(TEvDataShard::TEvTestLoadRequest): unexpected command case: " + << "TLoadActor::Handle(TEvDataShardLoad::TEvTestLoadRequest): unexpected command case: " << ui32(record.Command_case()) << " protoTxt# " << protoTxt.Quote()); } } } - void Handle(TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { const auto& msg = ev->Get(); auto it = LoadActors.find(msg->Tag); Y_VERIFY(it != LoadActors.end()); @@ -225,7 +237,7 @@ public: const auto& params = ev->Get()->Request.GetParams(); if (params.Has("protobuf")) { - NKikimrTxDataShard::TEvTestLoadRequest record; + NKikimrDataShardLoad::TEvTestLoadRequest record; bool status = google::protobuf::TextFormat::ParseFromString(params.Get("protobuf"), &record); if (status) { try { @@ -283,7 +295,7 @@ public: THttpInfoRequest& info = it->second; #define PROFILE(NAME) \ - str << "<option value=\"" << ui32(NKikimrTxDataShard::TEvTestLoadRequest::NAME) << "\">" << #NAME << "</option>"; + str << "<option value=\"" << ui32(NKikimrDataShardLoad::TEvTestLoadRequest::NAME) << "\">" << #NAME << "</option>"; #define PUT_HANDLE_CLASS(NAME) \ str << "<option value=\"" << ui32(NKikimrTxDataShard::NAME) << "\">" << #NAME << "</option>"; @@ -335,8 +347,8 @@ public: } STRICT_STFUNC(StateFunc, - HFunc(TEvDataShard::TEvTestLoadRequest, Handle) - HFunc(TEvTestLoadFinished, Handle) + HFunc(TEvDataShardLoad::TEvTestLoadRequest, Handle) + HFunc(TEvDataShardLoad::TEvTestLoadFinished, Handle) HFunc(NMon::TEvHttpInfo, Handle) HFunc(NMon::TEvHttpInfoRes, Handle) ) @@ -346,4 +358,4 @@ NActors::IActor *CreateTestLoadActor(const TIntrusivePtr<::NMonitoring::TDynamic return new TLoadActor(counters); } -} // NKikimr::NDataShard
\ No newline at end of file +} // NKikimr::NDataShardLoad
\ No newline at end of file diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h index fd82484c47e..acb6d7cdeac 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.h +++ b/ydb/core/tx/datashard/testload/test_load_actor.h @@ -2,8 +2,68 @@ #include "defs.h" -namespace NKikimr::NDataShard { +#include <ydb/core/base/events.h> +#include <ydb/core/protos/datashard_load.pb.h> + +#include <library/cpp/actors/core/event_pb.h> + +namespace NKikimr { + +struct TEvDataShardLoad { + enum EEv { + EvTestLoadRequest = EventSpaceBegin(TKikimrEvents::ES_DATASHARD_LOAD), + EvTestLoadResponse, + + EvTestLoadFinished, + }; + + struct TEvTestLoadRequest + : public TEventPB<TEvTestLoadRequest, + NKikimrDataShardLoad::TEvTestLoadRequest, + EvTestLoadRequest> + { + TEvTestLoadRequest() = default; + }; + + struct TEvTestLoadResponse + : public TEventPB<TEvTestLoadResponse, + NKikimrDataShardLoad::TEvTestLoadResponse, + EvTestLoadResponse> + { + TEvTestLoadResponse() = default; + }; + + struct TLoadReport { + TDuration Duration; + ui64 OperationsOK = 0; + ui64 OperationsError = 0; + + TString ToString() const { + TStringStream ss; + ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError; + if (OperationsOK && Duration.Seconds()) { + ui64 throughput = OperationsOK / Duration.Seconds(); + ss << ", throughput=" << throughput << " OK_ops/s"; + } + return ss.Str(); + } + }; + + struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, EvTestLoadFinished> { + ui64 Tag; + std::optional<TLoadReport> Report; + TString ErrorReason; + + TEvTestLoadFinished(ui64 tag, const TString& error = {}) + : Tag(tag) + , ErrorReason(error) + {} + }; +}; + +namespace NDataShardLoad { NActors::IActor *CreateTestLoadActor(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters); -} // NKikimr::NDataShard +} // NDataShardLoad +} // NKikimr diff --git a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp new file mode 100644 index 00000000000..8531888715d --- /dev/null +++ b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp @@ -0,0 +1,90 @@ +#include "actors.h" + +#include <ydb/core/base/tablet.h> +#include <ydb/core/base/tablet_pipe.h> + +#include <library/cpp/monlib/service/pages/templates.h> + +#include <util/datetime/cputimer.h> +#include <util/random/random.h> + +#include <google/protobuf/text_format.h> + +// * Scheme is hardcoded and it is like default YCSB setup: +// table name is "usertable", 1 utf8 "key" column, 10 utf8 "field0" - "field9" columns +// * row is ~ 1 KB, keys are like user1000385178204227360 + +namespace NKikimr::NDataShardLoad { + +namespace { + +class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadScenario> { + const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart Config; + const TActorId Parent; + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + const ui64 Tag; + + TInstant StartTs; + + TString ConfingString; + +public: + TReadIteratorLoadScenario(const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart& cmd, const TActorId& parent, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) + : Config(cmd) + , Parent(parent) + , Counters(std::move(counters)) + , Tag(tag) + { + google::protobuf::TextFormat::PrintToString(cmd, &ConfingString); + } + + void Bootstrap(const TActorContext& ctx) { + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " Bootstrap called: " << ConfingString); + + Become(&TReadIteratorLoadScenario::StateFunc); + } + +private: + void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { + TStringStream str; + HTML(str) { + str << "ReadIteratorLoadScenario# " << Tag << " started on " << StartTs; + } + ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId)); + } + + void HandlePoison(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " tablet recieved PoisonPill, going to die"); + Stop(ctx); + } + + void StopWithError(const TActorContext& ctx, const TString& reason) { + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag + << " stopped with error: " << reason); + + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); + Stop(ctx); + } + + void Stop(const TActorContext& ctx) { + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoison) + HFunc(NMon::TEvHttpInfo, Handle) + ) +}; + +} // anonymous + +NActors::IActor *CreateReadIteratorActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart& cmd, + const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) +{ + return new TReadIteratorLoadScenario(cmd, parent, std::move(counters), tag); +} + +} // NKikimr::NDataShardLoad diff --git a/ydb/core/tx/datashard/testload/test_load_upsert.cpp b/ydb/core/tx/datashard/testload/test_load_upsert.cpp index 6c244578dd8..e13a219c647 100644 --- a/ydb/core/tx/datashard/testload/test_load_upsert.cpp +++ b/ydb/core/tx/datashard/testload/test_load_upsert.cpp @@ -21,7 +21,7 @@ // table name is "usertable", 1 utf8 "key" column, 10 utf8 "field0" - "field9" columns // * row is ~ 1 KB, keys are like user1000385178204227360 -namespace NKikimr::NDataShard { +namespace NKikimr::NDataShardLoad { using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>; @@ -31,10 +31,8 @@ using TRequestsVector = std::vector<TUploadRequest>; namespace { enum class ERequestType { - BulkUpsert, + UpsertBulk, UpsertLocalMkql, - KqpUpsert, - Upsert, }; TString GetKey(size_t n) { @@ -114,19 +112,12 @@ TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum) { (let row1_ '('('key (Utf8 '%s)))) )", key.data()) + programWithoutKey; - auto request = std::unique_ptr<TEvTablet::TEvLocalMKQL>(new TEvTablet::TEvLocalMKQL); + auto request = std::make_unique<TEvTablet::TEvLocalMKQL>(); request->Record.MutableProgram()->MutableProgram()->SetText(programText); return TUploadRequest(request.release()); } -TUploadRequest GenerateRowRequest(ui64 tableId, ui64 keyNum) { - Y_UNUSED(tableId); - TString key = GetKey(keyNum); - - return nullptr; -} - TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType) { TRequestsVector requests; requests.reserve(n); @@ -134,15 +125,12 @@ TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType) for (size_t i = 0; i < n; ++i) { auto keyNum = RandomNumber(Max<ui64>()); switch (requestType) { - case ERequestType::BulkUpsert: + case ERequestType::UpsertBulk: requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum)); break; case ERequestType::UpsertLocalMkql: requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum)); break; - case ERequestType::Upsert: - requests.emplace_back(GenerateRowRequest(tableId, keyNum)); - break; default: // should not happen, just for compiler Y_FAIL("Unsupported request type"); @@ -209,7 +197,7 @@ TQueryInfo GenerateUpsert(size_t n) { // it's a partial copy-paste from TUpsertActor: logic slightly differs, so that // it seems better to have copy-paste rather if/else for different loads class TKqpUpsertActor : public TActorBootstrapped<TKqpUpsertActor> { - const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart Config; + const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config; const TActorId Parent; const ui64 Tag; const TString Path; @@ -227,7 +215,7 @@ class TKqpUpsertActor : public TActorBootstrapped<TKqpUpsertActor> { size_t Errors = 0; public: - TKqpUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent, + TKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag, TRequestsVector requests) : Config(cmd) , Parent(parent) @@ -291,8 +279,8 @@ private: } else if (Inflight == 0) { EndTs = TInstant::Now(); auto delta = EndTs - StartTs; - std::unique_ptr<TEvTestLoadFinished> response(new TEvTestLoadFinished(Tag)); - response->Report = TLoadReport(); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + response->Report = TEvDataShardLoad::TLoadReport(); response->Report->Duration = delta; response->Report->OperationsOK = Requests.size() - Errors; response->Report->OperationsError = Errors; @@ -338,7 +326,7 @@ private: void StopWithError(const TActorContext& ctx, const TString& reason) { LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, reason)); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); Die(ctx); } @@ -366,7 +354,7 @@ private: // creates multiple TKqpUpsertActor for inflight > 1 and waits completion class TKqpUpsertActorMultiSession : public TActorBootstrapped<TKqpUpsertActorMultiSession> { - const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart Config; + const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config; const TActorId Parent; const ui64 Tag; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; @@ -385,7 +373,7 @@ class TKqpUpsertActorMultiSession : public TActorBootstrapped<TKqpUpsertActorMul size_t Errors = 0; public: - TKqpUpsertActorMultiSession(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent, + TKqpUpsertActorMultiSession(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) : Config(cmd) , Parent(parent) @@ -421,7 +409,7 @@ private: for (size_t i = 0; i < requestsPerActor; ++i) { auto queryInfo = GenerateUpsert(rowCount++); - std::unique_ptr<NKqp::TEvKqp::TEvQueryRequest> request(new NKqp::TEvKqp::TEvQueryRequest()); + auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); request->Record.MutableRequest()->SetKeepSession(true); request->Record.MutableRequest()->SetDatabase(Path); @@ -466,7 +454,7 @@ private: << " started# " << actorsCount << " actors each with inflight# " << requestsPerActor); } - void Handle(const TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { + void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { const auto* msg = ev->Get(); if (msg->ErrorReason || !msg->Report) { TStringStream ss; @@ -487,8 +475,8 @@ private: if (Inflight == 0) { EndTs = TInstant::Now(); auto delta = EndTs - StartTs; - std::unique_ptr<TEvTestLoadFinished> response(new TEvTestLoadFinished(Tag)); - response->Report = TLoadReport(); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + response->Report = TEvDataShardLoad::TLoadReport(); response->Report->Duration = delta; response->Report->OperationsOK = Oks; response->Report->OperationsError = Errors; @@ -519,7 +507,7 @@ private: LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag << " stopped with error: " << reason); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, reason)); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); Stop(ctx); } @@ -534,14 +522,14 @@ private: STRICT_STFUNC(StateFunc, CFunc(TEvents::TSystem::PoisonPill, HandlePoison) HFunc(NMon::TEvHttpInfo, Handle) - HFunc(TEvTestLoadFinished, Handle); + HFunc(TEvDataShardLoad::TEvTestLoadFinished, Handle); ) }; class TUpsertActor : public TActorBootstrapped<TUpsertActor> { - const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart Config; + const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config; const TActorId Parent; const ui64 Tag; const ERequestType RequestType; @@ -563,7 +551,7 @@ public: return NKikimrServices::TActivity::DS_LOAD_ACTOR; } - TUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent, + TUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag, ERequestType requestType) : Config(cmd) , Parent(parent) @@ -632,8 +620,8 @@ private: } else if (Inflight == 0) { EndTs = TInstant::Now(); auto delta = EndTs - StartTs; - std::unique_ptr<TEvTestLoadFinished> response(new TEvTestLoadFinished(Tag)); - response->Report = TLoadReport(); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + response->Report = TEvDataShardLoad::TLoadReport(); response->Report->Duration = delta; response->Report->OperationsOK = Requests.size() - Errors; response->Report->OperationsError = Errors; @@ -701,7 +689,7 @@ private: void StopWithError(const TActorContext& ctx, const TString& reason) { LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, reason)); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); NTabletPipe::CloseClient(SelfId(), Pipe); Die(ctx); } @@ -717,28 +705,32 @@ private: ) }; -NActors::IActor *CreateBulkUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) { - return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::BulkUpsert); + return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertBulk); } -NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) { return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertLocalMkql); } -NActors::IActor *CreateKqpUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) { return new TKqpUpsertActorMultiSession(cmd, parent, std::move(counters), tag); } -NActors::IActor *CreateUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, +NActors::IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) { - return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::Upsert); + Y_UNUSED(cmd); + Y_UNUSED(parent); + Y_UNUSED(counters); + Y_UNUSED(tag); + return nullptr; // not yet implemented } -} // NKikimr::NDataShard +} // NKikimr::NDataShardLoad |