aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormatway <matway@yandex-team.ru>2022-02-10 16:52:19 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:19 +0300
commit969a4575432dd5cbdcdd3be410af8ddfb9b0cc1a (patch)
treeab7fbbf3253d4c0e2793218f09378908beb025fb
parentcd2afdc55d7e1c82d56a5c63453af6ebfe33101d (diff)
downloadydb-969a4575432dd5cbdcdd3be410af8ddfb9b0cc1a.tar.gz
Restoring authorship annotation for <matway@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp22
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h54
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp30
-rw-r--r--ydb/library/yql/providers/dq/actors/result_aggregator.cpp2
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp40
-rw-r--r--ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h2
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.cpp158
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.h10
-rw-r--r--ydb/library/yql/providers/dq/planner/dqs_task_graph.h24
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp204
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.h26
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp118
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.h22
-rw-r--r--ydb/library/yql/providers/dq/service/interconnect_helpers.cpp8
-rw-r--r--ydb/library/yql/providers/dq/service/interconnect_helpers.h2
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/events.cpp16
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/events.h32
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp10
18 files changed, 390 insertions, 390 deletions
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index 72b757077be..892c5cccd70 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -13,9 +13,9 @@ namespace NYql::NDqs {
Record.SetNeedFallback(needFallback);
}
- TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) {
- Record = std::move(queryResult);
- }
+ TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) {
+ Record = std::move(queryResult);
+ }
TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId)
{
@@ -29,8 +29,8 @@ namespace NYql::NDqs {
TEvReadyState::TEvReadyState(NActors::TActorId sourceId, TString type) {
NActors::ActorIdToProto(sourceId, Record.MutableSourceId());
- *Record.MutableResultType() = std::move(type);
- }
+ *Record.MutableResultType() = std::move(type);
+ }
TEvReadyState::TEvReadyState(NDqProto::TReadyState&& proto) {
Record = std::move(proto);
@@ -40,13 +40,13 @@ namespace NYql::NDqs {
Record = evt;
}
- TEvPullDataRequest::TEvPullDataRequest(ui32 rowThreshold) {
- Record.SetRowThreshold(rowThreshold);
- }
+ TEvPullDataRequest::TEvPullDataRequest(ui32 rowThreshold) {
+ Record.SetRowThreshold(rowThreshold);
+ }
- TEvPullDataResponse::TEvPullDataResponse(NYql::NDqProto::TPullResponse& data) {
- Record.Swap(&data);
- }
+ TEvPullDataResponse::TEvPullDataResponse(NYql::NDqProto::TPullResponse& data) {
+ Record.Swap(&data);
+ }
TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) {
Record.CopyFrom(data);
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index 97c404aae86..594921560ae 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -12,38 +12,38 @@
#include <library/cpp/actors/core/events.h>
namespace NYql::NDqs {
- using TDqExecuterEvents = NDq::TBaseDqExecuterEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
+ using TDqExecuterEvents = NDq::TBaseDqExecuterEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
struct TEvDqTask : NActors::TEventPB<TEvDqTask, NDqProto::TDqTaskRequest, TDqExecuterEvents::ES_DQ_TASK> {
- TEvDqTask() = default;
- explicit TEvDqTask(NDqProto::TDqTask task);
- };
+ TEvDqTask() = default;
+ explicit TEvDqTask(NDqProto::TDqTask task);
+ };
struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> {
TEvDqFailure() = default;
explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false);
};
- struct TEvQueryResponse
- : NActors::TEventPB<TEvQueryResponse, NDqProto::TQueryResponse, TDqExecuterEvents::ES_RESULT_SET> {
- TEvQueryResponse() = default;
- explicit TEvQueryResponse(NDqProto::TQueryResponse&& queryResult);
- };
+ struct TEvQueryResponse
+ : NActors::TEventPB<TEvQueryResponse, NDqProto::TQueryResponse, TDqExecuterEvents::ES_RESULT_SET> {
+ TEvQueryResponse() = default;
+ explicit TEvQueryResponse(NDqProto::TQueryResponse&& queryResult);
+ };
struct TEvGraphRequest : NActors::TEventPB<TEvGraphRequest, NDqProto::TGraphRequest, TDqExecuterEvents::ES_GRAPH> {
TEvGraphRequest() = default;
TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId = {});
};
- struct TEvReadyState : NActors::TEventPB<TEvReadyState, NDqProto::TReadyState, TDqExecuterEvents::ES_READY_TO_PULL> {
- TEvReadyState() = default;
+ struct TEvReadyState : NActors::TEventPB<TEvReadyState, NDqProto::TReadyState, TDqExecuterEvents::ES_READY_TO_PULL> {
+ TEvReadyState() = default;
TEvReadyState(NActors::TActorId sourceId, TString type);
explicit TEvReadyState(NDqProto::TReadyState&& proto);
- };
+ };
- struct TEvPullResult : NActors::TEventBase<TEvPullResult, TDqExecuterEvents::ES_PULL_RESULT> {
- DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPullResult, "");
- };
+ struct TEvPullResult : NActors::TEventBase<TEvPullResult, TDqExecuterEvents::ES_PULL_RESULT> {
+ DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPullResult, "");
+ };
struct TEvGraphExecutionEvent
: NActors::TEventPB<TEvGraphExecutionEvent, NYql::NDqProto::TGraphExecutionEvent, TDqExecuterEvents::ES_GRAPH_EXECUTION_EVENT> {
@@ -51,19 +51,19 @@ namespace NYql::NDqs {
explicit TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt);
};
- using TDqDataEvents = NDq::TBaseDqDataEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
+ using TDqDataEvents = NDq::TBaseDqDataEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
- struct TEvPullDataRequest
- : NActors::TEventPB<TEvPullDataRequest, NYql::NDqProto::TPullRequest, TDqDataEvents::ES_PULL_REQUEST> {
- TEvPullDataRequest() = default;
- explicit TEvPullDataRequest(ui32 rowThreshold);
- };
+ struct TEvPullDataRequest
+ : NActors::TEventPB<TEvPullDataRequest, NYql::NDqProto::TPullRequest, TDqDataEvents::ES_PULL_REQUEST> {
+ TEvPullDataRequest() = default;
+ explicit TEvPullDataRequest(ui32 rowThreshold);
+ };
- struct TEvPullDataResponse
- : NActors::TEventPB<TEvPullDataResponse, NYql::NDqProto::TPullResponse, TDqDataEvents::ES_PULL_RESPONSE> {
- TEvPullDataResponse() = default;
- explicit TEvPullDataResponse(NDqProto::TPullResponse& data);
- };
+ struct TEvPullDataResponse
+ : NActors::TEventPB<TEvPullDataResponse, NYql::NDqProto::TPullResponse, TDqDataEvents::ES_PULL_RESPONSE> {
+ TEvPullDataResponse() = default;
+ explicit TEvPullDataResponse(NDqProto::TPullResponse& data);
+ };
struct TEvPingRequest
: NActors::TEventPB<TEvPingRequest, NYql::NDqProto::TPingRequest, TDqDataEvents::ES_PING_REQUEST> {
@@ -91,4 +91,4 @@ namespace NYql::NDqs {
struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> {
DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, "");
};
-}
+}
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index f7649de8df1..91f1caee748 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -90,7 +90,7 @@ private:
Finish(/*retriable=*/ false, /*needFallback=*/ true);
})
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
- })
+ })
Yql::DqsProto::TWorkerFilter GetPragmaFilter() {
Yql::DqsProto::TWorkerFilter pragmaFilter;
@@ -304,30 +304,30 @@ private:
AddCounters(ev->Get()->Record);
FlushCounter("AllocateWorkers");
- auto& response = ev->Get()->Record;
- switch (response.GetTResponseCase()) {
- case TAllocateWorkersResponse::kWorkers:
- break;
- case TAllocateWorkersResponse::kError: {
+ auto& response = ev->Get()->Record;
+ switch (response.GetTResponseCase()) {
+ case TAllocateWorkersResponse::kWorkers:
+ break;
+ case TAllocateWorkersResponse::kError: {
YQL_LOG(ERROR) << "Error on allocate workers "
<< ev->Get()->Record.GetError().GetMessage() << ":"
<< static_cast<int>(ev->Get()->Record.GetError().GetErrorCode());
Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR));
Finish(/*retriable = */ true, /*fallback = */ true);
- return;
- }
+ return;
+ }
case TAllocateWorkersResponse::kNodes:
- case TAllocateWorkersResponse::TRESPONSE_NOT_SET:
- YQL_ENSURE(false, "Unexpected allocate result");
+ case TAllocateWorkersResponse::TRESPONSE_NOT_SET:
+ YQL_ENSURE(false, "Unexpected allocate result");
}
- auto& workerGroup = response.GetWorkers();
+ auto& workerGroup = response.GetWorkers();
ResourceId = workerGroup.GetResourceId();
YQL_LOG(DEBUG) << "Allocated resource " << ResourceId;
TVector<NActors::TActorId> workers;
for (const auto& actorIdProto : workerGroup.GetWorkerActor()) {
workers.emplace_back(NActors::ActorIdFromProto(actorIdProto));
- }
+ }
auto tasks = ExecutionPlanner->GetTasks(workers);
@@ -354,7 +354,7 @@ private:
YQL_LOG(INFO) << workers.size() << " workers allocated";
- YQL_ENSURE(workers.size() == tasks.size());
+ YQL_ENSURE(workers.size() == tasks.size());
auto res = MakeHolder<TEvReadyState>(ExecutionPlanner->GetSourceID(), ExecutionPlanner->GetResultType());
@@ -376,8 +376,8 @@ private:
*res->Record.AddTask() = tasks[i];
ActorIdToProto(workers[i], res->Record.AddActorId());
}
- }
-
+ }
+
WorkersAllocated = true;
ExecutionStart = TInstant::Now();
diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
index d2bd91c822b..59ae95c3af5 100644
--- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
@@ -514,7 +514,7 @@ private:
YQL_ENSURE(ysonNode.GetType() == NYT::TNode::EType::List);
for (const auto& row : ysonNode.AsList()) {
Output << NYT::NodeToYsonString(row) << "\n";
- }
+ }
}
Promise.SetValue();
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index c7954022511..f4503a60b2e 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -84,14 +84,14 @@ public:
, TaskRunnerActorFactory(taskRunnerActorFactory)
, RuntimeData(runtimeData)
, TraceId(traceId)
- {
+ {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << "TDqWorker created ";
if (RuntimeData) {
RuntimeData->OnWorkerStart(TraceId);
}
- }
+ }
~TDqWorker()
{
@@ -105,9 +105,9 @@ public:
void DoPassAway() override {
YQL_LOG_CTX_SCOPE(TraceId);
- for (const auto& inputs : InputMap) {
- Send(inputs.first, new NActors::TEvents::TEvPoison());
- }
+ for (const auto& inputs : InputMap) {
+ Send(inputs.first, new NActors::TEvents::TEvPoison());
+ }
YQL_LOG(DEBUG) << "TDqWorker passed away ";
if (Actor) {
@@ -140,7 +140,7 @@ private:
HFunc(TEvError, OnErrorFromPipe);
HFunc(TEvContinueRun, OnContinueRun);
cFunc(TEvents::TEvWakeup::EventType, OnWakeup);
- })
+ })
TString ParseStatus(const TString& input, bool* retriableFlag, bool* fallbackFlag) {
TString result;
@@ -424,7 +424,7 @@ private:
Send(TaskRunnerActor, new TEvPop(outChannel.ChannelId));
}
-
+
void OnChannelPopFinished(TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext& ctx) {
try {
auto outputActorId = OutChannelId2ActorId[ev->Get()->ChannelId];
@@ -522,16 +522,16 @@ private:
}
NActors::TActorId ResolveEndpoint(const TEndpoint& ep) {
- switch (ep.GetEndpointTypeCase()) {
- case TEndpoint::kActorId:
+ switch (ep.GetEndpointTypeCase()) {
+ case TEndpoint::kActorId:
return NActors::ActorIdFromProto(ep.GetActorId());
case TEndpoint::kUri:
Y_ENSURE(false, "kUri is not supported");
- case TEndpoint::kTabletId:
- Y_ENSURE(false, "Tablets not supported by dqs");
- case TEndpoint::ENDPOINTTYPE_NOT_SET: {
- Y_ENSURE(false, "Endpoint must be set");
- } break;
+ case TEndpoint::kTabletId:
+ Y_ENSURE(false, "Tablets not supported by dqs");
+ case TEndpoint::ENDPOINTTYPE_NOT_SET: {
+ Y_ENSURE(false, "Endpoint must be set");
+ } break;
}
}
@@ -604,10 +604,10 @@ private:
Stat.AddCounters2(ev->Get()->Sensors);
- switch (res) {
+ switch (res) {
case ERunStatus::Finished: {
TaskFinished = true;
- break;
+ break;
}
case ERunStatus::PendingInput: {
auto now = TInstant::Now();
@@ -629,7 +629,7 @@ private:
Stat.AddCounter("ReadTimeout", static_cast<ui64>(1));
OnError("PullTimeout " + TimeoutInfo(channel.ActorID, now, channel.RequestTime), false, true);
}
- }
+ }
}
for (auto& [inputIndex, source] : SourcesMap) {
@@ -654,7 +654,7 @@ private:
Actor->SourcePush(0, index, std::move(batch), space, finished);
}
- break;
+ break;
}
case ERunStatus::PendingOutput: {
for (auto& [index, sink] : SinksMap) {
@@ -663,9 +663,9 @@ private:
Send(TaskRunnerActor, new TEvSinkPop(index, sinkActorFreeSpaceBeforeSend));
}
}
- break;
+ break;
}
- }
+ }
}
TString TimeoutInfo(TActorId actorID, TInstant now, TInstant startTime) {
diff --git a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h
index 27ad76c2d05..04a077b4a5d 100644
--- a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h
+++ b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h
@@ -8,4 +8,4 @@ namespace NYql::NNodes {
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.decl.inl.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.defs.inl.h>
-}
+}
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
index 144c3d5bf66..b4bc07fbef1 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
@@ -27,77 +27,77 @@
} while (0)
namespace NYql::NDqs {
- using namespace NYql;
- using namespace NYql::NDq;
- using namespace NYql::NNodes;
-
- using TStatus = IGraphTransformer::TStatus;
-
- THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer() {
- return CreateFunctorTransformer(
- [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
- YQL_ENSURE(input->GetTypeAnn() != nullptr);
- YQL_ENSURE(input->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List);
- if (TExprBase(input).Maybe<TDqCnUnionAll>()) {
- return TStatus(TStatus::ELevel::Ok, false);
- }
-
- output = Build<TDqCnUnionAll>(ctx, input->Pos()) // clang-format off
- .Output()
+ using namespace NYql;
+ using namespace NYql::NDq;
+ using namespace NYql::NNodes;
+
+ using TStatus = IGraphTransformer::TStatus;
+
+ THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer() {
+ return CreateFunctorTransformer(
+ [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
+ YQL_ENSURE(input->GetTypeAnn() != nullptr);
+ YQL_ENSURE(input->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List);
+ if (TExprBase(input).Maybe<TDqCnUnionAll>()) {
+ return TStatus(TStatus::ELevel::Ok, false);
+ }
+
+ output = Build<TDqCnUnionAll>(ctx, input->Pos()) // clang-format off
+ .Output()
.Stage<TDqStage>()
- .Inputs()
- .Build()
- .Program()
- .Args({})
+ .Inputs()
+ .Build()
+ .Program()
+ .Args({})
.Body<TCoIterator>()
.List(input)
- .Build()
- .Build()
+ .Build()
+ .Build()
.Settings(TDqStageSettings().BuildNode(ctx, input->Pos()))
- .Build()
- .Index()
- .Build("0")
- .Build()
- .Done().Ptr(); // clang-format on
-
- return TStatus(TStatus::ELevel::Repeat, true);
- });
- }
-
- THolder<NYql::IGraphTransformer> CreateDqsBuildTransformer() {
- return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
- TExprBase node(input);
- if (node.Maybe<TDqCnResult>()) {
- return TStatus::Ok;
+ .Build()
+ .Index()
+ .Build("0")
+ .Build()
+ .Done().Ptr(); // clang-format on
+
+ return TStatus(TStatus::ELevel::Repeat, true);
+ });
+ }
+
+ THolder<NYql::IGraphTransformer> CreateDqsBuildTransformer() {
+ return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
+ TExprBase node(input);
+ if (node.Maybe<TDqCnResult>()) {
+ return TStatus::Ok;
}
- if (!node.Maybe<TDqCnUnionAll>()) {
- ctx.AddError(TIssue(input->Pos(ctx), "Last connection must be union all"));
- output = input;
- return TStatus::Error;
+ if (!node.Maybe<TDqCnUnionAll>()) {
+ ctx.AddError(TIssue(input->Pos(ctx), "Last connection must be union all"));
+ output = input;
+ return TStatus::Error;
}
- output = Build<TDqCnResult>(ctx, input->Pos()) // clang-format off
- .Output()
+ output = Build<TDqCnResult>(ctx, input->Pos()) // clang-format off
+ .Output()
.Stage<TDqStage>()
- .Inputs()
- .Add(node.Cast<TDqCnUnionAll>())
- .Build()
- .Program()
- .Args({"row"})
+ .Inputs()
+ .Add(node.Cast<TDqCnUnionAll>())
+ .Build()
+ .Program()
+ .Args({"row"})
.Body("row")
.Build()
.Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
.Build()
- .Index()
- .Build("0")
+ .Index()
+ .Build("0")
.Build()
.ColumnHints() // TODO: set column hints
.Build()
- .Done().Ptr(); // clang-format on
- return TStatus(IGraphTransformer::TStatus::Repeat, true);
- });
- }
+ .Done().Ptr(); // clang-format on
+ return TStatus(IGraphTransformer::TStatus::Repeat, true);
+ });
+ }
THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer() {
return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
@@ -116,22 +116,22 @@ namespace NYql::NDqs {
});
}
- namespace NPeephole {
+ namespace NPeephole {
class TDqsPeepholeTransformer: public TSyncTransformerBase {
- public:
+ public:
TDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer,
TTypeAnnotationContext& typesCtx)
- : TypeAnnTransformer(std::move(typeAnnTransformer))
- , TypesCtx(typesCtx)
- {
- }
+ : TypeAnnTransformer(std::move(typeAnnTransformer))
+ , TypesCtx(typesCtx)
+ {
+ }
- TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final {
- if (Optimized) {
- outputExpr = inputExpr;
- return TStatus::Ok;
- }
+ TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final {
+ if (Optimized) {
+ outputExpr = inputExpr;
+ return TStatus::Ok;
+ }
auto transformer = CreateDqsRewritePhyCallablesTransformer();
auto status = InstantTransform(*transformer, inputExpr, ctx);
@@ -150,20 +150,20 @@ namespace NYql::NDqs {
outputExpr = inputExpr;
Optimized = true;
return TStatus::Ok;
- }
-
- void Rewind() final {
- Optimized = false;
- }
-
- private:
- THolder<IGraphTransformer> TypeAnnTransformer;
- TTypeAnnotationContext& TypesCtx;
- bool Optimized = false;
- };
+ }
+
+ void Rewind() final {
+ Optimized = false;
+ }
+
+ private:
+ THolder<IGraphTransformer> TypeAnnTransformer;
+ TTypeAnnotationContext& TypesCtx;
+ bool Optimized = false;
+ };
}
- THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx) {
+ THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx) {
return MakeHolder<NPeephole::TDqsPeepholeTransformer>(std::move(typeAnnTransformer), typesCtx);
}
@@ -176,5 +176,5 @@ namespace NYql::NDqs {
}
return status;
});
- }
+ }
}
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.h b/ydb/library/yql/providers/dq/opt/dqs_opt.h
index 2337a6d2a35..6c9e1a7c18b 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.h
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.h
@@ -5,12 +5,12 @@
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
namespace NYql::NDqs {
- class TDatabaseManager;
-
- THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer();
+ class TDatabaseManager;
+
+ THolder<IGraphTransformer> CreateDqsWrapListsOptTransformer();
THolder<IGraphTransformer> CreateDqsFinalizingOptTransformer();
- THolder<IGraphTransformer> CreateDqsBuildTransformer();
+ THolder<IGraphTransformer> CreateDqsBuildTransformer();
THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer();
- THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx);
+ THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h
index 207d7a091f0..0b62dfce4a9 100644
--- a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h
+++ b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h
@@ -6,24 +6,24 @@
#include <library/cpp/actors/core/actorid.h>
namespace NYql::NDqs {
- struct TStageInfoMeta {
+ struct TStageInfoMeta {
NNodes::TDqPhyStage Stage;
- };
+ };
- struct TTaskInputMeta {
- };
+ struct TTaskInputMeta {
+ };
- struct TTaskOutputMeta {};
+ struct TTaskOutputMeta {};
struct TTaskMeta {
THashMap<TString, TString> TaskParams;
TString ClusterNameHint;
};
- using TStageInfo = NYql::NDq::TStageInfo<TStageInfoMeta>;
- using TTaskOutput = NYql::NDq::TTaskOutput<TTaskOutputMeta>;
- using TTaskOutputType = NYql::NDq::TTaskOutputType;
- using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>;
- using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
- using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
-}
+ using TStageInfo = NYql::NDq::TStageInfo<TStageInfoMeta>;
+ using TTaskOutput = NYql::NDq::TTaskOutput<TTaskOutputMeta>;
+ using TTaskOutputType = NYql::NDq::TTaskOutputType;
+ using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>;
+ using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
+ using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
+}
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index 0323e78d026..ec404c0a6dc 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -39,26 +39,26 @@ using namespace NKikimr::NMiniKQL;
using namespace Yql::DqsProto;
namespace NYql::NDqs {
- namespace {
+ namespace {
TVector<TDqPhyStage> GetStages(const TExprNode::TPtr& exprRoot) {
TVector<TDqPhyStage> stages;
- VisitExpr(
+ VisitExpr(
exprRoot,
- [](const TExprNode::TPtr& exprNode) {
- const auto& node = TExprBase(exprNode);
- return !node.Maybe<TCoLambda>();
- },
- [&stages](const TExprNode::TPtr& exprNode) {
- const auto& node = TExprBase(exprNode);
+ [](const TExprNode::TPtr& exprNode) {
+ const auto& node = TExprBase(exprNode);
+ return !node.Maybe<TCoLambda>();
+ },
+ [&stages](const TExprNode::TPtr& exprNode) {
+ const auto& node = TExprBase(exprNode);
if (auto maybeStage = node.Maybe<TDqPhyStage>()) {
- stages.push_back(maybeStage.Cast());
- }
+ stages.push_back(maybeStage.Cast());
+ }
- return true;
- });
+ return true;
+ });
- return stages;
- }
+ return stages;
+ }
bool HasReadWraps(const TExprNode::TPtr& node) {
bool result = false;
@@ -82,19 +82,19 @@ namespace NYql::NDqs {
}
TDqsExecutionPlanner::TDqsExecutionPlanner(TIntrusivePtr<TTypeAnnotationContext> typeContext,
- NYql::TExprContext& exprContext,
+ NYql::TExprContext& exprContext,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NYql::TExprNode::TPtr dqExprRoot,
NActors::TActorId executerID,
NActors::TActorId resultID)
: TypeContext(std::move(typeContext))
- , ExprContext(exprContext)
+ , ExprContext(exprContext)
, FunctionRegistry(functionRegistry)
, DqExprRoot(std::move(dqExprRoot))
- , ExecuterID(executerID)
- , ResultID(resultID)
- {
- }
+ , ExecuterID(executerID)
+ , ResultID(resultID)
+ {
+ }
void TDqsExecutionPlanner::Clear() {
TasksGraph.Clear();
@@ -138,20 +138,20 @@ namespace NYql::NDqs {
YQL_LOG(DEBUG) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot);
auto stages = GetStages(DqExprRoot);
- YQL_ENSURE(!stages.empty());
-
- for (const auto& stage : stages) {
- TasksGraph.AddStageInfo(TStageInfo(stage, {stage}));
- }
+ YQL_ENSURE(!stages.empty());
+
+ for (const auto& stage : stages) {
+ TasksGraph.AddStageInfo(TStageInfo(stage, {stage}));
+ }
- for (const auto& stage : stages) {
+ for (const auto& stage : stages) {
const bool hasDqSource = HasDqSource(stage);
if ((hasDqSource || HasReadWraps(stage.Program().Ptr())) && BuildReadStage(settings, stage, hasDqSource, canFallback)) {
YQL_LOG(DEBUG) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
- } else {
+ } else {
YQL_LOG(DEBUG) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
- NDq::CommonBuildTasks(TasksGraph, stage);
- }
+ NDq::CommonBuildTasks(TasksGraph, stage);
+ }
// Sinks
if (auto maybeDqSinksList = stage.Sinks()) {
@@ -183,7 +183,7 @@ namespace NYql::NDqs {
}
}
- BuildConnections(stage);
+ BuildConnections(stage);
if (canFallback && TasksGraph.GetTasks().size() > maxTasksPerOperation) {
break;
@@ -210,10 +210,10 @@ namespace NYql::NDqs {
output.Channels.emplace_back(channel.Id);
SourceTaskID = resultTask.Id;
}
-
+
BuildCheckpointingMode();
- return TasksGraph.GetTasks().size();
+ return TasksGraph.GetTasks().size();
}
bool TDqsExecutionPlanner::IsEgressTask(const TDqsTasksGraph::TTaskType& task) const {
@@ -314,17 +314,17 @@ namespace NYql::NDqs {
}
TVector<TDqTask> TDqsExecutionPlanner::GetTasks(const TVector<NActors::TActorId>& workers) {
- auto& tasks = TasksGraph.GetTasks();
- YQL_ENSURE(tasks.size() == workers.size());
+ auto& tasks = TasksGraph.GetTasks();
+ YQL_ENSURE(tasks.size() == workers.size());
- for (size_t i = 0; i < workers.size(); i++) {
- tasks[i].ComputeActorId = workers[i];
- }
+ for (size_t i = 0; i < workers.size(); i++) {
+ tasks[i].ComputeActorId = workers[i];
+ }
THashMap<TStageId, std::tuple<TString, ui64>> stagePrograms = BuildAllPrograms();
TVector<TDqTask> plan;
THashSet<TString> clusterNameHints;
- for (const auto& task : tasks) {
+ for (const auto& task : tasks) {
if (task.Meta.ClusterNameHint) {
clusterNameHints.insert(task.Meta.ClusterNameHint);
}
@@ -332,7 +332,7 @@ namespace NYql::NDqs {
for (const auto& task : tasks) {
Yql::DqsProto::TTaskMeta taskMeta;
TDqTask taskDesc;
- taskDesc.SetId(task.Id);
+ taskDesc.SetId(task.Id);
NActors::ActorIdToProto(ExecuterID, taskDesc.MutableExecuter()->MutableActorId());
auto& taskParams = *taskMeta.MutableTaskParams();
for (const auto& [k, v]: task.Meta.TaskParams) {
@@ -344,27 +344,27 @@ namespace NYql::NDqs {
taskMeta.SetClusterNameHint(task.Meta.ClusterNameHint);
}
- for (auto& input : task.Inputs) {
- auto& inputDesc = *taskDesc.AddInputs();
+ for (auto& input : task.Inputs) {
+ auto& inputDesc = *taskDesc.AddInputs();
if (input.SourceSettings) {
auto* sourceProto = inputDesc.MutableSource();
*sourceProto->MutableSettings() = *input.SourceSettings;
sourceProto->SetType(input.SourceType);
} else {
FillInputDesc(inputDesc, input);
- }
- }
+ }
+ }
- for (auto& output : task.Outputs) {
- FillOutputDesc(*taskDesc.AddOutputs(), output);
+ for (auto& output : task.Outputs) {
+ FillOutputDesc(*taskDesc.AddOutputs(), output);
}
auto& transform = *taskDesc.MutableOutputTransform();
transform.SetType(task.OutputTransform.Type);
transform.SetFunctionName(task.OutputTransform.FunctionName);
- auto& program = *taskDesc.MutableProgram();
- program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
+ auto& program = *taskDesc.MutableProgram();
+ program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
TString programStr;
ui64 stageId;
std::tie(programStr, stageId) = stagePrograms[task.StageId];
@@ -372,12 +372,12 @@ namespace NYql::NDqs {
taskMeta.SetStageId(stageId);
taskDesc.MutableMeta()->PackFrom(taskMeta);
taskDesc.SetStageId(stageId);
-
+
plan.emplace_back(std::move(taskDesc));
}
- return plan;
- }
+ return plan;
+ }
NActors::TActorId TDqsExecutionPlanner::GetSourceID() const {
if (SourceID) {
@@ -406,10 +406,10 @@ namespace NYql::NDqs {
return SerializeNode(type, typeEnv);
}
return {};
- }
+ }
bool TDqsExecutionPlanner::BuildReadStage(const TDqSettings::TPtr& settings, const TDqPhyStage& stage, bool dqSource, bool canFallback) {
- auto& stageInfo = TasksGraph.GetStageInfo(stage);
+ auto& stageInfo = TasksGraph.GetStageInfo(stage);
for (ui32 i = 0; i < stageInfo.InputsCount; i++) {
const auto& input = stage.Inputs().Item(i);
@@ -483,7 +483,7 @@ namespace NYql::NDqs {
transform.Type = stageSettings.TransformType;
transform.FunctionName = stageSettings.TransformName;
}
- }
+ }
return !parts.empty();
}
@@ -496,8 +496,8 @@ namespace NYql::NDqs {
void TDqsExecutionPlanner::BuildConnections(const NNodes::TDqPhyStage& stage) {
NDq::TChannelLogFunc logFunc = [](ui64, ui64, ui64, TStringBuf, bool) {};
- for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {
- const auto& input = stage.Inputs().Item(inputIndex);
+ for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {
+ const auto& input = stage.Inputs().Item(inputIndex);
if (input.Maybe<TDqConnection>()) {
BUILD_CONNECTION(TDqCnUnionAll, BuildUnionAllChannels);
BUILD_CONNECTION(TDqCnHashShuffle, BuildHashShuffleChannels);
@@ -508,25 +508,25 @@ namespace NYql::NDqs {
} else {
YQL_ENSURE(input.Maybe<TDqSource>());
}
- }
+ }
}
#undef BUILD_CONNECTION
THashMap<TStageId, std::tuple<TString,ui64>> TDqsExecutionPlanner::BuildAllPrograms() {
- using namespace NKikimr::NMiniKQL;
+ using namespace NKikimr::NMiniKQL;
THashMap<TStageId, std::tuple<TString,ui64>> result;
- TScopedAlloc alloc(NKikimr::TAlignedPagePoolCounters(), FunctionRegistry->SupportsSizedAllocators());
- TTypeEnvironment typeEnv(alloc);
+ TScopedAlloc alloc(NKikimr::TAlignedPagePoolCounters(), FunctionRegistry->SupportsSizedAllocators());
+ TTypeEnvironment typeEnv(alloc);
TVector<NNodes::TExprBase> fakeReads;
- NCommon::TMkqlCommonCallableCompiler compiler;
+ NCommon::TMkqlCommonCallableCompiler compiler;
RegisterDqsMkqlCompilers(compiler, *TypeContext);
- for (const auto& stageInfo : TasksGraph.GetStagesInfo()) {
- const auto& stage = stageInfo.second.Meta.Stage;
- auto paramsType = NDq::CollectParameters(stage.Program(), ExprContext);
- YQL_ENSURE(paramsType->GetItems().empty()); // TODO support parameters
+ for (const auto& stageInfo : TasksGraph.GetStagesInfo()) {
+ const auto& stage = stageInfo.second.Meta.Stage;
+ auto paramsType = NDq::CollectParameters(stage.Program(), ExprContext);
+ YQL_ENSURE(paramsType->GetItems().empty()); // TODO support parameters
auto settings = NDq::TDqStageSettings::Parse(stage);
ui64 stageId = stage.Ref().UniqueId();
@@ -555,32 +555,32 @@ namespace NYql::NDqs {
stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry,
ExprContext, fakeReads),
stageId);
- }
-
- return result;
+ }
+
+ return result;
}
- void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel) {
- channelDesc.SetId(channel.Id);
- channelDesc.SetSrcTaskId(channel.SrcTask);
- channelDesc.SetDstTaskId(channel.DstTask);
+ void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel) {
+ channelDesc.SetId(channel.Id);
+ channelDesc.SetSrcTaskId(channel.SrcTask);
+ channelDesc.SetDstTaskId(channel.DstTask);
channelDesc.SetCheckpointingMode(channel.CheckpointingMode);
- if (channel.SrcTask) {
+ if (channel.SrcTask) {
NActors::ActorIdToProto(TasksGraph.GetTask(channel.SrcTask).ComputeActorId,
- channelDesc.MutableSrcEndpoint()->MutableActorId());
- }
+ channelDesc.MutableSrcEndpoint()->MutableActorId());
+ }
- if (channel.DstTask) {
+ if (channel.DstTask) {
NActors::ActorIdToProto(TasksGraph.GetTask(channel.DstTask).ComputeActorId,
- channelDesc.MutableDstEndpoint()->MutableActorId());
- } else {
- auto& stageInfo = TasksGraph.GetStageInfo(TasksGraph.GetTask(channel.SrcTask).StageId);
- YQL_ENSURE(stageInfo.Tasks.size() == 1);
- YQL_ENSURE(!SourceID);
+ channelDesc.MutableDstEndpoint()->MutableActorId());
+ } else {
+ auto& stageInfo = TasksGraph.GetStageInfo(TasksGraph.GetTask(channel.SrcTask).StageId);
+ YQL_ENSURE(stageInfo.Tasks.size() == 1);
+ YQL_ENSURE(!SourceID);
ActorIdToProto(ResultID, channelDesc.MutableDstEndpoint()->MutableActorId());
- SourceID = TasksGraph.GetTask(channel.SrcTask).ComputeActorId;
- }
+ SourceID = TasksGraph.GetTask(channel.SrcTask).ComputeActorId;
+ }
}
void TDqsExecutionPlanner::FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input) {
@@ -609,28 +609,28 @@ namespace NYql::NDqs {
}
}
- void TDqsExecutionPlanner::FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) {
- switch (output.Type) {
- case TTaskOutputType::Map:
- YQL_ENSURE(output.Channels.size() == 1);
+ void TDqsExecutionPlanner::FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) {
+ switch (output.Type) {
+ case TTaskOutputType::Map:
+ YQL_ENSURE(output.Channels.size() == 1);
outputDesc.MutableMap(); //->SetChannelId(output.Channels[0]);
- break;
+ break;
- case TTaskOutputType::HashPartition: {
+ case TTaskOutputType::HashPartition: {
YQL_ENSURE(output.Channels.size() == output.PartitionsCount);
- auto& hashPartitionDesc = *outputDesc.MutableHashPartition();
- for (auto& column : output.KeyColumns) {
- hashPartitionDesc.AddKeyColumns(column);
- }
- hashPartitionDesc.SetPartitionsCount(output.PartitionsCount);
- break;
- }
-
- case TTaskOutputType::Broadcast: {
+ auto& hashPartitionDesc = *outputDesc.MutableHashPartition();
+ for (auto& column : output.KeyColumns) {
+ hashPartitionDesc.AddKeyColumns(column);
+ }
+ hashPartitionDesc.SetPartitionsCount(output.PartitionsCount);
+ break;
+ }
+
+ case TTaskOutputType::Broadcast: {
//for (const auto channel : output.Channels) {
outputDesc.MutableBroadcast(); //->AddChannelIds(channel);
//}
- break;
+ break;
}
case TTaskOutputType::Sink: {
@@ -643,14 +643,14 @@ namespace NYql::NDqs {
break;
}
- case TTaskOutputType::Undefined: {
+ case TTaskOutputType::Undefined: {
YQL_ENSURE(false, "Unexpected task output type `TTaskOutputType::Undefined`");
}
}
- for (auto& channel : output.Channels) {
- auto& channelDesc = *outputDesc.AddChannels();
- FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel));
+ for (auto& channel : output.Channels) {
+ auto& channelDesc = *outputDesc.AddChannels();
+ FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel));
}
}
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h
index 537c878e2ce..280600c8f7b 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.h
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.h
@@ -15,7 +15,7 @@
namespace NYql::NDqs {
class IDqsExecutionPlanner {
- public:
+ public:
virtual ~IDqsExecutionPlanner() = default;
virtual TVector<NDqProto::TDqTask> GetTasks(const TVector<NActors::TActorId>& workers) = 0;
virtual TVector<NDqProto::TDqTask>& GetTasks() = 0;
@@ -26,7 +26,7 @@ namespace NYql::NDqs {
class TDqsExecutionPlanner: public IDqsExecutionPlanner {
public:
explicit TDqsExecutionPlanner(TIntrusivePtr<TTypeAnnotationContext> typeContext,
- NYql::TExprContext& exprContext,
+ NYql::TExprContext& exprContext,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NYql::TExprNode::TPtr dqExprRoot,
NActors::TActorId executerID = NActors::TActorId(),
@@ -49,35 +49,35 @@ namespace NYql::NDqs {
PublicIds = publicIds;
}
- private:
+ private:
bool BuildReadStage(const TDqSettings::TPtr& settings, const NNodes::TDqPhyStage& stage, bool dqSource, bool canFallback);
void BuildConnections(const NNodes::TDqPhyStage& stage);
THashMap<NDq::TStageId, std::tuple<TString,ui64>> BuildAllPrograms();
- void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel);
+ void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel);
void FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input);
- void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output);
+ void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output);
void GatherPhyMapping(THashMap<std::tuple<TString, TString>, TString>& clusters, THashMap<std::tuple<TString, TString, TString>, TString>& tables);
void BuildCheckpointingMode();
bool IsEgressTask(const TDqsTasksGraph::TTaskType& task) const;
- private:
- TIntrusivePtr<TTypeAnnotationContext> TypeContext;
- NYql::TExprContext& ExprContext;
+ private:
+ TIntrusivePtr<TTypeAnnotationContext> TypeContext;
+ NYql::TExprContext& ExprContext;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
NYql::TExprNode::TPtr DqExprRoot;
- TVector<const TTypeAnnotationNode*> InputType;
+ TVector<const TTypeAnnotationNode*> InputType;
NActors::TActorId ExecuterID;
NActors::TActorId ResultID;
TMaybe<NActors::TActorId> SourceID = {};
- ui64 SourceTaskID = 0;
+ ui64 SourceTaskID = 0;
ui64 _MaxDataSizePerJob = 0;
- TDqsTasksGraph TasksGraph;
+ TDqsTasksGraph TasksGraph;
TVector<NDqProto::TDqTask> Tasks;
THashMap<ui64, ui32> PublicIds;
- };
+ };
// Execution planner for TRuntimeNode
class TDqsSingleExecutionPlanner: public IDqsExecutionPlanner {
@@ -132,4 +132,4 @@ namespace NYql::NDqs {
TMaybe<NActors::TActorId> SourceID = {};
};
-}
+}
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index 01017efd449..65a9e500d9d 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -33,31 +33,31 @@
#include <util/system/env.h>
namespace NYql::NDqs {
- using namespace NYql::NDqs;
- using namespace NKikimr;
+ using namespace NYql::NDqs;
+ using namespace NKikimr;
using namespace NThreading;
using namespace NMonitoring;
using namespace NActors;
- namespace {
+ namespace {
NGrpc::ICounterBlockPtr BuildCB(TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, const TString& name) {
- auto grpcCB = counters->GetSubgroup("rpc_name", name);
+ auto grpcCB = counters->GetSubgroup("rpc_name", name);
return MakeIntrusive<NGrpc::TCounterBlock>(
- grpcCB->GetCounter("total", true),
- grpcCB->GetCounter("infly", true),
- grpcCB->GetCounter("notOkReq", true),
- grpcCB->GetCounter("notOkResp", true),
- grpcCB->GetCounter("reqBytes", true),
- grpcCB->GetCounter("inflyReqBytes", true),
- grpcCB->GetCounter("resBytes", true),
- grpcCB->GetCounter("notAuth", true),
- grpcCB->GetCounter("resExh", true),
- grpcCB);
- }
+ grpcCB->GetCounter("total", true),
+ grpcCB->GetCounter("infly", true),
+ grpcCB->GetCounter("notOkReq", true),
+ grpcCB->GetCounter("notOkResp", true),
+ grpcCB->GetCounter("reqBytes", true),
+ grpcCB->GetCounter("inflyReqBytes", true),
+ grpcCB->GetCounter("resBytes", true),
+ grpcCB->GetCounter("notAuth", true),
+ grpcCB->GetCounter("resExh", true),
+ grpcCB);
+ }
template<typename RequestType, typename ResponseType>
class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> {
- public:
+ public:
static constexpr char ActorName[] = "SERVICE_PROXY";
explicit TServiceProxyActor(
@@ -76,23 +76,23 @@ namespace NYql::NDqs {
, TraceId(traceId)
, Username(username)
, Promise(NewPromise<void>())
- {
+ {
Settings->Dispatch(Request->GetSettings());
Settings->FreezeDefaults();
MaxRetries = Settings->MaxRetries.Get().GetOrElse(MaxRetries);
RetryBackoff = TDuration::MilliSeconds(Settings->RetryBackoffMs.Get().GetOrElse(RetryBackoff.MilliSeconds()));
- }
+ }
- STRICT_STFUNC(Handler, {
- HFunc(TEvQueryResponse, OnReturnResult);
+ STRICT_STFUNC(Handler, {
+ HFunc(TEvQueryResponse, OnReturnResult);
cFunc(TEvents::TEvPoison::EventType, OnPoison);
SFunc(TEvents::TEvBootstrap, DoBootstrap)
- })
+ })
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override {
return new IEventHandle(self, parentId, new TEvents::TEvBootstrap(), 0);
- }
+ }
void OnPoison() {
YQL_LOG_CTX_SCOPE(TraceId);
@@ -126,8 +126,8 @@ namespace NYql::NDqs {
void SendResponse(TEvQueryResponse::TPtr& ev)
{
auto& result = ev->Get()->Record;
- Yql::DqsProto::ExecuteQueryResult queryResult;
- queryResult.Mutableresult()->CopyFrom(result.resultset());
+ Yql::DqsProto::ExecuteQueryResult queryResult;
+ queryResult.Mutableresult()->CopyFrom(result.resultset());
queryResult.set_yson(result.yson());
if (result.GetNeedFallback()) {
@@ -160,9 +160,9 @@ namespace NYql::NDqs {
aggregatedQueryStat.FlushCounters(ResponseBuffer);
auto& operation = *ResponseBuffer.mutable_operation();
- operation.Setready(true);
- operation.Mutableresult()->PackFrom(queryResult);
- *operation.Mutableissues() = result.GetIssues();
+ operation.Setready(true);
+ operation.Mutableresult()->PackFrom(queryResult);
+ *operation.Mutableissues() = result.GetIssues();
ResponseBuffer.SetTruncated(result.GetTruncated());
Reply(Ydb::StatusIds::SUCCESS, result.GetIssues().size() > 0);
@@ -201,7 +201,7 @@ namespace NYql::NDqs {
}
SendResponse(ev);
}
- }
+ }
TFuture<void> GetFuture() {
return Promise.GetFuture();
@@ -249,7 +249,7 @@ namespace NYql::NDqs {
void RestoreRequest() {
Request = dynamic_cast<const RequestType*>(Ctx->GetRequest());
}
- };
+ };
class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> {
public:
@@ -414,25 +414,25 @@ namespace NYql::NDqs {
NActors::TActorId GraphExecutionEventsActorId;
};
- TString GetVersionString() {
- TStringBuilder sb;
- sb << GetProgramSvnVersion() << "\n";
- sb << GetBuildInfo();
- TString sandboxTaskId = GetSandboxTaskId();
- if (sandboxTaskId != TString("0")) {
- sb << "\nSandbox task id: " << sandboxTaskId;
- }
-
- return sb;
- }
+ TString GetVersionString() {
+ TStringBuilder sb;
+ sb << GetProgramSvnVersion() << "\n";
+ sb << GetBuildInfo();
+ TString sandboxTaskId = GetSandboxTaskId();
+ if (sandboxTaskId != TString("0")) {
+ sb << "\nSandbox task id: " << sandboxTaskId;
+ }
+
+ return sb;
+ }
}
TDqsGrpcService::TDqsGrpcService(
NActors::TActorSystem& system,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories)
- : ActorSystem(system)
- , Counters(std::move(counters))
+ : ActorSystem(system)
+ , Counters(std::move(counters))
, DqTaskPreprocessorFactories(dqTaskPreprocessorFactories)
, Promise(NewPromise<void>())
, RunningRequests(0)
@@ -455,9 +455,9 @@ namespace NYql::NDqs {
} while (0)
void TDqsGrpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
- using namespace google::protobuf;
+ using namespace google::protobuf;
- CQ = cq;
+ CQ = cq;
using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>;
@@ -500,12 +500,12 @@ namespace NYql::NDqs {
session->AddRequest(actorId);
});
- ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, {
- Y_UNUSED(this);
- Yql::DqsProto::SvnRevisionResponse result;
- result.SetRevision(GetVersionString());
+ ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, {
+ Y_UNUSED(this);
+ Yql::DqsProto::SvnRevisionResponse result;
+ result.SetRevision(GetVersionString());
ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
- });
+ });
ADD_REQUEST(CloseSession, CloseSessionRequest, CloseSessionResponse, {
Y_UNUSED(this);
@@ -797,20 +797,20 @@ namespace NYql::NDqs {
ctx->Reply(result, Ydb::StatusIds::SUCCESS);
});
*/
- }
+ }
void TDqsGrpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
- }
+ Limiter = limiter;
+ }
- bool TDqsGrpcService::IncRequest() {
- return Limiter->Inc();
- }
+ bool TDqsGrpcService::IncRequest() {
+ return Limiter->Inc();
+ }
- void TDqsGrpcService::DecRequest() {
- Limiter->Dec();
- Y_ASSERT(Limiter->GetCurrentInFlight() >= 0);
- }
+ void TDqsGrpcService::DecRequest() {
+ Limiter->Dec();
+ Y_ASSERT(Limiter->GetCurrentInFlight() >= 0);
+ }
TFuture<void> TDqsGrpcService::Stop() {
TGuard<TMutex> lock(Mutex);
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h
index 9210e3fdf86..fa2a835c8c6 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.h
+++ b/ydb/library/yql/providers/dq/service/grpc_service.h
@@ -19,10 +19,10 @@
#include "grpc_session.h"
namespace NYql::NDqs {
- class TDatabaseManager;
+ class TDatabaseManager;
class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> {
- public:
+ public:
TDqsGrpcService(NActors::TActorSystem& system,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
@@ -30,23 +30,23 @@ namespace NYql::NDqs {
void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
- bool IncRequest();
- void DecRequest();
+ bool IncRequest();
+ void DecRequest();
NThreading::TFuture<void> Stop();
- private:
- NActors::TActorSystem& ActorSystem;
- grpc::ServerCompletionQueue* CQ = nullptr;
+ private:
+ NActors::TActorSystem& ActorSystem;
+ grpc::ServerCompletionQueue* CQ = nullptr;
NGrpc::TGlobalLimiter* Limiter = nullptr;
- TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories;
TMutex Mutex;
- NThreading::TPromise<void> Promise;
+ NThreading::TPromise<void> Promise;
std::atomic<ui64> RunningRequests;
std::atomic<bool> Stopping;
TSessionStorage Sessions;
- };
-}
+ };
+}
diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
index 50ec2d66c75..bef368a961f 100644
--- a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
+++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
@@ -26,7 +26,7 @@
#include <util/system/env.h>
namespace NYql::NDqs {
- using namespace NActors;
+ using namespace NActors;
using namespace NActors::NDnsResolver;
using namespace NGrpc;
@@ -204,7 +204,7 @@ namespace NYql::NDqs {
if (nodeId != id) {
IActor* actor = new TInterconnectProxyTCP(id, icCommon);
setup->Interconnect.ProxyActors[id] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0);
- }
+ }
}
// start listener
@@ -226,9 +226,9 @@ namespace NYql::NDqs {
YQL_LOG(DEBUG) << "Actor initialization complete";
-#ifdef _unix_
+#ifdef _unix_
signal(SIGPIPE, SIG_IGN);
-#endif
+#endif
return std::make_tuple(std::move(setup), logSettings);
}
diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.h b/ydb/library/yql/providers/dq/service/interconnect_helpers.h
index 831445476bc..9009c8c179b 100644
--- a/ydb/library/yql/providers/dq/service/interconnect_helpers.h
+++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.h
@@ -30,7 +30,7 @@ struct TServiceNodeConfig {
NYql::NProto::TDqConfig::TICSettings ICSettings = NYql::NProto::TDqConfig::TICSettings();
TNameserverFactory NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
return CreateNameserverTable(setup);
- };
+ };
};
std::tuple<TString, TString> GetLocalAddress(const TString* hostname = nullptr);
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp
index c563fdb1f16..9044dc47e30 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp
@@ -11,17 +11,17 @@ namespace NYql::NDqs {
const TString& user,
const TMaybe<ui64>& globalResourceId)
{
- Record.SetCount(count);
+ Record.SetCount(count);
Record.SetUser(user);
if (globalResourceId) {
Record.SetResourceId(*globalResourceId);
}
Record.SetIsForwarded(false);
- }
+ }
- TEvAllocateWorkersResponse::TEvAllocateWorkersResponse() {
+ TEvAllocateWorkersResponse::TEvAllocateWorkersResponse() {
Record.MutableError()->SetMessage("Unknown error");
- }
+ }
TEvAllocateWorkersResponse::TEvAllocateWorkersResponse(const TString& error, NYql::NDqProto::EErrorCode code) {
Record.MutableError()->SetMessage(error);
@@ -29,12 +29,12 @@ namespace NYql::NDqs {
}
TEvAllocateWorkersResponse::TEvAllocateWorkersResponse(ui64 resourceId, const TVector<NActors::TActorId>& ids) {
- auto& group = *Record.MutableWorkers();
+ auto& group = *Record.MutableWorkers();
group.SetResourceId(resourceId);
- for (const auto& actorId : ids) {
+ for (const auto& actorId : ids) {
NActors::ActorIdToProto(actorId, group.AddWorkerActor());
- }
- }
+ }
+ }
TEvAllocateWorkersResponse::TEvAllocateWorkersResponse(ui64 resourceId, const TVector<ui32>& nodes) {
auto& group = *Record.MutableNodes();
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.h b/ydb/library/yql/providers/dq/worker_manager/interface/events.h
index 595f2abe773..a2d0c9f5409 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/events.h
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.h
@@ -17,31 +17,31 @@ namespace NYql::NDqs {
using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
- struct TEvAllocateWorkersRequest : NActors::TEventPB<TEvAllocateWorkersRequest, NYql::NDqProto::TAllocateWorkersRequest,
- TDqResManEvents::ES_ALLOCATE_WORKERS_REQUEST> {
- TEvAllocateWorkersRequest() = default;
+ struct TEvAllocateWorkersRequest : NActors::TEventPB<TEvAllocateWorkersRequest, NYql::NDqProto::TAllocateWorkersRequest,
+ TDqResManEvents::ES_ALLOCATE_WORKERS_REQUEST> {
+ TEvAllocateWorkersRequest() = default;
explicit TEvAllocateWorkersRequest(
ui32 count,
const TString& user,
const TMaybe<ui64>& globalResourceId = TMaybe<ui64>());
- };
+ };
- struct TEvAllocateWorkersResponse
- : NActors::TEventPB<TEvAllocateWorkersResponse, NYql::NDqProto::TAllocateWorkersResponse,
- TDqResManEvents::ES_ALLOCATE_WORKERS_RESPONSE> {
- TEvAllocateWorkersResponse();
+ struct TEvAllocateWorkersResponse
+ : NActors::TEventPB<TEvAllocateWorkersResponse, NYql::NDqProto::TAllocateWorkersResponse,
+ TDqResManEvents::ES_ALLOCATE_WORKERS_RESPONSE> {
+ TEvAllocateWorkersResponse();
explicit TEvAllocateWorkersResponse(const TString& error, NYql::NDqProto::EErrorCode code = NYql::NDqProto::EUNKNOWN);
explicit TEvAllocateWorkersResponse(ui64 resourceId, const TVector<NActors::TActorId>& ids);
explicit TEvAllocateWorkersResponse(ui64 resourceId, const TVector<TWorkerInfo::TPtr>& workerInfos);
explicit TEvAllocateWorkersResponse(ui64 resourceId, const TVector<ui32>& nodes);
- };
+ };
- struct TEvFreeWorkersNotify : NActors::TEventPB<TEvFreeWorkersNotify, NYql::NDqProto::TFreeWorkersNotify,
- TDqResManEvents::ES_FREE_WORKERS_NOTIFICATION> {
- TEvFreeWorkersNotify() = default;
+ struct TEvFreeWorkersNotify : NActors::TEventPB<TEvFreeWorkersNotify, NYql::NDqProto::TFreeWorkersNotify,
+ TDqResManEvents::ES_FREE_WORKERS_NOTIFICATION> {
+ TEvFreeWorkersNotify() = default;
explicit TEvFreeWorkersNotify(ui64 id);
- };
+ };
struct TEvRegisterNode
: NActors::TEventPB<TEvRegisterNode, NYql::NDqProto::TEvRegisterNode, TDqResManEvents::ES_REGISTER_NODE> {
@@ -153,8 +153,8 @@ using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::
};
inline NActors::TActorId MakeWorkerManagerActorID(ui32 nodeId) {
- char x[12] = {'r', 'e', 's', 'm', 'a', 'n'};
- memcpy(x + 7, &nodeId, sizeof(ui32));
+ char x[12] = {'r', 'e', 's', 'm', 'a', 'n'};
+ memcpy(x + 7, &nodeId, sizeof(ui32));
return NActors::TActorId(nodeId, TStringBuf(x, 12));
- }
+ }
}
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index e8dd40f5316..5ecf0a4a852 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -111,9 +111,9 @@ private:
void DoPassAway() override {
for (const auto& [resourceId, _] : AllocatedWorkers) {
FreeGroup(resourceId);
- }
+ }
- AllocatedWorkers.clear();
+ AllocatedWorkers.clear();
_exit(0);
}
@@ -215,7 +215,7 @@ private:
auto traceId = ev->Get()->Record.GetTraceId();
allocationInfo.TxId = traceId;
- auto count = ev->Get()->Record.GetCount();
+ auto count = ev->Get()->Record.GetCount();
Y_VERIFY(count > 0);
@@ -270,8 +270,8 @@ private:
}
Options.Counters.ActiveWorkers->Add(count);
- }
-
+ }
+
Send(ev->Sender,
MakeHolder<TEvAllocateWorkersResponse>(resourceId, allocationInfo.WorkerActors),
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,