aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-15 18:35:53 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-15 18:35:53 +0300
commit3895bf6da495927577224d3cb636e536f78478fa (patch)
treea1d4e3627638078286eec508d40e334db6571517
parent2553b9e00feb59b262b8db6d94aa6a2ea4f19119 (diff)
downloadydb-3895bf6da495927577224d3cb636e536f78478fa.tar.gz
proper event space and proto for load actors, skeleton for read iter load actor
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/client/server/msgbus_server_datashard_load.cpp12
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp2
-rw-r--r--ydb/core/protos/CMakeLists.txt1
-rw-r--r--ydb/core/protos/datashard_load.proto51
-rw-r--r--ydb/core/protos/msgbus.proto4
-rw-r--r--ydb/core/protos/tx_datashard.proto34
-rw-r--r--ydb/core/tx/datashard/datashard.h16
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp16
-rw-r--r--ydb/core/tx/datashard/testload/CMakeLists.txt1
-rw-r--r--ydb/core/tx/datashard/testload/actors.h43
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.cpp54
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.h64
-rw-r--r--ydb/core/tx/datashard/testload/test_load_read_iterator.cpp90
-rw-r--r--ydb/core/tx/datashard/testload/test_load_upsert.cpp74
15 files changed, 299 insertions, 164 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index 2e68d4ec1da..b17cc32123a 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -147,6 +147,7 @@ struct TKikimrEvents : TEvents {
ES_REPLICATION_CONTROLLER,
ES_HTTP_PROXY,
ES_BLOB_DEPOT,
+ ES_DATASHARD_LOAD,
};
};
diff --git a/ydb/core/client/server/msgbus_server_datashard_load.cpp b/ydb/core/client/server/msgbus_server_datashard_load.cpp
index 323ba5df0c6..ac7b1cc32e4 100644
--- a/ydb/core/client/server/msgbus_server_datashard_load.cpp
+++ b/ydb/core/client/server/msgbus_server_datashard_load.cpp
@@ -1,13 +1,13 @@
#include "msgbus_servicereq.h"
#include <ydb/core/base/services/datashard_service_id.h>
-#include <ydb/core/tx/datashard/datashard.h>
+#include <ydb/core/tx/datashard/testload/test_load_actor.h>
namespace NKikimr::NMsgBusProxy {
class TDsTestLoadActorRequest : public TActorBootstrapped<TDsTestLoadActorRequest>, public TMessageBusSessionIdentHolder {
ui32 NodeId = 0;
- NKikimrTxDataShard::TEvTestLoadRequest Cmd;
+ NKikimrDataShardLoad::TEvTestLoadRequest Cmd;
NKikimrClient::TDsTestLoadResponse Response;
public:
@@ -22,15 +22,15 @@ public:
{}
void Bootstrap(const TActorContext& ctx) {
- auto msg = MakeHolder<TEvDataShard::TEvTestLoadRequest>();
+ auto msg = std::make_unique<TEvDataShardLoad::TEvTestLoadRequest>();
msg->Record = Cmd;
msg->Record.SetCookie(NodeId);
- ctx.Send(MakeDataShardLoadId(NodeId), msg.Release());
+ ctx.Send(MakeDataShardLoadId(NodeId), msg.release());
Become(&TDsTestLoadActorRequest::StateFunc);
}
- void Handle(TEvDataShard::TEvTestLoadResponse::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvDataShardLoad::TEvTestLoadResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
ui32 nodeId = record.GetCookie();
@@ -51,7 +51,7 @@ public:
STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvDataShard::TEvTestLoadResponse, Handle);
+ HFunc(TEvDataShardLoad::TEvTestLoadResponse, Handle);
}
}
};
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 21d0a6efc12..02d64868730 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -1840,7 +1840,7 @@ void TLoadInitializer::InitializeServices(NActors::TActorSystemSetup *setup, con
setup->LocalServices.emplace_back(MakeBlobStorageLoadID(NodeId), TActorSetupCmd(bsActor, TMailboxType::HTSwap, appData->UserPoolId));
// FIXME: correct service id
- IActor *dsActor = NDataShard::CreateTestLoadActor(appData->Counters);
+ IActor *dsActor = NDataShardLoad::CreateTestLoadActor(appData->Counters);
setup->LocalServices.emplace_back(MakeDataShardLoadId(NodeId), TActorSetupCmd(dsActor, TMailboxType::HTSwap, appData->UserPoolId));
}
diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt
index a5b72ef56b7..4ab0d4b49e4 100644
--- a/ydb/core/protos/CMakeLists.txt
+++ b/ydb/core/protos/CMakeLists.txt
@@ -73,6 +73,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_mediator.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
new file mode 100644
index 00000000000..6ed5f767525
--- /dev/null
+++ b/ydb/core/protos/datashard_load.proto
@@ -0,0 +1,51 @@
+option cc_enable_arenas = true;
+
+import "ydb/core/protos/tx_datashard.proto";
+
+package NKikimrDataShardLoad;
+option java_package = "ru.yandex.kikimr.proto";
+
+message TEvTestLoadRequest {
+ message TLoadStop {
+ optional uint64 Tag = 1;
+ optional bool RemoveAllTags = 2;
+ }
+
+ message TUpdateStart {
+ optional uint64 Tag = 1;
+ optional uint64 RowCount = 2;
+ optional uint64 TabletId = 3;
+ optional uint64 TableId = 4;
+ optional uint32 Inflight = 5;
+
+ // in some cases we need full DB path with table
+ optional string Path = 6;
+ }
+
+ message TReadStart {
+ optional uint64 Tag = 1;
+ optional uint64 TabletId = 2;
+ optional uint64 TableId = 3;
+
+ // Specifies the format for result data in TEvReadResult
+ optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 4;
+ }
+
+ optional uint64 Cookie = 1;
+
+ oneof Command {
+ TLoadStop LoadStop = 2;
+ TUpdateStart UpsertBulkStart = 3;
+ TUpdateStart UpsertLocalMkqlStart = 4;
+ TUpdateStart UpsertKqpStart = 5;
+ TUpdateStart UpsertProposeStart = 6;
+
+ TReadStart ReadIteratorStart = 7;
+ }
+}
+
+message TEvTestLoadResponse {
+ optional uint32 Status = 1; // EResponseStatus from ydb/core/client/base/msgbus.h
+ optional string ErrorReason = 2;
+ optional uint64 Cookie = 3;
+}
diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto
index fcf4b60b35a..02e7fa02bd6 100644
--- a/ydb/core/protos/msgbus.proto
+++ b/ydb/core/protos/msgbus.proto
@@ -3,9 +3,9 @@ import "library/cpp/actors/protos/interconnect.proto";
import "ydb/core/protos/console_base.proto";
import "ydb/core/protos/console_config.proto";
import "ydb/core/protos/console_tenant.proto";
+import "ydb/core/protos/datashard_load.proto";
import "ydb/core/protos/tablet_counters.proto";
import "ydb/core/protos/flat_scheme_op.proto";
-import "ydb/core/protos/tx_datashard.proto";
import "ydb/core/protos/tx_proxy.proto";
import "ydb/core/protos/tx_scheme.proto";
import "ydb/core/protos/node_whiteboard.proto";
@@ -464,7 +464,7 @@ message TSchemeOperationStatus {
message TDsTestLoadRequest {
optional uint32 NodeId = 1;
- optional NKikimrTxDataShard.TEvTestLoadRequest Event = 2;
+ optional NKikimrDataShardLoad.TEvTestLoadRequest Event = 2;
};
message TDsTestLoadResponse {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index d2dd9ae0898..6ad3b0942be 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1791,37 +1791,3 @@ message TEvReplicationSourceOffsetsCancel {
// Cancels a previously started read for (Sender, ReadId)
optional uint64 ReadId = 1;
}
-
-message TEvTestLoadRequest {
- message TLoadStop {
- optional uint64 Tag = 1;
- optional bool RemoveAllTags = 2;
- }
-
- message TUpdateStart {
- optional uint64 Tag = 1;
- optional uint64 RowCount = 2;
- optional uint64 TabletId = 3;
- optional uint64 TableId = 4;
- optional uint32 Inflight = 5;
-
- // in some cases we need full DB path with table
- optional string Path = 6;
- }
-
- optional uint64 Cookie = 1;
-
- oneof Command {
- TLoadStop LoadStop = 2;
- TUpdateStart BulkUpsertStart = 3;
- TUpdateStart UpsertLocalMkqlStart = 4;
- TUpdateStart UpsertKqpStart = 5;
- TUpdateStart UpsertStart = 6;
- }
-}
-
-message TEvTestLoadResponse {
- optional uint32 Status = 1; // EResponseStatus from ydb/core/client/base/msgbus.h
- optional string ErrorReason = 2;
- optional uint64 Cookie = 3;
-}
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 6b34a9ab7c3..7a7e362aea8 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -1573,22 +1573,6 @@ struct TEvDataShard {
}
};
- struct TEvTestLoadRequest
- : public TEventPB<TEvTestLoadRequest,
- NKikimrTxDataShard::TEvTestLoadRequest,
- EvTestLoadRequest>
- {
- TEvTestLoadRequest() = default;
- };
-
- struct TEvTestLoadResponse
- : public TEventPB<TEvTestLoadResponse,
- NKikimrTxDataShard::TEvTestLoadResponse,
- EvTestLoadResponse>
- {
- TEvTestLoadResponse() = default;
- };
-
};
IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info);
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
index 0b6792869f6..671e1df1474 100644
--- a/ydb/core/tx/datashard/datashard_ut_testload.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -8,7 +8,7 @@
namespace NKikimr {
-using namespace NKikimr::NDataShard;
+using namespace NKikimr::NDataShardLoad;
using namespace NSchemeShard;
using namespace Tests;
@@ -159,7 +159,7 @@ struct TTestHelper {
return WaitReadResult();
}
- void TestLoad(std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request, size_t expectedRowCount) {
+ void TestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request, size_t expectedRowCount) {
auto &runtime = *Server->GetRuntime();
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters());
auto testLoadActor = runtime.Register(CreateTestLoadActor(counters));
@@ -167,9 +167,9 @@ struct TTestHelper {
runtime.Send(new IEventHandle(testLoadActor, Sender, request.release()), 0, true);
TAutoPtr<IEventHandle> handle;
- runtime.GrabEdgeEventRethrow<TEvDataShard::TEvTestLoadResponse>(handle);
+ runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadResponse>(handle);
UNIT_ASSERT(handle);
- auto response = handle->Release<TEvDataShard::TEvTestLoadResponse>();
+ auto response = handle->Release<TEvDataShardLoad::TEvTestLoadResponse>();
auto& responseRecord = response->Record;
UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK);
@@ -213,9 +213,9 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
const ui64 expectedRowCount = 10;
- std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest());
+ std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
auto& record = request->Record;
- auto& command = *record.MutableBulkUpsertStart();
+ auto& command = *record.MutableUpsertBulkStart();
command.SetTag(1);
command.SetRowCount(expectedRowCount);
@@ -231,7 +231,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
const ui64 expectedRowCount = 10;
- std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest());
+ std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
auto& record = request->Record;
auto& command = *record.MutableUpsertLocalMkqlStart();
@@ -249,7 +249,7 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
const ui64 expectedRowCount = 20;
- std::unique_ptr<TEvDataShard::TEvTestLoadRequest> request(new TEvDataShard::TEvTestLoadRequest());
+ std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
auto& record = request->Record;
auto& command = *record.MutableUpsertKqpStart();
diff --git a/ydb/core/tx/datashard/testload/CMakeLists.txt b/ydb/core/tx/datashard/testload/CMakeLists.txt
index fbb436a1391..762516e8b4e 100644
--- a/ydb/core/tx/datashard/testload/CMakeLists.txt
+++ b/ydb/core/tx/datashard/testload/CMakeLists.txt
@@ -24,4 +24,5 @@ target_link_libraries(tx-datashard-testload PUBLIC
target_sources(tx-datashard-testload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_upsert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
)
diff --git a/ydb/core/tx/datashard/testload/actors.h b/ydb/core/tx/datashard/testload/actors.h
index f800c3d3602..13dff18d8b5 100644
--- a/ydb/core/tx/datashard/testload/actors.h
+++ b/ydb/core/tx/datashard/testload/actors.h
@@ -1,51 +1,28 @@
#pragma once
#include "defs.h"
+#include "test_load_actor.h"
#include <ydb/core/tx/datashard/datashard.h>
-namespace NKikimr::NDataShard {
+namespace NKikimr::NDataShardLoad {
-NActors::IActor *CreateBulkUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag);
-NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag);
-NActors::IActor *CreateKqpUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag);
-NActors::IActor *CreateUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag);
-class TLoadActorException : public yexception {
-};
-
-struct TLoadReport {
- TDuration Duration;
- ui64 OperationsOK = 0;
- ui64 OperationsError = 0;
-
- TString ToString() const {
- TStringStream ss;
- ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError;
- if (OperationsOK && Duration.Seconds()) {
- ui64 throughput = OperationsOK / Duration.Seconds();
- ss << ", throughput=" << throughput << " OK_ops/s";
- }
- return ss.Str();
- }
-};
-
-struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, TEvDataShard::EvTestLoadFinished> {
- ui64 Tag;
- std::optional<TLoadReport> Report;
- TString ErrorReason;
+NActors::IActor *CreateReadIteratorActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart& cmd,
+ const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag);
- TEvTestLoadFinished(ui64 tag, const TString& error = {})
- : Tag(tag)
- , ErrorReason(error)
- {}
+class TLoadActorException : public yexception {
};
#define VERIFY_PARAM2(FIELD, NAME) \
@@ -57,4 +34,4 @@ struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, TEvDataShar
#define VERIFY_PARAM(NAME) VERIFY_PARAM2(cmd, NAME)
-} // NKikimr::NDataShard
+} // NKikimr::NDataShardLoad
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp
index 205d161d64e..07194b91aa8 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp
@@ -9,7 +9,7 @@
#include <google/protobuf/text_format.h>
#include <library/cpp/monlib/service/pages/templates.h>
-namespace NKikimr::NDataShard {
+namespace NKikimr::NDataShardLoad {
class TLoadActor : public TActorBootstrapped<TLoadActor> {
// per-actor HTTP info
@@ -31,7 +31,7 @@ class TLoadActor : public TActorBootstrapped<TLoadActor> {
ui64 Tag;
TString ErrorReason;
TInstant FinishTime;
- std::optional<TLoadReport> Report;
+ std::optional<TEvDataShardLoad::TLoadReport> Report;
};
// info about finished actors
@@ -62,7 +62,7 @@ public:
Become(&TLoadActor::StateFunc);
}
- void Handle(TEvDataShard::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvDataShardLoad::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) {
ui32 status = NMsgBusProxy::MSTATUS_OK;
TString error;
const auto& record = ev->Get()->Record;
@@ -74,7 +74,7 @@ public:
status = NMsgBusProxy::MSTATUS_ERROR;
error = ex.what();
}
- auto response = std::make_unique<TEvDataShard::TEvTestLoadResponse>();
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadResponse>();
response->Record.SetStatus(status);
if (error) {
response->Record.SetErrorReason(error);
@@ -98,21 +98,21 @@ public:
}
}
- void ProcessCmd(const NKikimrTxDataShard::TEvTestLoadRequest& record, const TActorContext& ctx) {
+ void ProcessCmd(const NKikimrDataShardLoad::TEvTestLoadRequest& record, const TActorContext& ctx) {
switch (record.Command_case()) {
- case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kBulkUpsertStart: {
- const auto& cmd = record.GetBulkUpsertStart();
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart: {
+ const auto& cmd = record.GetUpsertBulkStart();
const ui64 tag = GetOrGenerateTag(cmd);
if (LoadActors.count(tag) != 0) {
ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
}
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new bulk upsert load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateBulkUpsertActor(cmd, ctx.SelfID,
+ LoadActors.emplace(tag, ctx.Register(CreateUpsertBulkActor(cmd, ctx.SelfID,
GetServiceCounters(Counters, "load_actor"), tag)));
break;
}
- case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: {
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: {
const auto& cmd = record.GetUpsertLocalMkqlStart();
const ui64 tag = GetOrGenerateTag(cmd);
if (LoadActors.count(tag) != 0) {
@@ -124,7 +124,7 @@ public:
break;
}
- case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: {
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: {
const auto& cmd = record.GetUpsertKqpStart();
const ui64 tag = GetOrGenerateTag(cmd);
if (LoadActors.count(tag) != 0) {
@@ -136,19 +136,31 @@ public:
break;
}
- case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kUpsertStart: {
- const auto& cmd = record.GetUpsertStart();
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: {
+ const auto& cmd = record.GetUpsertProposeStart();
const ui64 tag = GetOrGenerateTag(cmd);
if (LoadActors.count(tag) != 0) {
ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
}
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new upsert load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateUpsertActor(cmd, ctx.SelfID,
+ LoadActors.emplace(tag, ctx.Register(CreateProposeUpsertActor(cmd, ctx.SelfID,
GetServiceCounters(Counters, "load_actor"), tag)));
break;
}
- case NKikimrTxDataShard::TEvTestLoadRequest::CommandCase::kLoadStop: {
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart: {
+ const auto& cmd = record.GetReadIteratorStart();
+ const ui64 tag = GetOrGenerateTag(cmd);
+ if (LoadActors.count(tag) != 0) {
+ ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
+ }
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new read iterator load actor# " << tag);
+ LoadActors.emplace(tag, ctx.Register(CreateReadIteratorActor(cmd, ctx.SelfID,
+ GetServiceCounters(Counters, "load_actor"), tag)));
+ break;
+ }
+
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kLoadStop: {
const auto& cmd = record.GetLoadStop();
if (cmd.HasRemoveAllTags() && cmd.GetRemoveAllTags()) {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete all running load actors");
@@ -176,14 +188,14 @@ public:
TString protoTxt;
google::protobuf::TextFormat::PrintToString(record, &protoTxt);
ythrow TLoadActorException() << (TStringBuilder()
- << "TLoadActor::Handle(TEvDataShard::TEvTestLoadRequest): unexpected command case: "
+ << "TLoadActor::Handle(TEvDataShardLoad::TEvTestLoadRequest): unexpected command case: "
<< ui32(record.Command_case())
<< " protoTxt# " << protoTxt.Quote());
}
}
}
- void Handle(TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
const auto& msg = ev->Get();
auto it = LoadActors.find(msg->Tag);
Y_VERIFY(it != LoadActors.end());
@@ -225,7 +237,7 @@ public:
const auto& params = ev->Get()->Request.GetParams();
if (params.Has("protobuf")) {
- NKikimrTxDataShard::TEvTestLoadRequest record;
+ NKikimrDataShardLoad::TEvTestLoadRequest record;
bool status = google::protobuf::TextFormat::ParseFromString(params.Get("protobuf"), &record);
if (status) {
try {
@@ -283,7 +295,7 @@ public:
THttpInfoRequest& info = it->second;
#define PROFILE(NAME) \
- str << "<option value=\"" << ui32(NKikimrTxDataShard::TEvTestLoadRequest::NAME) << "\">" << #NAME << "</option>";
+ str << "<option value=\"" << ui32(NKikimrDataShardLoad::TEvTestLoadRequest::NAME) << "\">" << #NAME << "</option>";
#define PUT_HANDLE_CLASS(NAME) \
str << "<option value=\"" << ui32(NKikimrTxDataShard::NAME) << "\">" << #NAME << "</option>";
@@ -335,8 +347,8 @@ public:
}
STRICT_STFUNC(StateFunc,
- HFunc(TEvDataShard::TEvTestLoadRequest, Handle)
- HFunc(TEvTestLoadFinished, Handle)
+ HFunc(TEvDataShardLoad::TEvTestLoadRequest, Handle)
+ HFunc(TEvDataShardLoad::TEvTestLoadFinished, Handle)
HFunc(NMon::TEvHttpInfo, Handle)
HFunc(NMon::TEvHttpInfoRes, Handle)
)
@@ -346,4 +358,4 @@ NActors::IActor *CreateTestLoadActor(const TIntrusivePtr<::NMonitoring::TDynamic
return new TLoadActor(counters);
}
-} // NKikimr::NDataShard \ No newline at end of file
+} // NKikimr::NDataShardLoad \ No newline at end of file
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h
index fd82484c47e..acb6d7cdeac 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.h
+++ b/ydb/core/tx/datashard/testload/test_load_actor.h
@@ -2,8 +2,68 @@
#include "defs.h"
-namespace NKikimr::NDataShard {
+#include <ydb/core/base/events.h>
+#include <ydb/core/protos/datashard_load.pb.h>
+
+#include <library/cpp/actors/core/event_pb.h>
+
+namespace NKikimr {
+
+struct TEvDataShardLoad {
+ enum EEv {
+ EvTestLoadRequest = EventSpaceBegin(TKikimrEvents::ES_DATASHARD_LOAD),
+ EvTestLoadResponse,
+
+ EvTestLoadFinished,
+ };
+
+ struct TEvTestLoadRequest
+ : public TEventPB<TEvTestLoadRequest,
+ NKikimrDataShardLoad::TEvTestLoadRequest,
+ EvTestLoadRequest>
+ {
+ TEvTestLoadRequest() = default;
+ };
+
+ struct TEvTestLoadResponse
+ : public TEventPB<TEvTestLoadResponse,
+ NKikimrDataShardLoad::TEvTestLoadResponse,
+ EvTestLoadResponse>
+ {
+ TEvTestLoadResponse() = default;
+ };
+
+ struct TLoadReport {
+ TDuration Duration;
+ ui64 OperationsOK = 0;
+ ui64 OperationsError = 0;
+
+ TString ToString() const {
+ TStringStream ss;
+ ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError;
+ if (OperationsOK && Duration.Seconds()) {
+ ui64 throughput = OperationsOK / Duration.Seconds();
+ ss << ", throughput=" << throughput << " OK_ops/s";
+ }
+ return ss.Str();
+ }
+ };
+
+ struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, EvTestLoadFinished> {
+ ui64 Tag;
+ std::optional<TLoadReport> Report;
+ TString ErrorReason;
+
+ TEvTestLoadFinished(ui64 tag, const TString& error = {})
+ : Tag(tag)
+ , ErrorReason(error)
+ {}
+ };
+};
+
+namespace NDataShardLoad {
NActors::IActor *CreateTestLoadActor(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
-} // NKikimr::NDataShard
+} // NDataShardLoad
+} // NKikimr
diff --git a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
new file mode 100644
index 00000000000..8531888715d
--- /dev/null
+++ b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
@@ -0,0 +1,90 @@
+#include "actors.h"
+
+#include <ydb/core/base/tablet.h>
+#include <ydb/core/base/tablet_pipe.h>
+
+#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <util/datetime/cputimer.h>
+#include <util/random/random.h>
+
+#include <google/protobuf/text_format.h>
+
+// * Scheme is hardcoded and it is like default YCSB setup:
+// table name is "usertable", 1 utf8 "key" column, 10 utf8 "field0" - "field9" columns
+// * row is ~ 1 KB, keys are like user1000385178204227360
+
+namespace NKikimr::NDataShardLoad {
+
+namespace {
+
+class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadScenario> {
+ const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart Config;
+ const TActorId Parent;
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+ const ui64 Tag;
+
+ TInstant StartTs;
+
+ TString ConfingString;
+
+public:
+ TReadIteratorLoadScenario(const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart& cmd, const TActorId& parent,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+ : Config(cmd)
+ , Parent(parent)
+ , Counters(std::move(counters))
+ , Tag(tag)
+ {
+ google::protobuf::TextFormat::PrintToString(cmd, &ConfingString);
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ << " Bootstrap called: " << ConfingString);
+
+ Become(&TReadIteratorLoadScenario::StateFunc);
+ }
+
+private:
+ void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
+ TStringStream str;
+ HTML(str) {
+ str << "ReadIteratorLoadScenario# " << Tag << " started on " << StartTs;
+ }
+ ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
+ }
+
+ void HandlePoison(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ << " tablet recieved PoisonPill, going to die");
+ Stop(ctx);
+ }
+
+ void StopWithError(const TActorContext& ctx, const TString& reason) {
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
+ << " stopped with error: " << reason);
+
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
+ Stop(ctx);
+ }
+
+ void Stop(const TActorContext& ctx) {
+ Die(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoison)
+ HFunc(NMon::TEvHttpInfo, Handle)
+ )
+};
+
+} // anonymous
+
+NActors::IActor *CreateReadIteratorActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart& cmd,
+ const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+{
+ return new TReadIteratorLoadScenario(cmd, parent, std::move(counters), tag);
+}
+
+} // NKikimr::NDataShardLoad
diff --git a/ydb/core/tx/datashard/testload/test_load_upsert.cpp b/ydb/core/tx/datashard/testload/test_load_upsert.cpp
index 6c244578dd8..e13a219c647 100644
--- a/ydb/core/tx/datashard/testload/test_load_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_upsert.cpp
@@ -21,7 +21,7 @@
// table name is "usertable", 1 utf8 "key" column, 10 utf8 "field0" - "field9" columns
// * row is ~ 1 KB, keys are like user1000385178204227360
-namespace NKikimr::NDataShard {
+namespace NKikimr::NDataShardLoad {
using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>;
@@ -31,10 +31,8 @@ using TRequestsVector = std::vector<TUploadRequest>;
namespace {
enum class ERequestType {
- BulkUpsert,
+ UpsertBulk,
UpsertLocalMkql,
- KqpUpsert,
- Upsert,
};
TString GetKey(size_t n) {
@@ -114,19 +112,12 @@ TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum) {
(let row1_ '('('key (Utf8 '%s))))
)", key.data()) + programWithoutKey;
- auto request = std::unique_ptr<TEvTablet::TEvLocalMKQL>(new TEvTablet::TEvLocalMKQL);
+ auto request = std::make_unique<TEvTablet::TEvLocalMKQL>();
request->Record.MutableProgram()->MutableProgram()->SetText(programText);
return TUploadRequest(request.release());
}
-TUploadRequest GenerateRowRequest(ui64 tableId, ui64 keyNum) {
- Y_UNUSED(tableId);
- TString key = GetKey(keyNum);
-
- return nullptr;
-}
-
TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType) {
TRequestsVector requests;
requests.reserve(n);
@@ -134,15 +125,12 @@ TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType)
for (size_t i = 0; i < n; ++i) {
auto keyNum = RandomNumber(Max<ui64>());
switch (requestType) {
- case ERequestType::BulkUpsert:
+ case ERequestType::UpsertBulk:
requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum));
break;
case ERequestType::UpsertLocalMkql:
requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum));
break;
- case ERequestType::Upsert:
- requests.emplace_back(GenerateRowRequest(tableId, keyNum));
- break;
default:
// should not happen, just for compiler
Y_FAIL("Unsupported request type");
@@ -209,7 +197,7 @@ TQueryInfo GenerateUpsert(size_t n) {
// it's a partial copy-paste from TUpsertActor: logic slightly differs, so that
// it seems better to have copy-paste rather if/else for different loads
class TKqpUpsertActor : public TActorBootstrapped<TKqpUpsertActor> {
- const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart Config;
+ const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config;
const TActorId Parent;
const ui64 Tag;
const TString Path;
@@ -227,7 +215,7 @@ class TKqpUpsertActor : public TActorBootstrapped<TKqpUpsertActor> {
size_t Errors = 0;
public:
- TKqpUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
+ TKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag, TRequestsVector requests)
: Config(cmd)
, Parent(parent)
@@ -291,8 +279,8 @@ private:
} else if (Inflight == 0) {
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
- std::unique_ptr<TEvTestLoadFinished> response(new TEvTestLoadFinished(Tag));
- response->Report = TLoadReport();
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ response->Report = TEvDataShardLoad::TLoadReport();
response->Report->Duration = delta;
response->Report->OperationsOK = Requests.size() - Errors;
response->Report->OperationsError = Errors;
@@ -338,7 +326,7 @@ private:
void StopWithError(const TActorContext& ctx, const TString& reason) {
LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
- ctx.Send(Parent, new TEvTestLoadFinished(Tag, reason));
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
Die(ctx);
}
@@ -366,7 +354,7 @@ private:
// creates multiple TKqpUpsertActor for inflight > 1 and waits completion
class TKqpUpsertActorMultiSession : public TActorBootstrapped<TKqpUpsertActorMultiSession> {
- const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart Config;
+ const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config;
const TActorId Parent;
const ui64 Tag;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
@@ -385,7 +373,7 @@ class TKqpUpsertActorMultiSession : public TActorBootstrapped<TKqpUpsertActorMul
size_t Errors = 0;
public:
- TKqpUpsertActorMultiSession(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
+ TKqpUpsertActorMultiSession(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
: Config(cmd)
, Parent(parent)
@@ -421,7 +409,7 @@ private:
for (size_t i = 0; i < requestsPerActor; ++i) {
auto queryInfo = GenerateUpsert(rowCount++);
- std::unique_ptr<NKqp::TEvKqp::TEvQueryRequest> request(new NKqp::TEvKqp::TEvQueryRequest());
+ auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
request->Record.MutableRequest()->SetKeepSession(true);
request->Record.MutableRequest()->SetDatabase(Path);
@@ -466,7 +454,7 @@ private:
<< " started# " << actorsCount << " actors each with inflight# " << requestsPerActor);
}
- void Handle(const TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
+ void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
if (msg->ErrorReason || !msg->Report) {
TStringStream ss;
@@ -487,8 +475,8 @@ private:
if (Inflight == 0) {
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
- std::unique_ptr<TEvTestLoadFinished> response(new TEvTestLoadFinished(Tag));
- response->Report = TLoadReport();
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ response->Report = TEvDataShardLoad::TLoadReport();
response->Report->Duration = delta;
response->Report->OperationsOK = Oks;
response->Report->OperationsError = Errors;
@@ -519,7 +507,7 @@ private:
LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
<< " stopped with error: " << reason);
- ctx.Send(Parent, new TEvTestLoadFinished(Tag, reason));
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
Stop(ctx);
}
@@ -534,14 +522,14 @@ private:
STRICT_STFUNC(StateFunc,
CFunc(TEvents::TSystem::PoisonPill, HandlePoison)
HFunc(NMon::TEvHttpInfo, Handle)
- HFunc(TEvTestLoadFinished, Handle);
+ HFunc(TEvDataShardLoad::TEvTestLoadFinished, Handle);
)
};
class TUpsertActor : public TActorBootstrapped<TUpsertActor> {
- const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart Config;
+ const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config;
const TActorId Parent;
const ui64 Tag;
const ERequestType RequestType;
@@ -563,7 +551,7 @@ public:
return NKikimrServices::TActivity::DS_LOAD_ACTOR;
}
- TUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
+ TUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag, ERequestType requestType)
: Config(cmd)
, Parent(parent)
@@ -632,8 +620,8 @@ private:
} else if (Inflight == 0) {
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
- std::unique_ptr<TEvTestLoadFinished> response(new TEvTestLoadFinished(Tag));
- response->Report = TLoadReport();
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ response->Report = TEvDataShardLoad::TLoadReport();
response->Report->Duration = delta;
response->Report->OperationsOK = Requests.size() - Errors;
response->Report->OperationsError = Errors;
@@ -701,7 +689,7 @@ private:
void StopWithError(const TActorContext& ctx, const TString& reason) {
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
- ctx.Send(Parent, new TEvTestLoadFinished(Tag, reason));
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
}
@@ -717,28 +705,32 @@ private:
)
};
-NActors::IActor *CreateBulkUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
{
- return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::BulkUpsert);
+ return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertBulk);
}
-NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
{
return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertLocalMkql);
}
-NActors::IActor *CreateKqpUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
{
return new TKqpUpsertActorMultiSession(cmd, parent, std::move(counters), tag);
}
-NActors::IActor *CreateUpsertActor(const NKikimrTxDataShard::TEvTestLoadRequest::TUpdateStart& cmd,
+NActors::IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
{
- return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::Upsert);
+ Y_UNUSED(cmd);
+ Y_UNUSED(parent);
+ Y_UNUSED(counters);
+ Y_UNUSED(tag);
+ return nullptr; // not yet implemented
}
-} // NKikimr::NDataShard
+} // NKikimr::NDataShardLoad