diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2025-03-13 12:17:47 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-13 12:17:47 +0300 |
commit | 116e2176523a5da37f291ec1cf14305fc298d3b4 (patch) | |
tree | 6a627932285a61c878aaae029b890c66a7f79e4d | |
parent | 36d187872ed8514b246995eb999cccc4948d2566 (diff) | |
download | ydb-116e2176523a5da37f291ec1cf14305fc298d3b4.tar.gz |
Edit pq write actor logs (#15677)
-rw-r--r-- | ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 5529ac46a4..f3d7ddd294 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -88,6 +88,10 @@ struct TEvPrivate { struct TEvPqEventsReady : public TEventLocal<TEvPqEventsReady, EvPqEventsReady> {}; }; +TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) { + return TStringBuilder() << "[Checkpoint " << checkpoint.GetGeneration() << "." << checkpoint.GetId() << "] "; +} + } // namespace class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput { @@ -155,6 +159,7 @@ public: , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ") , FreeSpace(freeSpace) , PqGateway(pqGateway) + , TaskId(taskId) { EgressStats.Level = statsLevel; } @@ -215,11 +220,11 @@ public: if (checkpoint) { if (Buffer.empty() && WaitingAcks.empty()) { - SINK_LOG_D("Send checkpoint state immediately"); + SINK_LOG_D(MakeStringForLog(*checkpoint) << "Send checkpoint state immediately"); Callbacks->OnAsyncOutputStateSaved(BuildState(*checkpoint), OutputIndex, *checkpoint); } else { ui64 seqNo = NextSeqNo + Buffer.size() - 1; - SINK_LOG_D("Defer sending the checkpoint, seqNo: " << seqNo); + SINK_LOG_D(MakeStringForLog(*checkpoint) << "Defer sending the checkpoint, seqNo: " << seqNo); Metrics.InFlyCheckpoints->Inc(); DeferredCheckpoints.emplace(seqNo, *checkpoint); } @@ -275,10 +280,18 @@ private: ) void Handle(TEvPrivate::TEvPqEventsReady::TPtr&) { + if (!Inited) { + Init(); + Inited = true; + } while (HandleNewPQEvents()) { } SubscribeOnNextEvent(); } + void Init() { + LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << TaskId << ", PQ sink. "; + } + // IActor & IDqComputeActorAsyncOutput void PassAway() override { // Is called from Compute Actor if (WriteSession) { @@ -433,7 +446,7 @@ private: if (!Self.DeferredCheckpoints.empty() && std::get<0>(Self.DeferredCheckpoints.front()) == it->SeqNo) { Self.ConfirmedSeqNo = it->SeqNo; const auto& checkpoint = std::get<1>(Self.DeferredCheckpoints.front()); - LOG_D(Self.LogPrefix << "Send a deferred checkpoint, seqNo: " << it->SeqNo); + LOG_D(Self.LogPrefix << MakeStringForLog(checkpoint) << "Send a deferred checkpoint, seqNo: " << it->SeqNo); Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(checkpoint), Self.OutputIndex, checkpoint); Self.DeferredCheckpoints.pop(); Self.Metrics.InFlyCheckpoints->Dec(); @@ -479,7 +492,7 @@ private: NYdb::TDriver Driver; std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory; IDqComputeActorAsyncOutput::ICallbacks* const Callbacks; - const TString LogPrefix; + TString LogPrefix; i64 FreeSpace = 0; bool Finished = false; @@ -495,6 +508,8 @@ private: std::queue<TAckInfo> WaitingAcks; // Size of items which are waiting for acks (used to update free space) std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints; IPqGateway::TPtr PqGateway; + ui64 TaskId; + bool Inited = false; }; std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor( |