diff options
author | ogorod <ogorod@yandex-team.ru> | 2022-02-10 16:52:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:17 +0300 |
commit | 993eb9943ef23adacc29ccc3cb2fbb943ce2d06e (patch) | |
tree | ab7fbbf3253d4c0e2793218f09378908beb025fb | |
parent | d826f6b7bb2a949001eda2068f1e8c68072efa19 (diff) | |
download | ydb-993eb9943ef23adacc29ccc3cb2fbb943ce2d06e.tar.gz |
Restoring authorship annotation for <ogorod@yandex-team.ru>. Commit 2 of 2.
32 files changed, 561 insertions, 561 deletions
diff --git a/ydb/core/kqp/ut/kqp_explain_ut.cpp b/ydb/core/kqp/ut/kqp_explain_ut.cpp index 78db386cd4b..c3a06836367 100644 --- a/ydb/core/kqp/ut/kqp_explain_ut.cpp +++ b/ydb/core/kqp/ut/kqp_explain_ut.cpp @@ -545,7 +545,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) { NJson::TJsonValue plan; NJson::ReadJsonTree(*res.PlanJson, &plan, true); - auto join = FindPlanNodeByKv(plan, "Node Type", "FullJoin (JoinDict)"); + auto join = FindPlanNodeByKv(plan, "Node Type", "FullJoin (JoinDict)"); UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); diff --git a/ydb/core/kqp/ut/kqp_join_ut.cpp b/ydb/core/kqp/ut/kqp_join_ut.cpp index e084d0a3c08..8fa59d31b36 100644 --- a/ydb/core/kqp/ut/kqp_join_ut.cpp +++ b/ydb/core/kqp/ut/kqp_join_ut.cpp @@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) { .EndList().Build() .Build(); - + const TString query = Q_(R"( DECLARE $in AS List<Struct<k: Int32>>; SELECT * FROM AS_TABLE($in) AS t1 @@ -915,31 +915,31 @@ Y_UNIT_TEST_SUITE(KqpJoin) { ])", FormatResultSetYson(result.GetResultSet(0))); } - Y_UNIT_TEST_NEW_ENGINE(FullOuterJoinSizeCheck) { - TKikimrRunner kikimr; - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - CreateSampleTables(session); - + Y_UNIT_TEST_NEW_ENGINE(FullOuterJoinSizeCheck) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + CreateSampleTables(session); + auto result = session.ExecuteDataQuery(Q_(R"( - SELECT COUNT(*) - FROM `/Root/Join1_1` as left - FULL OUTER JOIN `/Root/Join1_2` as right - ON left.Fk21 = right.Key1 - UNION ALL - SELECT COUNT(*) - FROM `/Root/Join1_2` as right - FULL OUTER JOIN `/Root/Join1_1` as left - ON left.Fk21 = right.Key1 + SELECT COUNT(*) + FROM `/Root/Join1_1` as left + FULL OUTER JOIN `/Root/Join1_2` as right + ON left.Fk21 = right.Key1 + UNION ALL + SELECT COUNT(*) + FROM `/Root/Join1_2` as right + FULL OUTER JOIN `/Root/Join1_1` as left + ON left.Fk21 = right.Key1 )"), TTxControl::BeginTx().CommitTx()).GetValueSync(); - - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson( - "[[12u];[12u]]", // numbers MUST be same + + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson( + "[[12u];[12u]]", // numbers MUST be same FormatResultSetYson(result.GetResultSet(0)) - ); - } - + ); + } + Y_UNIT_TEST_NEW_ENGINE(CrossJoinCount) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h index 665ce5b211f..3ffbdc041da 100644 --- a/ydb/library/yql/core/yql_data_provider.h +++ b/ydb/library/yql/core/yql_data_provider.h @@ -177,7 +177,7 @@ struct TYqlOperationOptions; struct TOperationProgress; class TGatewaysConfig; - + using TOperationProgressWriter = std::function<void(const TOperationProgress&)>; enum class ESourceSyntax { diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index 261cff982da..71cc7be6cb8 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -56,8 +56,8 @@ struct TBaseDqExecuterEvents { ES_RESULT_SET, ES_DQ_FAILURE, ES_GRAPH, - ES_GRAPH_FINISHED, - ES_GRAPH_EXECUTION_EVENT, + ES_GRAPH_FINISHED, + ES_GRAPH_EXECUTION_EVENT, }; }; @@ -71,9 +71,9 @@ struct TBaseDqDataEvents { ES_PING_REQUEST, ES_PING_RESPONSE, ES_CONTINUE_RUN, - - ES_FULL_RESULT_WRITER_STATUS_REQUEST, - ES_FULL_RESULT_WRITER_STATUS_RESPONSE, + + ES_FULL_RESULT_WRITER_STATUS_REQUEST, + ES_FULL_RESULT_WRITER_STATUS_RESPONSE, }; }; diff --git a/ydb/library/yql/dq/connections.md b/ydb/library/yql/dq/connections.md index 15fd77355f0..09da3d1d255 100644 --- a/ydb/library/yql/dq/connections.md +++ b/ydb/library/yql/dq/connections.md @@ -4,7 +4,7 @@ SQL-запрос преобразуется во внутреннее предс DQ-граф состоит из стадий (_DqStage_), соединённых рёбрами (коннекшинами типа _DqConnection_). Каждая стадия исполняется в виде некоторого количества тасков (_DqTask_). Например, читающая таблицу стадия может -исполняться в N тасков, где N - это количество шардов таблицы. +исполняться в N тасков, где N - это количество шардов таблицы. # Виды коннекшинов @@ -85,7 +85,7 @@ DQ-граф состоит из стадий (_DqStage_), соединённых | DqStage_1: DqTask_1, ..., DqTask_N | +----------------+--------------+------+ |\ /| - | \ ... / | DqCnHashShuffle + | \ ... / | DqCnHashShuffle | \ / | V V V V +--------------------------------------+ @@ -115,4 +115,4 @@ DQ-граф состоит из стадий (_DqStage_), соединённых Используется только для inplace-update под прагмой. В этом случае обе стадии выполняются на одном шарде и последовательно. ## KqpCnShuffleShard -Задумывалось для межшардового взаимодействия. Нигде не используется. +Задумывалось для межшардового взаимодействия. Нигде не используется. diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index 8824e41dbdf..2c46061f60d 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -19,14 +19,14 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp // Current assumptions: // 1. `Broadcast` can not be the 1st stage input unless it's a single input - // 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll` !!! or `HashShuffle` !!! + // 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll` !!! or `HashShuffle` !!! if (inputIndex == 0) { YQL_ENSURE(stage.Inputs().Size() == 1 || !input.Maybe<NNodes::TDqCnBroadcast>()); } else { - YQL_ENSURE( - input.Maybe<NNodes::TDqCnBroadcast>() || - input.Maybe<NNodes::TDqCnUnionAll>() || - input.Maybe<NNodes::TDqCnHashShuffle>(), "" << input.Ref().Content()); + YQL_ENSURE( + input.Maybe<NNodes::TDqCnBroadcast>() || + input.Maybe<NNodes::TDqCnUnionAll>() || + input.Maybe<NNodes::TDqCnHashShuffle>(), "" << input.Ref().Content()); } if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) { diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 49676339aa9..4da4d13d034 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -55,10 +55,10 @@ void TDqIntegrationBase::FillSinkSettings(const TExprNode& node, ::google::proto } void TDqIntegrationBase::Annotate(const TExprNode& node, THashMap<TString, TString>& params) { - Y_UNUSED(node); + Y_UNUSED(node); Y_UNUSED(params); -} - +} + bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) { Y_UNUSED(root); Y_UNUSED(ctx); diff --git a/ydb/library/yql/providers/common/metrics/metrics_registry.cpp b/ydb/library/yql/providers/common/metrics/metrics_registry.cpp index d42866be6dd..1fb7bdcbba7 100644 --- a/ydb/library/yql/providers/common/metrics/metrics_registry.cpp +++ b/ydb/library/yql/providers/common/metrics/metrics_registry.cpp @@ -46,7 +46,7 @@ private: label->SetValue(labelValue); } - void OnHistogram(const TString& labelName, const TString& labelValue, NMonitoring::IHistogramSnapshotPtr snapshot, bool) override { + void OnHistogram(const TString& labelName, const TString& labelValue, NMonitoring::IHistogramSnapshotPtr snapshot, bool) override { if (Invalidate_) { return; } @@ -195,7 +195,7 @@ public: : false); } - IMetricsRegistryPtr Personalized(const TString& userName) const override { + IMetricsRegistryPtr Personalized(const TString& userName) const override { return new TMetricsRegistryImpl(Sensors_, MakeMaybe(userName)); } diff --git a/ydb/library/yql/providers/dq/actors/actor_helpers.h b/ydb/library/yql/providers/dq/actors/actor_helpers.h index 28f862bf1f5..b92603ab0fb 100644 --- a/ydb/library/yql/providers/dq/actors/actor_helpers.h +++ b/ydb/library/yql/providers/dq/actors/actor_helpers.h @@ -7,17 +7,17 @@ #include <ydb/library/yql/utils/actors/rich_actor.h> #include <ydb/library/yql/utils/log/log.h> - + namespace NYql { -enum EExecutorPoolType { - Main, +enum EExecutorPoolType { + Main, FullResultWriter, - - TotalCount, -}; - -template <typename EventType> + + TotalCount, +}; + +template <typename EventType> struct TRichActorFutureCallback : public TRichActor<TRichActorFutureCallback<EventType>> { using TCallback = std::function<void(TAutoPtr<NActors::TEventHandle<EventType>>&)>; using TFailure = std::function<void(void)>; @@ -79,98 +79,98 @@ private: } }; -template <class TDerived> -class TSynchronizableRichActor : public TRichActor<TDerived> { -public: - using TBase = TRichActor<TDerived>; - template <class TEvType> - using TCallback = std::function<void(typename TEvType::TPtr&)>; - using TAbstractCallback = std::function<void(TAutoPtr<NActors::IEventHandle>&)>; - using THandler = typename TBase::TReceiveFunc; - - enum ESyncState { - E_IDLE, - E_SYNC_REQUESTED, - E_SYNC_RECEIVED, - }; - - template <class... Args> - explicit TSynchronizableRichActor(Args&&... args) - : TRichActor<TDerived>(std::forward<Args>(args)...) {} - - template <class TEvType> - void Synchronize(TCallback<TEvType> callback) { - switch (SyncState_) { - case E_SYNC_REQUESTED: - throw yexception() << "Synchronization was already requested"; - break; - case E_IDLE: - [[fallthrough]]; - case E_SYNC_RECEIVED: - InterruptedHandler_ = TBase::CurrentStateFunc(); - SyncCallback_ = [callback](TAutoPtr<NActors::IEventHandle>& ev) { - auto* x = reinterpret_cast<typename TEvType::TPtr*>(&ev); - callback(*x); - }; - TBase::Become(&TSynchronizableRichActor::SyncHandler); - ExpectedEventType_ = TEvType::EventType; - SyncState_ = E_SYNC_REQUESTED; - break; - } - } - -protected: - void AddCriticalEventType(ui32 type) { - CriticalEventTypes_.insert(type); - } - -private: - THandler InterruptedHandler_{nullptr}; - TDeque<TAutoPtr<NActors::IEventHandle>> DelayedEvents_{}; - TAbstractCallback SyncCallback_{nullptr}; - ESyncState SyncState_{E_IDLE}; - ui32 ExpectedEventType_{0}; - THashSet<ui32> CriticalEventTypes_{}; - - void SyncHandler(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) { - const ui32 etype = ev->GetTypeRewrite(); - if (etype == ExpectedEventType_) { - OnSync(ev, ctx); - } else if (CriticalEventTypes_.contains(etype)) { - (this->*InterruptedHandler_)(ev, ctx); - } else { - EnqueueEvent(ev); - } - } - - void OnSync(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) { - YQL_LOG(DEBUG) << "OnSync(): delayed messages " << DelayedEvents_.size(); - SyncState_ = E_SYNC_RECEIVED; - TBase::Become(InterruptedHandler_); - SyncCallback_(ev); - if (SyncState_ == E_SYNC_REQUESTED) { - return; - } - SyncCallback_ = nullptr; - - while (!DelayedEvents_.empty()) { - auto event = std::move(DelayedEvents_.front()); - DelayedEvents_.pop_front(); - InterruptedHandler_ = TBase::CurrentStateFunc(); - (this->*InterruptedHandler_)(event, ctx); - if (SyncState_ == E_SYNC_REQUESTED) { - return; - } - } - - InterruptedHandler_ = nullptr; - ExpectedEventType_ = 0; - SyncState_ = E_IDLE; - } - - void EnqueueEvent(TAutoPtr<NActors::IEventHandle>& ev) { - DelayedEvents_.emplace_back(ev.Release()); - } -}; - +template <class TDerived> +class TSynchronizableRichActor : public TRichActor<TDerived> { +public: + using TBase = TRichActor<TDerived>; + template <class TEvType> + using TCallback = std::function<void(typename TEvType::TPtr&)>; + using TAbstractCallback = std::function<void(TAutoPtr<NActors::IEventHandle>&)>; + using THandler = typename TBase::TReceiveFunc; + + enum ESyncState { + E_IDLE, + E_SYNC_REQUESTED, + E_SYNC_RECEIVED, + }; + + template <class... Args> + explicit TSynchronizableRichActor(Args&&... args) + : TRichActor<TDerived>(std::forward<Args>(args)...) {} + + template <class TEvType> + void Synchronize(TCallback<TEvType> callback) { + switch (SyncState_) { + case E_SYNC_REQUESTED: + throw yexception() << "Synchronization was already requested"; + break; + case E_IDLE: + [[fallthrough]]; + case E_SYNC_RECEIVED: + InterruptedHandler_ = TBase::CurrentStateFunc(); + SyncCallback_ = [callback](TAutoPtr<NActors::IEventHandle>& ev) { + auto* x = reinterpret_cast<typename TEvType::TPtr*>(&ev); + callback(*x); + }; + TBase::Become(&TSynchronizableRichActor::SyncHandler); + ExpectedEventType_ = TEvType::EventType; + SyncState_ = E_SYNC_REQUESTED; + break; + } + } + +protected: + void AddCriticalEventType(ui32 type) { + CriticalEventTypes_.insert(type); + } + +private: + THandler InterruptedHandler_{nullptr}; + TDeque<TAutoPtr<NActors::IEventHandle>> DelayedEvents_{}; + TAbstractCallback SyncCallback_{nullptr}; + ESyncState SyncState_{E_IDLE}; + ui32 ExpectedEventType_{0}; + THashSet<ui32> CriticalEventTypes_{}; + + void SyncHandler(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) { + const ui32 etype = ev->GetTypeRewrite(); + if (etype == ExpectedEventType_) { + OnSync(ev, ctx); + } else if (CriticalEventTypes_.contains(etype)) { + (this->*InterruptedHandler_)(ev, ctx); + } else { + EnqueueEvent(ev); + } + } + + void OnSync(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) { + YQL_LOG(DEBUG) << "OnSync(): delayed messages " << DelayedEvents_.size(); + SyncState_ = E_SYNC_RECEIVED; + TBase::Become(InterruptedHandler_); + SyncCallback_(ev); + if (SyncState_ == E_SYNC_REQUESTED) { + return; + } + SyncCallback_ = nullptr; + + while (!DelayedEvents_.empty()) { + auto event = std::move(DelayedEvents_.front()); + DelayedEvents_.pop_front(); + InterruptedHandler_ = TBase::CurrentStateFunc(); + (this->*InterruptedHandler_)(event, ctx); + if (SyncState_ == E_SYNC_REQUESTED) { + return; + } + } + + InterruptedHandler_ = nullptr; + ExpectedEventType_ = 0; + SyncState_ = E_IDLE; + } + + void EnqueueEvent(TAutoPtr<NActors::IEventHandle>& ev) { + DelayedEvents_.emplace_back(ev.Release()); + } +}; + } // namespace NYql diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 07369e66e05..892c5cccd70 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -36,10 +36,10 @@ namespace NYql::NDqs { Record = std::move(proto); } - TEvGraphExecutionEvent::TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt) { - Record = evt; - } - + TEvGraphExecutionEvent::TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt) { + Record = evt; + } + TEvPullDataRequest::TEvPullDataRequest(ui32 rowThreshold) { Record.SetRowThreshold(rowThreshold); } @@ -47,8 +47,8 @@ namespace NYql::NDqs { TEvPullDataResponse::TEvPullDataResponse(NYql::NDqProto::TPullResponse& data) { Record.Swap(&data); } - - TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) { - Record.CopyFrom(data); - } + + TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) { + Record.CopyFrom(data); + } } diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index b0e3817e725..594921560ae 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -45,12 +45,12 @@ namespace NYql::NDqs { DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPullResult, ""); }; - struct TEvGraphExecutionEvent - : NActors::TEventPB<TEvGraphExecutionEvent, NYql::NDqProto::TGraphExecutionEvent, TDqExecuterEvents::ES_GRAPH_EXECUTION_EVENT> { - TEvGraphExecutionEvent() = default; - explicit TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt); - }; - + struct TEvGraphExecutionEvent + : NActors::TEventPB<TEvGraphExecutionEvent, NYql::NDqProto::TGraphExecutionEvent, TDqExecuterEvents::ES_GRAPH_EXECUTION_EVENT> { + TEvGraphExecutionEvent() = default; + explicit TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt); + }; + using TDqDataEvents = NDq::TBaseDqDataEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; struct TEvPullDataRequest @@ -75,19 +75,19 @@ namespace NYql::NDqs { TEvPingResponse() = default; }; - struct TEvFullResultWriterStatusRequest - : NActors::TEventPB<TEvFullResultWriterStatusRequest, NYql::NDqProto::TFullResultWriterStatusRequest, - TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_REQUEST> { - TEvFullResultWriterStatusRequest() = default; - }; - - struct TEvFullResultWriterStatusResponse - : NActors::TEventPB<TEvFullResultWriterStatusResponse, NYql::NDqProto::TFullResultWriterStatusResponse, - TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_RESPONSE> { - TEvFullResultWriterStatusResponse() = default; - explicit TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data); - }; - + struct TEvFullResultWriterStatusRequest + : NActors::TEventPB<TEvFullResultWriterStatusRequest, NYql::NDqProto::TFullResultWriterStatusRequest, + TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_REQUEST> { + TEvFullResultWriterStatusRequest() = default; + }; + + struct TEvFullResultWriterStatusResponse + : NActors::TEventPB<TEvFullResultWriterStatusResponse, NYql::NDqProto::TFullResultWriterStatusResponse, + TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_RESPONSE> { + TEvFullResultWriterStatusResponse() = default; + explicit TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data); + }; + struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> { DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, ""); }; diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp index 49bbc83e444..927ac060436 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp @@ -4,7 +4,7 @@ #include <ydb/library/yql/providers/dq/actors/actor_helpers.h> #include <ydb/library/yql/providers/dq/actors/events.h> #include <ydb/library/yql/providers/dq/api/protos/service.pb.h> - + #include <ydb/library/yql/core/issue/yql_issue.h> #include <ydb/library/yql/utils/log/log.h> @@ -14,7 +14,7 @@ #include <library/cpp/actors/core/actor.h> #include <util/generic/size_literals.h> -#include <util/system/env.h> +#include <util/system/env.h> #include <utility> @@ -34,65 +34,65 @@ public: , TraceID(traceId) , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, TVector<TString>())) , FullResultWriter(std::move(writer)) - , AggregatorID(aggregatorId) + , AggregatorID(aggregatorId) { - } - -private: - STRICT_STFUNC(Handler, { - cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) - HFunc(TEvPullDataResponse, OnPullResponse) - HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest) - }) - - void PassAway() override { + } + +private: + STRICT_STFUNC(Handler, { + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + HFunc(TEvPullDataResponse, OnPullResponse) + HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest) + }) + + void PassAway() override { YQL_LOG_CTX_SCOPE(TraceID); YQL_LOG(DEBUG) << __FUNCTION__; - try { - FullResultWriter->Abort(); - } catch (...) { - YQL_LOG(WARN) << "FullResultWriter->Abort(): " << CurrentExceptionMessage(); - } + try { + FullResultWriter->Abort(); + } catch (...) { + YQL_LOG(WARN) << "FullResultWriter->Abort(): " << CurrentExceptionMessage(); + } ResultBuilder.Reset(); - FullResultWriter.Reset(); - - Send(AggregatorID, MakeHolder<NActors::TEvents::TEvGone>()); - + FullResultWriter.Reset(); + + Send(AggregatorID, MakeHolder<NActors::TEvents::TEvGone>()); + NActors::TActor<TThis>::PassAway(); - } - - void OnStatusRequest(TEvFullResultWriterStatusRequest::TPtr&, const NActors::TActorContext&) { + } + + void OnStatusRequest(TEvFullResultWriterStatusRequest::TPtr&, const NActors::TActorContext&) { YQL_LOG_CTX_SCOPE(TraceID); - NDqProto::TFullResultWriterStatusResponse response; - response.SetBytesReceived(BytesReceived); - if (ErrorMessage) { - response.SetErrorMessage(*ErrorMessage); - } - Send(AggregatorID, new TEvFullResultWriterStatusResponse(response)); - } - - void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext&) { + NDqProto::TFullResultWriterStatusResponse response; + response.SetBytesReceived(BytesReceived); + if (ErrorMessage) { + response.SetErrorMessage(*ErrorMessage); + } + Send(AggregatorID, new TEvFullResultWriterStatusResponse(response)); + } + + void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext&) { YQL_LOG_CTX_SCOPE(TraceID); - auto& response = ev->Get()->Record; - - switch (response.GetResponseType()) { - case NDqProto::FINISH: - Finish(); - break; - case NDqProto::CONTINUE: - Continue(response); - break; - default: - YQL_ENSURE(false, "Unsupported pull request"); - break; - } - } - - void Finish() { + auto& response = ev->Get()->Record; + + switch (response.GetResponseType()) { + case NDqProto::FINISH: + Finish(); + break; + case NDqProto::CONTINUE: + Continue(response); + break; + default: + YQL_ENSURE(false, "Unsupported pull request"); + break; + } + } + + void Finish() { YQL_LOG(DEBUG) << __FUNCTION__; - try { + try { TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; }); - FullResultWriter->Finish(); + FullResultWriter->Finish(); if (ErrorMessage) { TIssue issue(*ErrorMessage); issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); @@ -100,44 +100,44 @@ private: } else { Send(AggregatorID, MakeHolder<TEvDqFailure>().Release()); } - } catch (...) { - TIssue issue(CurrentExceptionMessage()); - issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); - if (ErrorMessage) { - issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage)); - } + } catch (...) { + TIssue issue(CurrentExceptionMessage()); + issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); + if (ErrorMessage) { + issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage)); + } Send(AggregatorID, MakeHolder<TEvDqFailure>(issue, false, false).Release()); - } + } Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>()); - } - - void Continue(NDqProto::TPullResponse& response) { - YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount(); - ui64 respSize = response.ByteSizeLong(); - WriteToFullResultTable(response.GetData()); - BytesReceived += respSize; - } - - void WriteToFullResultTable(const NDqProto::TData& data) { - if (ErrorMessage) { - YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting"; - return; - } - - try { + } + + void Continue(NDqProto::TPullResponse& response) { + YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount(); + ui64 respSize = response.ByteSizeLong(); + WriteToFullResultTable(response.GetData()); + BytesReceived += respSize; + } + + void WriteToFullResultTable(const NDqProto::TData& data) { + if (ErrorMessage) { + YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting"; + return; + } + + try { TFailureInjector::Reach("full_result_fail_on_write", [] { throw yexception() << "full_result_fail_on_write"; }); ResultBuilder->WriteData(data, [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) { writer->AddRow(value); return true; }); - } catch (...) { - ErrorMessage = CurrentExceptionMessage(); - } - - if (ErrorMessage) { - YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage; - } - } + } catch (...) { + ErrorMessage = CurrentExceptionMessage(); + } + + if (ErrorMessage) { + YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage; + } + } private: const TString TraceID; THolder<TProtoBuilder> ResultBuilder; @@ -146,16 +146,16 @@ private: ui64 BytesReceived{0}; TMaybe<TString> ErrorMessage; -}; - - -THolder<NActors::IActor> MakeFullResultWriterActor( +}; + + +THolder<NActors::IActor> MakeFullResultWriterActor( const TString& traceId, const TString& resultType, THolder<IDqFullResultWriter>&& writer, const NActors::TActorId& aggregatorId) { return MakeHolder<TFullResultWriterActor>(traceId, resultType, std::move(writer), aggregatorId); -} - +} + } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.h b/ydb/library/yql/providers/dq/actors/full_result_writer.h index 8b317696440..4d4345522ad 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.h +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.h @@ -1,14 +1,14 @@ #pragma once #include <ydb/library/yql/providers/dq/interface/yql_dq_full_result_writer.h> -#include <library/cpp/actors/core/actor.h> - +#include <library/cpp/actors/core/actor.h> + namespace NYql::NDqs { -THolder<NActors::IActor> MakeFullResultWriterActor( +THolder<NActors::IActor> MakeFullResultWriterActor( const TString& traceId, const TString& resultType, THolder<IDqFullResultWriter>&& writer, const NActors::TActorId& aggregatorId); - + } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp index 281283d45e8..3ff4fb16588 100644 --- a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp @@ -1,128 +1,128 @@ -#include "graph_execution_events_actor.h" +#include "graph_execution_events_actor.h" #include "actor_helpers.h" #include "full_result_writer.h" #include "events.h" - + #include <library/cpp/actors/core/event_pb.h> - + #include <util/generic/yexception.h> namespace NYql::NDqs { - -class TGraphExecutionEventsActor : public TRichActor<TGraphExecutionEventsActor> { -public: + +class TGraphExecutionEventsActor : public TRichActor<TGraphExecutionEventsActor> { +public: static constexpr const char ActorName[] = "YQL_DQ_GRAPH_EXECUTION_EVENTS_ACTOR"; - + TGraphExecutionEventsActor(const TString& traceID, std::vector<IDqTaskPreprocessor::TPtr>&& taskPreprocessors) - : TRichActor<TGraphExecutionEventsActor>(&TGraphExecutionEventsActor::Handler) + : TRichActor<TGraphExecutionEventsActor>(&TGraphExecutionEventsActor::Handler) , TraceID(traceID) , TaskPreprocessors(std::move(taskPreprocessors)) { } - -private: + +private: const TString TraceID; std::vector<IDqTaskPreprocessor::TPtr> TaskPreprocessors; - - STRICT_STFUNC(Handler, { - HFunc(NDqs::TEvGraphExecutionEvent, OnEvent); - cFunc(NActors::TEvents::TEvPoison::EventType, PassAway); - }) - + + STRICT_STFUNC(Handler, { + HFunc(NDqs::TEvGraphExecutionEvent, OnEvent); + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway); + }) + void DoPassAway() override { YQL_LOG_CTX_SCOPE(TraceID); } - void OnEvent(NDqs::TEvGraphExecutionEvent::TPtr& ev, const NActors::TActorContext&) { + void OnEvent(NDqs::TEvGraphExecutionEvent::TPtr& ev, const NActors::TActorContext&) { YQL_LOG_CTX_SCOPE(TraceID); YQL_LOG(DEBUG) << __FUNCTION__ << ' ' << NYql::NDqProto::EGraphExecutionEventType_Name(ev->Get()->Record.GetEventType()); - + try { switch (ev->Get()->Record.GetEventType()) { - case NYql::NDqProto::EGraphExecutionEventType::START: { + case NYql::NDqProto::EGraphExecutionEventType::START: { NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor payload; ev->Get()->Record.GetMessage().UnpackTo(&payload); OnStart(ev->Sender, payload); break; } - case NYql::NDqProto::EGraphExecutionEventType::SUCCESS: + case NYql::NDqProto::EGraphExecutionEventType::SUCCESS: OnSuccess(ev->Sender); - break; - case NYql::NDqProto::EGraphExecutionEventType::FAIL: + break; + case NYql::NDqProto::EGraphExecutionEventType::FAIL: OnFail(ev->Sender); - break; + break; case NYql::NDqProto::EGraphExecutionEventType::FULL_RESULT: { NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload; ev->Get()->Record.GetMessage().UnpackTo(&payload); OnFullResult(ev->Sender, payload); break; } - default: + default: Reply(ev->Sender); - break; + break; } } catch (...) { TString message = TStringBuilder() << "Error on TEvGraphExecutionEvent: " << CurrentExceptionMessage(); YQL_LOG(ERROR) << message; Reply(ev->Sender, message); - } - } - - template <class TKey, class TValue> - static THashMap<TKey, TValue> AsHashMap(const google::protobuf::Map<TKey, TValue>& map) { - return THashMap<TKey, TValue>(map.begin(), map.end()); - } - + } + } + + template <class TKey, class TValue> + static THashMap<TKey, TValue> AsHashMap(const google::protobuf::Map<TKey, TValue>& map) { + return THashMap<TKey, TValue>(map.begin(), map.end()); + } + void OnStart(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor& payload) { YQL_LOG(DEBUG) << __FUNCTION__; - const auto& secureParams = AsHashMap(payload.GetSecureParams().GetData()); + const auto& secureParams = AsHashMap(payload.GetSecureParams().GetData()); const auto& graphParams = AsHashMap(payload.GetGraphParams().GetData()); - + NDqProto::TGraphExecutionEvent::TMap response; - + auto* addedParams = response.MutableData(); for (const auto& preprocessor: TaskPreprocessors) { auto newParams = preprocessor->GetTaskParams(graphParams, secureParams); for (const auto& [k, v] : newParams) { addedParams->insert({k, v}); - } - } - + } + } + Reply(replyTo, response); - } - + } + void Reply(NActors::TActorId replyTo, TString msg = "") const { - NYql::NDqProto::TGraphExecutionEvent sync; - sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC); + NYql::NDqProto::TGraphExecutionEvent sync; + sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC); if (msg) { sync.SetErrorMessage(msg); } Send(replyTo, MakeHolder<NDqs::TEvGraphExecutionEvent>(sync)); - } - - template <class TPayload> + } + + template <class TPayload> void Reply(NActors::TActorId replyTo, const TPayload& resp) const { - NYql::NDqProto::TGraphExecutionEvent sync; - sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC); - sync.MutableMessage()->PackFrom(resp); + NYql::NDqProto::TGraphExecutionEvent sync; + sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC); + sync.MutableMessage()->PackFrom(resp); Send(replyTo, MakeHolder<NDqs::TEvGraphExecutionEvent>(sync)); - } - + } + void OnFail(NActors::TActorId replyTo) { YQL_LOG(DEBUG) << __FUNCTION__; for (const auto& preprocessor: TaskPreprocessors) { preprocessor->Finish(false); - } + } Reply(replyTo); - } - + } + void OnSuccess(NActors::TActorId replyTo) { YQL_LOG(DEBUG) << __FUNCTION__; for (const auto& preprocessor: TaskPreprocessors) { preprocessor->Finish(true); - } + } Reply(replyTo); - } + } void OnFullResult(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TFullResultDescriptor& payload) { YQL_LOG(DEBUG) << __FUNCTION__; @@ -143,10 +143,10 @@ private: Reply(replyTo, TString{"Failed to create full result writer"}); } } -}; - +}; + NActors::IActor* MakeGraphExecutionEventsActor(const TString& traceID, std::vector<IDqTaskPreprocessor::TPtr>&& taskPreprocessors) { return new TGraphExecutionEventsActor(traceID, std::move(taskPreprocessors)); -} - +} + } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h index fb042e6ed3e..73e6eba0fb0 100644 --- a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h +++ b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h @@ -1,10 +1,10 @@ -#pragma once - -#include <library/cpp/actors/core/actor.h> +#pragma once + +#include <library/cpp/actors/core/actor.h> #include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> - + namespace NYql::NDqs { - + NActors::IActor* MakeGraphExecutionEventsActor(const TString& traceID, std::vector<IDqTaskPreprocessor::TPtr>&& taskPreprocessors); - + } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp index b49717ed918..e4b16403c3b 100644 --- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp +++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp @@ -140,7 +140,7 @@ bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::f } return true; } - + Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDqProto::TData>& data) { Ydb::ResultSet resultSet; auto structType = AS_TYPE(TStructType, ResultType); @@ -150,8 +150,8 @@ Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDqProto::TData const ui32 memberIndex = ColumnOrder.empty() ? i : ColumnOrder[i]; column.set_name(TString(structType->GetMemberName(memberIndex))); ExportTypeToProto(structType->GetMemberType(memberIndex), *column.mutable_type()); - } - + } + WriteData(data, [&](const NYql::NUdf::TUnboxedValuePod& value) { ExportValueToProto(ResultType, value, *resultSet.add_rows(), &ColumnOrder); return true; diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index 40b9a389fee..59ae95c3af5 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -94,7 +94,7 @@ private: YQL_LOG_CTX_SCOPE(TraceId); \ YQL_LOG(DEBUG) << "Unexpected event " << ( #TEvType ); \ }) - + STRICT_STFUNC(Handler, { HFunc(TEvPullResult, OnPullResult); HFunc(TEvReadyState, OnReadyState); @@ -114,7 +114,7 @@ private: cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown) }) - + STRICT_STFUNC(ShutdownHandler, { HFunc(TEvents::TEvGone, OnShutdownQueryResult); HANDLER_STUB(TEvPullResult) @@ -127,7 +127,7 @@ private: HANDLER_STUB(TEvents::TEvUndelivered) HANDLER_STUB(TEvents::TEvWakeup) }) - + void DoPassAway() override { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; @@ -147,7 +147,7 @@ private: if (PullRequestTimeout && now - PullRequestStartTime > PullRequestTimeout) { OnError("Timeout " + ToString(SourceID.NodeId()), true, true); } - + if (PingTimeout && now - PingStartTime > PingTimeout) { OnError("PingTimeout " + ToString(SourceID.NodeId()), true, true); } @@ -176,7 +176,7 @@ private: AddCriticalEventType(TEvents::TEvWakeup::EventType); AddCriticalEventType(TEvPingResponse::EventType); } - + void OnPullResult(TEvPullResult::TPtr&, const TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); PullRequestStartTime = TInstant::Now(); @@ -274,17 +274,17 @@ private: issue.Severity = TSeverityIds::S_WARNING; Finish(/*truncated = */ true, {issue}); } - } - } + } + } } } - + void FlushCurrent() { YQL_LOG(DEBUG) << __FUNCTION__; YQL_ENSURE(!Truncated); YQL_ENSURE(!FullResultWriterID); YQL_ENSURE(Settings->EnableFullResultWrite.Get().GetOrElse(false)); - + NDqProto::TGraphExecutionEvent record; record.SetEventType(NDqProto::EGraphExecutionEventType::FULL_RESULT); NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload; @@ -294,7 +294,7 @@ private: Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) { Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); YQL_LOG_CTX_SCOPE(TraceId); - + if (auto msg = ev->Get()->Record.GetErrorMessage()) { OnError(msg, false, true); } else { @@ -303,10 +303,10 @@ private: FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto); Truncated = true; WriteAllDataPartsToFullResultTable(); - } + } }); } - + bool CanSendToFullResultWriter() { // TODO Customize return FullResultSentBytes - FullResultReceivedBytes <= 32_MB; @@ -328,11 +328,11 @@ private: YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage(); OnError(ev->Get()->Record.GetErrorMessage(), false, false); return; - } + } callback(); }); } - + void WriteAllDataPartsToFullResultTable() { while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) { UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]); @@ -348,7 +348,7 @@ private: WriteAllDataPartsToFullResultTable(); }); } - + void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) { if (CanSendToFullResultWriter()) { UnsafeWriteToFullResultTable(*data); @@ -358,7 +358,7 @@ private: WriteToFullResultTable(data); }); } - + void UnsafeWriteToFullResultTable(const NDqProto::TData& data) { NDqProto::TPullResponse response; response.SetResponseType(EPullResponseType::CONTINUE); @@ -401,47 +401,47 @@ private: YQL_LOG_CTX_SCOPE(TraceId); YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); YQL_LOG(DEBUG) << "Shutting down TResultAggregator"; - + BlockingActors.clear(); if (FullResultWriterID) { BlockingActors.insert(FullResultWriterID); Send(FullResultWriterID, MakeHolder<TEvents::TEvPoison>()); } - + YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors"; QueryResponse.Reset(ev->Release().Release()); Become(&TResultAggregator::ShutdownHandler); Send(SelfId(), MakeHolder<TEvents::TEvGone>()); } - + void OnShutdownQueryResult(TEvents::TEvGone::TPtr& ev, const TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); auto iter = BlockingActors.find(ev->Sender); if (iter != BlockingActors.end()) { BlockingActors.erase(iter); } - + YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left"; - + if (BlockingActors.empty()) { EndOnQueryResult(); } } - + void EndOnQueryResult() { YQL_LOG(DEBUG) << __FUNCTION__; NDqProto::TQueryResponse result = QueryResponse->Record; - + YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); FlushCounters(result); - + if (ResultYsonWriter) { ResultYsonWriter->OnEndList(); ResultYsonWriter.Destroy(); } ResultYsonOut.Destroy(); - + *result.MutableYson() = ResultYson; if (!FinishIssues.Empty()) { @@ -486,11 +486,11 @@ private: THashSet<TActorId> BlockingActors; THolder<TEvQueryResponse> QueryResponse; }; - + class TResultPrinter: public TActor<TResultPrinter> { public: static constexpr char ActorName[] = "YQL_DQ_RESULT_PRINTER"; - + TResultPrinter(IOutputStream& output, NThreading::TPromise<void>& promise) : TActor<TResultPrinter>(&TResultPrinter::Handler) , Output(output) diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.h b/ydb/library/yql/providers/dq/actors/result_aggregator.h index c69ab078296..450bc2de88b 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.h +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.h @@ -14,7 +14,7 @@ namespace NYql::NDqs::NExecutionHelpers { const TString& traceId, const THashMap<TString, TString>& secureParams, const TDqConfiguration::TPtr& settings, - const TString& resultType, + const TString& resultType, bool discard, const NActors::TActorId& graphExecutionEventsId); } // namespace NYql::NDqs::NExecutionHelpers diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp index 19d98f84029..7fdb73f257b 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp +++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp @@ -74,7 +74,7 @@ private: void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); - + auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>(); if (!Discard) { @@ -166,9 +166,9 @@ private: TIssues Issues; ui64 Size = 0; ui64 Rows = 0; - // const Yql::DqsProto::TFullResultTable FullResultTable; + // const Yql::DqsProto::TFullResultTable FullResultTable; const THashMap<TString, TString> SecureParams; - // THolder<IFullResultWriter> FullResultWriter; + // THolder<IFullResultWriter> FullResultWriter; THolder<TProtoBuilder> ResultBuilder; bool Discard = false; }; diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.h b/ydb/library/yql/providers/dq/actors/result_receiver.h index 1a8c49b7b01..968bcede101 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.h +++ b/ydb/library/yql/providers/dq/actors/result_receiver.h @@ -12,7 +12,7 @@ THolder<NActors::IActor> MakeResultReceiver( const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, -// const Yql::DqsProto::TFullResultTable& resultTable, +// const Yql::DqsProto::TFullResultTable& resultTable, const THashMap<TString, TString>& secureParams, const TString& resultBuilder, bool discard diff --git a/ydb/library/yql/providers/dq/actors/ya.make b/ydb/library/yql/providers/dq/actors/ya.make index dab7b1395df..14e93b587da 100644 --- a/ydb/library/yql/providers/dq/actors/ya.make +++ b/ydb/library/yql/providers/dq/actors/ya.make @@ -3,14 +3,14 @@ LIBRARY() OWNER(g:yql) SRCS( - compute_actor.cpp + compute_actor.cpp events.cpp executer_actor.cpp execution_helpers.cpp - graph_execution_events_actor.cpp + graph_execution_events_actor.cpp resource_allocator.cpp - task_controller.cpp - worker_actor.cpp + task_controller.cpp + worker_actor.cpp result_aggregator.cpp result_receiver.cpp full_result_writer.cpp diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 3db88fca9d8..09aecfcf923 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -3,7 +3,7 @@ option cc_enable_arenas = true; package NYql.NDqProto; -import "google/protobuf/any.proto"; +import "google/protobuf/any.proto"; import "ydb/library/yql/dq/proto/dq_transport.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; import "ydb/library/yql/providers/dq/api/protos/service.proto"; @@ -205,41 +205,41 @@ message TReadyState { repeated NActorsProto.TActorId ActorId = 5; // used for compute actor } -enum EGraphExecutionEventType { - SYNC = 0; - START = 1; - FAIL = 2; - SUCCESS = 3; +enum EGraphExecutionEventType { + SYNC = 0; + START = 1; + FAIL = 2; + SUCCESS = 3; FULL_RESULT = 4; -} - -message TGraphExecutionEvent { - EGraphExecutionEventType EventType = 1; - google.protobuf.Any Message = 2; +} + +message TGraphExecutionEvent { + EGraphExecutionEventType EventType = 1; + google.protobuf.Any Message = 2; optional string ErrorMessage = 3; - - message TMap { - map<string, bytes> Data = 1; - } - - message TExecuteGraphDescriptor { + + message TMap { + map<string, bytes> Data = 1; + } + + message TExecuteGraphDescriptor { TMap SecureParams = 1; TMap GraphParams = 2; - } + } message TFullResultDescriptor { bytes ResultType = 1; } -} - -message TFullResultWriterStatusRequest { -} - -message TFullResultWriterStatusResponse { - uint64 BytesReceived = 1; - optional string ErrorMessage = 2; -} - +} + +message TFullResultWriterStatusRequest { +} + +message TFullResultWriterStatusResponse { + uint64 BytesReceived = 1; + optional string ErrorMessage = 2; +} + message TDqTaskPrepareResult { bool Result = 1; repeated TMetric Metric = 2; diff --git a/ydb/library/yql/providers/dq/interface/ya.make b/ydb/library/yql/providers/dq/interface/ya.make index 4fb104c900e..8d72c16c03a 100644 --- a/ydb/library/yql/providers/dq/interface/ya.make +++ b/ydb/library/yql/providers/dq/interface/ya.make @@ -4,7 +4,7 @@ OWNER(g:yql) SRCS( yql_dq_integration.cpp - yql_dq_task_preprocessor.cpp + yql_dq_task_preprocessor.cpp yql_dq_task_transform.cpp ) diff --git a/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h b/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h index f4f5aa03324..f70d4f6a917 100644 --- a/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h +++ b/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h @@ -1,25 +1,25 @@ -#pragma once - +#pragma once + #include "yql_dq_full_result_writer.h" -#include <util/generic/ptr.h> - +#include <util/generic/ptr.h> + #include <vector> -namespace NYql { - -class IDqTaskPreprocessor : public TThrRefBase { -public: - using TPtr = TIntrusivePtr<IDqTaskPreprocessor>; - - virtual ~IDqTaskPreprocessor() = default; - +namespace NYql { + +class IDqTaskPreprocessor : public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IDqTaskPreprocessor>; + + virtual ~IDqTaskPreprocessor() = default; + virtual THashMap<TString, TString> GetTaskParams(const THashMap<TString, TString>& graphParams, const THashMap<TString, TString>& secureParams) = 0; virtual void Finish(bool success) = 0; virtual THolder<IDqFullResultWriter> CreateFullResultWriter() = 0; -}; - -using TDqTaskPreprocessorFactory = std::function<IDqTaskPreprocessor::TPtr()>; +}; + +using TDqTaskPreprocessorFactory = std::function<IDqTaskPreprocessor::TPtr()>; using TDqTaskPreprocessorFactoryCollection = std::vector<TDqTaskPreprocessorFactory>; - + } // namespace NYql diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index ba0d6a8ac8b..1a8d3172e41 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -86,8 +86,8 @@ public: TActorSetupCmd(resman, TMailboxType::Simple, 0)); ServiceNode->StartActorSystem(); - - ServiceNode->StartService(dqTaskPreprocessorFactories); + + ServiceNode->StartService(dqTaskPreprocessorFactories); } ~TLocalServiceHolder() @@ -115,7 +115,7 @@ public: return Gateway->CloseSession(sessionId); } - NThreading::TFuture<TResult> + NThreading::TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::IDqsExecutionPlanner& plan, const TVector<TString>& columns, const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, const TDqSettings::TPtr& settings, diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index f796f341fe4..858e3da45a0 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -35,10 +35,10 @@ public: AddHandler(0, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<false>)); AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput)); AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage)); - AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft)); + AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft)); AddHandler(0, &TDqJoin::Match, HNDL(PushJoinToStage<false>)); AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>)); - AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>)); + AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>)); AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>)); AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>)); AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); @@ -58,7 +58,7 @@ public: AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>)); AddHandler(1, &TDqJoin::Match, HNDL(PushJoinToStage<true>)); AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>)); - AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>)); + AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>)); AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); @@ -261,10 +261,10 @@ protected: return DqBuildExtendStage(node, ctx); } - TMaybeNode<TExprBase> RewriteRightJoinToLeft(TExprBase node, TExprContext& ctx) { - return DqRewriteRightJoinToLeft(node, ctx); - } - + TMaybeNode<TExprBase> RewriteRightJoinToLeft(TExprBase node, TExprContext& ctx) { + return DqRewriteRightJoinToLeft(node, ctx); + } + template <bool IsGlobal> TMaybeNode<TExprBase> PushJoinToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqPushJoinToStage(node, ctx, optCtx, *getParents(), IsGlobal); @@ -273,21 +273,21 @@ protected: template <bool IsGlobal> TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { auto join = node.Cast<TDqJoin>(); - const TParentsMap* parentsMap = getParents(); - if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) { + const TParentsMap* parentsMap = getParents(); + if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) { return node; } return DqBuildPhyJoin(join, false /* TODO */, ctx, optCtx); - } - - template <bool IsGlobal> - TMaybeNode<TExprBase> BuildJoinDict(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - Y_UNUSED(optCtx); - - auto join = node.Cast<TDqJoin>(); + } + + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildJoinDict(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + Y_UNUSED(optCtx); + + auto join = node.Cast<TDqJoin>(); const TParentsMap* parentsMap = getParents(); - if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) { + if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) { return node; } @@ -302,24 +302,24 @@ protected: return DqBuildScalarPrecompute(node, ctx, optCtx); } -private: - bool JoinPrerequisitesVerify(TDqJoin join, const TParentsMap* parentsMap, bool isGlobal) const { - // KqpBuildJoin copy/paste - if (!join.LeftInput().Maybe<TDqCnUnionAll>()) { - return false; - } - if (!join.RightInput().Maybe<TDqCnUnionAll>()) { - return false; - } - - if (!IsSingleConsumerConnection(join.LeftInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) { - return false; - } - if (!IsSingleConsumerConnection(join.RightInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) { - return false; - } - return true; - } +private: + bool JoinPrerequisitesVerify(TDqJoin join, const TParentsMap* parentsMap, bool isGlobal) const { + // KqpBuildJoin copy/paste + if (!join.LeftInput().Maybe<TDqCnUnionAll>()) { + return false; + } + if (!join.RightInput().Maybe<TDqCnUnionAll>()) { + return false; + } + + if (!IsSingleConsumerConnection(join.LeftInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) { + return false; + } + if (!IsSingleConsumerConnection(join.RightInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) { + return false; + } + return true; + } }; THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx) { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index fcf950cb416..2341b28ad44 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -228,7 +228,7 @@ public: }); } - NThreading::TFuture<TResult> + NThreading::TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::IDqsExecutionPlanner& plan, const TVector<TString>& columns, const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, const TDqSettings::TPtr& settings, @@ -257,12 +257,12 @@ public: *queryPB.AddColumns() = column; } settings->Save(queryPB); - - { + + { auto& secParams = *queryPB.MutableSecureParams(); - for (const auto&[k, v] : secureParams) { - secParams[k] = v; - } + for (const auto&[k, v] : secureParams) { + secParams[k] = v; + } } { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index d1e313b9129..d562776033f 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -63,13 +63,13 @@ public: virtual void CloseSession(const TString& sessionId) = 0; - virtual NThreading::TFuture<TResult> + virtual NThreading::TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::IDqsExecutionPlanner& plan, const TVector<TString>& columns, const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, bool discard) = 0; - + virtual TString GetVanillaJobPath() { return ""; } diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index b6de6cf2e2c..65a9e500d9d 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -56,7 +56,7 @@ namespace NYql::NDqs { } template<typename RequestType, typename ResponseType> - class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> { + class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> { public: static constexpr char ActorName[] = "SERVICE_PROXY"; @@ -64,7 +64,7 @@ namespace NYql::NDqs { NGrpc::IRequestContextBase* ctx, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, const TString& traceId, const TString& username) - : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler) + : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler) , Ctx(ctx) , Counters(counters) , ServiceProxyActorCounters(counters->GetSubgroup("component", "ServiceProxyActor")) @@ -157,18 +157,18 @@ namespace NYql::NDqs { } auto aggregatedQueryStat = AggregateQueryStatsByStage(QueryStat, Task2Stage); - aggregatedQueryStat.FlushCounters(ResponseBuffer); + aggregatedQueryStat.FlushCounters(ResponseBuffer); - auto& operation = *ResponseBuffer.mutable_operation(); + auto& operation = *ResponseBuffer.mutable_operation(); operation.Setready(true); operation.Mutableresult()->PackFrom(queryResult); *operation.Mutableissues() = result.GetIssues(); - ResponseBuffer.SetTruncated(result.GetTruncated()); + ResponseBuffer.SetTruncated(result.GetTruncated()); Reply(Ydb::StatusIds::SUCCESS, result.GetIssues().size() > 0); } - virtual void DoRetry() + virtual void DoRetry() { this->CleanupChildren(); auto selfId = this->SelfId(); @@ -178,7 +178,7 @@ namespace NYql::NDqs { } void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) { - Y_UNUSED(ctx); + Y_UNUSED(ctx); YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << ev->Get()->Record.GetMetric().size(); QueryStat.AddCounters(ev->Get()->Record); @@ -187,7 +187,7 @@ namespace NYql::NDqs { NYql::TIssues issues; NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); YQL_LOG(WARN) << "Retry " << Retry << " Issues: " << issues.ToString(); - DoRetry(); + DoRetry(); } else { if (ev->Get()->Record.GetIssues().size() > 0) { NYql::TIssues issues; @@ -207,21 +207,21 @@ namespace NYql::NDqs { return Promise.GetFuture(); } - virtual void ReplyError(grpc::StatusCode code, const TString& msg) { + virtual void ReplyError(grpc::StatusCode code, const TString& msg) { Ctx->ReplyError(code, msg); this->PassAway(); } virtual void Reply(ui32 status, bool hasIssues) { Y_UNUSED(hasIssues); - Ctx->Reply(&ResponseBuffer, status); + Ctx->Reply(&ResponseBuffer, status); this->PassAway(); } private: NGrpc::IRequestContextBase* Ctx; bool CtxSubscribed = false; - ResponseType ResponseBuffer; + ResponseType ResponseBuffer; protected: TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; @@ -245,27 +245,27 @@ namespace NYql::NDqs { NYql::TCounters QueryStat; THashMap<ui64, ui64> Task2Stage; - - void RestoreRequest() { - Request = dynamic_cast<const RequestType*>(Ctx->GetRequest()); - } + + void RestoreRequest() { + Request = dynamic_cast<const RequestType*>(Ctx->GetRequest()); + } }; class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> { public: - using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>; + using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>; TExecuteGraphProxyActor(NGrpc::IRequestContextBase* ctx, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, - const TString& traceId, const TString& username, - const NActors::TActorId& graphExecutionEventsActorId) + const TString& traceId, const TString& username, + const NActors::TActorId& graphExecutionEventsActorId) : TServiceProxyActor(ctx, counters, traceId, username) - , GraphExecutionEventsActorId(graphExecutionEventsActorId) - { - } + , GraphExecutionEventsActorId(graphExecutionEventsActorId) + { + } - void DoRetry() override { + void DoRetry() override { YQL_LOG(DEBUG) << __FUNCTION__; - SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) { + SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) { if (ev->Get()->Record.GetErrorMessage()) { TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); } else { @@ -273,9 +273,9 @@ namespace NYql::NDqs { ModifiedRequest.Reset(); TBase::DoRetry(); } - }); - } - + }); + } + void Reply(ui32 status, bool hasIssues) override { auto eventType = hasIssues ? NYql::NDqProto::EGraphExecutionEventType::FAIL @@ -286,39 +286,39 @@ namespace NYql::NDqs { } else { TBase::Reply(status, hasIssues); } - }); - } - - void ReplyError(grpc::StatusCode code, const TString& msg) override { - SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) { - Y_UNUSED(ev); - TBase::ReplyError(code, msg); - }); - } - + }); + } + + void ReplyError(grpc::StatusCode code, const TString& msg) override { + SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) { + Y_UNUSED(ev); + TBase::ReplyError(code, msg); + }); + } + private: THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest; - - void DoPassAway() override { + + void DoPassAway() override { YQL_LOG(DEBUG) << __FUNCTION__; - Send(GraphExecutionEventsActorId, new TEvents::TEvPoison()); - TServiceProxyActor::DoPassAway(); - } - + Send(GraphExecutionEventsActorId, new TEvents::TEvPoison()); + TServiceProxyActor::DoPassAway(); + } + NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor SerializeGraphDescriptor() const { - NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result; - - for (const auto& kv : Request->GetSecureParams()) { - result.MutableSecureParams()->MutableData()->insert(kv); - } - + NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result; + + for (const auto& kv : Request->GetSecureParams()) { + result.MutableSecureParams()->MutableData()->insert(kv); + } + for (const auto& kv : Request->GetGraphParams()) { result.MutableGraphParams()->MutableData()->insert(kv); } - return result; - } - + return result; + } + void Bootstrap() override { YQL_LOG(DEBUG) << "TServiceProxyActor::OnExecuteGraph"; @@ -330,9 +330,9 @@ namespace NYql::NDqs { ev->Get()->Record.GetMessage().UnpackTo(¶ms); FinishBootstrap(params); } - }); - } - + }); + } + void MergeTaskMetas(const NDqProto::TGraphExecutionEvent::TMap& params) { if (!params.data().empty()) { for (size_t i = 0; i < Request->TaskSize(); ++i) { @@ -340,29 +340,29 @@ namespace NYql::NDqs { ModifiedRequest.Reset(new Yql::DqsProto::ExecuteGraphRequest()); ModifiedRequest->CopyFrom(*Request); } - + auto* task = ModifiedRequest->MutableTask(i); - + Yql::DqsProto::TTaskMeta taskMeta; task->GetMeta().UnpackTo(&taskMeta); - + for (const auto&[key, value] : params.data()) { (*taskMeta.MutableTaskParams())[key] = value; } - + task->MutableMeta()->PackFrom(taskMeta); - } - } - - if (ModifiedRequest) { - Request = ModifiedRequest.Get(); - } - } - + } + } + + if (ModifiedRequest) { + Request = ModifiedRequest.Get(); + } + } + void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) { YQL_LOG(DEBUG) << __FUNCTION__; MergeTaskMetas(params); - + auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); TVector<TString> columns; @@ -379,7 +379,7 @@ namespace NYql::NDqs { for (const auto& x : Request->GetSecureParams()) { secureParams[x.first] = x.second; } - auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator( + auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator( columns, executerId, TraceId, @@ -395,23 +395,23 @@ namespace NYql::NDqs { controlId, resultId)); } - - template <class TPayload, class TCallback> - void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) { - NDqProto::TGraphExecutionEvent record; - record.SetEventType(eventType); - if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) { - record.MutableMessage()->PackFrom(payload); - } - Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record)); + + template <class TPayload, class TCallback> + void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) { + NDqProto::TGraphExecutionEvent record; + record.SetEventType(eventType); + if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) { + record.MutableMessage()->PackFrom(payload); + } + Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record)); Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) { YQL_LOG_CTX_SCOPE(traceId); - Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); - callback(ev); - }); - } - - NActors::TActorId GraphExecutionEventsActorId; + Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); + callback(ev); + }); + } + + NActors::TActorId GraphExecutionEventsActorId; }; TString GetVersionString() { @@ -427,13 +427,13 @@ namespace NYql::NDqs { } } - TDqsGrpcService::TDqsGrpcService( - NActors::TActorSystem& system, - TIntrusivePtr<NMonitoring::TDynamicCounters> counters, - const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) + TDqsGrpcService::TDqsGrpcService( + NActors::TActorSystem& system, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) : ActorSystem(system) , Counters(std::move(counters)) - , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories) + , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories) , Promise(NewPromise<void>()) , RunningRequests(0) , Stopping(false) @@ -460,7 +460,7 @@ namespace NYql::NDqs { CQ = cq; using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>; - + ADD_REQUEST(ExecuteGraph, ExecuteGraphRequest, ExecuteGraphResponse, { TGuard<TMutex> lock(Mutex); if (Stopping) { @@ -478,13 +478,13 @@ namespace NYql::NDqs { return; } - TDqTaskPreprocessorCollection taskPreprocessors; + TDqTaskPreprocessorCollection taskPreprocessors; for (const auto& factory: DqTaskPreprocessorFactories) { taskPreprocessors.push_back(factory()); - } - + } + auto graphExecutionEventsActorId = ActorSystem.Register(NDqs::MakeGraphExecutionEventsActor(request->GetSession(), std::move(taskPreprocessors))); - + RunningRequests++; auto actor = MakeHolder<TExecuteGraphProxyActor>(ctx, Counters, request->GetSession(), session->GetUsername(), graphExecutionEventsActorId); auto future = actor->GetFuture(); diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h index 8997b723b49..fa2a835c8c6 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.h +++ b/ydb/library/yql/providers/dq/service/grpc_service.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> - + #include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h> #include <ydb/library/yql/providers/dq/api/protos/service.pb.h> @@ -24,8 +24,8 @@ namespace NYql::NDqs { class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> { public: TDqsGrpcService(NActors::TActorSystem& system, - TIntrusivePtr<NMonitoring::TDynamicCounters> counters, - const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; @@ -41,7 +41,7 @@ namespace NYql::NDqs { NGrpc::TGlobalLimiter* Limiter = nullptr; TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; - TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories; + TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories; TMutex Mutex; NThreading::TPromise<void> Promise; std::atomic<ui64> RunningRequests; diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp index 1f68a427589..cda24ec6cf7 100644 --- a/ydb/library/yql/providers/dq/service/service_node.cpp +++ b/ydb/library/yql/providers/dq/service/service_node.cpp @@ -109,7 +109,7 @@ namespace NYql { return ActorSystem.Get(); } - void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) { + void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) { class TCustomOption : public grpc::ServerBuilderOption { public: TCustomOption() { } @@ -149,7 +149,7 @@ namespace NYql { .SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER Server = MakeHolder<TGRpcServer>(options); - Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories)); + Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories)); Server->AddService(Service); Server->Start(); } diff --git a/ydb/library/yql/providers/dq/service/service_node.h b/ydb/library/yql/providers/dq/service/service_node.h index 9c6ac5df5fb..32f904901dc 100644 --- a/ydb/library/yql/providers/dq/service/service_node.h +++ b/ydb/library/yql/providers/dq/service/service_node.h @@ -26,7 +26,7 @@ namespace NYql { void AddLocalService(NActors::TActorId actorId, const NActors::TActorSetupCmd& service); NActors::TActorSystem* StartActorSystem(void* appData = nullptr); - void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); + void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); void Stop(TDuration time = TDuration::Max()); |