aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2025-03-13 12:17:47 +0300
committerGitHub <noreply@github.com>2025-03-13 12:17:47 +0300
commit116e2176523a5da37f291ec1cf14305fc298d3b4 (patch)
tree6a627932285a61c878aaae029b890c66a7f79e4d
parent36d187872ed8514b246995eb999cccc4948d2566 (diff)
downloadydb-116e2176523a5da37f291ec1cf14305fc298d3b4.tar.gz
Edit pq write actor logs (#15677)
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp23
1 files changed, 19 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index 5529ac46a4..f3d7ddd294 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -88,6 +88,10 @@ struct TEvPrivate {
struct TEvPqEventsReady : public TEventLocal<TEvPqEventsReady, EvPqEventsReady> {};
};
+TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) {
+ return TStringBuilder() << "[Checkpoint " << checkpoint.GetGeneration() << "." << checkpoint.GetId() << "] ";
+}
+
} // namespace
class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput {
@@ -155,6 +159,7 @@ public:
, LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ")
, FreeSpace(freeSpace)
, PqGateway(pqGateway)
+ , TaskId(taskId)
{
EgressStats.Level = statsLevel;
}
@@ -215,11 +220,11 @@ public:
if (checkpoint) {
if (Buffer.empty() && WaitingAcks.empty()) {
- SINK_LOG_D("Send checkpoint state immediately");
+ SINK_LOG_D(MakeStringForLog(*checkpoint) << "Send checkpoint state immediately");
Callbacks->OnAsyncOutputStateSaved(BuildState(*checkpoint), OutputIndex, *checkpoint);
} else {
ui64 seqNo = NextSeqNo + Buffer.size() - 1;
- SINK_LOG_D("Defer sending the checkpoint, seqNo: " << seqNo);
+ SINK_LOG_D(MakeStringForLog(*checkpoint) << "Defer sending the checkpoint, seqNo: " << seqNo);
Metrics.InFlyCheckpoints->Inc();
DeferredCheckpoints.emplace(seqNo, *checkpoint);
}
@@ -275,10 +280,18 @@ private:
)
void Handle(TEvPrivate::TEvPqEventsReady::TPtr&) {
+ if (!Inited) {
+ Init();
+ Inited = true;
+ }
while (HandleNewPQEvents()) { }
SubscribeOnNextEvent();
}
+ void Init() {
+ LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << TaskId << ", PQ sink. ";
+ }
+
// IActor & IDqComputeActorAsyncOutput
void PassAway() override { // Is called from Compute Actor
if (WriteSession) {
@@ -433,7 +446,7 @@ private:
if (!Self.DeferredCheckpoints.empty() && std::get<0>(Self.DeferredCheckpoints.front()) == it->SeqNo) {
Self.ConfirmedSeqNo = it->SeqNo;
const auto& checkpoint = std::get<1>(Self.DeferredCheckpoints.front());
- LOG_D(Self.LogPrefix << "Send a deferred checkpoint, seqNo: " << it->SeqNo);
+ LOG_D(Self.LogPrefix << MakeStringForLog(checkpoint) << "Send a deferred checkpoint, seqNo: " << it->SeqNo);
Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(checkpoint), Self.OutputIndex, checkpoint);
Self.DeferredCheckpoints.pop();
Self.Metrics.InFlyCheckpoints->Dec();
@@ -479,7 +492,7 @@ private:
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;
- const TString LogPrefix;
+ TString LogPrefix;
i64 FreeSpace = 0;
bool Finished = false;
@@ -495,6 +508,8 @@ private:
std::queue<TAckInfo> WaitingAcks; // Size of items which are waiting for acks (used to update free space)
std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints;
IPqGateway::TPtr PqGateway;
+ ui64 TaskId;
+ bool Inited = false;
};
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(