aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <yumkam7@ydb.tech>2024-12-04 13:30:25 +0300
committerGitHub <noreply@github.com>2024-12-04 13:30:25 +0300
commit778313b4edf12d5786ac6c8cc1525a38238b175f (patch)
tree3c59009e20e476df7ebf33c89953a2edf55f76d7
parent42c202536b8cc47fd79dc8d1836e183942368fc7 (diff)
downloadydb-778313b4edf12d5786ac6c8cc1525a38238b175f.tar.gz
Fix LogPrefix initialization in Source/Sink/Transform/Channel helpers (#11810)
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h38
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp22
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h8
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp2
5 files changed, 49 insertions, 27 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
index d04b373f99..0e7ad1bff7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
@@ -15,7 +15,7 @@ constexpr ui32 IssuesBufferSize = 16;
struct TComputeActorAsyncInputHelper {
TString Type;
- const TString LogPrefix;
+ TString LogPrefix;
ui64 Index;
IDqComputeActorAsyncInput* AsyncInput = nullptr;
NActors::IActor* Actor = nullptr;
@@ -102,6 +102,10 @@ public:
}
return {};
}
+
+ void SetLogPrefix(const TString& logPrefix) {
+ LogPrefix = logPrefix;
+ }
};
//Used for inputs in Sync ComputeActor and for a base for input transform in both sync and async ComputeActors
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 57c4d78176..def8b094db 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -110,14 +110,7 @@ public:
void Bootstrap() {
try {
StartTime = TInstant::Now();
- {
- TStringBuilder prefixBuilder;
- prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ";
- if (RequestContext) {
- prefixBuilder << "Ctx: " << *RequestContext << ". ";
- }
- LogPrefix = prefixBuilder;
- }
+ InitializeLogPrefix(); // re-initialize with SelfId
CA_LOG_D("Start compute actor " << this->SelfId() << ", task: " << Task.GetId());
Channels = new TDqComputeActorChannels(this->SelfId(), TxId, Task, !RuntimeSettings.FailOnUndelivery,
@@ -176,7 +169,7 @@ protected:
, FunctionRegistry(functionRegistry)
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
- , WatermarksTracker(this->SelfId(), TxId, Task.GetId())
+ , WatermarksTracker(LogPrefix)
, TaskCounters(taskCounters)
, MetricsReporter(taskCounters)
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
@@ -233,6 +226,26 @@ protected:
}
}
+ void InitializeLogPrefix() {
+ TStringBuilder prefixBuilder;
+ prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ";
+ if (RequestContext) {
+ prefixBuilder << "Ctx: " << *RequestContext << ". ";
+ }
+ LogPrefix = std::move(prefixBuilder);
+
+ WatermarksTracker.SetLogPrefix(LogPrefix);
+ for (auto& [_, info]: InputTransformsMap) {
+ info.SetLogPrefix(LogPrefix);
+ }
+ for (auto& [_, info]: SourcesMap) {
+ info.SetLogPrefix(LogPrefix);
+ }
+ for (auto& [_, info]: InputChannelsMap) {
+ info.SetLogPrefix(LogPrefix);
+ }
+ }
+
void ReportEventElapsedTime() {
if (RuntimeSettings.CollectBasic()) {
ui64 elapsedMicros = NActors::TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000ull;
@@ -788,7 +801,7 @@ protected:
protected:
struct TInputChannelInfo {
- const TString LogPrefix;
+ TString LogPrefix;
ui64 ChannelId;
ui32 SrcStageId;
IDqInputChannel::TPtr Channel;
@@ -848,6 +861,10 @@ protected:
Channel->Resume();
}
}
+
+ void SetLogPrefix(const TString& logPrefix) {
+ LogPrefix = logPrefix;
+ }
};
//Design note:
@@ -1609,6 +1626,7 @@ protected:
RequestContext = MakeIntrusive<NYql::NDq::TRequestContext>(Task.GetRequestContext());
InitializeWatermarks();
+ InitializeLogPrefix(); // note: SelfId is not initialized here
}
private:
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
index 64b03da726..98c5c9d8ab 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
@@ -7,28 +7,24 @@
#include <algorithm>
#define LOG_T(s) \
- LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s)
#define LOG_D(s) \
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s)
#define LOG_I(s) \
- LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s)
#define LOG_W(s) \
- LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s)
#define LOG_E(s) \
- LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << "Watermarks. " << s)
namespace NYql::NDq {
using namespace NActors;
TDqComputeActorWatermarks::TDqComputeActorWatermarks(
- NActors::TActorIdentity selfId,
- const TTxId txId,
- ui64 taskId
+ const TString& logPrefix
)
- : SelfId(selfId)
- , TxId(txId)
- , TaskId(taskId) {
+ : LogPrefix(logPrefix) {
}
void TDqComputeActorWatermarks::RegisterAsyncInput(ui64 inputId) {
@@ -166,4 +162,8 @@ void TDqComputeActorWatermarks::PopPendingWatermark() {
PendingWatermark = Nothing();
}
+void TDqComputeActorWatermarks::SetLogPrefix(const TString& logPrefix) {
+ LogPrefix = logPrefix;
+}
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
index 259be9565e..1ac5d5326c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
@@ -9,7 +9,7 @@ namespace NYql::NDq {
class TDqComputeActorWatermarks
{
public:
- TDqComputeActorWatermarks(NActors::TActorIdentity selfId, const TTxId graphId, ui64 taskId);
+ TDqComputeActorWatermarks(const TString& logPrefix);
void RegisterAsyncInput(ui64 inputId);
void RegisterInputChannel(ui64 inputId);
@@ -32,14 +32,14 @@ public:
TMaybe<TInstant> GetPendingWatermark() const;
void PopPendingWatermark();
+ void SetLogPrefix(const TString& logPrefix);
+
private:
void RecalcPendingWatermark();
bool MaybePopPendingWatermark();
private:
- const NActors::TActorIdentity SelfId;
- const TTxId TxId;
- ui64 TaskId;
+ TString LogPrefix;
std::unordered_map<ui64, TMaybe<TInstant>> AsyncInputsWatermarks;
std::unordered_map<ui64, TMaybe<TInstant>> InputChannelsWatermarks;
diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp
index 74a6bc3421..99535398fc 100644
--- a/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp
+++ b/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TComputeActorAsyncInputHelperTest) {
TDummyAsyncInputHelper helper("MyPrefix", 13, NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED);
helper.AsyncInput = &input;
TDqComputeActorMetrics metrics{NMonitoring::TDynamicCounterPtr{}};
- TDqComputeActorWatermarks watermarks(NActors::TActorIdentity{NActors::TActorId{}}, TTxId{}, 7);
+ TDqComputeActorWatermarks watermarks("");
auto result = helper.PollAsyncInput(metrics, watermarks, 20);
UNIT_ASSERT(result && EResumeSource::CAPollAsync == *result);
}