summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungurtsev <[email protected]>2025-05-14 17:37:39 +0200
committerGitHub <[email protected]>2025-05-14 17:37:39 +0200
commit9320e5a6e1afa51e5731cef0af33ac482d8b80a2 (patch)
treef645a2d5ae3354c47b1f13363f4e4bd6b4916fc8
parentbfa2f89bec311259dfa09ad747fb2738a8dc61eb (diff)
Allow to send duplicates through TDedicatedPipePool (#18149)
-rw-r--r--ydb/core/tx/schemeshard/dedicated_pipe_pool.h25
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp41
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp2
3 files changed, 38 insertions, 30 deletions
diff --git a/ydb/core/tx/schemeshard/dedicated_pipe_pool.h b/ydb/core/tx/schemeshard/dedicated_pipe_pool.h
index 6e76ee4393a..ff848aeae12 100644
--- a/ydb/core/tx/schemeshard/dedicated_pipe_pool.h
+++ b/ydb/core/tx/schemeshard/dedicated_pipe_pool.h
@@ -15,18 +15,21 @@ class TDedicatedPipePool {
TMap<TActorId, std::pair<TEntityId, TTabletId>> Owners;
public:
- void Create(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
- Y_ABORT_UNLESS(!Pipes[entityId].contains(dst));
+ void Send(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
using namespace NTabletPipe;
- const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
- .MinRetryTime = TDuration::MilliSeconds(100),
- .MaxRetryTime = TDuration::Seconds(30),
- }));
+ if (!Pipes[entityId].contains(dst)) {
+ const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
+ .MinRetryTime = TDuration::MilliSeconds(100),
+ .MaxRetryTime = TDuration::Seconds(30),
+ }));
- Pipes[entityId][dst] = clientId;
- Owners[clientId] = std::make_pair(entityId, dst);
+ Pipes[entityId][dst] = clientId;
+ Owners[clientId] = std::make_pair(entityId, dst);
+ }
+ const auto clientId = Pipes[entityId][dst];
+ Y_ABORT_UNLESS(Owners[clientId] == std::make_pair(entityId, dst));
SendData(ctx.SelfID, clientId, message.Release(), 0);
}
@@ -53,10 +56,10 @@ public:
}
}
- ui64 CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
+ void CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
auto entityIt = Pipes.find(entityId);
if (entityIt == Pipes.end()) {
- return 0;
+ return;
}
const auto& entityPipes = entityIt->second;
@@ -70,7 +73,7 @@ public:
Close(entityId, tabletId, ctx);
}
- return tablets.size();
+ return;
}
void Shutdown(const TActorContext& ctx) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
index f06974855a0..e8ae21558e9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
@@ -497,7 +497,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
private:
TIndexBuildId BuildId;
- TDeque<std::tuple<TTabletId, ui64, THolder<IEventBase>>> ToTabletSend;
+ TMap<TTabletId, THolder<IEventBase>> ToTabletSend;
template <bool WithSnapshot = true, typename Record>
TTabletId CommonFillRecord(Record& record, TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -551,9 +551,9 @@ private:
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
ev->Record.SetSeed(ui64(shardId));
- LOG_D("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString());
+ LOG_N("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString());
- ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
+ ToTabletSend.emplace(shardId, std::move(ev));
}
void SendKMeansReshuffleRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -595,9 +595,9 @@ private:
r.ClearClusters();
return r.ShortDebugString();
};
- LOG_D("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record));
+ LOG_N("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record));
- ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
+ ToTabletSend.emplace(shardId, std::move(ev));
}
void SendKMeansLocalRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -644,9 +644,9 @@ private:
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
ev->Record.SetSeed(ui64(shardId));
- LOG_D("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());
+ LOG_N("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());
- ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
+ ToTabletSend.emplace(shardId, std::move(ev));
}
void SendPrefixKMeansRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -685,9 +685,9 @@ private:
auto shardId = CommonFillRecord<false>(ev->Record, shardIdx, buildInfo);
ev->Record.SetSeed(ui64(shardId));
- LOG_D("TTxBuildProgress: TEvPrefixKMeansRequest: " << ev->Record.ShortDebugString());
+ LOG_N("TTxBuildProgress: TEvPrefixKMeansRequest: " << ev->Record.ShortDebugString());
- ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
+ ToTabletSend.emplace(shardId, std::move(ev));
}
void SendBuildSecondaryIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -736,9 +736,9 @@ private:
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
- LOG_D("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString());
+ LOG_N("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString());
- ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
+ ToTabletSend.emplace(shardId, std::move(ev));
}
void SendUploadSampleKRequest(TIndexBuildInfo& buildInfo) {
@@ -757,7 +757,8 @@ private:
buildInfo.DoneShards = {};
buildInfo.InProgressShards = {};
buildInfo.ToUploadShards = {};
-
+
+ ToTabletSend.clear();
Self->IndexBuildPipes.CloseAll(BuildId, ctx);
}
@@ -791,6 +792,9 @@ private:
}
void AddAllShards(TIndexBuildInfo& buildInfo) {
+ ToTabletSend.clear();
+ Self->IndexBuildPipes.CloseAll(BuildId, Self->ActorContext());
+
for (const auto& [idx, status] : buildInfo.Shards) {
AddShard(buildInfo, idx, status);
}
@@ -1102,7 +1106,8 @@ public:
Y_ABORT_UNLESS(buildInfoPtr);
auto& buildInfo = *buildInfoPtr->Get();
- LOG_I("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo);
+ LOG_N("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State);
+ LOG_D("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo);
switch (buildInfo.State) {
case TIndexBuildInfo::EState::Invalid:
@@ -1390,8 +1395,8 @@ public:
}
void DoComplete(const TActorContext& ctx) override {
- for (auto& x: ToTabletSend) {
- Self->IndexBuildPipes.Create(BuildId, std::get<0>(x), std::move(std::get<2>(x)), ctx);
+ for (auto& [shardId, ev]: ToTabletSend) {
+ Self->IndexBuildPipes.Send(BuildId, shardId, std::move(ev), ctx);
}
ToTabletSend.clear();
}
@@ -1466,7 +1471,7 @@ public:
const auto& tabletId = PipeRetry.TabletId;
const auto& shardIdx = Self->GetShardIdx(tabletId);
- LOG_I("TTxReply : PipeRetry, id# " << buildId
+ LOG_N("TTxReply : PipeRetry, id# " << buildId
<< ", tabletId# " << tabletId
<< ", shardIdx# " << shardIdx);
@@ -2416,8 +2421,8 @@ public:
<< ", BuildIndexId: " << buildInfo.Id
<< ", status: " << Ydb::StatusIds::StatusCode_Name(status)
<< ", error: " << buildInfo.Issue
- << ", replyTo: " << buildInfo.CreateSender.ToString());
- LOG_D("Message:\n" << responseEv->Record.ShortDebugString());
+ << ", replyTo: " << buildInfo.CreateSender.ToString()
+ << ", message: " << responseEv->Record.ShortDebugString());
Send(buildInfo.CreateSender, std::move(responseEv), 0, buildInfo.SenderCookie);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
index c0476e4e3c1..3af675e29b9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
@@ -115,7 +115,7 @@ public:
void Complete(const TActorContext& ctx) override {
for (auto& [streamPathId, tabletId, ev] : ScanRequests) {
- Self->CdcStreamScanPipes.Create(streamPathId, tabletId, std::move(ev), ctx);
+ Self->CdcStreamScanPipes.Send(streamPathId, tabletId, std::move(ev), ctx);
}
if (StreamToProgress) {