diff options
author | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-05-26 01:18:20 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-05-26 01:18:20 +0300 |
commit | 363959d1c08721282997f4ab374a416fb1691ead (patch) | |
tree | 4cd5f411dd3cd9e20778e790fb5b8c0ca3422988 | |
parent | 13b2958e4986b143d751eba7671fc41ff4e49d31 (diff) | |
download | ydb-363959d1c08721282997f4ab374a416fb1691ead.tar.gz |
[yql] Cleanup dq logging
YQL-12393
ref:dc9772be7c1b3f3f6045fa5d6562a975f813856f
26 files changed, 238 insertions, 239 deletions
diff --git a/ydb/library/yql/providers/dq/actors/actor_helpers.h b/ydb/library/yql/providers/dq/actors/actor_helpers.h index 65be3a25d63..cf66de3ea40 100644 --- a/ydb/library/yql/providers/dq/actors/actor_helpers.h +++ b/ydb/library/yql/providers/dq/actors/actor_helpers.h @@ -144,7 +144,7 @@ private: } void OnSync(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) { - YQL_LOG(DEBUG) << "OnSync(): delayed messages " << DelayedEvents_.size(); + YQL_CLOG(DEBUG, ProviderDq) << "OnSync(): delayed messages " << DelayedEvents_.size(); SyncState_ = E_SYNC_RECEIVED; TBase::Become(InterruptedHandler_); SyncCallback_(ev); diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 7293afdbacd..aff34066bdc 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -81,8 +81,8 @@ private: HFunc(TEvQueryResponse, OnQueryResponse); // execution timeout cFunc(TEvents::TEvBootstrap::EventType, [this]() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "Execution timeout"; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "Execution timeout"; auto issue = TIssue("Execution timeout"); issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); @@ -99,7 +99,7 @@ private: TStringInput inputStream1(Settings->WorkerFilter.Get().GetOrElse("")); ParseFromTextFormat(inputStream1, pragmaFilter); } catch (...) { - YQL_LOG(INFO) << "Cannot parse filter pragma " << CurrentExceptionMessage(); + YQL_CLOG(INFO, ProviderDq) << "Cannot parse filter pragma " << CurrentExceptionMessage(); } } return pragmaFilter; @@ -128,10 +128,10 @@ private: } void OnGraph(TEvGraphRequest::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); Y_VERIFY(!ControlId); Y_VERIFY(!ResultId); - YQL_LOG(DEBUG) << "TDqExecuter::OnGraph"; + YQL_CLOG(DEBUG, ProviderDq) << "TDqExecuter::OnGraph"; ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId()); ResultId = NActors::ActorIdFromProto(ev->Get()->Record.GetResultId()); CheckPointCoordinatorId = NActors::ActorIdFromProto(ev->Get()->Record.GetCheckPointCoordinatorId()); @@ -141,7 +141,7 @@ private: AddChild(CheckPointCoordinatorId); int workerCount = ev->Get()->Record.GetRequest().GetTask().size(); - YQL_LOG(INFO) << (TStringBuilder() << "Trying to allocate " << workerCount << " workers"); + YQL_CLOG(INFO, ProviderDq) << (TStringBuilder() << "Trying to allocate " << workerCount << " workers"); THashMap<TString, Yql::DqsProto::TFile> files; TVector<NDqProto::TDqTask> tasks; @@ -238,9 +238,9 @@ private: void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool retriable, bool needFallback = false) { - YQL_LOG(DEBUG) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback; if (Finished) { - YQL_LOG(WARN) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback; + YQL_CLOG(WARN, ProviderDq) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback; } else { FlushCounter("ExecutionTime"); TQueryResponse result; @@ -256,8 +256,8 @@ private: void OnFailure(TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { if (!Finished) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; AddCounters(ev->Get()->Record); bool retriable = ev->Get()->Record.GetDeprecatedRetriable(); bool fallback = ev->Get()->Record.GetDeprecatedNeedFallback(); @@ -271,14 +271,14 @@ private: } void OnGraphFinished(TEvGraphFinished::TPtr&, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; if (!Finished) { try { TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; }); Finish(NYql::NDqProto::StatusIds::SUCCESS, false); } catch (...) { - YQL_LOG(ERROR) << " FailureInjector " << CurrentExceptionMessage(); + YQL_CLOG(ERROR, ProviderDq) << " FailureInjector " << CurrentExceptionMessage(); Issues.AddIssue(TIssue("FailureInjection")); Finish(NYql::NDqProto::StatusIds::UNAVAILABLE, true); } @@ -288,8 +288,8 @@ private: // TBD: wait for PoisonTaken from CheckPointCoordinator before send TEvQueryResponse to PrinterId void OnQueryResponse(TEvQueryResponse::TPtr& ev, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; Send(PrinterId, ev->Release().Release()); PassAway(); } @@ -299,8 +299,8 @@ private: } void OnAllocateWorkersResponse(TEvAllocateWorkersResponse::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "TDqExecuter::TEvAllocateWorkersResponse"; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "TDqExecuter::TEvAllocateWorkersResponse"; AddCounters(ev->Get()->Record); FlushCounter("AllocateWorkers"); @@ -310,7 +310,7 @@ private: case TAllocateWorkersResponse::kWorkers: break; case TAllocateWorkersResponse::kError: { - YQL_LOG(ERROR) << "Error on allocate workers " + YQL_CLOG(ERROR, ProviderDq) << "Error on allocate workers " << ev->Get()->Record.GetError().GetMessage() << ":" << static_cast<int>(ev->Get()->Record.GetError().GetErrorCode()); Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)); @@ -324,7 +324,7 @@ private: auto& workerGroup = response.GetWorkers(); ResourceId = workerGroup.GetResourceId(); - YQL_LOG(DEBUG) << "Allocated resource " << ResourceId; + YQL_CLOG(DEBUG, ProviderDq) << "Allocated resource " << ResourceId; TVector<NActors::TActorId> workers; for (const auto& actorIdProto : workerGroup.GetWorkerActor()) { workers.emplace_back(NActors::ActorIdFromProto(actorIdProto)); @@ -341,10 +341,10 @@ private: WorkerInfo[workerInfo.GetNodeId()] = std::make_tuple(workerInfo, taskMeta.GetStageId()); - YQL_LOG(DEBUG) << "WorkerInfo: " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(workerInfo, taskMeta.GetStageId()); - YQL_LOG(DEBUG) << "TaskInfo: " << i << "/" << tasks[i].GetId(); + YQL_CLOG(DEBUG, ProviderDq) << "WorkerInfo: " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(workerInfo, taskMeta.GetStageId()); + YQL_CLOG(DEBUG, ProviderDq) << "TaskInfo: " << i << "/" << tasks[i].GetId(); for (const auto& file: taskMeta.GetFiles()) { - YQL_LOG(DEBUG) << " ObjectId: " << file.GetObjectId(); + YQL_CLOG(DEBUG, ProviderDq) << " ObjectId: " << file.GetObjectId(); } i++; @@ -353,7 +353,7 @@ private: AddCounter("UniqueWorkers", uniqueWorkers.size()); } - YQL_LOG(INFO) << workers.size() << " workers allocated"; + YQL_CLOG(INFO, ProviderDq) << workers.size() << " workers allocated"; YQL_ENSURE(workers.size() == tasks.size()); diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp index f318eefa4be..06fc24e650a 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp @@ -46,12 +46,12 @@ private: }) void PassAway() override { - YQL_LOG_CTX_SCOPE(TraceID); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceID); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; try { FullResultWriter->Abort(); } catch (...) { - YQL_LOG(WARN) << "FullResultWriter->Abort(): " << CurrentExceptionMessage(); + YQL_CLOG(WARN, ProviderDq) << "FullResultWriter->Abort(): " << CurrentExceptionMessage(); } ResultBuilder.Reset(); FullResultWriter.Reset(); @@ -62,7 +62,7 @@ private: } void OnStatusRequest(TEvFullResultWriterStatusRequest::TPtr&, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceID); + YQL_LOG_CTX_ROOT_SCOPE(TraceID); NDqProto::TFullResultWriterStatusResponse response; response.SetBytesReceived(BytesReceived); if (ErrorMessage) { @@ -72,7 +72,7 @@ private: } void OnWriteRequest(TEvFullResultWriterWriteRequest::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceID); + YQL_LOG_CTX_ROOT_SCOPE(TraceID); auto& request = ev->Get()->Record; if (request.GetFinish()) { Finish(); @@ -82,7 +82,7 @@ private: } void Finish() { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; try { TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; }); FullResultWriter->Finish(); @@ -103,7 +103,7 @@ private: } void Continue(NDqProto::TFullResultWriterWriteRequest& request) { - YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount(); + YQL_CLOG(DEBUG, ProviderDq) << "Continue -- RowCount = " << FullResultWriter->GetRowCount(); ui64 reqSize = request.ByteSizeLong(); WriteToFullResultTable(request); BytesReceived += reqSize; @@ -111,7 +111,7 @@ private: void WriteToFullResultTable(NDqProto::TFullResultWriterWriteRequest& request) { if (ErrorMessage) { - YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting"; + YQL_CLOG(DEBUG, ProviderDq) << "Failed to write previous chunk, aborting"; return; } @@ -130,7 +130,7 @@ private: } if (ErrorMessage) { - YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage; + YQL_CLOG(DEBUG, ProviderDq) << "An error occurred: " << *ErrorMessage; } } diff --git a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp index 3ff4fb16588..4ab306e9bf0 100644 --- a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp @@ -30,12 +30,12 @@ private: }) void DoPassAway() override { - YQL_LOG_CTX_SCOPE(TraceID); + YQL_LOG_CTX_ROOT_SCOPE(TraceID); } void OnEvent(NDqs::TEvGraphExecutionEvent::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceID); - YQL_LOG(DEBUG) << __FUNCTION__ << ' ' << NYql::NDqProto::EGraphExecutionEventType_Name(ev->Get()->Record.GetEventType()); + YQL_LOG_CTX_ROOT_SCOPE(TraceID); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ' ' << NYql::NDqProto::EGraphExecutionEventType_Name(ev->Get()->Record.GetEventType()); try { switch (ev->Get()->Record.GetEventType()) { @@ -63,7 +63,7 @@ private: } } catch (...) { TString message = TStringBuilder() << "Error on TEvGraphExecutionEvent: " << CurrentExceptionMessage(); - YQL_LOG(ERROR) << message; + YQL_CLOG(ERROR, ProviderDq) << message; Reply(ev->Sender, message); } } @@ -74,7 +74,7 @@ private: } void OnStart(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor& payload) { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; const auto& secureParams = AsHashMap(payload.GetSecureParams().GetData()); const auto& graphParams = AsHashMap(payload.GetGraphParams().GetData()); @@ -109,7 +109,7 @@ private: } void OnFail(NActors::TActorId replyTo) { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; for (const auto& preprocessor: TaskPreprocessors) { preprocessor->Finish(false); } @@ -117,7 +117,7 @@ private: } void OnSuccess(NActors::TActorId replyTo) { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; for (const auto& preprocessor: TaskPreprocessors) { preprocessor->Finish(true); } @@ -125,7 +125,7 @@ private: } void OnFullResult(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TFullResultDescriptor& payload) { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; THolder<IDqFullResultWriter> writer; for (const auto& preprocessor: TaskPreprocessors) { writer = preprocessor->CreateFullResultWriter(); diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp index 4b968b9f240..c8c786f6a3d 100644 --- a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp +++ b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp @@ -101,7 +101,7 @@ private: { Y_UNUSED(ctx); - YQL_LOG(DEBUG) << "RequestActorIdsFromNodes " << ev->Sender.NodeId(); + YQL_CLOG(DEBUG, ProviderDq) << "RequestActorIdsFromNodes " << ev->Sender.NodeId(); auto& response = ev->Get()->Record; auto& nodes = response.GetNodes().GetWorker(); @@ -110,11 +110,11 @@ private: Y_VERIFY(static_cast<ui32>(nodes.size()) == RequestedCount); RequestedNodes.reserve(RequestedCount); - YQL_LOG(DEBUG) << "RequestActorIdsFromNodes " << ev->Sender.NodeId() << " " << ResourceId; + YQL_CLOG(DEBUG, ProviderDq) << "RequestActorIdsFromNodes " << ev->Sender.NodeId() << " " << ResourceId; auto now = TInstant::Now(); ui16 i = 1; for (const auto& node : nodes) { - YQL_LOG(DEBUG) << "RequestedNode: " << node.GetNodeId(); + YQL_CLOG(DEBUG, ProviderDq) << "RequestedNode: " << node.GetNodeId(); TString clusterName = node.GetClusterName(); NYql::NDqProto::TDqTask task; if (!Tasks.empty()) { @@ -133,10 +133,10 @@ private: void OnAllocateWorkersResponse(TEvAllocateWorkersResponse::TPtr& ev, const TActorContext& ctx) { - YQL_LOG_CTX_SCOPE(TraceId + SelfId().ToString()); + YQL_LOG_CTX_ROOT_SCOPE(TraceId, SelfId().ToString()); Y_UNUSED(ctx); - YQL_LOG(DEBUG) << "TEvAllocateWorkersResponse " << ev->Sender.NodeId(); + YQL_CLOG(DEBUG, ProviderDq) << "TEvAllocateWorkersResponse " << ev->Sender.NodeId(); QueryStat.AddCounters(ev->Get()->Record); @@ -145,7 +145,7 @@ private: } if (ev->Get()->Record.GetTResponseCase() == NDqProto::TAllocateWorkersResponse::kError) { - YQL_LOG(DEBUG) << "Forwarding bad state"; + YQL_CLOG(DEBUG, ProviderDq) << "Forwarding bad state"; ctx.Send(ev->Forward(SenderId)); FailState = true; return; @@ -215,18 +215,18 @@ private: void DoPassAway() override { - YQL_LOG_CTX_SCOPE(TraceId + SelfId().ToString()); + YQL_LOG_CTX_ROOT_SCOPE(TraceId, SelfId().ToString()); for (const auto& group : AllocatedWorkers) { for (const auto& actorIdProto : group.GetWorkerActor()) { auto actorNode = NActors::ActorIdFromProto(actorIdProto).NodeId(); - YQL_LOG(DEBUG) << "TEvFreeWorkersNotify " << group.GetResourceId(); + YQL_CLOG(DEBUG, ProviderDq) << "TEvFreeWorkersNotify " << group.GetResourceId(); auto request = MakeHolder<TEvFreeWorkersNotify>(group.GetResourceId()); request->Record.SetTraceId(TraceId); Send(MakeWorkerManagerActorID(actorNode), request.Release()); } } if (!LocalMode) { - YQL_LOG(DEBUG) << "TEvFreeWorkersNotify " << ResourceId << " (failed)"; + YQL_CLOG(DEBUG, ProviderDq) << "TEvFreeWorkersNotify " << ResourceId << " (failed)"; auto request = MakeHolder<TEvFreeWorkersNotify>(ResourceId); request->Record.SetTraceId(TraceId); for (const auto& failedWorker : FailedWorkers) { @@ -249,7 +249,7 @@ private: ActorIdToProto(ControlId, request->Record.MutableResultActorId()); *request->Record.AddTask() = node.Task; } - YQL_LOG(WARN) << "Send TEvAllocateWorkersRequest to " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(node.WorkerInfo, 0); + YQL_CLOG(WARN, ProviderDq) << "Send TEvAllocateWorkersRequest to " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(node.WorkerInfo, 0); if (backoff) { TActivationContext::Schedule(backoff, new IEventHandle( MakeWorkerManagerActorID(nodeId), SelfId(), request.Release(), @@ -265,13 +265,13 @@ private: } void Fail(const ui64 cookie, const TString& reason) { - YQL_LOG_CTX_SCOPE(TraceId + SelfId().ToString()); + YQL_LOG_CTX_ROOT_SCOPE(TraceId, SelfId().ToString()); if (FailState) { return; } auto maybeRequestInfo = RequestedNodes.find(cookie); if (!Answered && maybeRequestInfo != RequestedNodes.end() && maybeRequestInfo->second.Retry < NetworkRetries) { - YQL_LOG(WARN) << "Retry Allocate Request"; + YQL_CLOG(WARN, ProviderDq) << "Retry Allocate Request"; auto& requestInfo = RequestedNodes[cookie]; requestInfo.Retry ++; *RetryCounter += 1; @@ -293,7 +293,7 @@ private: delta); } TString message = "Disconnected from worker: `" + workerInfo + "', reason: " + reason; - YQL_LOG(ERROR) << message; + YQL_CLOG(ERROR, ProviderDq) << message; auto response = MakeHolder<TEvAllocateWorkersResponse>(message); QueryStat.FlushCounters(response->Record); Send(SenderId, response.Release()); diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index fe2bc1831cf..192dc080da2 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -56,8 +56,8 @@ namespace NYql::NDqs::NExecutionHelpers { , QueryResponse() , WaitingAckFromFRW(false) { ResultYsonWriter->OnBeginList(); - YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit; - YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing"); + YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << SizeLimit; + YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing"); } virtual void FinishFullResultWriter() { @@ -65,7 +65,7 @@ namespace NYql::NDqs::NExecutionHelpers { } void OnReceiveData(NYql::NDqProto::TData&& data, const TString& messageId = "", bool autoAck = false) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); if (data.GetRows() > 0 && !ResultBuilder) { Issues.AddIssue(TIssue("Non empty rows: >=" + ToString(data.GetRows())).SetCode(0, TSeverityIds::S_WARNING)); @@ -126,7 +126,7 @@ namespace NYql::NDqs::NExecutionHelpers { } void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) { - YQL_LOG(ERROR) << "OnError " << message; + YQL_CLOG(ERROR, ProviderDq) << "OnError " << message; auto issueCode = NCommon::NeedFallback(statusCode) ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR; @@ -138,7 +138,7 @@ namespace NYql::NDqs::NExecutionHelpers { } void Finish() { - YQL_LOG(DEBUG) << __FUNCTION__ << ", truncated=" << Truncated; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ", truncated=" << Truncated; YQL_ENSURE(!FinishCalled); FinishCalled = true; @@ -162,8 +162,8 @@ namespace NYql::NDqs::NExecutionHelpers { cFunc(NActors::TEvents::TEvGone::EventType, OnFullResultWriterShutdown); cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway) default: - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "Unexpected event " << etype; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "Unexpected event " << etype; break; } } @@ -176,17 +176,17 @@ namespace NYql::NDqs::NExecutionHelpers { HFunc(TEvDqFailure, OnErrorInShutdownState); HFunc(TEvFullResultWriterAck, OnFullResultWriterAck); default: - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "Unexpected event " << etype; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "Unexpected event " << etype; break; } } private: void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); - YQL_LOG(DEBUG) << "Shutting down TResultAggregator"; + YQL_CLOG(DEBUG, ProviderDq) << "Shutting down TResultAggregator"; BlockingActors.clear(); if (FullResultWriterID) { @@ -194,7 +194,7 @@ namespace NYql::NDqs::NExecutionHelpers { FinishFullResultWriter(); } - YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors"; + YQL_CLOG(DEBUG, ProviderDq) << "Waiting for " << BlockingActors.size() << " blocking actors"; QueryResponse.Reset(ev->Release().Release()); TBase::Become(&TDerived::ShutdownHandler); @@ -202,15 +202,15 @@ namespace NYql::NDqs::NExecutionHelpers { } void OnFullResultWriterShutdown() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "Got TEvGone"; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "Got TEvGone"; FullResultWriterID = {}; } void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; if (ev->Get()->Record.IssuesSize() == 0) { DoFinish(); } else { @@ -219,7 +219,7 @@ namespace NYql::NDqs::NExecutionHelpers { } void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId()) + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16) + "." + ToString(ev->Get()->SourceType & 0xFFFF); @@ -227,8 +227,8 @@ namespace NYql::NDqs::NExecutionHelpers { } void OnFullResultWriterAck(TEvFullResultWriterAck::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; Y_VERIFY(ev->Get()->Record.GetMessageId() == WriteQueue.front().MessageId); if (!WriteQueue.front().SentProcessedEvent) { // messages, received before limits exceeded, are already been reported TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(WriteQueue.front().MessageId)); @@ -251,13 +251,13 @@ namespace NYql::NDqs::NExecutionHelpers { } void OnShutdownQueryResult(NActors::TEvents::TEvGone::TPtr& ev, const NActors::TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); auto iter = BlockingActors.find(ev->Sender); if (iter != BlockingActors.end()) { BlockingActors.erase(iter); } - YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left"; + YQL_CLOG(DEBUG, ProviderDq) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left"; if (BlockingActors.empty()) { EndOnQueryResult(); @@ -269,7 +269,7 @@ namespace NYql::NDqs::NExecutionHelpers { } void FlushCurrent() { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; YQL_ENSURE(!FullResultWriterID); YQL_ENSURE(FullResultTableEnabled); @@ -281,7 +281,7 @@ namespace NYql::NDqs::NExecutionHelpers { TBase::Send(GraphExecutionEventsId, new TEvGraphExecutionEvent(record)); TBase::template Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) { Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); if (auto msg = ev->Get()->Record.GetErrorMessage()) { OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, msg); @@ -295,7 +295,7 @@ namespace NYql::NDqs::NExecutionHelpers { } void EndOnQueryResult() { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; NDqProto::TQueryResponse result = QueryResponse->Record; YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); @@ -318,8 +318,8 @@ namespace NYql::NDqs::NExecutionHelpers { } void DoPassAway() override { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; } void TryWriteToFullResultTable() { @@ -331,8 +331,8 @@ namespace NYql::NDqs::NExecutionHelpers { } void UnsafeWriteToFullResultTable() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(WriteQueue.front().WriteRequest))); } diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index cca0b79d9a8..ec49e909e58 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -101,8 +101,8 @@ private: } void OnWakeup() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; auto now = TInstant::Now(); if (PullRequestTimeout && now - PullRequestStartTime > PullRequestTimeout) { OnError(NYql::NDqProto::StatusIds::TIMEOUT, "Timeout " + ToString(SourceID.NodeId())); @@ -123,7 +123,7 @@ private: } void OnReadyState(TEvReadyState::TPtr& ev, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); AddCounters(ev->Get()->Record); SourceID = NActors::ActorIdFromProto(ev->Get()->Record.GetSourceId()); @@ -138,7 +138,7 @@ private: } void OnPullResult(TEvPullResult::TPtr&, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); PullRequestStartTime = TInstant::Now(); Send(SourceID, MakeHolder<TEvPullDataRequest>(MAX_RESULT_BATCH), IEventHandle::FlagTrackDelivery); } @@ -148,8 +148,7 @@ private: } void OnPullResponse(TEvPullDataResponse::TPtr& ev, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); if (FinishCalled) { // finalization has been begun, actor will not kill himself anymore, should ignore responses instead diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp index 7c743e7f606..f14c03fad50 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp +++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp @@ -65,8 +65,8 @@ public: private: void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; bool finishRequested = ev->Get()->Record.GetChannelData().GetFinished(); if (!FinishCalled) { @@ -77,7 +77,7 @@ private: Y_ENSURE(inserted); } - YQL_LOG(DEBUG) << "Finished: " << finishRequested; + YQL_CLOG(DEBUG, ProviderDq) << "Finished: " << finishRequested; } void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { @@ -85,8 +85,8 @@ private: } void OnMessageProcessed(TEvMessageProcessed::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; SendAck(ev->Get()->MessageId); } diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 379a5c0ce91..b0bcd9db67d 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -65,10 +65,10 @@ public: { if (Settings) { if (Settings->_AllResultsBytesLimit.Get()) { - YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get(); + YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get(); } if (Settings->_RowsLimitPerWrite.Get()) { - YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); + YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); } } } @@ -103,10 +103,10 @@ private: } void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); auto statusCode = ev->Get()->Record.GetStatusCode(); TIssues issues = ev->Get()->GetIssues(); - YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString(); + YQL_CLOG(DEBUG, ProviderDq) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString(); OnError(statusCode, issues); } @@ -114,8 +114,8 @@ private: TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; ui64 taskId = state.GetTaskId(); - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << SelfId() << " EvState TaskId: " << taskId << " State: " << state.GetState() @@ -123,7 +123,7 @@ private: << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode()); if (state.HasStats() && state.GetStats().GetTasks().size()) { - YQL_LOG(DEBUG) << " " << SelfId() << " AddStats " << taskId; + YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " AddStats " << taskId; AddStats(state.GetStats()); if (ServiceCounters.Counters && !AggrPeriod) { ExportStats(TaskStat, taskId); @@ -144,7 +144,7 @@ private: break; } case NDqProto::COMPUTE_STATE_EXECUTING: { - YQL_LOG(DEBUG) << " " << SelfId() << " Executing TaskId: " << taskId; + YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId; if (!FinishedTasks.contains(taskId)) { // may get late/reordered? message Executing[taskId] = Now(); @@ -152,7 +152,7 @@ private: break; } case NDqProto::COMPUTE_STATE_FINISHED: { - YQL_LOG(DEBUG) << " " << SelfId() << " Finish TaskId: " << taskId; + YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Finish TaskId: " << taskId; Executing.erase(taskId); FinishedTasks.insert(taskId); break; @@ -171,7 +171,7 @@ private: for (auto& taskActors: Executing) { if (now > taskActors.second + PingPeriod) { PingCookie++; - YQL_LOG(DEBUG) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie; + YQL_CLOG(DEBUG, ProviderDq) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie; Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie); taskActors.second = now; } @@ -206,7 +206,7 @@ private: } void ExportStats(const TCounters& stat, ui64 taskId) { - YQL_LOG(DEBUG) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary"); + YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary"); TString name; std::map<TString, TString> labels; static const TString SourceLabel = "Source"; @@ -411,7 +411,7 @@ private: } void OnReadyState(TEvReadyState::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); TaskStat.AddCounters(ev->Get()->Record); @@ -432,7 +432,7 @@ private: Stages.emplace(task.GetId(), taskMeta.GetStageId()); } - YQL_LOG(DEBUG) << "Ready State: X1=" << SelfId().RawX1() << ", X2=" << SelfId().RawX2(); + YQL_CLOG(DEBUG, ProviderDq) << "Ready State: X1=" << SelfId().RawX1() << ", X2=" << SelfId().RawX2(); MaybeUpdateChannels(); @@ -449,7 +449,7 @@ private: return; } - YQL_LOG(DEBUG) << "Update channels"; + YQL_CLOG(DEBUG, ProviderDq) << "Update channels"; for (const auto& [task, actorId] : Tasks) { auto ev = MakeHolder<NDq::TEvDqCompute::TEvChannelsInfo>(); @@ -465,7 +465,7 @@ private: } } - YQL_LOG(DEBUG) << task.GetId() << " " << ev->Record.ShortDebugString(); + YQL_CLOG(DEBUG, ProviderDq) << task.GetId() << " " << ev->Record.ShortDebugString(); Send(actorId, ev.Release()); } @@ -474,8 +474,8 @@ private: void OnResultFailure(TEvDqFailure::TPtr& ev) { if (Finished) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(WARN) << "TEvDqFailure IGNORED when Finished from " << ev->Sender; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(WARN, ProviderDq) << "TEvDqFailure IGNORED when Finished from " << ev->Sender; } else { FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported Send(ExecuterId, ev->Release().Release()); @@ -484,11 +484,11 @@ private: } void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode); if (Finished) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(WARN) << "OnError IGNORED when Finished"; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(WARN, ProviderDq) << "OnError IGNORED when Finished"; } else { auto req = MakeHolder<TEvDqFailure>(statusCode, issues); FinalStat().FlushCounters(req->Record); diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index aaf3151d74f..fc44dd201dc 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -87,23 +87,23 @@ public: , RuntimeData(runtimeData) , TraceId(traceId) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "TDqWorker created "; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker created "; } ~TDqWorker() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "TDqWorker destroyed "; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker destroyed "; } void DoPassAway() override { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); for (const auto& inputs : InputMap) { Send(inputs.first, new NActors::TEvents::TEvPoison()); } - YQL_LOG(DEBUG) << "TDqWorker passed away "; + YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker passed away "; if (Actor) { Actor->PassAway(); } @@ -200,7 +200,7 @@ private: void SendFailure(THolder<TEvDqFailure> ev) { if (!Executer) { // Posible Error on Undelivered before OnDqTask - YQL_LOG(ERROR) << "Error " << ev->Record.ShortUtf8DebugString(); + YQL_CLOG(ERROR, ProviderDq) << "Error " << ev->Record.ShortUtf8DebugString(); return; } Stat.FlushCounters(ev->Record); @@ -213,8 +213,8 @@ private: void OnDqTask(TEvDqTask::TPtr& ev, const NActors::TActorContext& ctx) { Y_UNUSED(ctx); - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "TDqWorker::OnDqTask"; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker::OnDqTask"; TFailureInjector::Reach("dq_task_failure", [] {::_exit(1); }); @@ -329,8 +329,8 @@ private: void OnPullRequest(TEvPullDataRequest::TPtr& ev, const NActors::TActorContext& ctx) { Y_UNUSED(ctx); - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(TRACE) << "TDqWorker::OnPullRequest " << ev->Sender; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(TRACE, ProviderDq) << "TDqWorker::OnPullRequest " << ev->Sender; if (!TaskRunnerActor || !TaskRunnerPrepared) { // waiting for initialization @@ -382,8 +382,8 @@ private: void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext& ctx) { Y_UNUSED(ctx); - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(TRACE) << "TDqWorker::OnPullResponse"; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(TRACE, ProviderDq) << "TDqWorker::OnPullResponse"; Stat.AddCounters(ev->Get()->Record); @@ -531,7 +531,7 @@ private: auto hasFreeSpace = freeSpace == ev->Get()->InputChannelFreeSpace.end() || freeSpace->second > 0; if (hasFreeSpace) { - YQL_LOG(TRACE) << "Send TEvPullDataRequest to " << + YQL_CLOG(TRACE, ProviderDq) << "Send TEvPullDataRequest to " << channel.ActorID << " from " << SelfId(); Send(channel.ActorID, MakeHolder<TEvPullDataRequest>(INPUT_SIZE), IEventHandle::FlagTrackDelivery); @@ -615,18 +615,18 @@ private: for (const auto& [actorId, channel] : InputMap) { if (!channel.Finished) { if (channel.Requested) { - YQL_LOG(DEBUG) << "Input " << JobDebugInfo(actorId) << (now - channel.RequestTime) << " Requested? " << channel.Requested; + YQL_CLOG(DEBUG, ProviderDq) << "Input " << JobDebugInfo(actorId) << (now - channel.RequestTime) << " Requested? " << channel.Requested; if (RuntimeData) { RuntimeData->UpdateChannelInputDelay(now - channel.RequestTime); } } else { - YQL_LOG(DEBUG) << "Input " << JobDebugInfo(actorId) << (now - channel.ResponseTime) << " Requested? " << channel.Requested; + YQL_CLOG(DEBUG, ProviderDq) << "Input " << JobDebugInfo(actorId) << (now - channel.ResponseTime) << " Requested? " << channel.Requested; if (RuntimeData) { RuntimeData->UpdateChannelInputDelay(now - channel.ResponseTime); } } } else { - YQL_LOG(DEBUG) << "Input " << JobDebugInfo(actorId) << " Finished"; + YQL_CLOG(DEBUG, ProviderDq) << "Input " << JobDebugInfo(actorId) << " Finished"; if (RuntimeData) { RuntimeData->UpdateChannelInputDelay(TDuration::Seconds(0)); } @@ -635,12 +635,12 @@ private: for (const auto& [actorId, channel] : OutputMap) { if (!channel.Finished) { - YQL_LOG(DEBUG) << "Output " << JobDebugInfo(actorId) << (now - channel.RequestTime); + YQL_CLOG(DEBUG, ProviderDq) << "Output " << JobDebugInfo(actorId) << (now - channel.RequestTime); if (RuntimeData) { RuntimeData->UpdateChannelOutputDelay(now - channel.RequestTime); } } else { - YQL_LOG(DEBUG) << "Output " << JobDebugInfo(actorId) << " Finished"; + YQL_CLOG(DEBUG, ProviderDq) << "Output " << JobDebugInfo(actorId) << " Finished"; if (RuntimeData) { RuntimeData->UpdateChannelOutputDelay(TDuration::Seconds(0)); } diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index b4bc07fbef1..455adef20bc 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -21,6 +21,7 @@ #define PERFORM_RULE(func, ...) \ do { \ if (auto result = func(__VA_ARGS__); result.Raw() != node.Raw()) { \ + YQL_CLOG(DEBUG, ProviderDq) << #func; \ node = result; \ return node.Ptr(); \ } \ diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index f672c9ded64..03e69a9fef3 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -137,7 +137,7 @@ namespace NYql::NDqs { auto query = expr.Maybe<TDqQuery>(); const auto maxTasksPerOperation = settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation); - YQL_LOG(DEBUG) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot); + YQL_CLOG(DEBUG, ProviderDq) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot); auto stages = GetStages(DqExprRoot); YQL_ENSURE(!stages.empty()); @@ -149,9 +149,9 @@ namespace NYql::NDqs { for (const auto& stage : stages) { const bool hasDqSource = HasDqSource(stage); if ((hasDqSource || HasReadWraps(stage.Program().Ptr())) && BuildReadStage(settings, stage, hasDqSource, canFallback)) { - YQL_LOG(DEBUG) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); + YQL_CLOG(TRACE, ProviderDq) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); } else { - YQL_LOG(DEBUG) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); + YQL_CLOG(TRACE, ProviderDq) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); NDq::CommonBuildTasks(TasksGraph, stage); } diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 229b248b03f..de3a6fd3878 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -147,7 +147,7 @@ public: TVector<NDqProto::TData> rows; { auto guard = runner->BindAllocator(State->Settings->MemoryLimit.Get().GetOrElse(0)); - YQL_LOG(DEBUG) << " NDq::ERunStatus " << runner->Run(); + YQL_CLOG(DEBUG, ProviderDq) << " NDq::ERunStatus " << runner->Run(); NDq::ERunStatus status; while ((status = runner->Run()) == NDq::ERunStatus::PendingOutput || status == NDq::ERunStatus::Finished) { @@ -270,7 +270,7 @@ private: std::tuple<TString, TString> GetPathAndObjectId(const TFilePathWithMd5& pathWithMd5) const { if (pathWithMd5.Md5.empty()) { - YQL_LOG(WARN) << "Empty md5 for " << pathWithMd5.Path; + YQL_CLOG(WARN, ProviderDq) << "Empty md5 for " << pathWithMd5.Path; } return GetPathAndObjectId(pathWithMd5.Path, pathWithMd5.Md5.empty() @@ -337,7 +337,7 @@ private: auto block = b.second; auto filePath = block->FrozenFile->GetPath().GetPath(); auto fullFileName = localRun ? filePath : TUserDataStorage::MakeRelativeName(b.first.Alias()); - YQL_LOG(DEBUG) << "Path resolve " << filePath << "|"<< fullFileName; + YQL_CLOG(DEBUG, ProviderDq) << "Path resolve " << filePath << "|"<< fullFileName; // validate switch (block->Type) { case EUserDataType::URL: @@ -421,14 +421,14 @@ private: const TString udfName(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); const auto moduleName = ModuleName(udfName); - YQL_LOG(DEBUG) << "Try to resolve " << moduleName; + YQL_CLOG(DEBUG, ProviderDq) << "Try to resolve " << moduleName; TMaybe<TFilePathWithMd5> udfPathWithMd5 = State->TypeCtx->UdfResolver->GetSystemModulePath(moduleName); YQL_ENSURE(udfPathWithMd5.Defined()); TString filePath, objectId; std::tie(filePath, objectId) = GetPathAndObjectId(*udfPathWithMd5); - YQL_LOG(DEBUG) << "File|Md5 " << filePath << "|" << objectId; + YQL_CLOG(DEBUG, ProviderDq) << "File|Md5 " << filePath << "|" << objectId; if (!filePath.StartsWith(NKikimr::NMiniKQL::StaticModulePrefix)) { auto f = IDqGateway::TFileResource(); @@ -465,7 +465,7 @@ private: i64 dataLimit = static_cast<i64>(4_GB); bool fallbackFlag = false; if (sizeSum > dataLimit) { - YQL_LOG(INFO) << "Too much data: " << sizeSum << " > " << dataLimit; + YQL_CLOG(WARN, ProviderDq) << "Too much data: " << sizeSum << " > " << dataLimit; fallbackFlag = true; } @@ -504,7 +504,7 @@ private: StartCounter("FreezeUsedFiles"); if (const auto filesRes = NCommon::FreezeUsedFiles(*input, files, *State->TypeCtx, ctx, [](const TString&){return true;}, crutches); filesRes.first.Level != TStatus::Ok) { if (filesRes.first.Level != TStatus::Error) { - YQL_LOG(DEBUG) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")"; + YQL_CLOG(DEBUG, ProviderDq) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")"; } return filesRes; } @@ -558,7 +558,7 @@ private: bool fallbackFlag = BuildUploadList(uploadList, localRun, explorer, typeEnv, files); if (fallbackFlag) { - YQL_LOG(DEBUG) << "Fallback: " << NCommon::ExprToPrettyString(ctx, *input); + YQL_CLOG(DEBUG, ProviderDq) << "Fallback: " << NCommon::ExprToPrettyString(ctx, *input); return Fallback(); } else { *lambda = SerializeRuntimeNode(root, typeEnv); @@ -621,7 +621,7 @@ private: } TStatusCallbackPair HandleResult(const TExprNode::TPtr& input, TExprContext& ctx) { - YQL_LOG(DEBUG) << "Executing " << input->Content() << " (UniqueId=" << input->UniqueId() << ")"; + YQL_CLOG(DEBUG, ProviderDq) << "Executing " << input->Content() << " (UniqueId=" << input->UniqueId() << ")"; if (State->ExternalUser) { return Fallback(); @@ -726,7 +726,7 @@ private: FlushStatisticsToState(); return WrapFutureCallback(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - YQL_LOG(DEBUG) << state->SessionId << " WrapFutureCallback"; + YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback"; auto duration = TInstant::Now() - startTime; if (state->Metrics) { @@ -749,7 +749,7 @@ private: return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error); } - YQL_LOG(DEBUG) << "Fallback from gateway: " << NCommon::ExprToPrettyString(ctx, *input); + YQL_CLOG(DEBUG, ProviderDq) << "Fallback from gateway: " << NCommon::ExprToPrettyString(ctx, *input); TIssue warning(ctx.GetPosition(input->Pos()), "DQ cannot execute the query"); warning.Severity = TSeverityIds::S_INFO; ctx.IssueManager.RaiseIssue(warning); @@ -921,7 +921,7 @@ private: auto filesRes = NCommon::FreezeUsedFiles(*optimizedInput, files, *State->TypeCtx, ctx, [](const TString&){return true;}, crutches); if (filesRes.first.Level != TStatus::Ok) { if (filesRes.first.Level != TStatus::Error) { - YQL_LOG(DEBUG) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")"; + YQL_CLOG(DEBUG, ProviderDq) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")"; } return filesRes; } @@ -1067,8 +1067,6 @@ private: int level = 0; // TODO: remove copy-paste return WrapFutureCallback(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - YQL_LOG(DEBUG) << state->SessionId << " WrapFutureCallback"; - auto duration = TInstant::Now() - startTime; if (state->Metrics) { state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds()); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp index 1a0053350a6..ee2aec976ab 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp @@ -61,7 +61,7 @@ public: try { return promise.GetFuture().GetValueSync(); } catch (...) { - YQL_LOG(INFO) << "DqControl IsReady Exception " << CurrentExceptionMessage(); + YQL_CLOG(INFO, ProviderDq) << "DqControl IsReady Exception " << CurrentExceptionMessage(); return false; } } @@ -110,11 +110,11 @@ public: } for (const auto& [path, objectId] : udfs){ - YQL_LOG(DEBUG) << "DQ control, adding file: " << path << " with objectId " << objectId; + YQL_CLOG(DEBUG, ProviderDq) << "DQ control, adding file: " << path << " with objectId " << objectId; TString newPath, newObjectId; std::tie(newPath, newObjectId) = GetPathAndObjectId(path, objectId, objectId); - YQL_LOG(DEBUG) << "DQ control, rewrite path/objectId: " << newPath << ", " << newObjectId; + YQL_CLOG(DEBUG, ProviderDq) << "DQ control, rewrite path/objectId: " << newPath << ", " << newObjectId; TFileResource r; r.SetLocalPath(newPath); r.SetObjectType(Yql::DqsProto::TFile::EUDF_FILE); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 2341b28ad44..86f8af26a13 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -52,7 +52,7 @@ public: template<typename RespType> void OnResponse(NThreading::TPromise<TResult> promise, TString sessionId, NGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap<TString, TString>& modulesMapping, bool alwaysFallback = false) { - YQL_LOG_CTX_SCOPE(sessionId); + YQL_LOG_CTX_ROOT_SCOPE(sessionId); YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback"; { @@ -235,7 +235,7 @@ public: const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, bool discard) override { - YQL_LOG_CTX_SCOPE(sessionId); + YQL_LOG_CTX_ROOT_SCOPE(sessionId); auto tasks = plan.GetTasks(); Yql::DqsProto::ExecuteGraphRequest queryPB; @@ -281,7 +281,7 @@ public: RunningQueries.emplace(sessionId, std::make_pair(progressWriter, TString(""))); } - YQL_LOG(DEBUG) << "Send query of size " << queryPB.ByteSizeLong(); + YQL_CLOG(DEBUG, ProviderDq) << "Send query of size " << queryPB.ByteSizeLong(); return WithRetry<Yql::DqsProto::ExecuteGraphResponse>( sessionId, @@ -293,7 +293,7 @@ public: } NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { - YQL_LOG_CTX_SCOPE(sessionId); + YQL_LOG_CTX_ROOT_SCOPE(sessionId); YQL_CLOG(INFO, ProviderDq) << "OpenSession"; Yql::DqsProto::OpenSessionRequest request; request.SetSession(sessionId); @@ -305,7 +305,7 @@ public: auto promise = NThreading::NewPromise<void>(); auto callback = [this, promise, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable { Y_UNUSED(resp); - YQL_LOG_CTX_SCOPE(sessionId); + YQL_LOG_CTX_ROOT_SCOPE(sessionId); if (status.Ok()) { YQL_CLOG(INFO, ProviderDq) << "OpenSession OK"; SchedulePingSessionRequest(sessionId); @@ -394,7 +394,7 @@ public: Yql::DqsProto::PingSessionResponse&&) mutable { if (status.GRpcStatusCode == grpc::INVALID_ARGUMENT) { - YQL_LOG(INFO) << "Session closed " << sessionId; + YQL_CLOG(INFO, ProviderDq) << "Session closed " << sessionId; } else { SchedulePingSessionRequest(sessionId); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp index 92db4290741..5521d96e15b 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp @@ -84,10 +84,10 @@ TDataProviderInitializer GetDqDataProviderInitializer( Y_UNUSED(timeProvider); if (dqGateway) { // nullptr in yqlrun auto t = TInstant::Now(); - YQL_LOG(DEBUG) << "OpenSession " << sessionId; + YQL_CLOG(DEBUG, ProviderDq) << "OpenSession " << sessionId; auto future = dqGateway->OpenSession(sessionId, username); future.Subscribe([sessionId, t] (const auto& ) { - YQL_LOG(DEBUG) << "OpenSession " << sessionId << " complete in " << (TInstant::Now()-t).MilliSeconds(); + YQL_CLOG(DEBUG, ProviderDq) << "OpenSession " << sessionId << " complete in " << (TInstant::Now()-t).MilliSeconds(); }); return future; } else { @@ -101,7 +101,7 @@ TDataProviderInitializer GetDqDataProviderInitializer( } if (dqGateway) { // nullptr in yqlrun - YQL_LOG(DEBUG) << "CloseSession " << sessionId; + YQL_CLOG(DEBUG, ProviderDq) << "CloseSession " << sessionId; dqGateway->CloseSession(sessionId); } }; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index 058c99a9fcc..d8c8ab7b87b 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -90,7 +90,7 @@ public: } if (!good || (hasJoin && dataSize > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB))) { - YQL_LOG(DEBUG) << "good: " << good << " hasJoin: " << hasJoin << " dataSize: " << dataSize; + YQL_CLOG(DEBUG, ProviderDq) << "good: " << good << " hasJoin: " << hasJoin << " dataSize: " << dataSize; return TStatus::Ok; } } @@ -116,6 +116,7 @@ public: }, ctx, TOptimizeExprSettings{State_->TypeCtx}); if (input != output) { + YQL_CLOG(DEBUG, ProviderDq) << "DqsRecapture"; // TODO: Add before/after recapture transformers State_->TypeCtx->DqCaptured = true; // TODO: drop this after implementing DQS ConstraintTransformer @@ -129,7 +130,7 @@ public: private: void AddInfo(TExprContext& ctx, const TString& message) const { - YQL_LOG(DEBUG) << message; + YQL_CLOG(DEBUG, ProviderDq) << message; TIssue info("DQ cannot execute the query. Cause: " + message); info.Severity = TSeverityIds::S_INFO; ctx.IssueManager.RaiseIssue(info); diff --git a/ydb/library/yql/providers/dq/runtime/file_cache.cpp b/ydb/library/yql/providers/dq/runtime/file_cache.cpp index 18005eb97fe..4806574716c 100644 --- a/ydb/library/yql/providers/dq/runtime/file_cache.cpp +++ b/ydb/library/yql/providers/dq/runtime/file_cache.cpp @@ -48,7 +48,7 @@ void TFileCache::Scan() UsedSize += file.Size; - YQL_LOG(DEBUG) << file.Name << "|" << file.ObjectId; + YQL_CLOG(DEBUG, ProviderDq) << file.Name << "|" << file.ObjectId; allFiles.emplace_back(std::move(file)); } @@ -84,7 +84,7 @@ void TFileCache::Clean() { if (maybeFile != Files.end()) { auto path = GetDir(objectId) + "/" + maybeFile->second.Name; - YQL_LOG(DEBUG) << "Remove File " << path << " UsedSize " << ToString(UsedSize) << " FileSize " << ToString(maybeFile->second.Size); + YQL_CLOG(DEBUG, ProviderDq) << "Remove File " << path << " UsedSize " << ToString(UsedSize) << " FileSize " << ToString(maybeFile->second.Size); UsedSize -= maybeFile->second.Size; NFs::Remove(path); diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index cfbccb9a7a8..1d8fd7a28a7 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -746,7 +746,7 @@ public: settings.AllowGeneratorsInUnboxedValues = true; for (const auto& x: taskMeta.GetSecureParams()) { settings.SecureParams[x.first] = x.second; - YQL_LOG(DEBUG) << "SecureParam " << x.first << ":XXX"; + YQL_CLOG(DEBUG, ProviderDq) << "SecureParam " << x.first << ":XXX"; } settings.OptLLVM = DqConfiguration->OptLLVM.Get().GetOrElse(""); diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index b771c1df7f9..c6d535cfcc5 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -96,8 +96,8 @@ namespace NYql::NDqs { } void OnPoison() { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << __FUNCTION__ ; + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ ; ReplyError(grpc::UNAVAILABLE, "Unexpected error"); *ClientDisconnectedCounter += 1; } @@ -107,7 +107,7 @@ namespace NYql::NDqs { } void DoBootstrap(const NActors::TActorContext& ctx) { - YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); if (!CtxSubscribed) { auto selfId = ctx.SelfID; auto* actorSystem = ctx.ExecutorThread.ActorSystem; @@ -192,8 +192,8 @@ namespace NYql::NDqs { void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) { auto& result = ev->Get()->Record; Y_UNUSED(ctx); - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size(); + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size(); QueryStat.AddCounters(result); bool retriable; @@ -211,14 +211,14 @@ namespace NYql::NDqs { QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0)); NYql::TIssues issues; NYql::IssuesFromMessage(result.GetIssues(), issues); - YQL_LOG(WARN) << RetryName << " " << Retry << " Issues: " << issues.ToString(); + YQL_CLOG(WARN, ProviderDq) << RetryName << " " << Retry << " Issues: " << issues.ToString(); DoRetry(); } else { auto needFallback = NCommon::NeedFallback(statusCode); if (result.GetIssues().size() > 0) { NYql::TIssues issues; NYql::IssuesFromMessage(result.GetIssues(), issues); - YQL_LOG(WARN) << "Issues: " << issues.ToString(); + YQL_CLOG(WARN, ProviderDq) << "Issues: " << issues.ToString(); *ErrorCounter += 1; } if (needFallback) { @@ -290,7 +290,7 @@ namespace NYql::NDqs { } void DoRetry() override { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) { if (ev->Get()->Record.GetErrorMessage()) { TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); @@ -326,7 +326,7 @@ namespace NYql::NDqs { THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest; void DoPassAway() override { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; Send(GraphExecutionEventsActorId, new TEvents::TEvPoison()); TServiceProxyActor::DoPassAway(); } @@ -346,7 +346,7 @@ namespace NYql::NDqs { } void Bootstrap() override { - YQL_LOG(DEBUG) << "TServiceProxyActor::OnExecuteGraph"; + YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnExecuteGraph"; SendEvent(NYql::NDqProto::EGraphExecutionEventType::START, SerializeGraphDescriptor(), [this](const TEvGraphExecutionEvent::TPtr& ev) { if (ev->Get()->Record.GetErrorMessage()) { @@ -386,7 +386,7 @@ namespace NYql::NDqs { } void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) { - YQL_LOG(DEBUG) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; MergeTaskMetas(params); auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); @@ -431,7 +431,7 @@ namespace NYql::NDqs { } Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record)); Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) { - YQL_LOG_CTX_SCOPE(traceId); + YQL_LOG_CTX_ROOT_SCOPE(traceId); Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); callback(ev); }); @@ -499,7 +499,7 @@ namespace NYql::NDqs { TString message = TStringBuilder() << "Bad session: " << request->GetSession(); - YQL_LOG(DEBUG) << message; + YQL_CLOG(DEBUG, ProviderDq) << message; ctx->ReplyError(grpc::INVALID_ARGUMENT, message); return; } @@ -548,7 +548,7 @@ namespace NYql::NDqs { auto* request = dynamic_cast<const Yql::DqsProto::PingSessionRequest*>(ctx->GetRequest()); Y_VERIFY(!!request); - YQL_LOG(DEBUG) << "PingSession " << request->GetSession(); + YQL_CLOG(TRACE, ProviderDq) << "PingSession " << request->GetSession(); Yql::DqsProto::PingSessionResponse result; auto session = Sessions.GetSession(request->GetSession()); @@ -556,7 +556,7 @@ namespace NYql::NDqs { TString message = TStringBuilder() << "Bad session: " << request->GetSession(); - YQL_LOG(DEBUG) << message; + YQL_CLOG(DEBUG, ProviderDq) << message; ctx->ReplyError(grpc::INVALID_ARGUMENT, message); } else { ctx->Reply(&result, Ydb::StatusIds::SUCCESS); @@ -568,7 +568,7 @@ namespace NYql::NDqs { auto* request = dynamic_cast<const Yql::DqsProto::OpenSessionRequest*>(ctx->GetRequest()); Y_VERIFY(!!request); - YQL_LOG(DEBUG) << "OpenSession for " << request->GetSession() << " " << request->GetUsername(); + YQL_CLOG(DEBUG, ProviderDq) << "OpenSession for " << request->GetSession() << " " << request->GetUsername(); Yql::DqsProto::OpenSessionResponse result; if (Sessions.OpenSession(request->GetSession(), request->GetUsername())) { @@ -605,15 +605,15 @@ namespace NYql::NDqs { stat.Aggregate(s); } - YQL_LOG(DEBUG) << "SentEvents: " << stat.SentEvents; - YQL_LOG(DEBUG) << "ReceivedEvents: " << stat.ReceivedEvents; - YQL_LOG(DEBUG) << "NonDeliveredEvents: " << stat.NonDeliveredEvents; - YQL_LOG(DEBUG) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation; + YQL_CLOG(DEBUG, ProviderDq) << "SentEvents: " << stat.SentEvents; + YQL_CLOG(DEBUG, ProviderDq) << "ReceivedEvents: " << stat.ReceivedEvents; + YQL_CLOG(DEBUG, ProviderDq) << "NonDeliveredEvents: " << stat.NonDeliveredEvents; + YQL_CLOG(DEBUG, ProviderDq) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation; Sessions.PrintInfo(); for (ui32 i = 0; i < stat.ActorsAliveByActivity.size(); i=i+1) { if (stat.ActorsAliveByActivity[i]) { - YQL_LOG(DEBUG) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i]; + YQL_CLOG(DEBUG, ProviderDq) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i]; } } @@ -624,7 +624,7 @@ namespace NYql::NDqs { ctx->Reply(result, Ydb::StatusIds::SUCCESS); }, [ctx] () mutable { - YQL_LOG(DEBUG) << "ClusterStatus failed"; + YQL_CLOG(INFO, ProviderDq) << "ClusterStatus failed"; ctx->ReplyError(grpc::UNAVAILABLE, "Error"); }, TDuration::MilliSeconds(2000)); @@ -645,7 +645,7 @@ namespace NYql::NDqs { ctx->Reply(result, Ydb::StatusIds::SUCCESS); }, [ctx] () mutable { - YQL_LOG(DEBUG) << "OperationStopResponse failed"; + YQL_CLOG(INFO, ProviderDq) << "OperationStopResponse failed"; ctx->ReplyError(grpc::UNAVAILABLE, "Error"); }, TDuration::MilliSeconds(2000)); @@ -667,7 +667,7 @@ namespace NYql::NDqs { ctx->Reply(result, Ydb::StatusIds::SUCCESS); }, [ctx] () mutable { - YQL_LOG(DEBUG) << "QueryStatus failed"; + YQL_CLOG(INFO, ProviderDq) << "QueryStatus failed"; ctx->ReplyError(grpc::UNAVAILABLE, "Error"); }, TDuration::MilliSeconds(2000)); @@ -701,7 +701,7 @@ namespace NYql::NDqs { ctx->Reply(result, Ydb::StatusIds::SUCCESS); }, [ctx] () mutable { - YQL_LOG(DEBUG) << "RegisterNode failed"; + YQL_CLOG(INFO, ProviderDq) << "RegisterNode failed"; ctx->ReplyError(grpc::UNAVAILABLE, "Error"); }, TDuration::MilliSeconds(5000)); @@ -760,7 +760,7 @@ namespace NYql::NDqs { ctx->Reply(&result, Ydb::StatusIds::SUCCESS); }, [ctx] () mutable { - YQL_LOG(DEBUG) << "IsReadyForRevision failed"; + YQL_CLOG(INFO, ProviderDq) << "IsReadyForRevision failed"; ctx->ReplyError(grpc::UNAVAILABLE, "Error"); }, TDuration::MilliSeconds(2000)); @@ -783,7 +783,7 @@ namespace NYql::NDqs { ctx->Reply(&result, Ydb::StatusIds::SUCCESS); }, [ctx] () mutable { - YQL_LOG(DEBUG) << "Routes failed"; + YQL_CLOG(INFO, ProviderDq) << "Routes failed"; ctx->ReplyError(grpc::UNAVAILABLE, "Error"); }, TDuration::MilliSeconds(5000)); diff --git a/ydb/library/yql/providers/dq/service/grpc_session.cpp b/ydb/library/yql/providers/dq/service/grpc_session.cpp index 38e95f66681..2de87f9d46a 100644 --- a/ydb/library/yql/providers/dq/service/grpc_session.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_session.cpp @@ -85,7 +85,7 @@ void TSessionStorage::Clean(TInstant before) { it != SessionsByLastUpdate.end(); ) { if (it->LastUpdate < before) { - YQL_LOG(INFO) << "Drop session by timeout " << it->SessionId; + YQL_CLOG(INFO, ProviderDq) << "Drop session by timeout " << it->SessionId; Sessions.erase(it->SessionId); it = SessionsByLastUpdate.erase(it); } else { @@ -97,10 +97,10 @@ void TSessionStorage::Clean(TInstant before) { } void TSessionStorage::PrintInfo() const { - YQL_LOG(INFO) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size(); - YQL_LOG(DEBUG) << "Sessions: " << Sessions.size(); + YQL_CLOG(INFO, ProviderDq) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size(); + YQL_CLOG(DEBUG, ProviderDq) << "Sessions: " << Sessions.size(); ui64 currenSessionsCounter = *SessionsCounter; - YQL_LOG(DEBUG) << "SessionsCounter: " << currenSessionsCounter; + YQL_CLOG(DEBUG, ProviderDq) << "SessionsCounter: " << currenSessionsCounter; } } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp index bef368a961f..9a0baa3f8b3 100644 --- a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp +++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp @@ -36,7 +36,7 @@ namespace NYql::NDqs { if (message.find("ICP01 ready to work") != TString::npos) { return; } - YQL_LOG(DEBUG) << message; + YQL_CLOG(DEBUG, ProviderDq) << message; } void ReopenLog() override { } @@ -102,7 +102,7 @@ namespace NYql::NDqs { #define SET_VALUE(name) \ if (icSettings.Has ## name()) { \ schedulerConfig.name = icSettings.Get ## name (); \ - YQL_LOG(DEBUG) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \ + YQL_CLOG(DEBUG, ProviderDq) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \ } SET_VALUE(ResolutionMicroseconds); @@ -119,7 +119,7 @@ namespace NYql::NDqs { setup->Scheduler = CreateSchedulerThread(schedulerConfig); setup->MaxActivityType = maxActivityType; - YQL_LOG(DEBUG) << "Initializing local services"; + YQL_CLOG(DEBUG, ProviderDq) << "Initializing local services"; setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); if (IActor* schedulerActor = CreateSchedulerActor(schedulerConfig)) { TActorId schedulerActorId = MakeSchedulerActorId(); @@ -143,7 +143,7 @@ namespace NYql::NDqs { TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup(); THashSet<ui32> staticNodeId; - YQL_LOG(DEBUG) << "Initializing node table"; + YQL_CLOG(DEBUG, ProviderDq) << "Initializing node table"; nameserverTable->StaticNodeTable[nodeId] = std::make_pair(interconnectAddress, port); setup->LocalServices.emplace_back( @@ -164,13 +164,13 @@ namespace NYql::NDqs { #define SET_DURATION(name) \ { \ icCommon->Settings.name = TDuration::MilliSeconds(icSettings.Get ## name ## Ms()); \ - YQL_LOG(DEBUG) << "IC " << #name << " set to " << icCommon->Settings.name; \ + YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \ } #define SET_VALUE(name) \ { \ icCommon->Settings.name = icSettings.Get ## name(); \ - YQL_LOG(DEBUG) << "IC " << #name << " set to " << icCommon->Settings.name; \ + YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \ } SET_DURATION(Handshake); @@ -198,7 +198,7 @@ namespace NYql::NDqs { ui32 maxNodeId = static_cast<ui32>(ENodeIdLimits::MaxWorkerNodeId); - YQL_LOG(DEBUG) << "Initializing proxy actors"; + YQL_CLOG(DEBUG, ProviderDq) << "Initializing proxy actors"; setup->Interconnect.ProxyActors.resize(maxNodeId + 1); for (ui32 id = 1; id <= maxNodeId; ++id) { if (nodeId != id) { @@ -208,10 +208,10 @@ namespace NYql::NDqs { } // start listener - YQL_LOG(DEBUG) << "Start listener"; + YQL_CLOG(DEBUG, ProviderDq) << "Start listener"; { icCommon->TechnicalSelfHostName = interconnectAddress; - YQL_LOG(INFO) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket; + YQL_CLOG(INFO, ProviderDq) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket; IActor* listener; TMaybe<SOCKET> maybeSocket = socket < 0 ? Nothing() @@ -224,7 +224,7 @@ namespace NYql::NDqs { TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0)); } - YQL_LOG(DEBUG) << "Actor initialization complete"; + YQL_CLOG(DEBUG, ProviderDq) << "Actor initialization complete"; #ifdef _unix_ signal(SIGPIPE, SIG_IGN); diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp index cda24ec6cf7..cdf05b6661f 100644 --- a/ydb/library/yql/providers/dq/service/service_node.cpp +++ b/ydb/library/yql/providers/dq/service/service_node.cpp @@ -30,7 +30,7 @@ namespace NYql { private: void Start() override { - YQL_LOG(DEBUG) << "Start GRPC Listener"; + YQL_CLOG(DEBUG, ProviderDq) << "Start GRPC Listener"; Poller.Start(); StartRead(); } @@ -40,7 +40,7 @@ namespace NYql { } void StartRead() { - YQL_LOG(TRACE) << "Read next GRPC event"; + YQL_CLOG(TRACE, ProviderDq) << "Read next GRPC event"; Poller.StartRead(Listener, [&](const TIntrusivePtr<TSharedDescriptor>& ss) { return Accept(ss); }); @@ -54,14 +54,14 @@ namespace NYql { NInterconnect::TAddress address; r = socket->Accept(address); if (r >= 0) { - YQL_LOG(TRACE) << "New GRPC connection"; + YQL_CLOG(TRACE, ProviderDq) << "New GRPC connection"; grpc::experimental::ExternalConnectionAcceptor::NewConnectionParameters params; SetNonBlock(r, true); params.listener_fd = -1; // static_cast<int>(Socket); params.fd = r; Acceptor->HandleNewConnection(¶ms); } else if (-r != EAGAIN && -r != EWOULDBLOCK) { - YQL_LOG(DEBUG) << "Unknown error code " + ToString(r); + YQL_CLOG(DEBUG, ProviderDq) << "Unknown error code " + ToString(r); } } @@ -122,7 +122,7 @@ namespace NYql { { } }; - YQL_LOG(INFO) << "Starting GRPC on " << Config.GrpcPort; + YQL_CLOG(INFO, ProviderDq) << "Starting GRPC on " << Config.GrpcPort; IExternalListener::TPtr listener = nullptr; if (Config.GrpcSocket >= 0) { diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index dd7b70fb165..7bdee232e6e 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -131,7 +131,7 @@ public: return true; #else int status; - YQL_LOG(DEBUG) << "Check Pid " << Pid; + YQL_CLOG(DEBUG, ProviderDq) << "Check Pid " << Pid; return waitpid(Pid, &status, WNOHANG) <= 0; #endif } @@ -256,7 +256,7 @@ struct TProcessHolder { auto it = Processes.begin(); while (it != Processes.end()) { if (!it->second->IsAlive()) { - YQL_LOG(DEBUG) << "Remove dead process"; + YQL_CLOG(DEBUG, ProviderDq) << "Remove dead process"; stopList.emplace_back(std::move(it->second)); it = Processes.erase(it); } else { @@ -320,11 +320,11 @@ public: ContainerName = portoSettings.ContainerNamePrefix + "/" + ContainerName; } - YQL_LOG(DEBUG) << "HardLink " << ExeName << "'" << WorkDir + "/" + name << "'"; + YQL_CLOG(DEBUG, ProviderDq) << "HardLink " << ExeName << "'" << WorkDir + "/" + name << "'"; if (NFs::HardLink(ExeName, WorkDir + "/" + name)) { ExeName = WorkDir + "/" + name; } else { - YQL_LOG(DEBUG) << "HardLink Failed " << ExeName << "'" << WorkDir + "/" + name << "'"; + YQL_CLOG(DEBUG, ProviderDq) << "HardLink Failed " << ExeName << "'" << WorkDir + "/" + name << "'"; } } @@ -354,13 +354,13 @@ private: cmd3.Run().Wait(); } } catch (...) { - YQL_LOG(DEBUG) << "Cannot set anon_limit: " << CurrentExceptionMessage(); + YQL_CLOG(DEBUG, ProviderDq) << "Cannot set anon_limit: " << CurrentExceptionMessage(); } try { TShellCommand cmd(PortoCtl, {"destroy", ContainerName}); cmd.Run().Wait(); } catch (...) { - YQL_LOG(DEBUG) << "Cannot destroy: " << CurrentExceptionMessage(); + YQL_CLOG(DEBUG, ProviderDq) << "Cannot destroy: " << CurrentExceptionMessage(); } TChildProcess::Kill(); } @@ -1226,9 +1226,9 @@ public: while ((size = input.Read(buf, sizeof(buf))) > 0) { auto str = TString(buf, size); Stderr += str; - YQL_LOG(DEBUG) << "stderr (" << StageId << " " << TraceId << " ) > `" << str << "'"; + YQL_CLOG(DEBUG, ProviderDq) << "stderr (" << StageId << " " << TraceId << " ) > `" << str << "'"; } - YQL_LOG(DEBUG) << "stderr (" << StageId << " " << TraceId << " ) finished"; + YQL_CLOG(DEBUG, ProviderDq) << "stderr (" << StageId << " " << TraceId << " ) finished"; } ui64 GetTaskId() const override { @@ -1740,7 +1740,7 @@ private: { auto localPath = MakeLocalPath(name.GetPath()); NFs::MakeDirectoryRecursive(base + "/" + localPath.Parent().GetPath(), NFs::FP_NONSECRET_FILE, false); - YQL_LOG(DEBUG) << "HardLink '" << path << "-> '" << (base + "/" + localPath.GetPath()) << "'"; + YQL_CLOG(DEBUG, ProviderDq) << "HardLink '" << path << "-> '" << (base + "/" + localPath.GetPath()) << "'"; YQL_ENSURE(NFs::HardLink(path, base + "/" + localPath.GetPath())); return localPath.GetPath(); } @@ -1764,7 +1764,7 @@ private: portoSettings.Enable = EnablePorto && conf->EnablePorto.Get().GetOrElse(TDqSettings::TDefault::EnablePorto); portoSettings.MemoryLimit = conf->_PortoMemoryLimit.Get().GetOrElse(TDqSettings::TDefault::PortoMemoryLimit); if (portoSettings.Enable) { - YQL_LOG(DEBUG) << "Porto enabled"; + YQL_CLOG(DEBUG, ProviderDq) << "Porto enabled"; } TString exePath; @@ -1804,9 +1804,9 @@ private: } else { command = MakeHolder<TChildProcess>(exePath, args, env, cacheDir + "/Slot-" + ToString(containerId)); } - YQL_LOG(DEBUG) << "Executing " << exePath; + YQL_CLOG(DEBUG, ProviderDq) << "Executing " << exePath; for (const auto& arg: args) { - YQL_LOG(DEBUG) << "Arg: " << arg; + YQL_CLOG(DEBUG, ProviderDq) << "Arg: " << arg; } command->Run(); return command; diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp index 634b25fb075..0a1caee54c4 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp @@ -174,7 +174,7 @@ namespace NYql::NDqs { } } for (const auto& k : toDrop) { - YQL_LOG(DEBUG) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId); + YQL_CLOG(DEBUG, ProviderDq) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId); Resources.erase(k); } *FileRemoveCounter += toDrop.size(); diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index 2538334a295..653db074676 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -120,7 +120,7 @@ private: void Deallocate(ui32 nodeId) { TVector<ui64> toDeallocate; - YQL_LOG(DEBUG) << "Deallocate " << nodeId; + YQL_CLOG(DEBUG, ProviderDq) << "Deallocate " << nodeId; for (const auto& [k, v] : AllocatedWorkers) { if (v.Sender.NodeId() == nodeId) { toDeallocate.push_back(k); @@ -135,7 +135,7 @@ private: void Deallocate(const NActors::TActorId& senderId) { TVector<ui64> toDeallocate; - YQL_LOG(DEBUG) << "Deallocate " << senderId; + YQL_CLOG(DEBUG, ProviderDq) << "Deallocate " << senderId; for (const auto& [k, v] : AllocatedWorkers) { if (v.Sender == senderId) { toDeallocate.push_back(k); @@ -149,7 +149,7 @@ private: void OnDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { - YQL_LOG(DEBUG) << "Disconnected " << ev->Get()->NodeId; + YQL_CLOG(DEBUG, ProviderDq) << "Disconnected " << ev->Get()->NodeId; Unsubscribe(ev->Get()->NodeId); Deallocate(ev->Get()->NodeId); } @@ -159,7 +159,7 @@ private: Y_VERIFY(ev->Get()->Reason == TEvents::TEvUndelivered::Disconnected || ev->Get()->Reason == TEvents::TEvUndelivered::ReasonActorUnknown); - YQL_LOG(DEBUG) << "Undelivered " << ev->Sender; + YQL_CLOG(DEBUG, ProviderDq) << "Undelivered " << ev->Sender; switch (ev->Get()->Reason) { case TEvents::TEvUndelivered::Disconnected: @@ -174,13 +174,13 @@ private: } void OnConfigureFailureInjector(TEvConfigureFailureInjectorRequest::TPtr& ev) { - YQL_LOG(DEBUG) << "TEvConfigureFailureInjectorRequest "; + YQL_CLOG(DEBUG, ProviderDq) << "TEvConfigureFailureInjectorRequest "; auto& request = ev->Get()->Record.GetRequest(); YQL_ENSURE(request.GetNodeId() == SelfId().NodeId(), "Wrong node id!"); TFailureInjector::Set(request.GetName(), request.GetSkip(), request.GetCountOfFails()); - YQL_LOG(DEBUG) << "Failure injector is configured " << request.GetName(); + YQL_CLOG(DEBUG, ProviderDq) << "Failure injector is configured " << request.GetName(); auto response = MakeHolder<TEvConfigureFailureInjectorResponse>(); auto* r = response->Record.MutableResponse(); @@ -207,8 +207,8 @@ private: return; } - YQL_LOG_CTX_SCOPE(ev->Get()->Record.GetTraceId()); - YQL_LOG(DEBUG) << "TLocalWorkerManager::TEvAllocateWorkersRequest " << resourceId; + YQL_LOG_CTX_ROOT_SCOPE(ev->Get()->Record.GetTraceId()); + YQL_CLOG(DEBUG, ProviderDq) << "TLocalWorkerManager::TEvAllocateWorkersRequest " << resourceId; TFailureInjector::Reach("allocate_workers_failure", [] { ::_exit(1); }); auto& allocationInfo = AllocatedWorkers[resourceId]; @@ -245,7 +245,7 @@ private: THolder<NActors::IActor> actor; if (createComputeActor) { - YQL_LOG(DEBUG) << "Create compute actor: " << computeActorType; + YQL_CLOG(DEBUG, ProviderDq) << "Create compute actor: " << computeActorType; actor.Reset( NYql::CreateComputeActor( Options, @@ -281,7 +281,7 @@ private: void OnFreeWorkers(TEvFreeWorkersNotify::TPtr& ev) { ui64 resourceId = ev->Get()->Record.GetResourceId(); - YQL_LOG(DEBUG) << "TEvFreeWorkersNotify " << resourceId; + YQL_CLOG(DEBUG, ProviderDq) << "TEvFreeWorkersNotify " << resourceId; FreeGroup(resourceId, ev->Sender); } @@ -291,7 +291,7 @@ private: } void FreeGroup(ui64 id, NActors::TActorId sender = NActors::TActorId()) { - YQL_LOG(DEBUG) << "Free Group " << id; + YQL_CLOG(DEBUG, ProviderDq) << "Free Group " << id; auto it = AllocatedWorkers.find(id); if (it != AllocatedWorkers.end()) { for (const auto& actorId : it->second.WorkerActors) { @@ -300,7 +300,7 @@ private: if (sender && it->second.Sender != sender) { Options.Counters.FreeGroupError->Inc(); - YQL_LOG(ERROR) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId; + YQL_CLOG(ERROR, ProviderDq) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId; } MemoryQuoter->Free(it->second.TxId, 0); @@ -318,7 +318,7 @@ private: } } for (const auto& id : todelete) { - YQL_LOG(DEBUG) << "Free on deadline: " << id; + YQL_CLOG(DEBUG, ProviderDq) << "Free on deadline: " << id; FreeGroup(id); } } |