diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-16 13:58:57 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-16 13:58:57 +0300 |
commit | 5fbc1694e011960d1cf103b82034a0ff61f0d6ce (patch) | |
tree | 1545a7f42a6a88901befeca959c48a2b44a7ae78 | |
parent | 353bf4878abf72bf834becde2eca008a84ea1855 (diff) | |
download | ydb-5fbc1694e011960d1cf103b82034a0ff61f0d6ce.tar.gz |
notify load starter, when load finished. Tags cleanup
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 34 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 71 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/actors.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.cpp | 248 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_upsert.cpp | 1 |
6 files changed, 196 insertions, 162 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index 6ed5f767525..5700539266b 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -12,35 +12,36 @@ message TEvTestLoadRequest { } message TUpdateStart { - optional uint64 Tag = 1; - optional uint64 RowCount = 2; - optional uint64 TabletId = 3; - optional uint64 TableId = 4; - optional uint32 Inflight = 5; + optional uint64 RowCount = 1; + optional uint64 TabletId = 2; + optional uint64 TableId = 3; + optional uint32 Inflight = 4; // in some cases we need full DB path with table optional string Path = 6; } message TReadStart { - optional uint64 Tag = 1; - optional uint64 TabletId = 2; - optional uint64 TableId = 3; + optional uint64 TabletId = 1; + optional uint64 TableId = 2; // Specifies the format for result data in TEvReadResult - optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 4; + optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 3; } optional uint64 Cookie = 1; + // normally should be used by ut only + optional bool NotifyWhenFinished = 2; + oneof Command { - TLoadStop LoadStop = 2; - TUpdateStart UpsertBulkStart = 3; - TUpdateStart UpsertLocalMkqlStart = 4; - TUpdateStart UpsertKqpStart = 5; - TUpdateStart UpsertProposeStart = 6; + TLoadStop LoadStop = 20; + TUpdateStart UpsertBulkStart = 21; + TUpdateStart UpsertLocalMkqlStart = 22; + TUpdateStart UpsertKqpStart = 23; + TUpdateStart UpsertProposeStart = 24; - TReadStart ReadIteratorStart = 7; + TReadStart ReadIteratorStart = 25; } } @@ -48,4 +49,7 @@ message TEvTestLoadResponse { optional uint32 Status = 1; // EResponseStatus from ydb/core/client/base/msgbus.h optional string ErrorReason = 2; optional uint64 Cookie = 3; + + // if load acter has been started, then it is the assigned tag + optional uint64 Tag = 4; } diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index 671e1df1474..a13da3e9595 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -159,42 +159,54 @@ struct TTestHelper { return WaitReadResult(); } - void TestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request, size_t expectedRowCount) { + void RunTestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request) { + request->Record.SetNotifyWhenFinished(true); auto &runtime = *Server->GetRuntime(); TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters()); auto testLoadActor = runtime.Register(CreateTestLoadActor(counters)); runtime.Send(new IEventHandle(testLoadActor, Sender, request.release()), 0, true); - TAutoPtr<IEventHandle> handle; - runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadResponse>(handle); - UNIT_ASSERT(handle); - auto response = handle->Release<TEvDataShardLoad::TEvTestLoadResponse>(); - auto& responseRecord = response->Record; - UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK); + { + // check load started + TAutoPtr<IEventHandle> handle; + runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadResponse>(handle); + UNIT_ASSERT(handle); + auto response = handle->Release<TEvDataShardLoad::TEvTestLoadResponse>(); + auto& responseRecord = response->Record; + UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK); + } + + { + // wait until load finished + TAutoPtr<IEventHandle> handle; + runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadFinished>(handle); + UNIT_ASSERT(handle); + auto response = handle->Release<TEvDataShardLoad::TEvTestLoadFinished>(); + UNIT_ASSERT(response->Report); + UNIT_ASSERT(!response->ErrorReason); + } + } + + void RunUpsertTestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> loadRequest, size_t expectedRowCount) { + RunTestLoad(std::move(loadRequest)); // holds memory for TCell TVector<TString> from = {TString("user")}; TVector<TString> to = {TString("zzz")}; - // busywait - while (1) { - auto request = GetBaseReadRequest(); - AddRangeQuery( - *request, - from, - true, - to, - true - ); - - auto readResult = SendRead(request.release()); - UNIT_ASSERT(readResult); - if (readResult->GetRowsCount() == expectedRowCount) - break; - - SimulateSleep(Server, TDuration::Seconds(1)); - } + auto request = GetBaseReadRequest(); + AddRangeQuery( + *request, + from, + true, + to, + true + ); + + auto readResult = SendRead(request.release()); + UNIT_ASSERT(readResult); + UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), expectedRowCount); } public: @@ -217,13 +229,12 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { auto& record = request->Record; auto& command = *record.MutableUpsertBulkStart(); - command.SetTag(1); command.SetRowCount(expectedRowCount); command.SetTabletId(helper.Table.TabletId); command.SetTableId(helper.Table.UserTable.GetPathId()); command.SetInflight(3); - helper.TestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), expectedRowCount); } Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkql) { @@ -235,13 +246,12 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { auto& record = request->Record; auto& command = *record.MutableUpsertLocalMkqlStart(); - command.SetTag(1); command.SetRowCount(expectedRowCount); command.SetTabletId(helper.Table.TabletId); command.SetTableId(helper.Table.UserTable.GetPathId()); command.SetInflight(3); - helper.TestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), expectedRowCount); } Y_UNIT_TEST(ShouldWriteKqpUpsert) { @@ -253,14 +263,13 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { auto& record = request->Record; auto& command = *record.MutableUpsertKqpStart(); - command.SetTag(1); command.SetRowCount(expectedRowCount); command.SetTabletId(helper.Table.TabletId); command.SetTableId(helper.Table.UserTable.GetPathId()); command.SetInflight(5); command.SetPath("/Root"); - helper.TestLoad(std::move(request), expectedRowCount); + helper.RunUpsertTestLoad(std::move(request), expectedRowCount); } } // Y_UNIT_TEST_SUITE(UpsertLoad) diff --git a/ydb/core/tx/datashard/testload/actors.h b/ydb/core/tx/datashard/testload/actors.h index 13dff18d8b5..6fe5d648914 100644 --- a/ydb/core/tx/datashard/testload/actors.h +++ b/ydb/core/tx/datashard/testload/actors.h @@ -32,6 +32,4 @@ class TLoadActorException : public yexception { } \ } while (false) -#define VERIFY_PARAM(NAME) VERIFY_PARAM2(cmd, NAME) - } // NKikimr::NDataShardLoad diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp index 07194b91aa8..cec2b3674c9 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.cpp +++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp @@ -18,6 +18,17 @@ class TLoadActor : public TActorBootstrapped<TLoadActor> { TString Data; // HTML response }; + struct TRunningActorInfo { + TActorId ActorId; + TActorId Parent; // if set we notify parent when actor finishes + + explicit TRunningActorInfo(const TActorId& actorId, const TActorId& parent = {}) + : ActorId(actorId) + , Parent(parent) + { + } + }; + // per-request info struct THttpInfoRequest { TActorId Origin; // who asked for status @@ -38,7 +49,8 @@ class TLoadActor : public TActorBootstrapped<TLoadActor> { TVector<TFinishedTestInfo> FinishedTests; // currently running load actors - TMap<ui64, TActorId> LoadActors; + TMap<ui64, TRunningActorInfo> LoadActors; + ui64 LastTag = 0; // tags start from 1 // next HTTP request identifier ui32 NextRequestId; @@ -63,11 +75,12 @@ public: } void Handle(TEvDataShardLoad::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; ui32 status = NMsgBusProxy::MSTATUS_OK; TString error; - const auto& record = ev->Get()->Record; + ui64 tag = 0; try { - ProcessCmd(record, ctx); + tag = ProcessCmd(ev, ctx); } catch (const TLoadActorException& ex) { LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "Exception while creating load actor, what# " << ex.what()); @@ -82,116 +95,134 @@ public: if (record.HasCookie()) { response->Record.SetCookie(record.GetCookie()); } + if (tag) { + response->Record.SetTag(tag); + } ctx.Send(ev->Sender, response.release()); } - template<typename T> - ui64 GetOrGenerateTag(const T& cmd) { - if (cmd.HasTag()) { - return cmd.GetTag(); - } else { - if (LoadActors.empty()) { - return 1; - } else { - return LoadActors.rbegin()->first + 1; - } + ui64 GetTag() { + auto tag = ++LastTag; + + // just sanity check + if (LoadActors.contains(tag)) { + ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } + + return tag; } - void ProcessCmd(const NKikimrDataShardLoad::TEvTestLoadRequest& record, const TActorContext& ctx) { + ui64 ProcessCmd(TEvDataShardLoad::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; switch (record.Command_case()) { - case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart: { - const auto& cmd = record.GetUpsertBulkStart(); - const ui64 tag = GetOrGenerateTag(cmd); - if (LoadActors.count(tag) != 0) { - ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); - } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new bulk upsert load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateUpsertBulkActor(cmd, ctx.SelfID, - GetServiceCounters(Counters, "load_actor"), tag))); - break; + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart: { + const auto& cmd = record.GetUpsertBulkStart(); + const ui64 tag = GetTag(); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new bulk upsert load actor with tag# " << tag); + + auto* actor = CreateUpsertBulkActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag); + TRunningActorInfo actorInfo(ctx.Register(actor)); + if (record.GetNotifyWhenFinished()) { + actorInfo.Parent = ev->Sender; } + LoadActors.emplace(tag, std::move(actorInfo)); + return tag; + } - case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: { - const auto& cmd = record.GetUpsertLocalMkqlStart(); - const ui64 tag = GetOrGenerateTag(cmd); - if (LoadActors.count(tag) != 0) { - ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); - } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new local mkql upsert load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateLocalMkqlUpsertActor(cmd, ctx.SelfID, - GetServiceCounters(Counters, "load_actor"), tag))); - break; + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: { + const auto& cmd = record.GetUpsertLocalMkqlStart(); + const ui64 tag = GetTag(); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new local mkql upsert load actor with tag# " << tag); + + auto* actor = CreateLocalMkqlUpsertActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag); + TRunningActorInfo actorInfo(ctx.Register(actor)); + if (record.GetNotifyWhenFinished()) { + actorInfo.Parent = ev->Sender; } + LoadActors.emplace(tag, std::move(actorInfo)); - case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: { - const auto& cmd = record.GetUpsertKqpStart(); - const ui64 tag = GetOrGenerateTag(cmd); - if (LoadActors.count(tag) != 0) { - ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); - } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new kqp upsert load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateKqpUpsertActor(cmd, ctx.SelfID, - GetServiceCounters(Counters, "load_actor"), tag))); - break; + return tag; + } + + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: { + const auto& cmd = record.GetUpsertKqpStart(); + const ui64 tag = GetTag(); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new kqp upsert load actor with tag# " << tag); + + auto* actor = CreateKqpUpsertActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag); + TRunningActorInfo actorInfo(ctx.Register(actor)); + if (record.GetNotifyWhenFinished()) { + actorInfo.Parent = ev->Sender; } + LoadActors.emplace(tag, std::move(actorInfo)); - case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: { - const auto& cmd = record.GetUpsertProposeStart(); - const ui64 tag = GetOrGenerateTag(cmd); - if (LoadActors.count(tag) != 0) { - ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); - } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new upsert load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateProposeUpsertActor(cmd, ctx.SelfID, - GetServiceCounters(Counters, "load_actor"), tag))); - break; + return tag; + } + + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: { + const auto& cmd = record.GetUpsertProposeStart(); + const ui64 tag = GetTag(); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new upsert load actor with tag# " << tag); + + auto* actor = CreateProposeUpsertActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag); + TRunningActorInfo actorInfo(ctx.Register(actor)); + if (record.GetNotifyWhenFinished()) { + actorInfo.Parent = ev->Sender; } + LoadActors.emplace(tag, std::move(actorInfo)); - case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart: { - const auto& cmd = record.GetReadIteratorStart(); - const ui64 tag = GetOrGenerateTag(cmd); - if (LoadActors.count(tag) != 0) { - ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); - } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new read iterator load actor# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateReadIteratorActor(cmd, ctx.SelfID, - GetServiceCounters(Counters, "load_actor"), tag))); - break; + return tag; + } + + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart: { + const auto& cmd = record.GetReadIteratorStart(); + const ui64 tag = GetTag(); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new read iterator load actor# " << tag); + + auto* actor = CreateReadIteratorActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag); + TRunningActorInfo actorInfo(ctx.Register(actor)); + if (record.GetNotifyWhenFinished()) { + actorInfo.Parent = ev->Sender; } + LoadActors.emplace(tag, std::move(actorInfo)); - case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kLoadStop: { - const auto& cmd = record.GetLoadStop(); - if (cmd.HasRemoveAllTags() && cmd.GetRemoveAllTags()) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete all running load actors"); - for (auto& actorPair : LoadActors) { - ctx.Send(actorPair.second, new TEvents::TEvPoisonPill); - } - LoadActors.clear(); - } else { - VERIFY_PARAM(Tag); - const ui64 tag = cmd.GetTag(); - auto it = LoadActors.find(tag); - if (it == LoadActors.end()) { - ythrow TLoadActorException() - << Sprintf("load actor with Tag# %" PRIu64 " not found", tag); - } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete running load actor with tag# " - << tag); - ctx.Send(it->second, new TEvents::TEvPoisonPill); - LoadActors.erase(it); + return tag; + } + + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kLoadStop: { + const auto& cmd = record.GetLoadStop(); + if (cmd.HasRemoveAllTags() && cmd.GetRemoveAllTags()) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete all running load actors"); + for (auto& actorPair : LoadActors) { + ctx.Send(actorPair.second.ActorId, new TEvents::TEvPoisonPill); + } + LoadActors.clear(); + } else { + if (!cmd.HasTag()) { + ythrow TLoadActorException() << "Either RemoveAllTags or Tag must present"; } - break; + const ui64 tag = cmd.GetTag(); + auto it = LoadActors.find(tag); + if (it == LoadActors.end()) { + ythrow TLoadActorException() + << Sprintf("load actor with Tag# %" PRIu64 " not found", tag); + } + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete running load actor# " << tag); + ctx.Send(it->second.ActorId, new TEvents::TEvPoisonPill); + LoadActors.erase(it); } - default: { - TString protoTxt; - google::protobuf::TextFormat::PrintToString(record, &protoTxt); - ythrow TLoadActorException() << (TStringBuilder() - << "TLoadActor::Handle(TEvDataShardLoad::TEvTestLoadRequest): unexpected command case: " - << ui32(record.Command_case()) - << " protoTxt# " << protoTxt.Quote()); - } + return 0; + } + + default: { + TString protoTxt; + google::protobuf::TextFormat::PrintToString(record, &protoTxt); + ythrow TLoadActorException() << (TStringBuilder() + << "TLoadActor::Handle(TEvDataShardLoad::TEvTestLoadRequest): unexpected command case: " + << ui32(record.Command_case()) + << " protoTxt# " << protoTxt.Quote()); + } } } @@ -200,6 +231,15 @@ public: auto it = LoadActors.find(msg->Tag); Y_VERIFY(it != LoadActors.end()); LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load actor with tag# " << msg->Tag << " finished"); + + if (it->second.Parent) { + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(); + response->Tag = ev->Get()->Tag; + response->ErrorReason = ev->Get()->ErrorReason; + response->Report = ev->Get()->Report; + ctx.Send(it->second.Parent, response.release()); + } + LoadActors.erase(it); FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report}); @@ -235,28 +275,10 @@ public: info.ErrorMessage.clear(); - const auto& params = ev->Get()->Request.GetParams(); - if (params.Has("protobuf")) { - NKikimrDataShardLoad::TEvTestLoadRequest record; - bool status = google::protobuf::TextFormat::ParseFromString(params.Get("protobuf"), &record); - if (status) { - try { - ProcessCmd(record, ctx); - } catch (const TLoadActorException& ex) { - info.ErrorMessage = ex.what(); - } - } else { - info.ErrorMessage = "bad protobuf"; - } - - GenerateHttpInfoRes(ctx, id, true); - return; - } - // send messages to subactors for (const auto& kv : LoadActors) { - ctx.Send(kv.second, new NMon::TEvHttpInfo(ev->Get()->Request, id)); - info.ActorMap[kv.second].Tag = kv.first; + ctx.Send(kv.second.ActorId, new NMon::TEvHttpInfo(ev->Get()->Request, id)); + info.ActorMap[kv.second.ActorId].Tag = kv.first; } // record number of responses pending diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h index acb6d7cdeac..b4a0549c19f 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.h +++ b/ydb/core/tx/datashard/testload/test_load_actor.h @@ -54,6 +54,8 @@ struct TEvDataShardLoad { std::optional<TLoadReport> Report; TString ErrorReason; + TEvTestLoadFinished() = default; + TEvTestLoadFinished(ui64 tag, const TString& error = {}) : Tag(tag) , ErrorReason(error) diff --git a/ydb/core/tx/datashard/testload/test_load_upsert.cpp b/ydb/core/tx/datashard/testload/test_load_upsert.cpp index e13a219c647..cdc2b29b9c5 100644 --- a/ydb/core/tx/datashard/testload/test_load_upsert.cpp +++ b/ydb/core/tx/datashard/testload/test_load_upsert.cpp @@ -439,7 +439,6 @@ private: auto configCopy = Config; configCopy.SetInflight(1); // we have only 1 session configCopy.SetRowCount(requestsPerActor); - configCopy.SetTag(pseudoTag); auto* kqpActor = new TKqpUpsertActor( configCopy, |