diff options
| author | FloatingCrowbar <[email protected]> | 2026-04-20 14:08:52 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-04-20 11:08:52 +0000 |
| commit | 645a08772bf73a67338178386df0b4e68e87f129 (patch) | |
| tree | 79605e997b7741e79def483c1e80ee892e5b5a2c | |
| parent | 1e42c1d7ba1a4b119a9c35c7ce14ba9c62633dd5 (diff) | |
Bugfix (#38272)
6 files changed, 21 insertions, 13 deletions
diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index cd3bbe61a93..dc1a289135f 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -109,7 +109,7 @@ public: virtual void UpdateLag(ui64 workerId, TDuration lag) = 0; virtual const TMaybe<TDuration> GetLag() const = 0; - virtual void UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) = 0; + virtual bool UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) = 0; virtual void WorkerStatusChanged(ui64 workerId, ui64 status) = 0; virtual const ITargetStats* GetStats() = 0; diff --git a/ydb/core/tx/replication/controller/target_transfer.cpp b/ydb/core/tx/replication/controller/target_transfer.cpp index 021afa72d26..142a871c2c3 100644 --- a/ydb/core/tx/replication/controller/target_transfer.cpp +++ b/ydb/core/tx/replication/controller/target_transfer.cpp @@ -344,8 +344,10 @@ void TTargetTransfer::WorkerStatusChanged(ui64 workerId, ui64 status) { } } -void TTargetTransfer::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) { - TBase::UpdateStats(workerId, newStats); +bool TTargetTransfer::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) { + if (!TBase::UpdateStats(workerId, newStats)) { + return false; + } EnsureCounters(); auto* statsPtr = dynamic_cast<TTransferStats*>(Stats.get()); @@ -376,6 +378,7 @@ void TTargetTransfer::UpdateStats(ui64 workerId, const NKikimrReplication::TWork countersPtr->MinWorkerUptime->Set(0); } } + return true; } TTargetWithStreamStats* TTargetTransfer::GetStatsImpl() { diff --git a/ydb/core/tx/replication/controller/target_transfer.h b/ydb/core/tx/replication/controller/target_transfer.h index 541c4521ddc..29b46db9cee 100644 --- a/ydb/core/tx/replication/controller/target_transfer.h +++ b/ydb/core/tx/replication/controller/target_transfer.h @@ -21,7 +21,7 @@ public: using TPtr = std::shared_ptr<TTransferConfig>; TTransferConfig(const TString& srcPath, const TString& dstPath, const NKikimrReplication::TReplicationConfig& cfg); - + const TString& GetTransformLambda() const; const TString& GetRunAsUser() const; const TString& GetDirectoryPath() const; @@ -43,7 +43,7 @@ public: TString GetStreamPath() const override; void EnsureCounters(); - void UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) override; + bool UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) override; void WorkerStatusChanged(ui64 workerId, ui64 status) override; void RemoveWorker(ui64 id) override; diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp index c0b2ffd0c95..d0c1bf61e54 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.cpp +++ b/ydb/core/tx/replication/controller/target_with_stream.cpp @@ -261,18 +261,18 @@ void TTargetWithStream::WorkerStatusChanged(ui64, ui64) { // nop } -void TTargetWithStream::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) { +bool TTargetWithStream::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) { auto* stats = GetStatsImpl(); auto* counters = GetCountersImpl(); if (!stats && !counters) { - return; + return false; } if (!HasWorker(workerId)) { if (stats) { stats->RemoveWorker(workerId); } - return; + return false; } for (const auto& item : newStats.GetValues()) { @@ -283,6 +283,7 @@ void TTargetWithStream::UpdateStats(ui64 workerId, const NKikimrReplication::TWo counters->UpdateWithSingleStatsItem(workerId, item.GetKey(), item.GetValue()); } } + return true; } const TReplication::ITargetStats* TTargetWithStream::GetStats() { diff --git a/ydb/core/tx/replication/controller/target_with_stream.h b/ydb/core/tx/replication/controller/target_with_stream.h index 1c52cd45fbe..c6c20c6b8a9 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.h +++ b/ydb/core/tx/replication/controller/target_with_stream.h @@ -82,7 +82,7 @@ public: void Shutdown(const TActorContext& ctx) override; void WorkerStatusChanged(ui64 workerId, ui64 status) override; - void UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) override; + bool UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) override; const TReplication::ITargetStats* GetStats() override; diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp index 50b603bbd88..7b9b99ce972 100644 --- a/ydb/core/tx/replication/service/service.cpp +++ b/ydb/core/tx/replication/service/service.cpp @@ -214,9 +214,13 @@ public: TActorId RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor, ui32 poolId, const NKikimrReplication::TReplicationLocationConfig& replicationLocation, TMetricsConfig::EMetricsLevel metricsLevel) { - auto res = Workers.emplace(id, TWorkerInfo{ops->Register(actor, TMailboxType::HTSwap, poolId), - replicationLocation, metricsLevel, id.WorkerId(), AppData()->Counters}); - if (metricsLevel == TMetricsConfig::LEVEL_DETAILED) { + auto res = Workers.emplace(id, TWorkerInfo{ + ops->Register(actor, TMailboxType::HTSwap, poolId), + replicationLocation, metricsLevel, id.WorkerId(), AppData()->Counters + }); + + const auto& worker = res.first->second; + if (metricsLevel == TMetricsConfig::LEVEL_DETAILED && !worker.Location.GetPath().empty()) { PendingStatsValues[id]; } @@ -226,7 +230,7 @@ public: Y_ABORT_UNLESS(res.second); - const auto actorId = res.first->second.ActorId; + const auto actorId = worker.ActorId; ActorIdToWorkerId.emplace(actorId, id); SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING); |
