aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.ru>2022-05-26 01:18:20 +0300
committerudovichenko-r <udovichenko-r@yandex-team.ru>2022-05-26 01:18:20 +0300
commit363959d1c08721282997f4ab374a416fb1691ead (patch)
tree4cd5f411dd3cd9e20778e790fb5b8c0ca3422988
parent13b2958e4986b143d751eba7671fc41ff4e49d31 (diff)
downloadydb-363959d1c08721282997f4ab374a416fb1691ead.tar.gz
[yql] Cleanup dq logging
YQL-12393 ref:dc9772be7c1b3f3f6045fa5d6562a975f813856f
-rw-r--r--ydb/library/yql/providers/dq/actors/actor_helpers.h2
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp46
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.cpp18
-rw-r--r--ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp16
-rw-r--r--ydb/library/yql/providers/dq/actors/resource_allocator.cpp26
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h56
-rw-r--r--ydb/library/yql/providers/dq/actors/result_aggregator.cpp11
-rw-r--r--ydb/library/yql/providers/dq/actors/result_receiver.cpp10
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp42
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp38
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.cpp1
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp26
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_control.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp12
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp5
-rw-r--r--ydb/library/yql/providers/dq/runtime/file_cache.cpp4
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp54
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_session.cpp8
-rw-r--r--ydb/library/yql/providers/dq/service/interconnect_helpers.cpp20
-rw-r--r--ydb/library/yql/providers/dq/service/service_node.cpp10
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp24
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp2
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp26
26 files changed, 238 insertions, 239 deletions
diff --git a/ydb/library/yql/providers/dq/actors/actor_helpers.h b/ydb/library/yql/providers/dq/actors/actor_helpers.h
index 65be3a25d63..cf66de3ea40 100644
--- a/ydb/library/yql/providers/dq/actors/actor_helpers.h
+++ b/ydb/library/yql/providers/dq/actors/actor_helpers.h
@@ -144,7 +144,7 @@ private:
}
void OnSync(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) {
- YQL_LOG(DEBUG) << "OnSync(): delayed messages " << DelayedEvents_.size();
+ YQL_CLOG(DEBUG, ProviderDq) << "OnSync(): delayed messages " << DelayedEvents_.size();
SyncState_ = E_SYNC_RECEIVED;
TBase::Become(InterruptedHandler_);
SyncCallback_(ev);
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index 7293afdbacd..aff34066bdc 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -81,8 +81,8 @@ private:
HFunc(TEvQueryResponse, OnQueryResponse);
// execution timeout
cFunc(TEvents::TEvBootstrap::EventType, [this]() {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "Execution timeout";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "Execution timeout";
auto issue = TIssue("Execution timeout");
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
Issues.AddIssues({issue});
@@ -99,7 +99,7 @@ private:
TStringInput inputStream1(Settings->WorkerFilter.Get().GetOrElse(""));
ParseFromTextFormat(inputStream1, pragmaFilter);
} catch (...) {
- YQL_LOG(INFO) << "Cannot parse filter pragma " << CurrentExceptionMessage();
+ YQL_CLOG(INFO, ProviderDq) << "Cannot parse filter pragma " << CurrentExceptionMessage();
}
}
return pragmaFilter;
@@ -128,10 +128,10 @@ private:
}
void OnGraph(TEvGraphRequest::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
Y_VERIFY(!ControlId);
Y_VERIFY(!ResultId);
- YQL_LOG(DEBUG) << "TDqExecuter::OnGraph";
+ YQL_CLOG(DEBUG, ProviderDq) << "TDqExecuter::OnGraph";
ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId());
ResultId = NActors::ActorIdFromProto(ev->Get()->Record.GetResultId());
CheckPointCoordinatorId = NActors::ActorIdFromProto(ev->Get()->Record.GetCheckPointCoordinatorId());
@@ -141,7 +141,7 @@ private:
AddChild(CheckPointCoordinatorId);
int workerCount = ev->Get()->Record.GetRequest().GetTask().size();
- YQL_LOG(INFO) << (TStringBuilder() << "Trying to allocate " << workerCount << " workers");
+ YQL_CLOG(INFO, ProviderDq) << (TStringBuilder() << "Trying to allocate " << workerCount << " workers");
THashMap<TString, Yql::DqsProto::TFile> files;
TVector<NDqProto::TDqTask> tasks;
@@ -238,9 +238,9 @@ private:
void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool retriable, bool needFallback = false)
{
- YQL_LOG(DEBUG) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback;
if (Finished) {
- YQL_LOG(WARN) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback;
+ YQL_CLOG(WARN, ProviderDq) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback;
} else {
FlushCounter("ExecutionTime");
TQueryResponse result;
@@ -256,8 +256,8 @@ private:
void OnFailure(TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
if (!Finished) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
AddCounters(ev->Get()->Record);
bool retriable = ev->Get()->Record.GetDeprecatedRetriable();
bool fallback = ev->Get()->Record.GetDeprecatedNeedFallback();
@@ -271,14 +271,14 @@ private:
}
void OnGraphFinished(TEvGraphFinished::TPtr&, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
if (!Finished) {
try {
TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; });
Finish(NYql::NDqProto::StatusIds::SUCCESS, false);
} catch (...) {
- YQL_LOG(ERROR) << " FailureInjector " << CurrentExceptionMessage();
+ YQL_CLOG(ERROR, ProviderDq) << " FailureInjector " << CurrentExceptionMessage();
Issues.AddIssue(TIssue("FailureInjection"));
Finish(NYql::NDqProto::StatusIds::UNAVAILABLE, true);
}
@@ -288,8 +288,8 @@ private:
// TBD: wait for PoisonTaken from CheckPointCoordinator before send TEvQueryResponse to PrinterId
void OnQueryResponse(TEvQueryResponse::TPtr& ev, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
Send(PrinterId, ev->Release().Release());
PassAway();
}
@@ -299,8 +299,8 @@ private:
}
void OnAllocateWorkersResponse(TEvAllocateWorkersResponse::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "TDqExecuter::TEvAllocateWorkersResponse";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "TDqExecuter::TEvAllocateWorkersResponse";
AddCounters(ev->Get()->Record);
FlushCounter("AllocateWorkers");
@@ -310,7 +310,7 @@ private:
case TAllocateWorkersResponse::kWorkers:
break;
case TAllocateWorkersResponse::kError: {
- YQL_LOG(ERROR) << "Error on allocate workers "
+ YQL_CLOG(ERROR, ProviderDq) << "Error on allocate workers "
<< ev->Get()->Record.GetError().GetMessage() << ":"
<< static_cast<int>(ev->Get()->Record.GetError().GetErrorCode());
Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR));
@@ -324,7 +324,7 @@ private:
auto& workerGroup = response.GetWorkers();
ResourceId = workerGroup.GetResourceId();
- YQL_LOG(DEBUG) << "Allocated resource " << ResourceId;
+ YQL_CLOG(DEBUG, ProviderDq) << "Allocated resource " << ResourceId;
TVector<NActors::TActorId> workers;
for (const auto& actorIdProto : workerGroup.GetWorkerActor()) {
workers.emplace_back(NActors::ActorIdFromProto(actorIdProto));
@@ -341,10 +341,10 @@ private:
WorkerInfo[workerInfo.GetNodeId()] = std::make_tuple(workerInfo, taskMeta.GetStageId());
- YQL_LOG(DEBUG) << "WorkerInfo: " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(workerInfo, taskMeta.GetStageId());
- YQL_LOG(DEBUG) << "TaskInfo: " << i << "/" << tasks[i].GetId();
+ YQL_CLOG(DEBUG, ProviderDq) << "WorkerInfo: " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(workerInfo, taskMeta.GetStageId());
+ YQL_CLOG(DEBUG, ProviderDq) << "TaskInfo: " << i << "/" << tasks[i].GetId();
for (const auto& file: taskMeta.GetFiles()) {
- YQL_LOG(DEBUG) << " ObjectId: " << file.GetObjectId();
+ YQL_CLOG(DEBUG, ProviderDq) << " ObjectId: " << file.GetObjectId();
}
i++;
@@ -353,7 +353,7 @@ private:
AddCounter("UniqueWorkers", uniqueWorkers.size());
}
- YQL_LOG(INFO) << workers.size() << " workers allocated";
+ YQL_CLOG(INFO, ProviderDq) << workers.size() << " workers allocated";
YQL_ENSURE(workers.size() == tasks.size());
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
index f318eefa4be..06fc24e650a 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
@@ -46,12 +46,12 @@ private:
})
void PassAway() override {
- YQL_LOG_CTX_SCOPE(TraceID);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceID);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
try {
FullResultWriter->Abort();
} catch (...) {
- YQL_LOG(WARN) << "FullResultWriter->Abort(): " << CurrentExceptionMessage();
+ YQL_CLOG(WARN, ProviderDq) << "FullResultWriter->Abort(): " << CurrentExceptionMessage();
}
ResultBuilder.Reset();
FullResultWriter.Reset();
@@ -62,7 +62,7 @@ private:
}
void OnStatusRequest(TEvFullResultWriterStatusRequest::TPtr&, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceID);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceID);
NDqProto::TFullResultWriterStatusResponse response;
response.SetBytesReceived(BytesReceived);
if (ErrorMessage) {
@@ -72,7 +72,7 @@ private:
}
void OnWriteRequest(TEvFullResultWriterWriteRequest::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceID);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceID);
auto& request = ev->Get()->Record;
if (request.GetFinish()) {
Finish();
@@ -82,7 +82,7 @@ private:
}
void Finish() {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
try {
TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; });
FullResultWriter->Finish();
@@ -103,7 +103,7 @@ private:
}
void Continue(NDqProto::TFullResultWriterWriteRequest& request) {
- YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount();
+ YQL_CLOG(DEBUG, ProviderDq) << "Continue -- RowCount = " << FullResultWriter->GetRowCount();
ui64 reqSize = request.ByteSizeLong();
WriteToFullResultTable(request);
BytesReceived += reqSize;
@@ -111,7 +111,7 @@ private:
void WriteToFullResultTable(NDqProto::TFullResultWriterWriteRequest& request) {
if (ErrorMessage) {
- YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting";
+ YQL_CLOG(DEBUG, ProviderDq) << "Failed to write previous chunk, aborting";
return;
}
@@ -130,7 +130,7 @@ private:
}
if (ErrorMessage) {
- YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage;
+ YQL_CLOG(DEBUG, ProviderDq) << "An error occurred: " << *ErrorMessage;
}
}
diff --git a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp
index 3ff4fb16588..4ab306e9bf0 100644
--- a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp
@@ -30,12 +30,12 @@ private:
})
void DoPassAway() override {
- YQL_LOG_CTX_SCOPE(TraceID);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceID);
}
void OnEvent(NDqs::TEvGraphExecutionEvent::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceID);
- YQL_LOG(DEBUG) << __FUNCTION__ << ' ' << NYql::NDqProto::EGraphExecutionEventType_Name(ev->Get()->Record.GetEventType());
+ YQL_LOG_CTX_ROOT_SCOPE(TraceID);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ' ' << NYql::NDqProto::EGraphExecutionEventType_Name(ev->Get()->Record.GetEventType());
try {
switch (ev->Get()->Record.GetEventType()) {
@@ -63,7 +63,7 @@ private:
}
} catch (...) {
TString message = TStringBuilder() << "Error on TEvGraphExecutionEvent: " << CurrentExceptionMessage();
- YQL_LOG(ERROR) << message;
+ YQL_CLOG(ERROR, ProviderDq) << message;
Reply(ev->Sender, message);
}
}
@@ -74,7 +74,7 @@ private:
}
void OnStart(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor& payload) {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
const auto& secureParams = AsHashMap(payload.GetSecureParams().GetData());
const auto& graphParams = AsHashMap(payload.GetGraphParams().GetData());
@@ -109,7 +109,7 @@ private:
}
void OnFail(NActors::TActorId replyTo) {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
for (const auto& preprocessor: TaskPreprocessors) {
preprocessor->Finish(false);
}
@@ -117,7 +117,7 @@ private:
}
void OnSuccess(NActors::TActorId replyTo) {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
for (const auto& preprocessor: TaskPreprocessors) {
preprocessor->Finish(true);
}
@@ -125,7 +125,7 @@ private:
}
void OnFullResult(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TFullResultDescriptor& payload) {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
THolder<IDqFullResultWriter> writer;
for (const auto& preprocessor: TaskPreprocessors) {
writer = preprocessor->CreateFullResultWriter();
diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp
index 4b968b9f240..c8c786f6a3d 100644
--- a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp
+++ b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp
@@ -101,7 +101,7 @@ private:
{
Y_UNUSED(ctx);
- YQL_LOG(DEBUG) << "RequestActorIdsFromNodes " << ev->Sender.NodeId();
+ YQL_CLOG(DEBUG, ProviderDq) << "RequestActorIdsFromNodes " << ev->Sender.NodeId();
auto& response = ev->Get()->Record;
auto& nodes = response.GetNodes().GetWorker();
@@ -110,11 +110,11 @@ private:
Y_VERIFY(static_cast<ui32>(nodes.size()) == RequestedCount);
RequestedNodes.reserve(RequestedCount);
- YQL_LOG(DEBUG) << "RequestActorIdsFromNodes " << ev->Sender.NodeId() << " " << ResourceId;
+ YQL_CLOG(DEBUG, ProviderDq) << "RequestActorIdsFromNodes " << ev->Sender.NodeId() << " " << ResourceId;
auto now = TInstant::Now();
ui16 i = 1;
for (const auto& node : nodes) {
- YQL_LOG(DEBUG) << "RequestedNode: " << node.GetNodeId();
+ YQL_CLOG(DEBUG, ProviderDq) << "RequestedNode: " << node.GetNodeId();
TString clusterName = node.GetClusterName();
NYql::NDqProto::TDqTask task;
if (!Tasks.empty()) {
@@ -133,10 +133,10 @@ private:
void OnAllocateWorkersResponse(TEvAllocateWorkersResponse::TPtr& ev, const TActorContext& ctx)
{
- YQL_LOG_CTX_SCOPE(TraceId + SelfId().ToString());
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId, SelfId().ToString());
Y_UNUSED(ctx);
- YQL_LOG(DEBUG) << "TEvAllocateWorkersResponse " << ev->Sender.NodeId();
+ YQL_CLOG(DEBUG, ProviderDq) << "TEvAllocateWorkersResponse " << ev->Sender.NodeId();
QueryStat.AddCounters(ev->Get()->Record);
@@ -145,7 +145,7 @@ private:
}
if (ev->Get()->Record.GetTResponseCase() == NDqProto::TAllocateWorkersResponse::kError) {
- YQL_LOG(DEBUG) << "Forwarding bad state";
+ YQL_CLOG(DEBUG, ProviderDq) << "Forwarding bad state";
ctx.Send(ev->Forward(SenderId));
FailState = true;
return;
@@ -215,18 +215,18 @@ private:
void DoPassAway() override
{
- YQL_LOG_CTX_SCOPE(TraceId + SelfId().ToString());
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId, SelfId().ToString());
for (const auto& group : AllocatedWorkers) {
for (const auto& actorIdProto : group.GetWorkerActor()) {
auto actorNode = NActors::ActorIdFromProto(actorIdProto).NodeId();
- YQL_LOG(DEBUG) << "TEvFreeWorkersNotify " << group.GetResourceId();
+ YQL_CLOG(DEBUG, ProviderDq) << "TEvFreeWorkersNotify " << group.GetResourceId();
auto request = MakeHolder<TEvFreeWorkersNotify>(group.GetResourceId());
request->Record.SetTraceId(TraceId);
Send(MakeWorkerManagerActorID(actorNode), request.Release());
}
}
if (!LocalMode) {
- YQL_LOG(DEBUG) << "TEvFreeWorkersNotify " << ResourceId << " (failed)";
+ YQL_CLOG(DEBUG, ProviderDq) << "TEvFreeWorkersNotify " << ResourceId << " (failed)";
auto request = MakeHolder<TEvFreeWorkersNotify>(ResourceId);
request->Record.SetTraceId(TraceId);
for (const auto& failedWorker : FailedWorkers) {
@@ -249,7 +249,7 @@ private:
ActorIdToProto(ControlId, request->Record.MutableResultActorId());
*request->Record.AddTask() = node.Task;
}
- YQL_LOG(WARN) << "Send TEvAllocateWorkersRequest to " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(node.WorkerInfo, 0);
+ YQL_CLOG(WARN, ProviderDq) << "Send TEvAllocateWorkersRequest to " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(node.WorkerInfo, 0);
if (backoff) {
TActivationContext::Schedule(backoff, new IEventHandle(
MakeWorkerManagerActorID(nodeId), SelfId(), request.Release(),
@@ -265,13 +265,13 @@ private:
}
void Fail(const ui64 cookie, const TString& reason) {
- YQL_LOG_CTX_SCOPE(TraceId + SelfId().ToString());
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId, SelfId().ToString());
if (FailState) {
return;
}
auto maybeRequestInfo = RequestedNodes.find(cookie);
if (!Answered && maybeRequestInfo != RequestedNodes.end() && maybeRequestInfo->second.Retry < NetworkRetries) {
- YQL_LOG(WARN) << "Retry Allocate Request";
+ YQL_CLOG(WARN, ProviderDq) << "Retry Allocate Request";
auto& requestInfo = RequestedNodes[cookie];
requestInfo.Retry ++;
*RetryCounter += 1;
@@ -293,7 +293,7 @@ private:
delta);
}
TString message = "Disconnected from worker: `" + workerInfo + "', reason: " + reason;
- YQL_LOG(ERROR) << message;
+ YQL_CLOG(ERROR, ProviderDq) << message;
auto response = MakeHolder<TEvAllocateWorkersResponse>(message);
QueryStat.FlushCounters(response->Record);
Send(SenderId, response.Release());
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
index fe2bc1831cf..192dc080da2 100644
--- a/ydb/library/yql/providers/dq/actors/result_actor_base.h
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -56,8 +56,8 @@ namespace NYql::NDqs::NExecutionHelpers {
, QueryResponse()
, WaitingAckFromFRW(false) {
ResultYsonWriter->OnBeginList();
- YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << SizeLimit;
- YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing");
+ YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << SizeLimit;
+ YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing");
}
virtual void FinishFullResultWriter() {
@@ -65,7 +65,7 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void OnReceiveData(NYql::NDqProto::TData&& data, const TString& messageId = "", bool autoAck = false) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
if (data.GetRows() > 0 && !ResultBuilder) {
Issues.AddIssue(TIssue("Non empty rows: >=" + ToString(data.GetRows())).SetCode(0, TSeverityIds::S_WARNING));
@@ -126,7 +126,7 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) {
- YQL_LOG(ERROR) << "OnError " << message;
+ YQL_CLOG(ERROR, ProviderDq) << "OnError " << message;
auto issueCode = NCommon::NeedFallback(statusCode)
? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
: TIssuesIds::DQ_GATEWAY_ERROR;
@@ -138,7 +138,7 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void Finish() {
- YQL_LOG(DEBUG) << __FUNCTION__ << ", truncated=" << Truncated;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ", truncated=" << Truncated;
YQL_ENSURE(!FinishCalled);
FinishCalled = true;
@@ -162,8 +162,8 @@ namespace NYql::NDqs::NExecutionHelpers {
cFunc(NActors::TEvents::TEvGone::EventType, OnFullResultWriterShutdown);
cFunc(NActors::TEvents::TEvPoison::EventType, TBase::PassAway)
default:
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "Unexpected event " << etype;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "Unexpected event " << etype;
break;
}
}
@@ -176,17 +176,17 @@ namespace NYql::NDqs::NExecutionHelpers {
HFunc(TEvDqFailure, OnErrorInShutdownState);
HFunc(TEvFullResultWriterAck, OnFullResultWriterAck);
default:
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "Unexpected event " << etype;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "Unexpected event " << etype;
break;
}
}
private:
void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
- YQL_LOG(DEBUG) << "Shutting down TResultAggregator";
+ YQL_CLOG(DEBUG, ProviderDq) << "Shutting down TResultAggregator";
BlockingActors.clear();
if (FullResultWriterID) {
@@ -194,7 +194,7 @@ namespace NYql::NDqs::NExecutionHelpers {
FinishFullResultWriter();
}
- YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors";
+ YQL_CLOG(DEBUG, ProviderDq) << "Waiting for " << BlockingActors.size() << " blocking actors";
QueryResponse.Reset(ev->Release().Release());
TBase::Become(&TDerived::ShutdownHandler);
@@ -202,15 +202,15 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void OnFullResultWriterShutdown() {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "Got TEvGone";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "Got TEvGone";
FullResultWriterID = {};
}
void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
if (ev->Get()->Record.IssuesSize() == 0) {
DoFinish();
} else {
@@ -219,7 +219,7 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId())
+ " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16)
+ "." + ToString(ev->Get()->SourceType & 0xFFFF);
@@ -227,8 +227,8 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void OnFullResultWriterAck(TEvFullResultWriterAck::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
Y_VERIFY(ev->Get()->Record.GetMessageId() == WriteQueue.front().MessageId);
if (!WriteQueue.front().SentProcessedEvent) { // messages, received before limits exceeded, are already been reported
TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(WriteQueue.front().MessageId));
@@ -251,13 +251,13 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void OnShutdownQueryResult(NActors::TEvents::TEvGone::TPtr& ev, const NActors::TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
auto iter = BlockingActors.find(ev->Sender);
if (iter != BlockingActors.end()) {
BlockingActors.erase(iter);
}
- YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left";
+ YQL_CLOG(DEBUG, ProviderDq) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left";
if (BlockingActors.empty()) {
EndOnQueryResult();
@@ -269,7 +269,7 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void FlushCurrent() {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
YQL_ENSURE(!FullResultWriterID);
YQL_ENSURE(FullResultTableEnabled);
@@ -281,7 +281,7 @@ namespace NYql::NDqs::NExecutionHelpers {
TBase::Send(GraphExecutionEventsId, new TEvGraphExecutionEvent(record));
TBase::template Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) {
Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
if (auto msg = ev->Get()->Record.GetErrorMessage()) {
OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, msg);
@@ -295,7 +295,7 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void EndOnQueryResult() {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
NDqProto::TQueryResponse result = QueryResponse->Record;
YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty());
@@ -318,8 +318,8 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void DoPassAway() override {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
}
void TryWriteToFullResultTable() {
@@ -331,8 +331,8 @@ namespace NYql::NDqs::NExecutionHelpers {
}
void UnsafeWriteToFullResultTable() {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
TBase::Send(FullResultWriterID, MakeHolder<TEvFullResultWriterWriteRequest>(std::move(WriteQueue.front().WriteRequest)));
}
diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
index cca0b79d9a8..ec49e909e58 100644
--- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
@@ -101,8 +101,8 @@ private:
}
void OnWakeup() {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
auto now = TInstant::Now();
if (PullRequestTimeout && now - PullRequestStartTime > PullRequestTimeout) {
OnError(NYql::NDqProto::StatusIds::TIMEOUT, "Timeout " + ToString(SourceID.NodeId()));
@@ -123,7 +123,7 @@ private:
}
void OnReadyState(TEvReadyState::TPtr& ev, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
AddCounters(ev->Get()->Record);
SourceID = NActors::ActorIdFromProto(ev->Get()->Record.GetSourceId());
@@ -138,7 +138,7 @@ private:
}
void OnPullResult(TEvPullResult::TPtr&, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
PullRequestStartTime = TInstant::Now();
Send(SourceID, MakeHolder<TEvPullDataRequest>(MAX_RESULT_BATCH), IEventHandle::FlagTrackDelivery);
}
@@ -148,8 +148,7 @@ private:
}
void OnPullResponse(TEvPullDataResponse::TPtr& ev, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
if (FinishCalled) {
// finalization has been begun, actor will not kill himself anymore, should ignore responses instead
diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
index 7c743e7f606..f14c03fad50 100644
--- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
@@ -65,8 +65,8 @@ public:
private:
void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
bool finishRequested = ev->Get()->Record.GetChannelData().GetFinished();
if (!FinishCalled) {
@@ -77,7 +77,7 @@ private:
Y_ENSURE(inserted);
}
- YQL_LOG(DEBUG) << "Finished: " << finishRequested;
+ YQL_CLOG(DEBUG, ProviderDq) << "Finished: " << finishRequested;
}
void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) {
@@ -85,8 +85,8 @@ private:
}
void OnMessageProcessed(TEvMessageProcessed::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
SendAck(ev->Get()->MessageId);
}
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index 379a5c0ce91..b0bcd9db67d 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -65,10 +65,10 @@ public:
{
if (Settings) {
if (Settings->_AllResultsBytesLimit.Get()) {
- YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get();
+ YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get();
}
if (Settings->_RowsLimitPerWrite.Get()) {
- YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get();
+ YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get();
}
}
}
@@ -103,10 +103,10 @@ private:
}
void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
auto statusCode = ev->Get()->Record.GetStatusCode();
TIssues issues = ev->Get()->GetIssues();
- YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString();
+ YQL_CLOG(DEBUG, ProviderDq) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString();
OnError(statusCode, issues);
}
@@ -114,8 +114,8 @@ private:
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG)
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq)
<< SelfId()
<< " EvState TaskId: " << taskId
<< " State: " << state.GetState()
@@ -123,7 +123,7 @@ private:
<< " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode());
if (state.HasStats() && state.GetStats().GetTasks().size()) {
- YQL_LOG(DEBUG) << " " << SelfId() << " AddStats " << taskId;
+ YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " AddStats " << taskId;
AddStats(state.GetStats());
if (ServiceCounters.Counters && !AggrPeriod) {
ExportStats(TaskStat, taskId);
@@ -144,7 +144,7 @@ private:
break;
}
case NDqProto::COMPUTE_STATE_EXECUTING: {
- YQL_LOG(DEBUG) << " " << SelfId() << " Executing TaskId: " << taskId;
+ YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId;
if (!FinishedTasks.contains(taskId)) {
// may get late/reordered? message
Executing[taskId] = Now();
@@ -152,7 +152,7 @@ private:
break;
}
case NDqProto::COMPUTE_STATE_FINISHED: {
- YQL_LOG(DEBUG) << " " << SelfId() << " Finish TaskId: " << taskId;
+ YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Finish TaskId: " << taskId;
Executing.erase(taskId);
FinishedTasks.insert(taskId);
break;
@@ -171,7 +171,7 @@ private:
for (auto& taskActors: Executing) {
if (now > taskActors.second + PingPeriod) {
PingCookie++;
- YQL_LOG(DEBUG) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie;
+ YQL_CLOG(DEBUG, ProviderDq) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie;
Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie);
taskActors.second = now;
}
@@ -206,7 +206,7 @@ private:
}
void ExportStats(const TCounters& stat, ui64 taskId) {
- YQL_LOG(DEBUG) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary");
+ YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary");
TString name;
std::map<TString, TString> labels;
static const TString SourceLabel = "Source";
@@ -411,7 +411,7 @@ private:
}
void OnReadyState(TEvReadyState::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
TaskStat.AddCounters(ev->Get()->Record);
@@ -432,7 +432,7 @@ private:
Stages.emplace(task.GetId(), taskMeta.GetStageId());
}
- YQL_LOG(DEBUG) << "Ready State: X1=" << SelfId().RawX1() << ", X2=" << SelfId().RawX2();
+ YQL_CLOG(DEBUG, ProviderDq) << "Ready State: X1=" << SelfId().RawX1() << ", X2=" << SelfId().RawX2();
MaybeUpdateChannels();
@@ -449,7 +449,7 @@ private:
return;
}
- YQL_LOG(DEBUG) << "Update channels";
+ YQL_CLOG(DEBUG, ProviderDq) << "Update channels";
for (const auto& [task, actorId] : Tasks) {
auto ev = MakeHolder<NDq::TEvDqCompute::TEvChannelsInfo>();
@@ -465,7 +465,7 @@ private:
}
}
- YQL_LOG(DEBUG) << task.GetId() << " " << ev->Record.ShortDebugString();
+ YQL_CLOG(DEBUG, ProviderDq) << task.GetId() << " " << ev->Record.ShortDebugString();
Send(actorId, ev.Release());
}
@@ -474,8 +474,8 @@ private:
void OnResultFailure(TEvDqFailure::TPtr& ev) {
if (Finished) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(WARN) << "TEvDqFailure IGNORED when Finished from " << ev->Sender;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(WARN, ProviderDq) << "TEvDqFailure IGNORED when Finished from " << ev->Sender;
} else {
FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported
Send(ExecuterId, ev->Release().Release());
@@ -484,11 +484,11 @@ private:
}
void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode);
if (Finished) {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(WARN) << "OnError IGNORED when Finished";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(WARN, ProviderDq) << "OnError IGNORED when Finished";
} else {
auto req = MakeHolder<TEvDqFailure>(statusCode, issues);
FinalStat().FlushCounters(req->Record);
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index aaf3151d74f..fc44dd201dc 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -87,23 +87,23 @@ public:
, RuntimeData(runtimeData)
, TraceId(traceId)
{
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "TDqWorker created ";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker created ";
}
~TDqWorker()
{
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "TDqWorker destroyed ";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker destroyed ";
}
void DoPassAway() override {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
for (const auto& inputs : InputMap) {
Send(inputs.first, new NActors::TEvents::TEvPoison());
}
- YQL_LOG(DEBUG) << "TDqWorker passed away ";
+ YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker passed away ";
if (Actor) {
Actor->PassAway();
}
@@ -200,7 +200,7 @@ private:
void SendFailure(THolder<TEvDqFailure> ev) {
if (!Executer) {
// Posible Error on Undelivered before OnDqTask
- YQL_LOG(ERROR) << "Error " << ev->Record.ShortUtf8DebugString();
+ YQL_CLOG(ERROR, ProviderDq) << "Error " << ev->Record.ShortUtf8DebugString();
return;
}
Stat.FlushCounters(ev->Record);
@@ -213,8 +213,8 @@ private:
void OnDqTask(TEvDqTask::TPtr& ev, const NActors::TActorContext& ctx) {
Y_UNUSED(ctx);
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "TDqWorker::OnDqTask";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker::OnDqTask";
TFailureInjector::Reach("dq_task_failure", [] {::_exit(1); });
@@ -329,8 +329,8 @@ private:
void OnPullRequest(TEvPullDataRequest::TPtr& ev, const NActors::TActorContext& ctx) {
Y_UNUSED(ctx);
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(TRACE) << "TDqWorker::OnPullRequest " << ev->Sender;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(TRACE, ProviderDq) << "TDqWorker::OnPullRequest " << ev->Sender;
if (!TaskRunnerActor || !TaskRunnerPrepared) {
// waiting for initialization
@@ -382,8 +382,8 @@ private:
void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext& ctx) {
Y_UNUSED(ctx);
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(TRACE) << "TDqWorker::OnPullResponse";
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(TRACE, ProviderDq) << "TDqWorker::OnPullResponse";
Stat.AddCounters(ev->Get()->Record);
@@ -531,7 +531,7 @@ private:
auto hasFreeSpace = freeSpace == ev->Get()->InputChannelFreeSpace.end()
|| freeSpace->second > 0;
if (hasFreeSpace) {
- YQL_LOG(TRACE) << "Send TEvPullDataRequest to " <<
+ YQL_CLOG(TRACE, ProviderDq) << "Send TEvPullDataRequest to " <<
channel.ActorID << " from " <<
SelfId();
Send(channel.ActorID, MakeHolder<TEvPullDataRequest>(INPUT_SIZE), IEventHandle::FlagTrackDelivery);
@@ -615,18 +615,18 @@ private:
for (const auto& [actorId, channel] : InputMap) {
if (!channel.Finished) {
if (channel.Requested) {
- YQL_LOG(DEBUG) << "Input " << JobDebugInfo(actorId) << (now - channel.RequestTime) << " Requested? " << channel.Requested;
+ YQL_CLOG(DEBUG, ProviderDq) << "Input " << JobDebugInfo(actorId) << (now - channel.RequestTime) << " Requested? " << channel.Requested;
if (RuntimeData) {
RuntimeData->UpdateChannelInputDelay(now - channel.RequestTime);
}
} else {
- YQL_LOG(DEBUG) << "Input " << JobDebugInfo(actorId) << (now - channel.ResponseTime) << " Requested? " << channel.Requested;
+ YQL_CLOG(DEBUG, ProviderDq) << "Input " << JobDebugInfo(actorId) << (now - channel.ResponseTime) << " Requested? " << channel.Requested;
if (RuntimeData) {
RuntimeData->UpdateChannelInputDelay(now - channel.ResponseTime);
}
}
} else {
- YQL_LOG(DEBUG) << "Input " << JobDebugInfo(actorId) << " Finished";
+ YQL_CLOG(DEBUG, ProviderDq) << "Input " << JobDebugInfo(actorId) << " Finished";
if (RuntimeData) {
RuntimeData->UpdateChannelInputDelay(TDuration::Seconds(0));
}
@@ -635,12 +635,12 @@ private:
for (const auto& [actorId, channel] : OutputMap) {
if (!channel.Finished) {
- YQL_LOG(DEBUG) << "Output " << JobDebugInfo(actorId) << (now - channel.RequestTime);
+ YQL_CLOG(DEBUG, ProviderDq) << "Output " << JobDebugInfo(actorId) << (now - channel.RequestTime);
if (RuntimeData) {
RuntimeData->UpdateChannelOutputDelay(now - channel.RequestTime);
}
} else {
- YQL_LOG(DEBUG) << "Output " << JobDebugInfo(actorId) << " Finished";
+ YQL_CLOG(DEBUG, ProviderDq) << "Output " << JobDebugInfo(actorId) << " Finished";
if (RuntimeData) {
RuntimeData->UpdateChannelOutputDelay(TDuration::Seconds(0));
}
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
index b4bc07fbef1..455adef20bc 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
@@ -21,6 +21,7 @@
#define PERFORM_RULE(func, ...) \
do { \
if (auto result = func(__VA_ARGS__); result.Raw() != node.Raw()) { \
+ YQL_CLOG(DEBUG, ProviderDq) << #func; \
node = result; \
return node.Ptr(); \
} \
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index f672c9ded64..03e69a9fef3 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -137,7 +137,7 @@ namespace NYql::NDqs {
auto query = expr.Maybe<TDqQuery>();
const auto maxTasksPerOperation = settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation);
- YQL_LOG(DEBUG) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot);
+ YQL_CLOG(DEBUG, ProviderDq) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot);
auto stages = GetStages(DqExprRoot);
YQL_ENSURE(!stages.empty());
@@ -149,9 +149,9 @@ namespace NYql::NDqs {
for (const auto& stage : stages) {
const bool hasDqSource = HasDqSource(stage);
if ((hasDqSource || HasReadWraps(stage.Program().Ptr())) && BuildReadStage(settings, stage, hasDqSource, canFallback)) {
- YQL_LOG(DEBUG) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
+ YQL_CLOG(TRACE, ProviderDq) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
} else {
- YQL_LOG(DEBUG) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
+ YQL_CLOG(TRACE, ProviderDq) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
NDq::CommonBuildTasks(TasksGraph, stage);
}
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 229b248b03f..de3a6fd3878 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -147,7 +147,7 @@ public:
TVector<NDqProto::TData> rows;
{
auto guard = runner->BindAllocator(State->Settings->MemoryLimit.Get().GetOrElse(0));
- YQL_LOG(DEBUG) << " NDq::ERunStatus " << runner->Run();
+ YQL_CLOG(DEBUG, ProviderDq) << " NDq::ERunStatus " << runner->Run();
NDq::ERunStatus status;
while ((status = runner->Run()) == NDq::ERunStatus::PendingOutput || status == NDq::ERunStatus::Finished) {
@@ -270,7 +270,7 @@ private:
std::tuple<TString, TString> GetPathAndObjectId(const TFilePathWithMd5& pathWithMd5) const {
if (pathWithMd5.Md5.empty()) {
- YQL_LOG(WARN) << "Empty md5 for " << pathWithMd5.Path;
+ YQL_CLOG(WARN, ProviderDq) << "Empty md5 for " << pathWithMd5.Path;
}
return GetPathAndObjectId(pathWithMd5.Path,
pathWithMd5.Md5.empty()
@@ -337,7 +337,7 @@ private:
auto block = b.second;
auto filePath = block->FrozenFile->GetPath().GetPath();
auto fullFileName = localRun ? filePath : TUserDataStorage::MakeRelativeName(b.first.Alias());
- YQL_LOG(DEBUG) << "Path resolve " << filePath << "|"<< fullFileName;
+ YQL_CLOG(DEBUG, ProviderDq) << "Path resolve " << filePath << "|"<< fullFileName;
// validate
switch (block->Type) {
case EUserDataType::URL:
@@ -421,14 +421,14 @@ private:
const TString udfName(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
const auto moduleName = ModuleName(udfName);
- YQL_LOG(DEBUG) << "Try to resolve " << moduleName;
+ YQL_CLOG(DEBUG, ProviderDq) << "Try to resolve " << moduleName;
TMaybe<TFilePathWithMd5> udfPathWithMd5 = State->TypeCtx->UdfResolver->GetSystemModulePath(moduleName);
YQL_ENSURE(udfPathWithMd5.Defined());
TString filePath, objectId;
std::tie(filePath, objectId) = GetPathAndObjectId(*udfPathWithMd5);
- YQL_LOG(DEBUG) << "File|Md5 " << filePath << "|" << objectId;
+ YQL_CLOG(DEBUG, ProviderDq) << "File|Md5 " << filePath << "|" << objectId;
if (!filePath.StartsWith(NKikimr::NMiniKQL::StaticModulePrefix)) {
auto f = IDqGateway::TFileResource();
@@ -465,7 +465,7 @@ private:
i64 dataLimit = static_cast<i64>(4_GB);
bool fallbackFlag = false;
if (sizeSum > dataLimit) {
- YQL_LOG(INFO) << "Too much data: " << sizeSum << " > " << dataLimit;
+ YQL_CLOG(WARN, ProviderDq) << "Too much data: " << sizeSum << " > " << dataLimit;
fallbackFlag = true;
}
@@ -504,7 +504,7 @@ private:
StartCounter("FreezeUsedFiles");
if (const auto filesRes = NCommon::FreezeUsedFiles(*input, files, *State->TypeCtx, ctx, [](const TString&){return true;}, crutches); filesRes.first.Level != TStatus::Ok) {
if (filesRes.first.Level != TStatus::Error) {
- YQL_LOG(DEBUG) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")";
+ YQL_CLOG(DEBUG, ProviderDq) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")";
}
return filesRes;
}
@@ -558,7 +558,7 @@ private:
bool fallbackFlag = BuildUploadList(uploadList, localRun, explorer, typeEnv, files);
if (fallbackFlag) {
- YQL_LOG(DEBUG) << "Fallback: " << NCommon::ExprToPrettyString(ctx, *input);
+ YQL_CLOG(DEBUG, ProviderDq) << "Fallback: " << NCommon::ExprToPrettyString(ctx, *input);
return Fallback();
} else {
*lambda = SerializeRuntimeNode(root, typeEnv);
@@ -621,7 +621,7 @@ private:
}
TStatusCallbackPair HandleResult(const TExprNode::TPtr& input, TExprContext& ctx) {
- YQL_LOG(DEBUG) << "Executing " << input->Content() << " (UniqueId=" << input->UniqueId() << ")";
+ YQL_CLOG(DEBUG, ProviderDq) << "Executing " << input->Content() << " (UniqueId=" << input->UniqueId() << ")";
if (State->ExternalUser) {
return Fallback();
@@ -726,7 +726,7 @@ private:
FlushStatisticsToState();
return WrapFutureCallback(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- YQL_LOG(DEBUG) << state->SessionId << " WrapFutureCallback";
+ YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback";
auto duration = TInstant::Now() - startTime;
if (state->Metrics) {
@@ -749,7 +749,7 @@ private:
return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
}
- YQL_LOG(DEBUG) << "Fallback from gateway: " << NCommon::ExprToPrettyString(ctx, *input);
+ YQL_CLOG(DEBUG, ProviderDq) << "Fallback from gateway: " << NCommon::ExprToPrettyString(ctx, *input);
TIssue warning(ctx.GetPosition(input->Pos()), "DQ cannot execute the query");
warning.Severity = TSeverityIds::S_INFO;
ctx.IssueManager.RaiseIssue(warning);
@@ -921,7 +921,7 @@ private:
auto filesRes = NCommon::FreezeUsedFiles(*optimizedInput, files, *State->TypeCtx, ctx, [](const TString&){return true;}, crutches);
if (filesRes.first.Level != TStatus::Ok) {
if (filesRes.first.Level != TStatus::Error) {
- YQL_LOG(DEBUG) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")";
+ YQL_CLOG(DEBUG, ProviderDq) << "Freezing files for " << input->Content() << " (UniqueId=" << input->UniqueId() << ")";
}
return filesRes;
}
@@ -1067,8 +1067,6 @@ private:
int level = 0;
// TODO: remove copy-paste
return WrapFutureCallback(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- YQL_LOG(DEBUG) << state->SessionId << " WrapFutureCallback";
-
auto duration = TInstant::Now() - startTime;
if (state->Metrics) {
state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds());
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp
index 1a0053350a6..ee2aec976ab 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp
@@ -61,7 +61,7 @@ public:
try {
return promise.GetFuture().GetValueSync();
} catch (...) {
- YQL_LOG(INFO) << "DqControl IsReady Exception " << CurrentExceptionMessage();
+ YQL_CLOG(INFO, ProviderDq) << "DqControl IsReady Exception " << CurrentExceptionMessage();
return false;
}
}
@@ -110,11 +110,11 @@ public:
}
for (const auto& [path, objectId] : udfs){
- YQL_LOG(DEBUG) << "DQ control, adding file: " << path << " with objectId " << objectId;
+ YQL_CLOG(DEBUG, ProviderDq) << "DQ control, adding file: " << path << " with objectId " << objectId;
TString newPath, newObjectId;
std::tie(newPath, newObjectId) = GetPathAndObjectId(path, objectId, objectId);
- YQL_LOG(DEBUG) << "DQ control, rewrite path/objectId: " << newPath << ", " << newObjectId;
+ YQL_CLOG(DEBUG, ProviderDq) << "DQ control, rewrite path/objectId: " << newPath << ", " << newObjectId;
TFileResource r;
r.SetLocalPath(newPath);
r.SetObjectType(Yql::DqsProto::TFile::EUDF_FILE);
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
index 2341b28ad44..86f8af26a13 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
@@ -52,7 +52,7 @@ public:
template<typename RespType>
void OnResponse(NThreading::TPromise<TResult> promise, TString sessionId, NGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap<TString, TString>& modulesMapping, bool alwaysFallback = false)
{
- YQL_LOG_CTX_SCOPE(sessionId);
+ YQL_LOG_CTX_ROOT_SCOPE(sessionId);
YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback";
{
@@ -235,7 +235,7 @@ public:
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) override
{
- YQL_LOG_CTX_SCOPE(sessionId);
+ YQL_LOG_CTX_ROOT_SCOPE(sessionId);
auto tasks = plan.GetTasks();
Yql::DqsProto::ExecuteGraphRequest queryPB;
@@ -281,7 +281,7 @@ public:
RunningQueries.emplace(sessionId, std::make_pair(progressWriter, TString("")));
}
- YQL_LOG(DEBUG) << "Send query of size " << queryPB.ByteSizeLong();
+ YQL_CLOG(DEBUG, ProviderDq) << "Send query of size " << queryPB.ByteSizeLong();
return WithRetry<Yql::DqsProto::ExecuteGraphResponse>(
sessionId,
@@ -293,7 +293,7 @@ public:
}
NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override {
- YQL_LOG_CTX_SCOPE(sessionId);
+ YQL_LOG_CTX_ROOT_SCOPE(sessionId);
YQL_CLOG(INFO, ProviderDq) << "OpenSession";
Yql::DqsProto::OpenSessionRequest request;
request.SetSession(sessionId);
@@ -305,7 +305,7 @@ public:
auto promise = NThreading::NewPromise<void>();
auto callback = [this, promise, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable {
Y_UNUSED(resp);
- YQL_LOG_CTX_SCOPE(sessionId);
+ YQL_LOG_CTX_ROOT_SCOPE(sessionId);
if (status.Ok()) {
YQL_CLOG(INFO, ProviderDq) << "OpenSession OK";
SchedulePingSessionRequest(sessionId);
@@ -394,7 +394,7 @@ public:
Yql::DqsProto::PingSessionResponse&&) mutable
{
if (status.GRpcStatusCode == grpc::INVALID_ARGUMENT) {
- YQL_LOG(INFO) << "Session closed " << sessionId;
+ YQL_CLOG(INFO, ProviderDq) << "Session closed " << sessionId;
} else {
SchedulePingSessionRequest(sessionId);
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
index 92db4290741..5521d96e15b 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
@@ -84,10 +84,10 @@ TDataProviderInitializer GetDqDataProviderInitializer(
Y_UNUSED(timeProvider);
if (dqGateway) { // nullptr in yqlrun
auto t = TInstant::Now();
- YQL_LOG(DEBUG) << "OpenSession " << sessionId;
+ YQL_CLOG(DEBUG, ProviderDq) << "OpenSession " << sessionId;
auto future = dqGateway->OpenSession(sessionId, username);
future.Subscribe([sessionId, t] (const auto& ) {
- YQL_LOG(DEBUG) << "OpenSession " << sessionId << " complete in " << (TInstant::Now()-t).MilliSeconds();
+ YQL_CLOG(DEBUG, ProviderDq) << "OpenSession " << sessionId << " complete in " << (TInstant::Now()-t).MilliSeconds();
});
return future;
} else {
@@ -101,7 +101,7 @@ TDataProviderInitializer GetDqDataProviderInitializer(
}
if (dqGateway) { // nullptr in yqlrun
- YQL_LOG(DEBUG) << "CloseSession " << sessionId;
+ YQL_CLOG(DEBUG, ProviderDq) << "CloseSession " << sessionId;
dqGateway->CloseSession(sessionId);
}
};
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
index 058c99a9fcc..d8c8ab7b87b 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
@@ -90,7 +90,7 @@ public:
}
if (!good || (hasJoin && dataSize > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB))) {
- YQL_LOG(DEBUG) << "good: " << good << " hasJoin: " << hasJoin << " dataSize: " << dataSize;
+ YQL_CLOG(DEBUG, ProviderDq) << "good: " << good << " hasJoin: " << hasJoin << " dataSize: " << dataSize;
return TStatus::Ok;
}
}
@@ -116,6 +116,7 @@ public:
}, ctx, TOptimizeExprSettings{State_->TypeCtx});
if (input != output) {
+ YQL_CLOG(DEBUG, ProviderDq) << "DqsRecapture";
// TODO: Add before/after recapture transformers
State_->TypeCtx->DqCaptured = true;
// TODO: drop this after implementing DQS ConstraintTransformer
@@ -129,7 +130,7 @@ public:
private:
void AddInfo(TExprContext& ctx, const TString& message) const {
- YQL_LOG(DEBUG) << message;
+ YQL_CLOG(DEBUG, ProviderDq) << message;
TIssue info("DQ cannot execute the query. Cause: " + message);
info.Severity = TSeverityIds::S_INFO;
ctx.IssueManager.RaiseIssue(info);
diff --git a/ydb/library/yql/providers/dq/runtime/file_cache.cpp b/ydb/library/yql/providers/dq/runtime/file_cache.cpp
index 18005eb97fe..4806574716c 100644
--- a/ydb/library/yql/providers/dq/runtime/file_cache.cpp
+++ b/ydb/library/yql/providers/dq/runtime/file_cache.cpp
@@ -48,7 +48,7 @@ void TFileCache::Scan()
UsedSize += file.Size;
- YQL_LOG(DEBUG) << file.Name << "|" << file.ObjectId;
+ YQL_CLOG(DEBUG, ProviderDq) << file.Name << "|" << file.ObjectId;
allFiles.emplace_back(std::move(file));
}
@@ -84,7 +84,7 @@ void TFileCache::Clean() {
if (maybeFile != Files.end()) {
auto path = GetDir(objectId) + "/" + maybeFile->second.Name;
- YQL_LOG(DEBUG) << "Remove File " << path << " UsedSize " << ToString(UsedSize) << " FileSize " << ToString(maybeFile->second.Size);
+ YQL_CLOG(DEBUG, ProviderDq) << "Remove File " << path << " UsedSize " << ToString(UsedSize) << " FileSize " << ToString(maybeFile->second.Size);
UsedSize -= maybeFile->second.Size;
NFs::Remove(path);
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index cfbccb9a7a8..1d8fd7a28a7 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -746,7 +746,7 @@ public:
settings.AllowGeneratorsInUnboxedValues = true;
for (const auto& x: taskMeta.GetSecureParams()) {
settings.SecureParams[x.first] = x.second;
- YQL_LOG(DEBUG) << "SecureParam " << x.first << ":XXX";
+ YQL_CLOG(DEBUG, ProviderDq) << "SecureParam " << x.first << ":XXX";
}
settings.OptLLVM = DqConfiguration->OptLLVM.Get().GetOrElse("");
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index b771c1df7f9..c6d535cfcc5 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -96,8 +96,8 @@ namespace NYql::NDqs {
}
void OnPoison() {
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << __FUNCTION__ ;
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ ;
ReplyError(grpc::UNAVAILABLE, "Unexpected error");
*ClientDisconnectedCounter += 1;
}
@@ -107,7 +107,7 @@ namespace NYql::NDqs {
}
void DoBootstrap(const NActors::TActorContext& ctx) {
- YQL_LOG_CTX_SCOPE(TraceId);
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
if (!CtxSubscribed) {
auto selfId = ctx.SelfID;
auto* actorSystem = ctx.ExecutorThread.ActorSystem;
@@ -192,8 +192,8 @@ namespace NYql::NDqs {
void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) {
auto& result = ev->Get()->Record;
Y_UNUSED(ctx);
- YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size();
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size();
QueryStat.AddCounters(result);
bool retriable;
@@ -211,14 +211,14 @@ namespace NYql::NDqs {
QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0));
NYql::TIssues issues;
NYql::IssuesFromMessage(result.GetIssues(), issues);
- YQL_LOG(WARN) << RetryName << " " << Retry << " Issues: " << issues.ToString();
+ YQL_CLOG(WARN, ProviderDq) << RetryName << " " << Retry << " Issues: " << issues.ToString();
DoRetry();
} else {
auto needFallback = NCommon::NeedFallback(statusCode);
if (result.GetIssues().size() > 0) {
NYql::TIssues issues;
NYql::IssuesFromMessage(result.GetIssues(), issues);
- YQL_LOG(WARN) << "Issues: " << issues.ToString();
+ YQL_CLOG(WARN, ProviderDq) << "Issues: " << issues.ToString();
*ErrorCounter += 1;
}
if (needFallback) {
@@ -290,7 +290,7 @@ namespace NYql::NDqs {
}
void DoRetry() override {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) {
if (ev->Get()->Record.GetErrorMessage()) {
TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
@@ -326,7 +326,7 @@ namespace NYql::NDqs {
THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest;
void DoPassAway() override {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
Send(GraphExecutionEventsActorId, new TEvents::TEvPoison());
TServiceProxyActor::DoPassAway();
}
@@ -346,7 +346,7 @@ namespace NYql::NDqs {
}
void Bootstrap() override {
- YQL_LOG(DEBUG) << "TServiceProxyActor::OnExecuteGraph";
+ YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnExecuteGraph";
SendEvent(NYql::NDqProto::EGraphExecutionEventType::START, SerializeGraphDescriptor(), [this](const TEvGraphExecutionEvent::TPtr& ev) {
if (ev->Get()->Record.GetErrorMessage()) {
@@ -386,7 +386,7 @@ namespace NYql::NDqs {
}
void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) {
- YQL_LOG(DEBUG) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
MergeTaskMetas(params);
auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime));
@@ -431,7 +431,7 @@ namespace NYql::NDqs {
}
Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record));
Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) {
- YQL_LOG_CTX_SCOPE(traceId);
+ YQL_LOG_CTX_ROOT_SCOPE(traceId);
Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
callback(ev);
});
@@ -499,7 +499,7 @@ namespace NYql::NDqs {
TString message = TStringBuilder()
<< "Bad session: "
<< request->GetSession();
- YQL_LOG(DEBUG) << message;
+ YQL_CLOG(DEBUG, ProviderDq) << message;
ctx->ReplyError(grpc::INVALID_ARGUMENT, message);
return;
}
@@ -548,7 +548,7 @@ namespace NYql::NDqs {
auto* request = dynamic_cast<const Yql::DqsProto::PingSessionRequest*>(ctx->GetRequest());
Y_VERIFY(!!request);
- YQL_LOG(DEBUG) << "PingSession " << request->GetSession();
+ YQL_CLOG(TRACE, ProviderDq) << "PingSession " << request->GetSession();
Yql::DqsProto::PingSessionResponse result;
auto session = Sessions.GetSession(request->GetSession());
@@ -556,7 +556,7 @@ namespace NYql::NDqs {
TString message = TStringBuilder()
<< "Bad session: "
<< request->GetSession();
- YQL_LOG(DEBUG) << message;
+ YQL_CLOG(DEBUG, ProviderDq) << message;
ctx->ReplyError(grpc::INVALID_ARGUMENT, message);
} else {
ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
@@ -568,7 +568,7 @@ namespace NYql::NDqs {
auto* request = dynamic_cast<const Yql::DqsProto::OpenSessionRequest*>(ctx->GetRequest());
Y_VERIFY(!!request);
- YQL_LOG(DEBUG) << "OpenSession for " << request->GetSession() << " " << request->GetUsername();
+ YQL_CLOG(DEBUG, ProviderDq) << "OpenSession for " << request->GetSession() << " " << request->GetUsername();
Yql::DqsProto::OpenSessionResponse result;
if (Sessions.OpenSession(request->GetSession(), request->GetUsername())) {
@@ -605,15 +605,15 @@ namespace NYql::NDqs {
stat.Aggregate(s);
}
- YQL_LOG(DEBUG) << "SentEvents: " << stat.SentEvents;
- YQL_LOG(DEBUG) << "ReceivedEvents: " << stat.ReceivedEvents;
- YQL_LOG(DEBUG) << "NonDeliveredEvents: " << stat.NonDeliveredEvents;
- YQL_LOG(DEBUG) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation;
+ YQL_CLOG(DEBUG, ProviderDq) << "SentEvents: " << stat.SentEvents;
+ YQL_CLOG(DEBUG, ProviderDq) << "ReceivedEvents: " << stat.ReceivedEvents;
+ YQL_CLOG(DEBUG, ProviderDq) << "NonDeliveredEvents: " << stat.NonDeliveredEvents;
+ YQL_CLOG(DEBUG, ProviderDq) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation;
Sessions.PrintInfo();
for (ui32 i = 0; i < stat.ActorsAliveByActivity.size(); i=i+1) {
if (stat.ActorsAliveByActivity[i]) {
- YQL_LOG(DEBUG) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i];
+ YQL_CLOG(DEBUG, ProviderDq) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i];
}
}
@@ -624,7 +624,7 @@ namespace NYql::NDqs {
ctx->Reply(result, Ydb::StatusIds::SUCCESS);
},
[ctx] () mutable {
- YQL_LOG(DEBUG) << "ClusterStatus failed";
+ YQL_CLOG(INFO, ProviderDq) << "ClusterStatus failed";
ctx->ReplyError(grpc::UNAVAILABLE, "Error");
},
TDuration::MilliSeconds(2000));
@@ -645,7 +645,7 @@ namespace NYql::NDqs {
ctx->Reply(result, Ydb::StatusIds::SUCCESS);
},
[ctx] () mutable {
- YQL_LOG(DEBUG) << "OperationStopResponse failed";
+ YQL_CLOG(INFO, ProviderDq) << "OperationStopResponse failed";
ctx->ReplyError(grpc::UNAVAILABLE, "Error");
},
TDuration::MilliSeconds(2000));
@@ -667,7 +667,7 @@ namespace NYql::NDqs {
ctx->Reply(result, Ydb::StatusIds::SUCCESS);
},
[ctx] () mutable {
- YQL_LOG(DEBUG) << "QueryStatus failed";
+ YQL_CLOG(INFO, ProviderDq) << "QueryStatus failed";
ctx->ReplyError(grpc::UNAVAILABLE, "Error");
},
TDuration::MilliSeconds(2000));
@@ -701,7 +701,7 @@ namespace NYql::NDqs {
ctx->Reply(result, Ydb::StatusIds::SUCCESS);
},
[ctx] () mutable {
- YQL_LOG(DEBUG) << "RegisterNode failed";
+ YQL_CLOG(INFO, ProviderDq) << "RegisterNode failed";
ctx->ReplyError(grpc::UNAVAILABLE, "Error");
},
TDuration::MilliSeconds(5000));
@@ -760,7 +760,7 @@ namespace NYql::NDqs {
ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
},
[ctx] () mutable {
- YQL_LOG(DEBUG) << "IsReadyForRevision failed";
+ YQL_CLOG(INFO, ProviderDq) << "IsReadyForRevision failed";
ctx->ReplyError(grpc::UNAVAILABLE, "Error");
},
TDuration::MilliSeconds(2000));
@@ -783,7 +783,7 @@ namespace NYql::NDqs {
ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
},
[ctx] () mutable {
- YQL_LOG(DEBUG) << "Routes failed";
+ YQL_CLOG(INFO, ProviderDq) << "Routes failed";
ctx->ReplyError(grpc::UNAVAILABLE, "Error");
},
TDuration::MilliSeconds(5000));
diff --git a/ydb/library/yql/providers/dq/service/grpc_session.cpp b/ydb/library/yql/providers/dq/service/grpc_session.cpp
index 38e95f66681..2de87f9d46a 100644
--- a/ydb/library/yql/providers/dq/service/grpc_session.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_session.cpp
@@ -85,7 +85,7 @@ void TSessionStorage::Clean(TInstant before) {
it != SessionsByLastUpdate.end(); )
{
if (it->LastUpdate < before) {
- YQL_LOG(INFO) << "Drop session by timeout " << it->SessionId;
+ YQL_CLOG(INFO, ProviderDq) << "Drop session by timeout " << it->SessionId;
Sessions.erase(it->SessionId);
it = SessionsByLastUpdate.erase(it);
} else {
@@ -97,10 +97,10 @@ void TSessionStorage::Clean(TInstant before) {
}
void TSessionStorage::PrintInfo() const {
- YQL_LOG(INFO) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size();
- YQL_LOG(DEBUG) << "Sessions: " << Sessions.size();
+ YQL_CLOG(INFO, ProviderDq) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size();
+ YQL_CLOG(DEBUG, ProviderDq) << "Sessions: " << Sessions.size();
ui64 currenSessionsCounter = *SessionsCounter;
- YQL_LOG(DEBUG) << "SessionsCounter: " << currenSessionsCounter;
+ YQL_CLOG(DEBUG, ProviderDq) << "SessionsCounter: " << currenSessionsCounter;
}
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
index bef368a961f..9a0baa3f8b3 100644
--- a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
+++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
@@ -36,7 +36,7 @@ namespace NYql::NDqs {
if (message.find("ICP01 ready to work") != TString::npos) {
return;
}
- YQL_LOG(DEBUG) << message;
+ YQL_CLOG(DEBUG, ProviderDq) << message;
}
void ReopenLog() override { }
@@ -102,7 +102,7 @@ namespace NYql::NDqs {
#define SET_VALUE(name) \
if (icSettings.Has ## name()) { \
schedulerConfig.name = icSettings.Get ## name (); \
- YQL_LOG(DEBUG) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \
+ YQL_CLOG(DEBUG, ProviderDq) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \
}
SET_VALUE(ResolutionMicroseconds);
@@ -119,7 +119,7 @@ namespace NYql::NDqs {
setup->Scheduler = CreateSchedulerThread(schedulerConfig);
setup->MaxActivityType = maxActivityType;
- YQL_LOG(DEBUG) << "Initializing local services";
+ YQL_CLOG(DEBUG, ProviderDq) << "Initializing local services";
setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0));
if (IActor* schedulerActor = CreateSchedulerActor(schedulerConfig)) {
TActorId schedulerActorId = MakeSchedulerActorId();
@@ -143,7 +143,7 @@ namespace NYql::NDqs {
TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup();
THashSet<ui32> staticNodeId;
- YQL_LOG(DEBUG) << "Initializing node table";
+ YQL_CLOG(DEBUG, ProviderDq) << "Initializing node table";
nameserverTable->StaticNodeTable[nodeId] = std::make_pair(interconnectAddress, port);
setup->LocalServices.emplace_back(
@@ -164,13 +164,13 @@ namespace NYql::NDqs {
#define SET_DURATION(name) \
{ \
icCommon->Settings.name = TDuration::MilliSeconds(icSettings.Get ## name ## Ms()); \
- YQL_LOG(DEBUG) << "IC " << #name << " set to " << icCommon->Settings.name; \
+ YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \
}
#define SET_VALUE(name) \
{ \
icCommon->Settings.name = icSettings.Get ## name(); \
- YQL_LOG(DEBUG) << "IC " << #name << " set to " << icCommon->Settings.name; \
+ YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \
}
SET_DURATION(Handshake);
@@ -198,7 +198,7 @@ namespace NYql::NDqs {
ui32 maxNodeId = static_cast<ui32>(ENodeIdLimits::MaxWorkerNodeId);
- YQL_LOG(DEBUG) << "Initializing proxy actors";
+ YQL_CLOG(DEBUG, ProviderDq) << "Initializing proxy actors";
setup->Interconnect.ProxyActors.resize(maxNodeId + 1);
for (ui32 id = 1; id <= maxNodeId; ++id) {
if (nodeId != id) {
@@ -208,10 +208,10 @@ namespace NYql::NDqs {
}
// start listener
- YQL_LOG(DEBUG) << "Start listener";
+ YQL_CLOG(DEBUG, ProviderDq) << "Start listener";
{
icCommon->TechnicalSelfHostName = interconnectAddress;
- YQL_LOG(INFO) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket;
+ YQL_CLOG(INFO, ProviderDq) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket;
IActor* listener;
TMaybe<SOCKET> maybeSocket = socket < 0
? Nothing()
@@ -224,7 +224,7 @@ namespace NYql::NDqs {
TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0));
}
- YQL_LOG(DEBUG) << "Actor initialization complete";
+ YQL_CLOG(DEBUG, ProviderDq) << "Actor initialization complete";
#ifdef _unix_
signal(SIGPIPE, SIG_IGN);
diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp
index cda24ec6cf7..cdf05b6661f 100644
--- a/ydb/library/yql/providers/dq/service/service_node.cpp
+++ b/ydb/library/yql/providers/dq/service/service_node.cpp
@@ -30,7 +30,7 @@ namespace NYql {
private:
void Start() override {
- YQL_LOG(DEBUG) << "Start GRPC Listener";
+ YQL_CLOG(DEBUG, ProviderDq) << "Start GRPC Listener";
Poller.Start();
StartRead();
}
@@ -40,7 +40,7 @@ namespace NYql {
}
void StartRead() {
- YQL_LOG(TRACE) << "Read next GRPC event";
+ YQL_CLOG(TRACE, ProviderDq) << "Read next GRPC event";
Poller.StartRead(Listener, [&](const TIntrusivePtr<TSharedDescriptor>& ss) {
return Accept(ss);
});
@@ -54,14 +54,14 @@ namespace NYql {
NInterconnect::TAddress address;
r = socket->Accept(address);
if (r >= 0) {
- YQL_LOG(TRACE) << "New GRPC connection";
+ YQL_CLOG(TRACE, ProviderDq) << "New GRPC connection";
grpc::experimental::ExternalConnectionAcceptor::NewConnectionParameters params;
SetNonBlock(r, true);
params.listener_fd = -1; // static_cast<int>(Socket);
params.fd = r;
Acceptor->HandleNewConnection(&params);
} else if (-r != EAGAIN && -r != EWOULDBLOCK) {
- YQL_LOG(DEBUG) << "Unknown error code " + ToString(r);
+ YQL_CLOG(DEBUG, ProviderDq) << "Unknown error code " + ToString(r);
}
}
@@ -122,7 +122,7 @@ namespace NYql {
{ }
};
- YQL_LOG(INFO) << "Starting GRPC on " << Config.GrpcPort;
+ YQL_CLOG(INFO, ProviderDq) << "Starting GRPC on " << Config.GrpcPort;
IExternalListener::TPtr listener = nullptr;
if (Config.GrpcSocket >= 0) {
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index dd7b70fb165..7bdee232e6e 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -131,7 +131,7 @@ public:
return true;
#else
int status;
- YQL_LOG(DEBUG) << "Check Pid " << Pid;
+ YQL_CLOG(DEBUG, ProviderDq) << "Check Pid " << Pid;
return waitpid(Pid, &status, WNOHANG) <= 0;
#endif
}
@@ -256,7 +256,7 @@ struct TProcessHolder {
auto it = Processes.begin();
while (it != Processes.end()) {
if (!it->second->IsAlive()) {
- YQL_LOG(DEBUG) << "Remove dead process";
+ YQL_CLOG(DEBUG, ProviderDq) << "Remove dead process";
stopList.emplace_back(std::move(it->second));
it = Processes.erase(it);
} else {
@@ -320,11 +320,11 @@ public:
ContainerName = portoSettings.ContainerNamePrefix + "/" + ContainerName;
}
- YQL_LOG(DEBUG) << "HardLink " << ExeName << "'" << WorkDir + "/" + name << "'";
+ YQL_CLOG(DEBUG, ProviderDq) << "HardLink " << ExeName << "'" << WorkDir + "/" + name << "'";
if (NFs::HardLink(ExeName, WorkDir + "/" + name)) {
ExeName = WorkDir + "/" + name;
} else {
- YQL_LOG(DEBUG) << "HardLink Failed " << ExeName << "'" << WorkDir + "/" + name << "'";
+ YQL_CLOG(DEBUG, ProviderDq) << "HardLink Failed " << ExeName << "'" << WorkDir + "/" + name << "'";
}
}
@@ -354,13 +354,13 @@ private:
cmd3.Run().Wait();
}
} catch (...) {
- YQL_LOG(DEBUG) << "Cannot set anon_limit: " << CurrentExceptionMessage();
+ YQL_CLOG(DEBUG, ProviderDq) << "Cannot set anon_limit: " << CurrentExceptionMessage();
}
try {
TShellCommand cmd(PortoCtl, {"destroy", ContainerName});
cmd.Run().Wait();
} catch (...) {
- YQL_LOG(DEBUG) << "Cannot destroy: " << CurrentExceptionMessage();
+ YQL_CLOG(DEBUG, ProviderDq) << "Cannot destroy: " << CurrentExceptionMessage();
}
TChildProcess::Kill();
}
@@ -1226,9 +1226,9 @@ public:
while ((size = input.Read(buf, sizeof(buf))) > 0) {
auto str = TString(buf, size);
Stderr += str;
- YQL_LOG(DEBUG) << "stderr (" << StageId << " " << TraceId << " ) > `" << str << "'";
+ YQL_CLOG(DEBUG, ProviderDq) << "stderr (" << StageId << " " << TraceId << " ) > `" << str << "'";
}
- YQL_LOG(DEBUG) << "stderr (" << StageId << " " << TraceId << " ) finished";
+ YQL_CLOG(DEBUG, ProviderDq) << "stderr (" << StageId << " " << TraceId << " ) finished";
}
ui64 GetTaskId() const override {
@@ -1740,7 +1740,7 @@ private:
{
auto localPath = MakeLocalPath(name.GetPath());
NFs::MakeDirectoryRecursive(base + "/" + localPath.Parent().GetPath(), NFs::FP_NONSECRET_FILE, false);
- YQL_LOG(DEBUG) << "HardLink '" << path << "-> '" << (base + "/" + localPath.GetPath()) << "'";
+ YQL_CLOG(DEBUG, ProviderDq) << "HardLink '" << path << "-> '" << (base + "/" + localPath.GetPath()) << "'";
YQL_ENSURE(NFs::HardLink(path, base + "/" + localPath.GetPath()));
return localPath.GetPath();
}
@@ -1764,7 +1764,7 @@ private:
portoSettings.Enable = EnablePorto && conf->EnablePorto.Get().GetOrElse(TDqSettings::TDefault::EnablePorto);
portoSettings.MemoryLimit = conf->_PortoMemoryLimit.Get().GetOrElse(TDqSettings::TDefault::PortoMemoryLimit);
if (portoSettings.Enable) {
- YQL_LOG(DEBUG) << "Porto enabled";
+ YQL_CLOG(DEBUG, ProviderDq) << "Porto enabled";
}
TString exePath;
@@ -1804,9 +1804,9 @@ private:
} else {
command = MakeHolder<TChildProcess>(exePath, args, env, cacheDir + "/Slot-" + ToString(containerId));
}
- YQL_LOG(DEBUG) << "Executing " << exePath;
+ YQL_CLOG(DEBUG, ProviderDq) << "Executing " << exePath;
for (const auto& arg: args) {
- YQL_LOG(DEBUG) << "Arg: " << arg;
+ YQL_CLOG(DEBUG, ProviderDq) << "Arg: " << arg;
}
command->Run();
return command;
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp
index 634b25fb075..0a1caee54c4 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp
@@ -174,7 +174,7 @@ namespace NYql::NDqs {
}
}
for (const auto& k : toDrop) {
- YQL_LOG(DEBUG) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId);
+ YQL_CLOG(DEBUG, ProviderDq) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId);
Resources.erase(k);
}
*FileRemoveCounter += toDrop.size();
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index 2538334a295..653db074676 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -120,7 +120,7 @@ private:
void Deallocate(ui32 nodeId) {
TVector<ui64> toDeallocate;
- YQL_LOG(DEBUG) << "Deallocate " << nodeId;
+ YQL_CLOG(DEBUG, ProviderDq) << "Deallocate " << nodeId;
for (const auto& [k, v] : AllocatedWorkers) {
if (v.Sender.NodeId() == nodeId) {
toDeallocate.push_back(k);
@@ -135,7 +135,7 @@ private:
void Deallocate(const NActors::TActorId& senderId) {
TVector<ui64> toDeallocate;
- YQL_LOG(DEBUG) << "Deallocate " << senderId;
+ YQL_CLOG(DEBUG, ProviderDq) << "Deallocate " << senderId;
for (const auto& [k, v] : AllocatedWorkers) {
if (v.Sender == senderId) {
toDeallocate.push_back(k);
@@ -149,7 +149,7 @@ private:
void OnDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev)
{
- YQL_LOG(DEBUG) << "Disconnected " << ev->Get()->NodeId;
+ YQL_CLOG(DEBUG, ProviderDq) << "Disconnected " << ev->Get()->NodeId;
Unsubscribe(ev->Get()->NodeId);
Deallocate(ev->Get()->NodeId);
}
@@ -159,7 +159,7 @@ private:
Y_VERIFY(ev->Get()->Reason == TEvents::TEvUndelivered::Disconnected
|| ev->Get()->Reason == TEvents::TEvUndelivered::ReasonActorUnknown);
- YQL_LOG(DEBUG) << "Undelivered " << ev->Sender;
+ YQL_CLOG(DEBUG, ProviderDq) << "Undelivered " << ev->Sender;
switch (ev->Get()->Reason) {
case TEvents::TEvUndelivered::Disconnected:
@@ -174,13 +174,13 @@ private:
}
void OnConfigureFailureInjector(TEvConfigureFailureInjectorRequest::TPtr& ev) {
- YQL_LOG(DEBUG) << "TEvConfigureFailureInjectorRequest ";
+ YQL_CLOG(DEBUG, ProviderDq) << "TEvConfigureFailureInjectorRequest ";
auto& request = ev->Get()->Record.GetRequest();
YQL_ENSURE(request.GetNodeId() == SelfId().NodeId(), "Wrong node id!");
TFailureInjector::Set(request.GetName(), request.GetSkip(), request.GetCountOfFails());
- YQL_LOG(DEBUG) << "Failure injector is configured " << request.GetName();
+ YQL_CLOG(DEBUG, ProviderDq) << "Failure injector is configured " << request.GetName();
auto response = MakeHolder<TEvConfigureFailureInjectorResponse>();
auto* r = response->Record.MutableResponse();
@@ -207,8 +207,8 @@ private:
return;
}
- YQL_LOG_CTX_SCOPE(ev->Get()->Record.GetTraceId());
- YQL_LOG(DEBUG) << "TLocalWorkerManager::TEvAllocateWorkersRequest " << resourceId;
+ YQL_LOG_CTX_ROOT_SCOPE(ev->Get()->Record.GetTraceId());
+ YQL_CLOG(DEBUG, ProviderDq) << "TLocalWorkerManager::TEvAllocateWorkersRequest " << resourceId;
TFailureInjector::Reach("allocate_workers_failure", [] { ::_exit(1); });
auto& allocationInfo = AllocatedWorkers[resourceId];
@@ -245,7 +245,7 @@ private:
THolder<NActors::IActor> actor;
if (createComputeActor) {
- YQL_LOG(DEBUG) << "Create compute actor: " << computeActorType;
+ YQL_CLOG(DEBUG, ProviderDq) << "Create compute actor: " << computeActorType;
actor.Reset(
NYql::CreateComputeActor(
Options,
@@ -281,7 +281,7 @@ private:
void OnFreeWorkers(TEvFreeWorkersNotify::TPtr& ev) {
ui64 resourceId = ev->Get()->Record.GetResourceId();
- YQL_LOG(DEBUG) << "TEvFreeWorkersNotify " << resourceId;
+ YQL_CLOG(DEBUG, ProviderDq) << "TEvFreeWorkersNotify " << resourceId;
FreeGroup(resourceId, ev->Sender);
}
@@ -291,7 +291,7 @@ private:
}
void FreeGroup(ui64 id, NActors::TActorId sender = NActors::TActorId()) {
- YQL_LOG(DEBUG) << "Free Group " << id;
+ YQL_CLOG(DEBUG, ProviderDq) << "Free Group " << id;
auto it = AllocatedWorkers.find(id);
if (it != AllocatedWorkers.end()) {
for (const auto& actorId : it->second.WorkerActors) {
@@ -300,7 +300,7 @@ private:
if (sender && it->second.Sender != sender) {
Options.Counters.FreeGroupError->Inc();
- YQL_LOG(ERROR) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId;
+ YQL_CLOG(ERROR, ProviderDq) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId;
}
MemoryQuoter->Free(it->second.TxId, 0);
@@ -318,7 +318,7 @@ private:
}
}
for (const auto& id : todelete) {
- YQL_LOG(DEBUG) << "Free on deadline: " << id;
+ YQL_CLOG(DEBUG, ProviderDq) << "Free on deadline: " << id;
FreeGroup(id);
}
}