diff options
author | matway <matway@yandex-team.ru> | 2022-02-10 16:52:19 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:19 +0300 |
commit | 969a4575432dd5cbdcdd3be410af8ddfb9b0cc1a (patch) | |
tree | ab7fbbf3253d4c0e2793218f09378908beb025fb | |
parent | cd2afdc55d7e1c82d56a5c63453af6ebfe33101d (diff) | |
download | ydb-969a4575432dd5cbdcdd3be410af8ddfb9b0cc1a.tar.gz |
Restoring authorship annotation for <matway@yandex-team.ru>. Commit 2 of 2.
18 files changed, 390 insertions, 390 deletions
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 72b757077be..892c5cccd70 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -13,9 +13,9 @@ namespace NYql::NDqs { Record.SetNeedFallback(needFallback); } - TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) { - Record = std::move(queryResult); - } + TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) { + Record = std::move(queryResult); + } TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId) { @@ -29,8 +29,8 @@ namespace NYql::NDqs { TEvReadyState::TEvReadyState(NActors::TActorId sourceId, TString type) { NActors::ActorIdToProto(sourceId, Record.MutableSourceId()); - *Record.MutableResultType() = std::move(type); - } + *Record.MutableResultType() = std::move(type); + } TEvReadyState::TEvReadyState(NDqProto::TReadyState&& proto) { Record = std::move(proto); @@ -40,13 +40,13 @@ namespace NYql::NDqs { Record = evt; } - TEvPullDataRequest::TEvPullDataRequest(ui32 rowThreshold) { - Record.SetRowThreshold(rowThreshold); - } + TEvPullDataRequest::TEvPullDataRequest(ui32 rowThreshold) { + Record.SetRowThreshold(rowThreshold); + } - TEvPullDataResponse::TEvPullDataResponse(NYql::NDqProto::TPullResponse& data) { - Record.Swap(&data); - } + TEvPullDataResponse::TEvPullDataResponse(NYql::NDqProto::TPullResponse& data) { + Record.Swap(&data); + } TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) { Record.CopyFrom(data); diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index 97c404aae86..594921560ae 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -12,38 +12,38 @@ #include <library/cpp/actors/core/events.h> namespace NYql::NDqs { - using TDqExecuterEvents = NDq::TBaseDqExecuterEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; + using TDqExecuterEvents = NDq::TBaseDqExecuterEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; struct TEvDqTask : NActors::TEventPB<TEvDqTask, NDqProto::TDqTaskRequest, TDqExecuterEvents::ES_DQ_TASK> { - TEvDqTask() = default; - explicit TEvDqTask(NDqProto::TDqTask task); - }; + TEvDqTask() = default; + explicit TEvDqTask(NDqProto::TDqTask task); + }; struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> { TEvDqFailure() = default; explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false); }; - struct TEvQueryResponse - : NActors::TEventPB<TEvQueryResponse, NDqProto::TQueryResponse, TDqExecuterEvents::ES_RESULT_SET> { - TEvQueryResponse() = default; - explicit TEvQueryResponse(NDqProto::TQueryResponse&& queryResult); - }; + struct TEvQueryResponse + : NActors::TEventPB<TEvQueryResponse, NDqProto::TQueryResponse, TDqExecuterEvents::ES_RESULT_SET> { + TEvQueryResponse() = default; + explicit TEvQueryResponse(NDqProto::TQueryResponse&& queryResult); + }; struct TEvGraphRequest : NActors::TEventPB<TEvGraphRequest, NDqProto::TGraphRequest, TDqExecuterEvents::ES_GRAPH> { TEvGraphRequest() = default; TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId = {}); }; - struct TEvReadyState : NActors::TEventPB<TEvReadyState, NDqProto::TReadyState, TDqExecuterEvents::ES_READY_TO_PULL> { - TEvReadyState() = default; + struct TEvReadyState : NActors::TEventPB<TEvReadyState, NDqProto::TReadyState, TDqExecuterEvents::ES_READY_TO_PULL> { + TEvReadyState() = default; TEvReadyState(NActors::TActorId sourceId, TString type); explicit TEvReadyState(NDqProto::TReadyState&& proto); - }; + }; - struct TEvPullResult : NActors::TEventBase<TEvPullResult, TDqExecuterEvents::ES_PULL_RESULT> { - DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPullResult, ""); - }; + struct TEvPullResult : NActors::TEventBase<TEvPullResult, TDqExecuterEvents::ES_PULL_RESULT> { + DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPullResult, ""); + }; struct TEvGraphExecutionEvent : NActors::TEventPB<TEvGraphExecutionEvent, NYql::NDqProto::TGraphExecutionEvent, TDqExecuterEvents::ES_GRAPH_EXECUTION_EVENT> { @@ -51,19 +51,19 @@ namespace NYql::NDqs { explicit TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt); }; - using TDqDataEvents = NDq::TBaseDqDataEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; + using TDqDataEvents = NDq::TBaseDqDataEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; - struct TEvPullDataRequest - : NActors::TEventPB<TEvPullDataRequest, NYql::NDqProto::TPullRequest, TDqDataEvents::ES_PULL_REQUEST> { - TEvPullDataRequest() = default; - explicit TEvPullDataRequest(ui32 rowThreshold); - }; + struct TEvPullDataRequest + : NActors::TEventPB<TEvPullDataRequest, NYql::NDqProto::TPullRequest, TDqDataEvents::ES_PULL_REQUEST> { + TEvPullDataRequest() = default; + explicit TEvPullDataRequest(ui32 rowThreshold); + }; - struct TEvPullDataResponse - : NActors::TEventPB<TEvPullDataResponse, NYql::NDqProto::TPullResponse, TDqDataEvents::ES_PULL_RESPONSE> { - TEvPullDataResponse() = default; - explicit TEvPullDataResponse(NDqProto::TPullResponse& data); - }; + struct TEvPullDataResponse + : NActors::TEventPB<TEvPullDataResponse, NYql::NDqProto::TPullResponse, TDqDataEvents::ES_PULL_RESPONSE> { + TEvPullDataResponse() = default; + explicit TEvPullDataResponse(NDqProto::TPullResponse& data); + }; struct TEvPingRequest : NActors::TEventPB<TEvPingRequest, NYql::NDqProto::TPingRequest, TDqDataEvents::ES_PING_REQUEST> { @@ -91,4 +91,4 @@ namespace NYql::NDqs { struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> { DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, ""); }; -} +} diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index f7649de8df1..91f1caee748 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -90,7 +90,7 @@ private: Finish(/*retriable=*/ false, /*needFallback=*/ true); }) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) - }) + }) Yql::DqsProto::TWorkerFilter GetPragmaFilter() { Yql::DqsProto::TWorkerFilter pragmaFilter; @@ -304,30 +304,30 @@ private: AddCounters(ev->Get()->Record); FlushCounter("AllocateWorkers"); - auto& response = ev->Get()->Record; - switch (response.GetTResponseCase()) { - case TAllocateWorkersResponse::kWorkers: - break; - case TAllocateWorkersResponse::kError: { + auto& response = ev->Get()->Record; + switch (response.GetTResponseCase()) { + case TAllocateWorkersResponse::kWorkers: + break; + case TAllocateWorkersResponse::kError: { YQL_LOG(ERROR) << "Error on allocate workers " << ev->Get()->Record.GetError().GetMessage() << ":" << static_cast<int>(ev->Get()->Record.GetError().GetErrorCode()); Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)); Finish(/*retriable = */ true, /*fallback = */ true); - return; - } + return; + } case TAllocateWorkersResponse::kNodes: - case TAllocateWorkersResponse::TRESPONSE_NOT_SET: - YQL_ENSURE(false, "Unexpected allocate result"); + case TAllocateWorkersResponse::TRESPONSE_NOT_SET: + YQL_ENSURE(false, "Unexpected allocate result"); } - auto& workerGroup = response.GetWorkers(); + auto& workerGroup = response.GetWorkers(); ResourceId = workerGroup.GetResourceId(); YQL_LOG(DEBUG) << "Allocated resource " << ResourceId; TVector<NActors::TActorId> workers; for (const auto& actorIdProto : workerGroup.GetWorkerActor()) { workers.emplace_back(NActors::ActorIdFromProto(actorIdProto)); - } + } auto tasks = ExecutionPlanner->GetTasks(workers); @@ -354,7 +354,7 @@ private: YQL_LOG(INFO) << workers.size() << " workers allocated"; - YQL_ENSURE(workers.size() == tasks.size()); + YQL_ENSURE(workers.size() == tasks.size()); auto res = MakeHolder<TEvReadyState>(ExecutionPlanner->GetSourceID(), ExecutionPlanner->GetResultType()); @@ -376,8 +376,8 @@ private: *res->Record.AddTask() = tasks[i]; ActorIdToProto(workers[i], res->Record.AddActorId()); } - } - + } + WorkersAllocated = true; ExecutionStart = TInstant::Now(); diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index d2bd91c822b..59ae95c3af5 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -514,7 +514,7 @@ private: YQL_ENSURE(ysonNode.GetType() == NYT::TNode::EType::List); for (const auto& row : ysonNode.AsList()) { Output << NYT::NodeToYsonString(row) << "\n"; - } + } } Promise.SetValue(); diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index c7954022511..f4503a60b2e 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -84,14 +84,14 @@ public: , TaskRunnerActorFactory(taskRunnerActorFactory) , RuntimeData(runtimeData) , TraceId(traceId) - { + { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << "TDqWorker created "; if (RuntimeData) { RuntimeData->OnWorkerStart(TraceId); } - } + } ~TDqWorker() { @@ -105,9 +105,9 @@ public: void DoPassAway() override { YQL_LOG_CTX_SCOPE(TraceId); - for (const auto& inputs : InputMap) { - Send(inputs.first, new NActors::TEvents::TEvPoison()); - } + for (const auto& inputs : InputMap) { + Send(inputs.first, new NActors::TEvents::TEvPoison()); + } YQL_LOG(DEBUG) << "TDqWorker passed away "; if (Actor) { @@ -140,7 +140,7 @@ private: HFunc(TEvError, OnErrorFromPipe); HFunc(TEvContinueRun, OnContinueRun); cFunc(TEvents::TEvWakeup::EventType, OnWakeup); - }) + }) TString ParseStatus(const TString& input, bool* retriableFlag, bool* fallbackFlag) { TString result; @@ -424,7 +424,7 @@ private: Send(TaskRunnerActor, new TEvPop(outChannel.ChannelId)); } - + void OnChannelPopFinished(TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext& ctx) { try { auto outputActorId = OutChannelId2ActorId[ev->Get()->ChannelId]; @@ -522,16 +522,16 @@ private: } NActors::TActorId ResolveEndpoint(const TEndpoint& ep) { - switch (ep.GetEndpointTypeCase()) { - case TEndpoint::kActorId: + switch (ep.GetEndpointTypeCase()) { + case TEndpoint::kActorId: return NActors::ActorIdFromProto(ep.GetActorId()); case TEndpoint::kUri: Y_ENSURE(false, "kUri is not supported"); - case TEndpoint::kTabletId: - Y_ENSURE(false, "Tablets not supported by dqs"); - case TEndpoint::ENDPOINTTYPE_NOT_SET: { - Y_ENSURE(false, "Endpoint must be set"); - } break; + case TEndpoint::kTabletId: + Y_ENSURE(false, "Tablets not supported by dqs"); + case TEndpoint::ENDPOINTTYPE_NOT_SET: { + Y_ENSURE(false, "Endpoint must be set"); + } break; } } @@ -604,10 +604,10 @@ private: Stat.AddCounters2(ev->Get()->Sensors); - switch (res) { + switch (res) { case ERunStatus::Finished: { TaskFinished = true; - break; + break; } case ERunStatus::PendingInput: { auto now = TInstant::Now(); @@ -629,7 +629,7 @@ private: Stat.AddCounter("ReadTimeout", static_cast<ui64>(1)); OnError("PullTimeout " + TimeoutInfo(channel.ActorID, now, channel.RequestTime), false, true); } - } + } } for (auto& [inputIndex, source] : SourcesMap) { @@ -654,7 +654,7 @@ private: Actor->SourcePush(0, index, std::move(batch), space, finished); } - break; + break; } case ERunStatus::PendingOutput: { for (auto& [index, sink] : SinksMap) { @@ -663,9 +663,9 @@ private: Send(TaskRunnerActor, new TEvSinkPop(index, sinkActorFreeSpaceBeforeSend)); } } - break; + break; } - } + } } TString TimeoutInfo(TActorId actorID, TInstant now, TInstant startTime) { diff --git a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h index 27ad76c2d05..04a077b4a5d 100644 --- a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h +++ b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h @@ -8,4 +8,4 @@ namespace NYql::NNodes { #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.decl.inl.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.defs.inl.h> -} +} diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index 144c3d5bf66..b4bc07fbef1 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -27,77 +27,77 @@ } while (0) namespace NYql::NDqs { - using namespace NYql; - using namespace NYql::NDq; - using namespace NYql::NNodes; - - using TStatus = IGraphTransformer::TStatus; - - THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer() { - return CreateFunctorTransformer( - [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus { - YQL_ENSURE(input->GetTypeAnn() != nullptr); - YQL_ENSURE(input->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List); - if (TExprBase(input).Maybe<TDqCnUnionAll>()) { - return TStatus(TStatus::ELevel::Ok, false); - } - - output = Build<TDqCnUnionAll>(ctx, input->Pos()) // clang-format off - .Output() + using namespace NYql; + using namespace NYql::NDq; + using namespace NYql::NNodes; + + using TStatus = IGraphTransformer::TStatus; + + THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer() { + return CreateFunctorTransformer( + [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus { + YQL_ENSURE(input->GetTypeAnn() != nullptr); + YQL_ENSURE(input->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List); + if (TExprBase(input).Maybe<TDqCnUnionAll>()) { + return TStatus(TStatus::ELevel::Ok, false); + } + + output = Build<TDqCnUnionAll>(ctx, input->Pos()) // clang-format off + .Output() .Stage<TDqStage>() - .Inputs() - .Build() - .Program() - .Args({}) + .Inputs() + .Build() + .Program() + .Args({}) .Body<TCoIterator>() .List(input) - .Build() - .Build() + .Build() + .Build() .Settings(TDqStageSettings().BuildNode(ctx, input->Pos())) - .Build() - .Index() - .Build("0") - .Build() - .Done().Ptr(); // clang-format on - - return TStatus(TStatus::ELevel::Repeat, true); - }); - } - - THolder<NYql::IGraphTransformer> CreateDqsBuildTransformer() { - return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus { - TExprBase node(input); - if (node.Maybe<TDqCnResult>()) { - return TStatus::Ok; + .Build() + .Index() + .Build("0") + .Build() + .Done().Ptr(); // clang-format on + + return TStatus(TStatus::ELevel::Repeat, true); + }); + } + + THolder<NYql::IGraphTransformer> CreateDqsBuildTransformer() { + return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus { + TExprBase node(input); + if (node.Maybe<TDqCnResult>()) { + return TStatus::Ok; } - if (!node.Maybe<TDqCnUnionAll>()) { - ctx.AddError(TIssue(input->Pos(ctx), "Last connection must be union all")); - output = input; - return TStatus::Error; + if (!node.Maybe<TDqCnUnionAll>()) { + ctx.AddError(TIssue(input->Pos(ctx), "Last connection must be union all")); + output = input; + return TStatus::Error; } - output = Build<TDqCnResult>(ctx, input->Pos()) // clang-format off - .Output() + output = Build<TDqCnResult>(ctx, input->Pos()) // clang-format off + .Output() .Stage<TDqStage>() - .Inputs() - .Add(node.Cast<TDqCnUnionAll>()) - .Build() - .Program() - .Args({"row"}) + .Inputs() + .Add(node.Cast<TDqCnUnionAll>()) + .Build() + .Program() + .Args({"row"}) .Body("row") .Build() .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) .Build() - .Index() - .Build("0") + .Index() + .Build("0") .Build() .ColumnHints() // TODO: set column hints .Build() - .Done().Ptr(); // clang-format on - return TStatus(IGraphTransformer::TStatus::Repeat, true); - }); - } + .Done().Ptr(); // clang-format on + return TStatus(IGraphTransformer::TStatus::Repeat, true); + }); + } THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer() { return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { @@ -116,22 +116,22 @@ namespace NYql::NDqs { }); } - namespace NPeephole { + namespace NPeephole { class TDqsPeepholeTransformer: public TSyncTransformerBase { - public: + public: TDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx) - : TypeAnnTransformer(std::move(typeAnnTransformer)) - , TypesCtx(typesCtx) - { - } + : TypeAnnTransformer(std::move(typeAnnTransformer)) + , TypesCtx(typesCtx) + { + } - TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final { - if (Optimized) { - outputExpr = inputExpr; - return TStatus::Ok; - } + TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final { + if (Optimized) { + outputExpr = inputExpr; + return TStatus::Ok; + } auto transformer = CreateDqsRewritePhyCallablesTransformer(); auto status = InstantTransform(*transformer, inputExpr, ctx); @@ -150,20 +150,20 @@ namespace NYql::NDqs { outputExpr = inputExpr; Optimized = true; return TStatus::Ok; - } - - void Rewind() final { - Optimized = false; - } - - private: - THolder<IGraphTransformer> TypeAnnTransformer; - TTypeAnnotationContext& TypesCtx; - bool Optimized = false; - }; + } + + void Rewind() final { + Optimized = false; + } + + private: + THolder<IGraphTransformer> TypeAnnTransformer; + TTypeAnnotationContext& TypesCtx; + bool Optimized = false; + }; } - THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx) { + THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx) { return MakeHolder<NPeephole::TDqsPeepholeTransformer>(std::move(typeAnnTransformer), typesCtx); } @@ -176,5 +176,5 @@ namespace NYql::NDqs { } return status; }); - } + } } diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.h b/ydb/library/yql/providers/dq/opt/dqs_opt.h index 2337a6d2a35..6c9e1a7c18b 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.h +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.h @@ -5,12 +5,12 @@ #include <ydb/library/yql/dq/proto/dq_tasks.pb.h> namespace NYql::NDqs { - class TDatabaseManager; - - THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer(); + class TDatabaseManager; + + THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer(); THolder<IGraphTransformer> CreateDqsFinalizingOptTransformer(); - THolder<IGraphTransformer> CreateDqsBuildTransformer(); + THolder<IGraphTransformer> CreateDqsBuildTransformer(); THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer(); - THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx); + THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h index 207d7a091f0..0b62dfce4a9 100644 --- a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h +++ b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h @@ -6,24 +6,24 @@ #include <library/cpp/actors/core/actorid.h> namespace NYql::NDqs { - struct TStageInfoMeta { + struct TStageInfoMeta { NNodes::TDqPhyStage Stage; - }; + }; - struct TTaskInputMeta { - }; + struct TTaskInputMeta { + }; - struct TTaskOutputMeta {}; + struct TTaskOutputMeta {}; struct TTaskMeta { THashMap<TString, TString> TaskParams; TString ClusterNameHint; }; - using TStageInfo = NYql::NDq::TStageInfo<TStageInfoMeta>; - using TTaskOutput = NYql::NDq::TTaskOutput<TTaskOutputMeta>; - using TTaskOutputType = NYql::NDq::TTaskOutputType; - using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>; - using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; - using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; -} + using TStageInfo = NYql::NDq::TStageInfo<TStageInfoMeta>; + using TTaskOutput = NYql::NDq::TTaskOutput<TTaskOutputMeta>; + using TTaskOutputType = NYql::NDq::TTaskOutputType; + using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>; + using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; + using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; +} diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 0323e78d026..ec404c0a6dc 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -39,26 +39,26 @@ using namespace NKikimr::NMiniKQL; using namespace Yql::DqsProto; namespace NYql::NDqs { - namespace { + namespace { TVector<TDqPhyStage> GetStages(const TExprNode::TPtr& exprRoot) { TVector<TDqPhyStage> stages; - VisitExpr( + VisitExpr( exprRoot, - [](const TExprNode::TPtr& exprNode) { - const auto& node = TExprBase(exprNode); - return !node.Maybe<TCoLambda>(); - }, - [&stages](const TExprNode::TPtr& exprNode) { - const auto& node = TExprBase(exprNode); + [](const TExprNode::TPtr& exprNode) { + const auto& node = TExprBase(exprNode); + return !node.Maybe<TCoLambda>(); + }, + [&stages](const TExprNode::TPtr& exprNode) { + const auto& node = TExprBase(exprNode); if (auto maybeStage = node.Maybe<TDqPhyStage>()) { - stages.push_back(maybeStage.Cast()); - } + stages.push_back(maybeStage.Cast()); + } - return true; - }); + return true; + }); - return stages; - } + return stages; + } bool HasReadWraps(const TExprNode::TPtr& node) { bool result = false; @@ -82,19 +82,19 @@ namespace NYql::NDqs { } TDqsExecutionPlanner::TDqsExecutionPlanner(TIntrusivePtr<TTypeAnnotationContext> typeContext, - NYql::TExprContext& exprContext, + NYql::TExprContext& exprContext, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NYql::TExprNode::TPtr dqExprRoot, NActors::TActorId executerID, NActors::TActorId resultID) : TypeContext(std::move(typeContext)) - , ExprContext(exprContext) + , ExprContext(exprContext) , FunctionRegistry(functionRegistry) , DqExprRoot(std::move(dqExprRoot)) - , ExecuterID(executerID) - , ResultID(resultID) - { - } + , ExecuterID(executerID) + , ResultID(resultID) + { + } void TDqsExecutionPlanner::Clear() { TasksGraph.Clear(); @@ -138,20 +138,20 @@ namespace NYql::NDqs { YQL_LOG(DEBUG) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot); auto stages = GetStages(DqExprRoot); - YQL_ENSURE(!stages.empty()); - - for (const auto& stage : stages) { - TasksGraph.AddStageInfo(TStageInfo(stage, {stage})); - } + YQL_ENSURE(!stages.empty()); + + for (const auto& stage : stages) { + TasksGraph.AddStageInfo(TStageInfo(stage, {stage})); + } - for (const auto& stage : stages) { + for (const auto& stage : stages) { const bool hasDqSource = HasDqSource(stage); if ((hasDqSource || HasReadWraps(stage.Program().Ptr())) && BuildReadStage(settings, stage, hasDqSource, canFallback)) { YQL_LOG(DEBUG) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); - } else { + } else { YQL_LOG(DEBUG) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); - NDq::CommonBuildTasks(TasksGraph, stage); - } + NDq::CommonBuildTasks(TasksGraph, stage); + } // Sinks if (auto maybeDqSinksList = stage.Sinks()) { @@ -183,7 +183,7 @@ namespace NYql::NDqs { } } - BuildConnections(stage); + BuildConnections(stage); if (canFallback && TasksGraph.GetTasks().size() > maxTasksPerOperation) { break; @@ -210,10 +210,10 @@ namespace NYql::NDqs { output.Channels.emplace_back(channel.Id); SourceTaskID = resultTask.Id; } - + BuildCheckpointingMode(); - return TasksGraph.GetTasks().size(); + return TasksGraph.GetTasks().size(); } bool TDqsExecutionPlanner::IsEgressTask(const TDqsTasksGraph::TTaskType& task) const { @@ -314,17 +314,17 @@ namespace NYql::NDqs { } TVector<TDqTask> TDqsExecutionPlanner::GetTasks(const TVector<NActors::TActorId>& workers) { - auto& tasks = TasksGraph.GetTasks(); - YQL_ENSURE(tasks.size() == workers.size()); + auto& tasks = TasksGraph.GetTasks(); + YQL_ENSURE(tasks.size() == workers.size()); - for (size_t i = 0; i < workers.size(); i++) { - tasks[i].ComputeActorId = workers[i]; - } + for (size_t i = 0; i < workers.size(); i++) { + tasks[i].ComputeActorId = workers[i]; + } THashMap<TStageId, std::tuple<TString, ui64>> stagePrograms = BuildAllPrograms(); TVector<TDqTask> plan; THashSet<TString> clusterNameHints; - for (const auto& task : tasks) { + for (const auto& task : tasks) { if (task.Meta.ClusterNameHint) { clusterNameHints.insert(task.Meta.ClusterNameHint); } @@ -332,7 +332,7 @@ namespace NYql::NDqs { for (const auto& task : tasks) { Yql::DqsProto::TTaskMeta taskMeta; TDqTask taskDesc; - taskDesc.SetId(task.Id); + taskDesc.SetId(task.Id); NActors::ActorIdToProto(ExecuterID, taskDesc.MutableExecuter()->MutableActorId()); auto& taskParams = *taskMeta.MutableTaskParams(); for (const auto& [k, v]: task.Meta.TaskParams) { @@ -344,27 +344,27 @@ namespace NYql::NDqs { taskMeta.SetClusterNameHint(task.Meta.ClusterNameHint); } - for (auto& input : task.Inputs) { - auto& inputDesc = *taskDesc.AddInputs(); + for (auto& input : task.Inputs) { + auto& inputDesc = *taskDesc.AddInputs(); if (input.SourceSettings) { auto* sourceProto = inputDesc.MutableSource(); *sourceProto->MutableSettings() = *input.SourceSettings; sourceProto->SetType(input.SourceType); } else { FillInputDesc(inputDesc, input); - } - } + } + } - for (auto& output : task.Outputs) { - FillOutputDesc(*taskDesc.AddOutputs(), output); + for (auto& output : task.Outputs) { + FillOutputDesc(*taskDesc.AddOutputs(), output); } auto& transform = *taskDesc.MutableOutputTransform(); transform.SetType(task.OutputTransform.Type); transform.SetFunctionName(task.OutputTransform.FunctionName); - auto& program = *taskDesc.MutableProgram(); - program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); + auto& program = *taskDesc.MutableProgram(); + program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); TString programStr; ui64 stageId; std::tie(programStr, stageId) = stagePrograms[task.StageId]; @@ -372,12 +372,12 @@ namespace NYql::NDqs { taskMeta.SetStageId(stageId); taskDesc.MutableMeta()->PackFrom(taskMeta); taskDesc.SetStageId(stageId); - + plan.emplace_back(std::move(taskDesc)); } - return plan; - } + return plan; + } NActors::TActorId TDqsExecutionPlanner::GetSourceID() const { if (SourceID) { @@ -406,10 +406,10 @@ namespace NYql::NDqs { return SerializeNode(type, typeEnv); } return {}; - } + } bool TDqsExecutionPlanner::BuildReadStage(const TDqSettings::TPtr& settings, const TDqPhyStage& stage, bool dqSource, bool canFallback) { - auto& stageInfo = TasksGraph.GetStageInfo(stage); + auto& stageInfo = TasksGraph.GetStageInfo(stage); for (ui32 i = 0; i < stageInfo.InputsCount; i++) { const auto& input = stage.Inputs().Item(i); @@ -483,7 +483,7 @@ namespace NYql::NDqs { transform.Type = stageSettings.TransformType; transform.FunctionName = stageSettings.TransformName; } - } + } return !parts.empty(); } @@ -496,8 +496,8 @@ namespace NYql::NDqs { void TDqsExecutionPlanner::BuildConnections(const NNodes::TDqPhyStage& stage) { NDq::TChannelLogFunc logFunc = [](ui64, ui64, ui64, TStringBuf, bool) {}; - for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { - const auto& input = stage.Inputs().Item(inputIndex); + for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { + const auto& input = stage.Inputs().Item(inputIndex); if (input.Maybe<TDqConnection>()) { BUILD_CONNECTION(TDqCnUnionAll, BuildUnionAllChannels); BUILD_CONNECTION(TDqCnHashShuffle, BuildHashShuffleChannels); @@ -508,25 +508,25 @@ namespace NYql::NDqs { } else { YQL_ENSURE(input.Maybe<TDqSource>()); } - } + } } #undef BUILD_CONNECTION THashMap<TStageId, std::tuple<TString,ui64>> TDqsExecutionPlanner::BuildAllPrograms() { - using namespace NKikimr::NMiniKQL; + using namespace NKikimr::NMiniKQL; THashMap<TStageId, std::tuple<TString,ui64>> result; - TScopedAlloc alloc(NKikimr::TAlignedPagePoolCounters(), FunctionRegistry->SupportsSizedAllocators()); - TTypeEnvironment typeEnv(alloc); + TScopedAlloc alloc(NKikimr::TAlignedPagePoolCounters(), FunctionRegistry->SupportsSizedAllocators()); + TTypeEnvironment typeEnv(alloc); TVector<NNodes::TExprBase> fakeReads; - NCommon::TMkqlCommonCallableCompiler compiler; + NCommon::TMkqlCommonCallableCompiler compiler; RegisterDqsMkqlCompilers(compiler, *TypeContext); - for (const auto& stageInfo : TasksGraph.GetStagesInfo()) { - const auto& stage = stageInfo.second.Meta.Stage; - auto paramsType = NDq::CollectParameters(stage.Program(), ExprContext); - YQL_ENSURE(paramsType->GetItems().empty()); // TODO support parameters + for (const auto& stageInfo : TasksGraph.GetStagesInfo()) { + const auto& stage = stageInfo.second.Meta.Stage; + auto paramsType = NDq::CollectParameters(stage.Program(), ExprContext); + YQL_ENSURE(paramsType->GetItems().empty()); // TODO support parameters auto settings = NDq::TDqStageSettings::Parse(stage); ui64 stageId = stage.Ref().UniqueId(); @@ -555,32 +555,32 @@ namespace NYql::NDqs { stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry, ExprContext, fakeReads), stageId); - } - - return result; + } + + return result; } - void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel) { - channelDesc.SetId(channel.Id); - channelDesc.SetSrcTaskId(channel.SrcTask); - channelDesc.SetDstTaskId(channel.DstTask); + void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel) { + channelDesc.SetId(channel.Id); + channelDesc.SetSrcTaskId(channel.SrcTask); + channelDesc.SetDstTaskId(channel.DstTask); channelDesc.SetCheckpointingMode(channel.CheckpointingMode); - if (channel.SrcTask) { + if (channel.SrcTask) { NActors::ActorIdToProto(TasksGraph.GetTask(channel.SrcTask).ComputeActorId, - channelDesc.MutableSrcEndpoint()->MutableActorId()); - } + channelDesc.MutableSrcEndpoint()->MutableActorId()); + } - if (channel.DstTask) { + if (channel.DstTask) { NActors::ActorIdToProto(TasksGraph.GetTask(channel.DstTask).ComputeActorId, - channelDesc.MutableDstEndpoint()->MutableActorId()); - } else { - auto& stageInfo = TasksGraph.GetStageInfo(TasksGraph.GetTask(channel.SrcTask).StageId); - YQL_ENSURE(stageInfo.Tasks.size() == 1); - YQL_ENSURE(!SourceID); + channelDesc.MutableDstEndpoint()->MutableActorId()); + } else { + auto& stageInfo = TasksGraph.GetStageInfo(TasksGraph.GetTask(channel.SrcTask).StageId); + YQL_ENSURE(stageInfo.Tasks.size() == 1); + YQL_ENSURE(!SourceID); ActorIdToProto(ResultID, channelDesc.MutableDstEndpoint()->MutableActorId()); - SourceID = TasksGraph.GetTask(channel.SrcTask).ComputeActorId; - } + SourceID = TasksGraph.GetTask(channel.SrcTask).ComputeActorId; + } } void TDqsExecutionPlanner::FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input) { @@ -609,28 +609,28 @@ namespace NYql::NDqs { } } - void TDqsExecutionPlanner::FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) { - switch (output.Type) { - case TTaskOutputType::Map: - YQL_ENSURE(output.Channels.size() == 1); + void TDqsExecutionPlanner::FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) { + switch (output.Type) { + case TTaskOutputType::Map: + YQL_ENSURE(output.Channels.size() == 1); outputDesc.MutableMap(); //->SetChannelId(output.Channels[0]); - break; + break; - case TTaskOutputType::HashPartition: { + case TTaskOutputType::HashPartition: { YQL_ENSURE(output.Channels.size() == output.PartitionsCount); - auto& hashPartitionDesc = *outputDesc.MutableHashPartition(); - for (auto& column : output.KeyColumns) { - hashPartitionDesc.AddKeyColumns(column); - } - hashPartitionDesc.SetPartitionsCount(output.PartitionsCount); - break; - } - - case TTaskOutputType::Broadcast: { + auto& hashPartitionDesc = *outputDesc.MutableHashPartition(); + for (auto& column : output.KeyColumns) { + hashPartitionDesc.AddKeyColumns(column); + } + hashPartitionDesc.SetPartitionsCount(output.PartitionsCount); + break; + } + + case TTaskOutputType::Broadcast: { //for (const auto channel : output.Channels) { outputDesc.MutableBroadcast(); //->AddChannelIds(channel); //} - break; + break; } case TTaskOutputType::Sink: { @@ -643,14 +643,14 @@ namespace NYql::NDqs { break; } - case TTaskOutputType::Undefined: { + case TTaskOutputType::Undefined: { YQL_ENSURE(false, "Unexpected task output type `TTaskOutputType::Undefined`"); } } - for (auto& channel : output.Channels) { - auto& channelDesc = *outputDesc.AddChannels(); - FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel)); + for (auto& channel : output.Channels) { + auto& channelDesc = *outputDesc.AddChannels(); + FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel)); } } diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h index 537c878e2ce..280600c8f7b 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.h +++ b/ydb/library/yql/providers/dq/planner/execution_planner.h @@ -15,7 +15,7 @@ namespace NYql::NDqs { class IDqsExecutionPlanner { - public: + public: virtual ~IDqsExecutionPlanner() = default; virtual TVector<NDqProto::TDqTask> GetTasks(const TVector<NActors::TActorId>& workers) = 0; virtual TVector<NDqProto::TDqTask>& GetTasks() = 0; @@ -26,7 +26,7 @@ namespace NYql::NDqs { class TDqsExecutionPlanner: public IDqsExecutionPlanner { public: explicit TDqsExecutionPlanner(TIntrusivePtr<TTypeAnnotationContext> typeContext, - NYql::TExprContext& exprContext, + NYql::TExprContext& exprContext, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NYql::TExprNode::TPtr dqExprRoot, NActors::TActorId executerID = NActors::TActorId(), @@ -49,35 +49,35 @@ namespace NYql::NDqs { PublicIds = publicIds; } - private: + private: bool BuildReadStage(const TDqSettings::TPtr& settings, const NNodes::TDqPhyStage& stage, bool dqSource, bool canFallback); void BuildConnections(const NNodes::TDqPhyStage& stage); THashMap<NDq::TStageId, std::tuple<TString,ui64>> BuildAllPrograms(); - void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel); + void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel); void FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input); - void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); + void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); void GatherPhyMapping(THashMap<std::tuple<TString, TString>, TString>& clusters, THashMap<std::tuple<TString, TString, TString>, TString>& tables); void BuildCheckpointingMode(); bool IsEgressTask(const TDqsTasksGraph::TTaskType& task) const; - private: - TIntrusivePtr<TTypeAnnotationContext> TypeContext; - NYql::TExprContext& ExprContext; + private: + TIntrusivePtr<TTypeAnnotationContext> TypeContext; + NYql::TExprContext& ExprContext; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; NYql::TExprNode::TPtr DqExprRoot; - TVector<const TTypeAnnotationNode*> InputType; + TVector<const TTypeAnnotationNode*> InputType; NActors::TActorId ExecuterID; NActors::TActorId ResultID; TMaybe<NActors::TActorId> SourceID = {}; - ui64 SourceTaskID = 0; + ui64 SourceTaskID = 0; ui64 _MaxDataSizePerJob = 0; - TDqsTasksGraph TasksGraph; + TDqsTasksGraph TasksGraph; TVector<NDqProto::TDqTask> Tasks; THashMap<ui64, ui32> PublicIds; - }; + }; // Execution planner for TRuntimeNode class TDqsSingleExecutionPlanner: public IDqsExecutionPlanner { @@ -132,4 +132,4 @@ namespace NYql::NDqs { TMaybe<NActors::TActorId> SourceID = {}; }; -} +} diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 01017efd449..65a9e500d9d 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -33,31 +33,31 @@ #include <util/system/env.h> namespace NYql::NDqs { - using namespace NYql::NDqs; - using namespace NKikimr; + using namespace NYql::NDqs; + using namespace NKikimr; using namespace NThreading; using namespace NMonitoring; using namespace NActors; - namespace { + namespace { NGrpc::ICounterBlockPtr BuildCB(TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, const TString& name) { - auto grpcCB = counters->GetSubgroup("rpc_name", name); + auto grpcCB = counters->GetSubgroup("rpc_name", name); return MakeIntrusive<NGrpc::TCounterBlock>( - grpcCB->GetCounter("total", true), - grpcCB->GetCounter("infly", true), - grpcCB->GetCounter("notOkReq", true), - grpcCB->GetCounter("notOkResp", true), - grpcCB->GetCounter("reqBytes", true), - grpcCB->GetCounter("inflyReqBytes", true), - grpcCB->GetCounter("resBytes", true), - grpcCB->GetCounter("notAuth", true), - grpcCB->GetCounter("resExh", true), - grpcCB); - } + grpcCB->GetCounter("total", true), + grpcCB->GetCounter("infly", true), + grpcCB->GetCounter("notOkReq", true), + grpcCB->GetCounter("notOkResp", true), + grpcCB->GetCounter("reqBytes", true), + grpcCB->GetCounter("inflyReqBytes", true), + grpcCB->GetCounter("resBytes", true), + grpcCB->GetCounter("notAuth", true), + grpcCB->GetCounter("resExh", true), + grpcCB); + } template<typename RequestType, typename ResponseType> class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> { - public: + public: static constexpr char ActorName[] = "SERVICE_PROXY"; explicit TServiceProxyActor( @@ -76,23 +76,23 @@ namespace NYql::NDqs { , TraceId(traceId) , Username(username) , Promise(NewPromise<void>()) - { + { Settings->Dispatch(Request->GetSettings()); Settings->FreezeDefaults(); MaxRetries = Settings->MaxRetries.Get().GetOrElse(MaxRetries); RetryBackoff = TDuration::MilliSeconds(Settings->RetryBackoffMs.Get().GetOrElse(RetryBackoff.MilliSeconds())); - } + } - STRICT_STFUNC(Handler, { - HFunc(TEvQueryResponse, OnReturnResult); + STRICT_STFUNC(Handler, { + HFunc(TEvQueryResponse, OnReturnResult); cFunc(TEvents::TEvPoison::EventType, OnPoison); SFunc(TEvents::TEvBootstrap, DoBootstrap) - }) + }) TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { return new IEventHandle(self, parentId, new TEvents::TEvBootstrap(), 0); - } + } void OnPoison() { YQL_LOG_CTX_SCOPE(TraceId); @@ -126,8 +126,8 @@ namespace NYql::NDqs { void SendResponse(TEvQueryResponse::TPtr& ev) { auto& result = ev->Get()->Record; - Yql::DqsProto::ExecuteQueryResult queryResult; - queryResult.Mutableresult()->CopyFrom(result.resultset()); + Yql::DqsProto::ExecuteQueryResult queryResult; + queryResult.Mutableresult()->CopyFrom(result.resultset()); queryResult.set_yson(result.yson()); if (result.GetNeedFallback()) { @@ -160,9 +160,9 @@ namespace NYql::NDqs { aggregatedQueryStat.FlushCounters(ResponseBuffer); auto& operation = *ResponseBuffer.mutable_operation(); - operation.Setready(true); - operation.Mutableresult()->PackFrom(queryResult); - *operation.Mutableissues() = result.GetIssues(); + operation.Setready(true); + operation.Mutableresult()->PackFrom(queryResult); + *operation.Mutableissues() = result.GetIssues(); ResponseBuffer.SetTruncated(result.GetTruncated()); Reply(Ydb::StatusIds::SUCCESS, result.GetIssues().size() > 0); @@ -201,7 +201,7 @@ namespace NYql::NDqs { } SendResponse(ev); } - } + } TFuture<void> GetFuture() { return Promise.GetFuture(); @@ -249,7 +249,7 @@ namespace NYql::NDqs { void RestoreRequest() { Request = dynamic_cast<const RequestType*>(Ctx->GetRequest()); } - }; + }; class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> { public: @@ -414,25 +414,25 @@ namespace NYql::NDqs { NActors::TActorId GraphExecutionEventsActorId; }; - TString GetVersionString() { - TStringBuilder sb; - sb << GetProgramSvnVersion() << "\n"; - sb << GetBuildInfo(); - TString sandboxTaskId = GetSandboxTaskId(); - if (sandboxTaskId != TString("0")) { - sb << "\nSandbox task id: " << sandboxTaskId; - } - - return sb; - } + TString GetVersionString() { + TStringBuilder sb; + sb << GetProgramSvnVersion() << "\n"; + sb << GetBuildInfo(); + TString sandboxTaskId = GetSandboxTaskId(); + if (sandboxTaskId != TString("0")) { + sb << "\nSandbox task id: " << sandboxTaskId; + } + + return sb; + } } TDqsGrpcService::TDqsGrpcService( NActors::TActorSystem& system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) - : ActorSystem(system) - , Counters(std::move(counters)) + : ActorSystem(system) + , Counters(std::move(counters)) , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories) , Promise(NewPromise<void>()) , RunningRequests(0) @@ -455,9 +455,9 @@ namespace NYql::NDqs { } while (0) void TDqsGrpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { - using namespace google::protobuf; + using namespace google::protobuf; - CQ = cq; + CQ = cq; using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>; @@ -500,12 +500,12 @@ namespace NYql::NDqs { session->AddRequest(actorId); }); - ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, { - Y_UNUSED(this); - Yql::DqsProto::SvnRevisionResponse result; - result.SetRevision(GetVersionString()); + ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, { + Y_UNUSED(this); + Yql::DqsProto::SvnRevisionResponse result; + result.SetRevision(GetVersionString()); ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - }); + }); ADD_REQUEST(CloseSession, CloseSessionRequest, CloseSessionResponse, { Y_UNUSED(this); @@ -797,20 +797,20 @@ namespace NYql::NDqs { ctx->Reply(result, Ydb::StatusIds::SUCCESS); }); */ - } + } void TDqsGrpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; - } + Limiter = limiter; + } - bool TDqsGrpcService::IncRequest() { - return Limiter->Inc(); - } + bool TDqsGrpcService::IncRequest() { + return Limiter->Inc(); + } - void TDqsGrpcService::DecRequest() { - Limiter->Dec(); - Y_ASSERT(Limiter->GetCurrentInFlight() >= 0); - } + void TDqsGrpcService::DecRequest() { + Limiter->Dec(); + Y_ASSERT(Limiter->GetCurrentInFlight() >= 0); + } TFuture<void> TDqsGrpcService::Stop() { TGuard<TMutex> lock(Mutex); diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h index 9210e3fdf86..fa2a835c8c6 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.h +++ b/ydb/library/yql/providers/dq/service/grpc_service.h @@ -19,10 +19,10 @@ #include "grpc_session.h" namespace NYql::NDqs { - class TDatabaseManager; + class TDatabaseManager; class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> { - public: + public: TDqsGrpcService(NActors::TActorSystem& system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); @@ -30,23 +30,23 @@ namespace NYql::NDqs { void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - bool IncRequest(); - void DecRequest(); + bool IncRequest(); + void DecRequest(); NThreading::TFuture<void> Stop(); - private: - NActors::TActorSystem& ActorSystem; - grpc::ServerCompletionQueue* CQ = nullptr; + private: + NActors::TActorSystem& ActorSystem; + grpc::ServerCompletionQueue* CQ = nullptr; NGrpc::TGlobalLimiter* Limiter = nullptr; - TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories; TMutex Mutex; - NThreading::TPromise<void> Promise; + NThreading::TPromise<void> Promise; std::atomic<ui64> RunningRequests; std::atomic<bool> Stopping; TSessionStorage Sessions; - }; -} + }; +} diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp index 50ec2d66c75..bef368a961f 100644 --- a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp +++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp @@ -26,7 +26,7 @@ #include <util/system/env.h> namespace NYql::NDqs { - using namespace NActors; + using namespace NActors; using namespace NActors::NDnsResolver; using namespace NGrpc; @@ -204,7 +204,7 @@ namespace NYql::NDqs { if (nodeId != id) { IActor* actor = new TInterconnectProxyTCP(id, icCommon); setup->Interconnect.ProxyActors[id] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0); - } + } } // start listener @@ -226,9 +226,9 @@ namespace NYql::NDqs { YQL_LOG(DEBUG) << "Actor initialization complete"; -#ifdef _unix_ +#ifdef _unix_ signal(SIGPIPE, SIG_IGN); -#endif +#endif return std::make_tuple(std::move(setup), logSettings); } diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.h b/ydb/library/yql/providers/dq/service/interconnect_helpers.h index 831445476bc..9009c8c179b 100644 --- a/ydb/library/yql/providers/dq/service/interconnect_helpers.h +++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.h @@ -30,7 +30,7 @@ struct TServiceNodeConfig { NYql::NProto::TDqConfig::TICSettings ICSettings = NYql::NProto::TDqConfig::TICSettings(); TNameserverFactory NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { return CreateNameserverTable(setup); - }; + }; }; std::tuple<TString, TString> GetLocalAddress(const TString* hostname = nullptr); diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp index c563fdb1f16..9044dc47e30 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp @@ -11,17 +11,17 @@ namespace NYql::NDqs { const TString& user, const TMaybe<ui64>& globalResourceId) { - Record.SetCount(count); + Record.SetCount(count); Record.SetUser(user); if (globalResourceId) { Record.SetResourceId(*globalResourceId); } Record.SetIsForwarded(false); - } + } - TEvAllocateWorkersResponse::TEvAllocateWorkersResponse() { + TEvAllocateWorkersResponse::TEvAllocateWorkersResponse() { Record.MutableError()->SetMessage("Unknown error"); - } + } TEvAllocateWorkersResponse::TEvAllocateWorkersResponse(const TString& error, NYql::NDqProto::EErrorCode code) { Record.MutableError()->SetMessage(error); @@ -29,12 +29,12 @@ namespace NYql::NDqs { } TEvAllocateWorkersResponse::TEvAllocateWorkersResponse(ui64 resourceId, const TVector<NActors::TActorId>& ids) { - auto& group = *Record.MutableWorkers(); + auto& group = *Record.MutableWorkers(); group.SetResourceId(resourceId); - for (const auto& actorId : ids) { + for (const auto& actorId : ids) { NActors::ActorIdToProto(actorId, group.AddWorkerActor()); - } - } + } + } TEvAllocateWorkersResponse::TEvAllocateWorkersResponse(ui64 resourceId, const TVector<ui32>& nodes) { auto& group = *Record.MutableNodes(); diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.h b/ydb/library/yql/providers/dq/worker_manager/interface/events.h index 595f2abe773..a2d0c9f5409 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/events.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.h @@ -17,31 +17,31 @@ namespace NYql::NDqs { using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; - struct TEvAllocateWorkersRequest : NActors::TEventPB<TEvAllocateWorkersRequest, NYql::NDqProto::TAllocateWorkersRequest, - TDqResManEvents::ES_ALLOCATE_WORKERS_REQUEST> { - TEvAllocateWorkersRequest() = default; + struct TEvAllocateWorkersRequest : NActors::TEventPB<TEvAllocateWorkersRequest, NYql::NDqProto::TAllocateWorkersRequest, + TDqResManEvents::ES_ALLOCATE_WORKERS_REQUEST> { + TEvAllocateWorkersRequest() = default; explicit TEvAllocateWorkersRequest( ui32 count, const TString& user, const TMaybe<ui64>& globalResourceId = TMaybe<ui64>()); - }; + }; - struct TEvAllocateWorkersResponse - : NActors::TEventPB<TEvAllocateWorkersResponse, NYql::NDqProto::TAllocateWorkersResponse, - TDqResManEvents::ES_ALLOCATE_WORKERS_RESPONSE> { - TEvAllocateWorkersResponse(); + struct TEvAllocateWorkersResponse + : NActors::TEventPB<TEvAllocateWorkersResponse, NYql::NDqProto::TAllocateWorkersResponse, + TDqResManEvents::ES_ALLOCATE_WORKERS_RESPONSE> { + TEvAllocateWorkersResponse(); explicit TEvAllocateWorkersResponse(const TString& error, NYql::NDqProto::EErrorCode code = NYql::NDqProto::EUNKNOWN); explicit TEvAllocateWorkersResponse(ui64 resourceId, const TVector<NActors::TActorId>& ids); explicit TEvAllocateWorkersResponse(ui64 resourceId, const TVector<TWorkerInfo::TPtr>& workerInfos); explicit TEvAllocateWorkersResponse(ui64 resourceId, const TVector<ui32>& nodes); - }; + }; - struct TEvFreeWorkersNotify : NActors::TEventPB<TEvFreeWorkersNotify, NYql::NDqProto::TFreeWorkersNotify, - TDqResManEvents::ES_FREE_WORKERS_NOTIFICATION> { - TEvFreeWorkersNotify() = default; + struct TEvFreeWorkersNotify : NActors::TEventPB<TEvFreeWorkersNotify, NYql::NDqProto::TFreeWorkersNotify, + TDqResManEvents::ES_FREE_WORKERS_NOTIFICATION> { + TEvFreeWorkersNotify() = default; explicit TEvFreeWorkersNotify(ui64 id); - }; + }; struct TEvRegisterNode : NActors::TEventPB<TEvRegisterNode, NYql::NDqProto::TEvRegisterNode, TDqResManEvents::ES_REGISTER_NODE> { @@ -153,8 +153,8 @@ using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace:: }; inline NActors::TActorId MakeWorkerManagerActorID(ui32 nodeId) { - char x[12] = {'r', 'e', 's', 'm', 'a', 'n'}; - memcpy(x + 7, &nodeId, sizeof(ui32)); + char x[12] = {'r', 'e', 's', 'm', 'a', 'n'}; + memcpy(x + 7, &nodeId, sizeof(ui32)); return NActors::TActorId(nodeId, TStringBuf(x, 12)); - } + } } diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index e8dd40f5316..5ecf0a4a852 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -111,9 +111,9 @@ private: void DoPassAway() override { for (const auto& [resourceId, _] : AllocatedWorkers) { FreeGroup(resourceId); - } + } - AllocatedWorkers.clear(); + AllocatedWorkers.clear(); _exit(0); } @@ -215,7 +215,7 @@ private: auto traceId = ev->Get()->Record.GetTraceId(); allocationInfo.TxId = traceId; - auto count = ev->Get()->Record.GetCount(); + auto count = ev->Get()->Record.GetCount(); Y_VERIFY(count > 0); @@ -270,8 +270,8 @@ private: } Options.Counters.ActiveWorkers->Add(count); - } - + } + Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>(resourceId, allocationInfo.WorkerActors), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, |