aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-23 13:07:55 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-23 13:07:55 +0300
commitc1cf55f7913d9d5674d70112bd83329cf37d06ed (patch)
tree988783f20dab2276759385d5e4a64016bbab31bc
parentec4303976cef8f0b57309321bbedf913498e496c (diff)
downloadydb-c1cf55f7913d9d5674d70112bd83329cf37d06ed.tar.gz
move kqp upsert to separate file
-rw-r--r--ydb/core/tx/datashard/testload/CMakeLists.txt3
-rw-r--r--ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp317
-rw-r--r--ydb/core/tx/datashard/testload/defs.h12
-rw-r--r--ydb/core/tx/datashard/testload/kqp_upsert.cpp (renamed from ydb/core/tx/datashard/testload/test_load_upsert.cpp)312
4 files changed, 339 insertions, 305 deletions
diff --git a/ydb/core/tx/datashard/testload/CMakeLists.txt b/ydb/core/tx/datashard/testload/CMakeLists.txt
index 762516e8b4..0c2437aa1a 100644
--- a/ydb/core/tx/datashard/testload/CMakeLists.txt
+++ b/ydb/core/tx/datashard/testload/CMakeLists.txt
@@ -22,7 +22,8 @@ target_link_libraries(tx-datashard-testload PUBLIC
cpp-client-ydb_proto
)
target_sources(tx-datashard-testload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/kqp_upsert.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_upsert.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
)
diff --git a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
new file mode 100644
index 0000000000..031957c284
--- /dev/null
+++ b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
@@ -0,0 +1,317 @@
+#include "actors.h"
+
+#include <ydb/core/base/tablet.h>
+#include <ydb/core/base/tablet_pipe.h>
+
+#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <util/datetime/cputimer.h>
+#include <util/random/random.h>
+
+#include <google/protobuf/text_format.h>
+
+// * Scheme is hardcoded and it is like default YCSB setup:
+// table name is "usertable", 1 utf8 "key" column, 10 utf8 "field0" - "field9" columns
+// * row is ~ 1 KB, keys are like user1000385178204227360
+
+namespace NKikimr::NDataShardLoad {
+
+TString GetKey(size_t n) {
+ // user1000385178204227360
+ return Sprintf("user%.19lu", n);
+}
+
+using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>;
+
+namespace {
+
+enum class ERequestType {
+ UpsertBulk,
+ UpsertLocalMkql,
+};
+
+TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyNum) {
+ TUploadRowsRequestPtr request(new TEvDataShard::TEvUploadRowsRequest());
+ auto& record = request->Record;
+ record.SetTableId(tableId);
+
+ auto& rowScheme = *record.MutableRowScheme();
+ for (size_t i = 2; i <= 11; ++i) {
+ rowScheme.AddValueColumnIds(i);
+ }
+ rowScheme.AddKeyColumnIds(1);
+
+ TVector<TCell> keys;
+ keys.reserve(1);
+ TString key = GetKey(keyNum);
+ keys.emplace_back(key.data(), key.size());
+
+ TVector<TCell> values;
+ values.reserve(10);
+ for (size_t i = 2; i <= 11; ++i) {
+ values.emplace_back(Value.data(), Value.size());
+ }
+
+ auto& row = *record.AddRows();
+ row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
+ row.SetValueColumns(TSerializedCellVec::Serialize(values));
+
+ return TUploadRequest(request.release());
+}
+
+TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum) {
+ static TString programWithoutKey;
+
+ if (!programWithoutKey) {
+ TString fields;
+ for (size_t i = 0; i < 10; ++i) {
+ fields += Sprintf("'('field%lu (Utf8 '%s))", i, Value.data());
+ }
+ TString rowUpd = "(let upd_ '(" + fields + "))";
+
+ programWithoutKey = rowUpd;
+
+ programWithoutKey += R"(
+ (let ret_ (AsList
+ (UpdateRow '__user__usertable row1_ upd_
+ )))
+ (return ret_)
+ ))";
+ }
+
+ TString key = GetKey(keyNum);
+
+ auto programText = Sprintf(R"((
+ (let row1_ '('('key (Utf8 '%s))))
+ )", key.data()) + programWithoutKey;
+
+ auto request = std::make_unique<TEvTablet::TEvLocalMKQL>();
+ request->Record.MutableProgram()->MutableProgram()->SetText(programText);
+
+ return TUploadRequest(request.release());
+}
+
+TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType) {
+ TRequestsVector requests;
+ requests.reserve(n);
+
+ for (size_t i = 0; i < n; ++i) {
+ auto keyNum = RandomNumber(Max<ui64>());
+ switch (requestType) {
+ case ERequestType::UpsertBulk:
+ requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum));
+ break;
+ case ERequestType::UpsertLocalMkql:
+ requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum));
+ break;
+ default:
+ // should not happen, just for compiler
+ Y_FAIL("Unsupported request type");
+ }
+ }
+
+ return requests;
+}
+
+class TUpsertActor : public TActorBootstrapped<TUpsertActor> {
+ const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config;
+ const TActorId Parent;
+ const ui64 Tag;
+ const ERequestType RequestType;
+ TString ConfingString;
+
+ TActorId Pipe;
+
+ TRequestsVector Requests;
+ size_t CurrentRequest = 0;
+ size_t Inflight = 0;
+
+ TInstant StartTs;
+ TInstant EndTs;
+
+ size_t Errors = 0;
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::DS_LOAD_ACTOR;
+ }
+
+ TUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag, ERequestType requestType)
+ : Config(cmd)
+ , Parent(parent)
+ , Tag(tag)
+ , RequestType(requestType)
+ {
+ Y_UNUSED(counters);
+ google::protobuf::TextFormat::PrintToString(cmd, &ConfingString);
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor Bootstrap called: " << ConfingString);
+
+ // note that we generate all requests at once to send at max speed, i.e.
+ // do not mess with protobufs, strings, etc when send data
+ Requests = GenerateRequests(Config.GetTableId(), Config.GetRowCount(), RequestType);
+
+ Become(&TUpsertActor::StateFunc);
+ Connect(ctx);
+ }
+
+private:
+ void Connect(const TActorContext &ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag << " TUpsertActor Connect called");
+ Pipe = Register(NTabletPipe::CreateClient(SelfId(), Config.GetTabletId()));
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
+ TEvTabletPipe::TEvClientConnected *msg = ev->Get();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status);
+
+ if (msg->Status != NKikimrProto::OK) {
+ TStringStream ss;
+ ss << "Failed to connect to " << Config.GetTabletId() << ", status: " << msg->Status;
+ StopWithError(ctx, ss.Str());
+ return;
+ }
+
+ StartTs = TInstant::Now();
+ SendRows(ctx);
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor Handle TEvClientDestroyed called");
+ StopWithError(ctx, "broken pipe");
+ }
+
+ void SendRows(const TActorContext &ctx) {
+ while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) {
+ const auto* request = Requests[CurrentRequest].get();
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << "TUpsertActor# " << Tag << " send request# " << CurrentRequest << ": " << request->ToString());
+ NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release());
+ ++CurrentRequest;
+ ++Inflight;
+ }
+ }
+
+ void OnRequestDone(const TActorContext& ctx) {
+ if (CurrentRequest < Requests.size()) {
+ SendRows(ctx);
+ } else if (Inflight == 0) {
+ EndTs = TInstant::Now();
+ auto delta = EndTs - StartTs;
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ response->Report = TEvDataShardLoad::TLoadReport();
+ response->Report->Duration = delta;
+ response->Report->OperationsOK = Requests.size() - Errors;
+ response->Report->OperationsError = Errors;
+ ctx.Send(Parent, response.release());
+
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor finished in " << delta << ", errors=" << Errors);
+ Die(ctx);
+ }
+ }
+
+ void Handle(TEvDataShard::TEvUploadRowsResponse::TPtr ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record);
+ --Inflight;
+
+ TEvDataShard::TEvUploadRowsResponse *msg = ev->Get();
+ if (msg->Record.GetStatus() != 0) {
+ ++Errors;
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor TEvUploadRowsResponse: " << msg->ToString());
+ }
+
+ OnRequestDone(ctx);
+ }
+
+ void Handle(TEvTablet::TEvLocalMKQLResponse::TPtr ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record);
+ --Inflight;
+
+ TEvTablet::TEvLocalMKQLResponse *msg = ev->Get();
+ if (msg->Record.GetStatus() != 0) {
+ ++Errors;
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ << " TUpsertActor TEvLocalMKQLResponse: " << msg->ToString());
+ }
+
+ OnRequestDone(ctx);
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) {
+ StopWithError(ctx, "delivery failed");
+ }
+
+ void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
+ TStringStream str;
+ HTML(str) {
+ str << "DS bulk upsert load actor# " << Tag << " started on " << StartTs
+ << " sent " << CurrentRequest << " out of " << Requests.size();
+ TInstant ts = EndTs ? EndTs : TInstant::Now();
+ auto delta = ts - StartTs;
+ auto throughput = Requests.size() / delta.Seconds();
+ str << " in " << delta << " (" << throughput << " op/s)"
+ << " errors=" << Errors;
+ }
+ ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
+ }
+
+ void HandlePoison(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die");
+ NTabletPipe::CloseClient(SelfId(), Pipe);
+ Die(ctx);
+ }
+
+ void StopWithError(const TActorContext& ctx, const TString& reason) {
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
+ NTabletPipe::CloseClient(SelfId(), Pipe);
+ Die(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoison);
+ HFunc(NMon::TEvHttpInfo, Handle)
+ HFunc(TEvents::TEvUndelivered, Handle);
+ HFunc(TEvDataShard::TEvUploadRowsResponse, Handle);
+ HFunc(TEvTablet::TEvLocalMKQLResponse, Handle);
+ HFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ )
+};
+
+} // anonymous
+
+NActors::IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
+ const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+{
+ return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertBulk);
+}
+
+NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
+ const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+{
+ return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertLocalMkql);
+}
+
+NActors::IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
+ const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+{
+ Y_UNUSED(cmd);
+ Y_UNUSED(parent);
+ Y_UNUSED(counters);
+ Y_UNUSED(tag);
+ return nullptr; // not yet implemented
+}
+
+} // NKikimr::NDataShardLoad
diff --git a/ydb/core/tx/datashard/testload/defs.h b/ydb/core/tx/datashard/testload/defs.h
index 515e099bd8..56b6952963 100644
--- a/ydb/core/tx/datashard/testload/defs.h
+++ b/ydb/core/tx/datashard/testload/defs.h
@@ -1,6 +1,18 @@
#pragma once
+
#include <ydb/core/base/defs.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/log.h>
#include <ydb/core/protos/services.pb.h>
+
+namespace NKikimr::NDataShardLoad {
+
+using TUploadRequest = std::unique_ptr<IEventBase>;
+using TRequestsVector = std::vector<TUploadRequest>;
+
+TString GetKey(size_t n);
+
+static const TString Value = TString(100, 'x');
+
+} // NKikimr::NDataShardLoad
diff --git a/ydb/core/tx/datashard/testload/test_load_upsert.cpp b/ydb/core/tx/datashard/testload/kqp_upsert.cpp
index a64b1e7592..0f24cf4a26 100644
--- a/ydb/core/tx/datashard/testload/test_load_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/kqp_upsert.cpp
@@ -12,9 +12,6 @@
#include <library/cpp/monlib/service/pages/templates.h>
-#include <util/datetime/cputimer.h>
-#include <util/random/random.h>
-
#include <google/protobuf/text_format.h>
// * Scheme is hardcoded and it is like default YCSB setup:
@@ -23,25 +20,8 @@
namespace NKikimr::NDataShardLoad {
-using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>;
-
-using TUploadRequest = std::unique_ptr<IEventBase>;
-using TRequestsVector = std::vector<TUploadRequest>;
-
namespace {
-enum class ERequestType {
- UpsertBulk,
- UpsertLocalMkql,
-};
-
-TString GetKey(size_t n) {
- // user1000385178204227360
- return Sprintf("user%.19lu", n);
-}
-
-const TString Value = TString(100, 'x');
-
void ConvertYdbParamsToMiniKQLParams(const NYdb::TParams& input, NKikimrMiniKQL::TParams& output) {
output.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
auto type = output.MutableType()->MutableStruct();
@@ -50,92 +30,16 @@ void ConvertYdbParamsToMiniKQLParams(const NYdb::TParams& input, NKikimrMiniKQL:
auto typeMember = type->AddMember();
auto valueItem = value->AddStruct();
typeMember->SetName(p.first);
- ConvertYdbTypeToMiniKQLType(NYdb::TProtoAccessor::GetProto(p.second.GetType()), *typeMember->MutableType());
- ConvertYdbValueToMiniKQLValue(NYdb::TProtoAccessor::GetProto(p.second.GetType()), NYdb::TProtoAccessor::GetProto(p.second), *valueItem);
- }
-}
-TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyNum) {
- TUploadRowsRequestPtr request(new TEvDataShard::TEvUploadRowsRequest());
- auto& record = request->Record;
- record.SetTableId(tableId);
+ ConvertYdbTypeToMiniKQLType(
+ NYdb::TProtoAccessor::GetProto(p.second.GetType()),
+ *typeMember->MutableType());
- auto& rowScheme = *record.MutableRowScheme();
- for (size_t i = 2; i <= 11; ++i) {
- rowScheme.AddValueColumnIds(i);
+ ConvertYdbValueToMiniKQLValue(
+ NYdb::TProtoAccessor::GetProto(p.second.GetType()),
+ NYdb::TProtoAccessor::GetProto(p.second),
+ *valueItem);
}
- rowScheme.AddKeyColumnIds(1);
-
- TVector<TCell> keys;
- keys.reserve(1);
- TString key = GetKey(keyNum);
- keys.emplace_back(key.data(), key.size());
-
- TVector<TCell> values;
- values.reserve(10);
- for (size_t i = 2; i <= 11; ++i) {
- values.emplace_back(Value.data(), Value.size());
- }
-
- auto& row = *record.AddRows();
- row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
- row.SetValueColumns(TSerializedCellVec::Serialize(values));
-
- return TUploadRequest(request.release());
-}
-
-TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum) {
- static TString programWithoutKey;
-
- if (!programWithoutKey) {
- TString fields;
- for (size_t i = 0; i < 10; ++i) {
- fields += Sprintf("'('field%lu (Utf8 '%s))", i, Value.data());
- }
- TString rowUpd = "(let upd_ '(" + fields + "))";
-
- programWithoutKey = rowUpd;
-
- programWithoutKey += R"(
- (let ret_ (AsList
- (UpdateRow '__user__usertable row1_ upd_
- )))
- (return ret_)
- ))";
- }
-
- TString key = GetKey(keyNum);
-
- auto programText = Sprintf(R"((
- (let row1_ '('('key (Utf8 '%s))))
- )", key.data()) + programWithoutKey;
-
- auto request = std::make_unique<TEvTablet::TEvLocalMKQL>();
- request->Record.MutableProgram()->MutableProgram()->SetText(programText);
-
- return TUploadRequest(request.release());
-}
-
-TRequestsVector GenerateRequests(ui64 tableId, ui64 n, ERequestType requestType) {
- TRequestsVector requests;
- requests.reserve(n);
-
- for (size_t i = 0; i < n; ++i) {
- auto keyNum = RandomNumber(Max<ui64>());
- switch (requestType) {
- case ERequestType::UpsertBulk:
- requests.emplace_back(GenerateBulkRowRequest(tableId, keyNum));
- break;
- case ERequestType::UpsertLocalMkql:
- requests.emplace_back(GenerateMkqlRowRequest(tableId, keyNum));
- break;
- default:
- // should not happen, just for compiler
- Y_FAIL("Unsupported request type");
- }
- }
-
- return requests;
}
struct TQueryInfo {
@@ -190,7 +94,6 @@ TQueryInfo GenerateUpsert(size_t n) {
return TQueryInfo(str.Str(), std::move(params));
}
-} // anonymous
// it's a partial copy-paste from TUpsertActor: logic slightly differs, so that
// it seems better to have copy-paste rather if/else for different loads
@@ -523,196 +426,7 @@ private:
)
};
-
-
-class TUpsertActor : public TActorBootstrapped<TUpsertActor> {
- const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart Config;
- const TActorId Parent;
- const ui64 Tag;
- const ERequestType RequestType;
- TString ConfingString;
-
- TActorId Pipe;
-
- TRequestsVector Requests;
- size_t CurrentRequest = 0;
- size_t Inflight = 0;
-
- TInstant StartTs;
- TInstant EndTs;
-
- size_t Errors = 0;
-
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::DS_LOAD_ACTOR;
- }
-
- TUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd, const TActorId& parent,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag, ERequestType requestType)
- : Config(cmd)
- , Parent(parent)
- , Tag(tag)
- , RequestType(requestType)
- {
- Y_UNUSED(counters);
- google::protobuf::TextFormat::PrintToString(cmd, &ConfingString);
- }
-
- void Bootstrap(const TActorContext& ctx) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor Bootstrap called: " << ConfingString);
-
- // note that we generate all requests at once to send at max speed, i.e.
- // do not mess with protobufs, strings, etc when send data
- Requests = GenerateRequests(Config.GetTableId(), Config.GetRowCount(), RequestType);
-
- Become(&TUpsertActor::StateFunc);
- Connect(ctx);
- }
-
-private:
- void Connect(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag << " TUpsertActor Connect called");
- Pipe = Register(NTabletPipe::CreateClient(SelfId(), Config.GetTabletId()));
- }
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
- TEvTabletPipe::TEvClientConnected *msg = ev->Get();
-
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status);
-
- if (msg->Status != NKikimrProto::OK) {
- TStringStream ss;
- ss << "Failed to connect to " << Config.GetTabletId() << ", status: " << msg->Status;
- StopWithError(ctx, ss.Str());
- return;
- }
-
- StartTs = TInstant::Now();
- SendRows(ctx);
- }
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor Handle TEvClientDestroyed called");
- StopWithError(ctx, "broken pipe");
- }
-
- void SendRows(const TActorContext &ctx) {
- while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) {
- const auto* request = Requests[CurrentRequest].get();
- LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << "TUpsertActor# " << Tag << " send request# " << CurrentRequest << ": " << request->ToString());
- NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release());
- ++CurrentRequest;
- ++Inflight;
- }
- }
-
- void OnRequestDone(const TActorContext& ctx) {
- if (CurrentRequest < Requests.size()) {
- SendRows(ctx);
- } else if (Inflight == 0) {
- EndTs = TInstant::Now();
- auto delta = EndTs - StartTs;
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = delta;
- response->Report->OperationsOK = Requests.size() - Errors;
- response->Report->OperationsError = Errors;
- ctx.Send(Parent, response.release());
-
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor finished in " << delta << ", errors=" << Errors);
- Die(ctx);
- }
- }
-
- void Handle(TEvDataShard::TEvUploadRowsResponse::TPtr ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record);
- --Inflight;
-
- TEvDataShard::TEvUploadRowsResponse *msg = ev->Get();
- if (msg->Record.GetStatus() != 0) {
- ++Errors;
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor TEvUploadRowsResponse: " << msg->ToString());
- }
-
- OnRequestDone(ctx);
- }
-
- void Handle(TEvTablet::TEvLocalMKQLResponse::TPtr ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record);
- --Inflight;
-
- TEvTablet::TEvLocalMKQLResponse *msg = ev->Get();
- if (msg->Record.GetStatus() != 0) {
- ++Errors;
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << " TUpsertActor TEvLocalMKQLResponse: " << msg->ToString());
- }
-
- OnRequestDone(ctx);
- }
-
- void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) {
- StopWithError(ctx, "delivery failed");
- }
-
- void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
- TStringStream str;
- HTML(str) {
- str << "DS bulk upsert load actor# " << Tag << " started on " << StartTs
- << " sent " << CurrentRequest << " out of " << Requests.size();
- TInstant ts = EndTs ? EndTs : TInstant::Now();
- auto delta = ts - StartTs;
- auto throughput = Requests.size() / delta.Seconds();
- str << " in " << delta << " (" << throughput << " op/s)"
- << " errors=" << Errors;
- }
- ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
- }
-
- void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die");
- NTabletPipe::CloseClient(SelfId(), Pipe);
- Die(ctx);
- }
-
- void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
- ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
- NTabletPipe::CloseClient(SelfId(), Pipe);
- Die(ctx);
- }
-
- STRICT_STFUNC(StateFunc,
- CFunc(TEvents::TSystem::PoisonPill, HandlePoison);
- HFunc(NMon::TEvHttpInfo, Handle)
- HFunc(TEvents::TEvUndelivered, Handle);
- HFunc(TEvDataShard::TEvUploadRowsResponse, Handle);
- HFunc(TEvTablet::TEvLocalMKQLResponse, Handle);
- HFunc(TEvTabletPipe::TEvClientConnected, Handle);
- HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- )
-};
-
-NActors::IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
- const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
-{
- return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertBulk);
-}
-
-NActors::IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
- const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
-{
- return new TUpsertActor(cmd, parent, std::move(counters), tag, ERequestType::UpsertLocalMkql);
-}
+} // anonymous
NActors::IActor *CreateKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
@@ -720,14 +434,4 @@ NActors::IActor *CreateKqpUpsertActor(const NKikimrDataShardLoad::TEvTestLoadReq
return new TKqpUpsertActorMultiSession(cmd, parent, std::move(counters), tag);
}
-NActors::IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart& cmd,
- const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
-{
- Y_UNUSED(cmd);
- Y_UNUSED(parent);
- Y_UNUSED(counters);
- Y_UNUSED(tag);
- return nullptr; // not yet implemented
-}
-
} // NKikimr::NDataShardLoad