diff options
| author | kungurtsev <[email protected]> | 2025-05-14 17:37:39 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-05-14 17:37:39 +0200 |
| commit | 9320e5a6e1afa51e5731cef0af33ac482d8b80a2 (patch) | |
| tree | f645a2d5ae3354c47b1f13363f4e4bd6b4916fc8 | |
| parent | bfa2f89bec311259dfa09ad747fb2738a8dc61eb (diff) | |
Allow to send duplicates through TDedicatedPipePool (#18149)
| -rw-r--r-- | ydb/core/tx/schemeshard/dedicated_pipe_pool.h | 25 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp | 41 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp | 2 |
3 files changed, 38 insertions, 30 deletions
diff --git a/ydb/core/tx/schemeshard/dedicated_pipe_pool.h b/ydb/core/tx/schemeshard/dedicated_pipe_pool.h index 6e76ee4393a..ff848aeae12 100644 --- a/ydb/core/tx/schemeshard/dedicated_pipe_pool.h +++ b/ydb/core/tx/schemeshard/dedicated_pipe_pool.h @@ -15,18 +15,21 @@ class TDedicatedPipePool { TMap<TActorId, std::pair<TEntityId, TTabletId>> Owners; public: - void Create(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) { - Y_ABORT_UNLESS(!Pipes[entityId].contains(dst)); + void Send(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) { using namespace NTabletPipe; - const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy { - .MinRetryTime = TDuration::MilliSeconds(100), - .MaxRetryTime = TDuration::Seconds(30), - })); + if (!Pipes[entityId].contains(dst)) { + const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy { + .MinRetryTime = TDuration::MilliSeconds(100), + .MaxRetryTime = TDuration::Seconds(30), + })); - Pipes[entityId][dst] = clientId; - Owners[clientId] = std::make_pair(entityId, dst); + Pipes[entityId][dst] = clientId; + Owners[clientId] = std::make_pair(entityId, dst); + } + const auto clientId = Pipes[entityId][dst]; + Y_ABORT_UNLESS(Owners[clientId] == std::make_pair(entityId, dst)); SendData(ctx.SelfID, clientId, message.Release(), 0); } @@ -53,10 +56,10 @@ public: } } - ui64 CloseAll(const TEntityId& entityId, const TActorContext& ctx) { + void CloseAll(const TEntityId& entityId, const TActorContext& ctx) { auto entityIt = Pipes.find(entityId); if (entityIt == Pipes.end()) { - return 0; + return; } const auto& entityPipes = entityIt->second; @@ -70,7 +73,7 @@ public: Close(entityId, tabletId, ctx); } - return tablets.size(); + return; } void Shutdown(const TActorContext& ctx) { diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index f06974855a0..e8ae21558e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -497,7 +497,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil private: TIndexBuildId BuildId; - TDeque<std::tuple<TTabletId, ui64, THolder<IEventBase>>> ToTabletSend; + TMap<TTabletId, THolder<IEventBase>> ToTabletSend; template <bool WithSnapshot = true, typename Record> TTabletId CommonFillRecord(Record& record, TShardIdx shardIdx, TIndexBuildInfo& buildInfo) { @@ -551,9 +551,9 @@ private: auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo); ev->Record.SetSeed(ui64(shardId)); - LOG_D("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString()); + LOG_N("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString()); - ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev)); + ToTabletSend.emplace(shardId, std::move(ev)); } void SendKMeansReshuffleRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) { @@ -595,9 +595,9 @@ private: r.ClearClusters(); return r.ShortDebugString(); }; - LOG_D("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record)); + LOG_N("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record)); - ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev)); + ToTabletSend.emplace(shardId, std::move(ev)); } void SendKMeansLocalRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) { @@ -644,9 +644,9 @@ private: auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo); ev->Record.SetSeed(ui64(shardId)); - LOG_D("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString()); + LOG_N("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString()); - ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev)); + ToTabletSend.emplace(shardId, std::move(ev)); } void SendPrefixKMeansRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) { @@ -685,9 +685,9 @@ private: auto shardId = CommonFillRecord<false>(ev->Record, shardIdx, buildInfo); ev->Record.SetSeed(ui64(shardId)); - LOG_D("TTxBuildProgress: TEvPrefixKMeansRequest: " << ev->Record.ShortDebugString()); + LOG_N("TTxBuildProgress: TEvPrefixKMeansRequest: " << ev->Record.ShortDebugString()); - ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev)); + ToTabletSend.emplace(shardId, std::move(ev)); } void SendBuildSecondaryIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) { @@ -736,9 +736,9 @@ private: auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo); - LOG_D("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString()); + LOG_N("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString()); - ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev)); + ToTabletSend.emplace(shardId, std::move(ev)); } void SendUploadSampleKRequest(TIndexBuildInfo& buildInfo) { @@ -757,7 +757,8 @@ private: buildInfo.DoneShards = {}; buildInfo.InProgressShards = {}; buildInfo.ToUploadShards = {}; - + + ToTabletSend.clear(); Self->IndexBuildPipes.CloseAll(BuildId, ctx); } @@ -791,6 +792,9 @@ private: } void AddAllShards(TIndexBuildInfo& buildInfo) { + ToTabletSend.clear(); + Self->IndexBuildPipes.CloseAll(BuildId, Self->ActorContext()); + for (const auto& [idx, status] : buildInfo.Shards) { AddShard(buildInfo, idx, status); } @@ -1102,7 +1106,8 @@ public: Y_ABORT_UNLESS(buildInfoPtr); auto& buildInfo = *buildInfoPtr->Get(); - LOG_I("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo); + LOG_N("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State); + LOG_D("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo); switch (buildInfo.State) { case TIndexBuildInfo::EState::Invalid: @@ -1390,8 +1395,8 @@ public: } void DoComplete(const TActorContext& ctx) override { - for (auto& x: ToTabletSend) { - Self->IndexBuildPipes.Create(BuildId, std::get<0>(x), std::move(std::get<2>(x)), ctx); + for (auto& [shardId, ev]: ToTabletSend) { + Self->IndexBuildPipes.Send(BuildId, shardId, std::move(ev), ctx); } ToTabletSend.clear(); } @@ -1466,7 +1471,7 @@ public: const auto& tabletId = PipeRetry.TabletId; const auto& shardIdx = Self->GetShardIdx(tabletId); - LOG_I("TTxReply : PipeRetry, id# " << buildId + LOG_N("TTxReply : PipeRetry, id# " << buildId << ", tabletId# " << tabletId << ", shardIdx# " << shardIdx); @@ -2416,8 +2421,8 @@ public: << ", BuildIndexId: " << buildInfo.Id << ", status: " << Ydb::StatusIds::StatusCode_Name(status) << ", error: " << buildInfo.Issue - << ", replyTo: " << buildInfo.CreateSender.ToString()); - LOG_D("Message:\n" << responseEv->Record.ShortDebugString()); + << ", replyTo: " << buildInfo.CreateSender.ToString() + << ", message: " << responseEv->Record.ShortDebugString()); Send(buildInfo.CreateSender, std::move(responseEv), 0, buildInfo.SenderCookie); } diff --git a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp index c0476e4e3c1..3af675e29b9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp @@ -115,7 +115,7 @@ public: void Complete(const TActorContext& ctx) override { for (auto& [streamPathId, tabletId, ev] : ScanRequests) { - Self->CdcStreamScanPipes.Create(streamPathId, tabletId, std::move(ev), ctx); + Self->CdcStreamScanPipes.Send(streamPathId, tabletId, std::move(ev), ctx); } if (StreamToProgress) { |
