diff options
author | yumkam <yumkam7@ydb.tech> | 2024-12-04 13:30:25 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-04 13:30:25 +0300 |
commit | 778313b4edf12d5786ac6c8cc1525a38238b175f (patch) | |
tree | 3c59009e20e476df7ebf33c89953a2edf55f76d7 | |
parent | 42c202536b8cc47fd79dc8d1836e183942368fc7 (diff) | |
download | ydb-778313b4edf12d5786ac6c8cc1525a38238b175f.tar.gz |
Fix LogPrefix initialization in Source/Sink/Transform/Channel helpers (#11810)
5 files changed, 49 insertions, 27 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h index d04b373f99..0e7ad1bff7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -15,7 +15,7 @@ constexpr ui32 IssuesBufferSize = 16; struct TComputeActorAsyncInputHelper { TString Type; - const TString LogPrefix; + TString LogPrefix; ui64 Index; IDqComputeActorAsyncInput* AsyncInput = nullptr; NActors::IActor* Actor = nullptr; @@ -102,6 +102,10 @@ public: } return {}; } + + void SetLogPrefix(const TString& logPrefix) { + LogPrefix = logPrefix; + } }; //Used for inputs in Sync ComputeActor and for a base for input transform in both sync and async ComputeActors diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 57c4d78176..def8b094db 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -110,14 +110,7 @@ public: void Bootstrap() { try { StartTime = TInstant::Now(); - { - TStringBuilder prefixBuilder; - prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". "; - if (RequestContext) { - prefixBuilder << "Ctx: " << *RequestContext << ". "; - } - LogPrefix = prefixBuilder; - } + InitializeLogPrefix(); // re-initialize with SelfId CA_LOG_D("Start compute actor " << this->SelfId() << ", task: " << Task.GetId()); Channels = new TDqComputeActorChannels(this->SelfId(), TxId, Task, !RuntimeSettings.FailOnUndelivery, @@ -176,7 +169,7 @@ protected: , FunctionRegistry(functionRegistry) , CheckpointingMode(GetTaskCheckpointingMode(Task)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) - , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) + , WatermarksTracker(LogPrefix) , TaskCounters(taskCounters) , MetricsReporter(taskCounters) , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") @@ -233,6 +226,26 @@ protected: } } + void InitializeLogPrefix() { + TStringBuilder prefixBuilder; + prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". "; + if (RequestContext) { + prefixBuilder << "Ctx: " << *RequestContext << ". "; + } + LogPrefix = std::move(prefixBuilder); + + WatermarksTracker.SetLogPrefix(LogPrefix); + for (auto& [_, info]: InputTransformsMap) { + info.SetLogPrefix(LogPrefix); + } + for (auto& [_, info]: SourcesMap) { + info.SetLogPrefix(LogPrefix); + } + for (auto& [_, info]: InputChannelsMap) { + info.SetLogPrefix(LogPrefix); + } + } + void ReportEventElapsedTime() { if (RuntimeSettings.CollectBasic()) { ui64 elapsedMicros = NActors::TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000ull; @@ -788,7 +801,7 @@ protected: protected: struct TInputChannelInfo { - const TString LogPrefix; + TString LogPrefix; ui64 ChannelId; ui32 SrcStageId; IDqInputChannel::TPtr Channel; @@ -848,6 +861,10 @@ protected: Channel->Resume(); } } + + void SetLogPrefix(const TString& logPrefix) { + LogPrefix = logPrefix; + } }; //Design note: @@ -1609,6 +1626,7 @@ protected: RequestContext = MakeIntrusive<NYql::NDq::TRequestContext>(Task.GetRequestContext()); InitializeWatermarks(); + InitializeLogPrefix(); // note: SelfId is not initialized here } private: diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp index 64b03da726..98c5c9d8ab 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp @@ -7,28 +7,24 @@ #include <algorithm> #define LOG_T(s) \ - LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s) #define LOG_D(s) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s) #define LOG_I(s) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s) #define LOG_W(s) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s) #define LOG_E(s) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s) namespace NYql::NDq { using namespace NActors; TDqComputeActorWatermarks::TDqComputeActorWatermarks( - NActors::TActorIdentity selfId, - const TTxId txId, - ui64 taskId + const TString& logPrefix ) - : SelfId(selfId) - , TxId(txId) - , TaskId(taskId) { + : LogPrefix(logPrefix) { } void TDqComputeActorWatermarks::RegisterAsyncInput(ui64 inputId) { @@ -166,4 +162,8 @@ void TDqComputeActorWatermarks::PopPendingWatermark() { PendingWatermark = Nothing(); } +void TDqComputeActorWatermarks::SetLogPrefix(const TString& logPrefix) { + LogPrefix = logPrefix; +} + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h index 259be9565e..1ac5d5326c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h @@ -9,7 +9,7 @@ namespace NYql::NDq { class TDqComputeActorWatermarks { public: - TDqComputeActorWatermarks(NActors::TActorIdentity selfId, const TTxId graphId, ui64 taskId); + TDqComputeActorWatermarks(const TString& logPrefix); void RegisterAsyncInput(ui64 inputId); void RegisterInputChannel(ui64 inputId); @@ -32,14 +32,14 @@ public: TMaybe<TInstant> GetPendingWatermark() const; void PopPendingWatermark(); + void SetLogPrefix(const TString& logPrefix); + private: void RecalcPendingWatermark(); bool MaybePopPendingWatermark(); private: - const NActors::TActorIdentity SelfId; - const TTxId TxId; - ui64 TaskId; + TString LogPrefix; std::unordered_map<ui64, TMaybe<TInstant>> AsyncInputsWatermarks; std::unordered_map<ui64, TMaybe<TInstant>> InputChannelsWatermarks; diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp index 74a6bc3421..99535398fc 100644 --- a/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp +++ b/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TComputeActorAsyncInputHelperTest) { TDummyAsyncInputHelper helper("MyPrefix", 13, NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED); helper.AsyncInput = &input; TDqComputeActorMetrics metrics{NMonitoring::TDynamicCounterPtr{}}; - TDqComputeActorWatermarks watermarks(NActors::TActorIdentity{NActors::TActorId{}}, TTxId{}, 7); + TDqComputeActorWatermarks watermarks(""); auto result = helper.PollAsyncInput(metrics, watermarks, 20); UNIT_ASSERT(result && EResumeSource::CAPollAsync == *result); } |