summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFloatingCrowbar <[email protected]>2026-04-20 14:08:52 +0300
committerGitHub <[email protected]>2026-04-20 11:08:52 +0000
commit645a08772bf73a67338178386df0b4e68e87f129 (patch)
tree79605e997b7741e79def483c1e80ee892e5b5a2c
parent1e42c1d7ba1a4b119a9c35c7ce14ba9c62633dd5 (diff)
Bugfix (#38272)
-rw-r--r--ydb/core/tx/replication/controller/replication.h2
-rw-r--r--ydb/core/tx/replication/controller/target_transfer.cpp7
-rw-r--r--ydb/core/tx/replication/controller/target_transfer.h4
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.cpp7
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.h2
-rw-r--r--ydb/core/tx/replication/service/service.cpp12
6 files changed, 21 insertions, 13 deletions
diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h
index cd3bbe61a93..dc1a289135f 100644
--- a/ydb/core/tx/replication/controller/replication.h
+++ b/ydb/core/tx/replication/controller/replication.h
@@ -109,7 +109,7 @@ public:
virtual void UpdateLag(ui64 workerId, TDuration lag) = 0;
virtual const TMaybe<TDuration> GetLag() const = 0;
- virtual void UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) = 0;
+ virtual bool UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) = 0;
virtual void WorkerStatusChanged(ui64 workerId, ui64 status) = 0;
virtual const ITargetStats* GetStats() = 0;
diff --git a/ydb/core/tx/replication/controller/target_transfer.cpp b/ydb/core/tx/replication/controller/target_transfer.cpp
index 021afa72d26..142a871c2c3 100644
--- a/ydb/core/tx/replication/controller/target_transfer.cpp
+++ b/ydb/core/tx/replication/controller/target_transfer.cpp
@@ -344,8 +344,10 @@ void TTargetTransfer::WorkerStatusChanged(ui64 workerId, ui64 status) {
}
}
-void TTargetTransfer::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) {
- TBase::UpdateStats(workerId, newStats);
+bool TTargetTransfer::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) {
+ if (!TBase::UpdateStats(workerId, newStats)) {
+ return false;
+ }
EnsureCounters();
auto* statsPtr = dynamic_cast<TTransferStats*>(Stats.get());
@@ -376,6 +378,7 @@ void TTargetTransfer::UpdateStats(ui64 workerId, const NKikimrReplication::TWork
countersPtr->MinWorkerUptime->Set(0);
}
}
+ return true;
}
TTargetWithStreamStats* TTargetTransfer::GetStatsImpl() {
diff --git a/ydb/core/tx/replication/controller/target_transfer.h b/ydb/core/tx/replication/controller/target_transfer.h
index 541c4521ddc..29b46db9cee 100644
--- a/ydb/core/tx/replication/controller/target_transfer.h
+++ b/ydb/core/tx/replication/controller/target_transfer.h
@@ -21,7 +21,7 @@ public:
using TPtr = std::shared_ptr<TTransferConfig>;
TTransferConfig(const TString& srcPath, const TString& dstPath, const NKikimrReplication::TReplicationConfig& cfg);
-
+
const TString& GetTransformLambda() const;
const TString& GetRunAsUser() const;
const TString& GetDirectoryPath() const;
@@ -43,7 +43,7 @@ public:
TString GetStreamPath() const override;
void EnsureCounters();
- void UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) override;
+ bool UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& stats) override;
void WorkerStatusChanged(ui64 workerId, ui64 status) override;
void RemoveWorker(ui64 id) override;
diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp
index c0b2ffd0c95..d0c1bf61e54 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.cpp
+++ b/ydb/core/tx/replication/controller/target_with_stream.cpp
@@ -261,18 +261,18 @@ void TTargetWithStream::WorkerStatusChanged(ui64, ui64) {
// nop
}
-void TTargetWithStream::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) {
+bool TTargetWithStream::UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) {
auto* stats = GetStatsImpl();
auto* counters = GetCountersImpl();
if (!stats && !counters) {
- return;
+ return false;
}
if (!HasWorker(workerId)) {
if (stats) {
stats->RemoveWorker(workerId);
}
- return;
+ return false;
}
for (const auto& item : newStats.GetValues()) {
@@ -283,6 +283,7 @@ void TTargetWithStream::UpdateStats(ui64 workerId, const NKikimrReplication::TWo
counters->UpdateWithSingleStatsItem(workerId, item.GetKey(), item.GetValue());
}
}
+ return true;
}
const TReplication::ITargetStats* TTargetWithStream::GetStats() {
diff --git a/ydb/core/tx/replication/controller/target_with_stream.h b/ydb/core/tx/replication/controller/target_with_stream.h
index 1c52cd45fbe..c6c20c6b8a9 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.h
+++ b/ydb/core/tx/replication/controller/target_with_stream.h
@@ -82,7 +82,7 @@ public:
void Shutdown(const TActorContext& ctx) override;
void WorkerStatusChanged(ui64 workerId, ui64 status) override;
- void UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) override;
+ bool UpdateStats(ui64 workerId, const NKikimrReplication::TWorkerStats& newStats) override;
const TReplication::ITargetStats* GetStats() override;
diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp
index 50b603bbd88..7b9b99ce972 100644
--- a/ydb/core/tx/replication/service/service.cpp
+++ b/ydb/core/tx/replication/service/service.cpp
@@ -214,9 +214,13 @@ public:
TActorId RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor, ui32 poolId,
const NKikimrReplication::TReplicationLocationConfig& replicationLocation, TMetricsConfig::EMetricsLevel metricsLevel)
{
- auto res = Workers.emplace(id, TWorkerInfo{ops->Register(actor, TMailboxType::HTSwap, poolId),
- replicationLocation, metricsLevel, id.WorkerId(), AppData()->Counters});
- if (metricsLevel == TMetricsConfig::LEVEL_DETAILED) {
+ auto res = Workers.emplace(id, TWorkerInfo{
+ ops->Register(actor, TMailboxType::HTSwap, poolId),
+ replicationLocation, metricsLevel, id.WorkerId(), AppData()->Counters
+ });
+
+ const auto& worker = res.first->second;
+ if (metricsLevel == TMetricsConfig::LEVEL_DETAILED && !worker.Location.GetPath().empty()) {
PendingStatsValues[id];
}
@@ -226,7 +230,7 @@ public:
Y_ABORT_UNLESS(res.second);
- const auto actorId = res.first->second.ActorId;
+ const auto actorId = worker.ActorId;
ActorIdToWorkerId.emplace(actorId, id);
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING);