aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorogorod <ogorod@yandex-team.ru>2022-02-10 16:52:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:17 +0300
commit993eb9943ef23adacc29ccc3cb2fbb943ce2d06e (patch)
treeab7fbbf3253d4c0e2793218f09378908beb025fb
parentd826f6b7bb2a949001eda2068f1e8c68072efa19 (diff)
downloadydb-993eb9943ef23adacc29ccc3cb2fbb943ce2d06e.tar.gz
Restoring authorship annotation for <ogorod@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--ydb/core/kqp/ut/kqp_explain_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/kqp_join_ut.cpp46
-rw-r--r--ydb/library/yql/core/yql_data_provider.h2
-rw-r--r--ydb/library/yql/dq/common/dq_common.h10
-rw-r--r--ydb/library/yql/dq/connections.md6
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h10
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp6
-rw-r--r--ydb/library/yql/providers/common/metrics/metrics_registry.cpp4
-rw-r--r--ydb/library/yql/providers/dq/actors/actor_helpers.h204
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp16
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h38
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.cpp178
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.h8
-rw-r--r--ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp120
-rw-r--r--ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h12
-rw-r--r--ydb/library/yql/providers/dq/actors/proto_builder.cpp6
-rw-r--r--ydb/library/yql/providers/dq/actors/result_aggregator.cpp54
-rw-r--r--ydb/library/yql/providers/dq/actors/result_aggregator.h2
-rw-r--r--ydb/library/yql/providers/dq/actors/result_receiver.cpp6
-rw-r--r--ydb/library/yql/providers/dq/actors/result_receiver.h2
-rw-r--r--ydb/library/yql/providers/dq/actors/ya.make8
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto56
-rw-r--r--ydb/library/yql/providers/dq/interface/ya.make2
-rw-r--r--ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h32
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp6
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp70
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp12
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.h4
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp186
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.h8
-rw-r--r--ydb/library/yql/providers/dq/service/service_node.cpp4
-rw-r--r--ydb/library/yql/providers/dq/service/service_node.h2
32 files changed, 561 insertions, 561 deletions
diff --git a/ydb/core/kqp/ut/kqp_explain_ut.cpp b/ydb/core/kqp/ut/kqp_explain_ut.cpp
index 78db386cd4b..c3a06836367 100644
--- a/ydb/core/kqp/ut/kqp_explain_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_explain_ut.cpp
@@ -545,7 +545,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
NJson::TJsonValue plan;
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
- auto join = FindPlanNodeByKv(plan, "Node Type", "FullJoin (JoinDict)");
+ auto join = FindPlanNodeByKv(plan, "Node Type", "FullJoin (JoinDict)");
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
diff --git a/ydb/core/kqp/ut/kqp_join_ut.cpp b/ydb/core/kqp/ut/kqp_join_ut.cpp
index e084d0a3c08..8fa59d31b36 100644
--- a/ydb/core/kqp/ut/kqp_join_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_join_ut.cpp
@@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
.EndList().Build()
.Build();
-
+
const TString query = Q_(R"(
DECLARE $in AS List<Struct<k: Int32>>;
SELECT * FROM AS_TABLE($in) AS t1
@@ -915,31 +915,31 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
])", FormatResultSetYson(result.GetResultSet(0)));
}
- Y_UNIT_TEST_NEW_ENGINE(FullOuterJoinSizeCheck) {
- TKikimrRunner kikimr;
- auto db = kikimr.GetTableClient();
- auto session = db.CreateSession().GetValueSync().GetSession();
- CreateSampleTables(session);
-
+ Y_UNIT_TEST_NEW_ENGINE(FullOuterJoinSizeCheck) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ CreateSampleTables(session);
+
auto result = session.ExecuteDataQuery(Q_(R"(
- SELECT COUNT(*)
- FROM `/Root/Join1_1` as left
- FULL OUTER JOIN `/Root/Join1_2` as right
- ON left.Fk21 = right.Key1
- UNION ALL
- SELECT COUNT(*)
- FROM `/Root/Join1_2` as right
- FULL OUTER JOIN `/Root/Join1_1` as left
- ON left.Fk21 = right.Key1
+ SELECT COUNT(*)
+ FROM `/Root/Join1_1` as left
+ FULL OUTER JOIN `/Root/Join1_2` as right
+ ON left.Fk21 = right.Key1
+ UNION ALL
+ SELECT COUNT(*)
+ FROM `/Root/Join1_2` as right
+ FULL OUTER JOIN `/Root/Join1_1` as left
+ ON left.Fk21 = right.Key1
)"), TTxControl::BeginTx().CommitTx()).GetValueSync();
-
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(
- "[[12u];[12u]]", // numbers MUST be same
+
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(
+ "[[12u];[12u]]", // numbers MUST be same
FormatResultSetYson(result.GetResultSet(0))
- );
- }
-
+ );
+ }
+
Y_UNIT_TEST_NEW_ENGINE(CrossJoinCount) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h
index 665ce5b211f..3ffbdc041da 100644
--- a/ydb/library/yql/core/yql_data_provider.h
+++ b/ydb/library/yql/core/yql_data_provider.h
@@ -177,7 +177,7 @@ struct TYqlOperationOptions;
struct TOperationProgress;
class TGatewaysConfig;
-
+
using TOperationProgressWriter = std::function<void(const TOperationProgress&)>;
enum class ESourceSyntax {
diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h
index 261cff982da..71cc7be6cb8 100644
--- a/ydb/library/yql/dq/common/dq_common.h
+++ b/ydb/library/yql/dq/common/dq_common.h
@@ -56,8 +56,8 @@ struct TBaseDqExecuterEvents {
ES_RESULT_SET,
ES_DQ_FAILURE,
ES_GRAPH,
- ES_GRAPH_FINISHED,
- ES_GRAPH_EXECUTION_EVENT,
+ ES_GRAPH_FINISHED,
+ ES_GRAPH_EXECUTION_EVENT,
};
};
@@ -71,9 +71,9 @@ struct TBaseDqDataEvents {
ES_PING_REQUEST,
ES_PING_RESPONSE,
ES_CONTINUE_RUN,
-
- ES_FULL_RESULT_WRITER_STATUS_REQUEST,
- ES_FULL_RESULT_WRITER_STATUS_RESPONSE,
+
+ ES_FULL_RESULT_WRITER_STATUS_REQUEST,
+ ES_FULL_RESULT_WRITER_STATUS_RESPONSE,
};
};
diff --git a/ydb/library/yql/dq/connections.md b/ydb/library/yql/dq/connections.md
index 15fd77355f0..09da3d1d255 100644
--- a/ydb/library/yql/dq/connections.md
+++ b/ydb/library/yql/dq/connections.md
@@ -4,7 +4,7 @@ SQL-запрос преобразуется во внутреннее предс
DQ-граф состоит из стадий (_DqStage_), соединённых рёбрами (коннекшинами типа _DqConnection_).
Каждая стадия исполняется в виде некоторого количества тасков (_DqTask_). Например, читающая таблицу стадия может
-исполняться в N тасков, где N - это количество шардов таблицы.
+исполняться в N тасков, где N - это количество шардов таблицы.
# Виды коннекшинов
@@ -85,7 +85,7 @@ DQ-граф состоит из стадий (_DqStage_), соединённых
| DqStage_1: DqTask_1, ..., DqTask_N |
+----------------+--------------+------+
|\ /|
- | \ ... / | DqCnHashShuffle
+ | \ ... / | DqCnHashShuffle
| \ / |
V V V V
+--------------------------------------+
@@ -115,4 +115,4 @@ DQ-граф состоит из стадий (_DqStage_), соединённых
Используется только для inplace-update под прагмой. В этом случае обе стадии выполняются на одном шарде и последовательно.
## KqpCnShuffleShard
-Задумывалось для межшардового взаимодействия. Нигде не используется.
+Задумывалось для межшардового взаимодействия. Нигде не используется.
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index 8824e41dbdf..2c46061f60d 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -19,14 +19,14 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp
// Current assumptions:
// 1. `Broadcast` can not be the 1st stage input unless it's a single input
- // 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll` !!! or `HashShuffle` !!!
+ // 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll` !!! or `HashShuffle` !!!
if (inputIndex == 0) {
YQL_ENSURE(stage.Inputs().Size() == 1 || !input.Maybe<NNodes::TDqCnBroadcast>());
} else {
- YQL_ENSURE(
- input.Maybe<NNodes::TDqCnBroadcast>() ||
- input.Maybe<NNodes::TDqCnUnionAll>() ||
- input.Maybe<NNodes::TDqCnHashShuffle>(), "" << input.Ref().Content());
+ YQL_ENSURE(
+ input.Maybe<NNodes::TDqCnBroadcast>() ||
+ input.Maybe<NNodes::TDqCnUnionAll>() ||
+ input.Maybe<NNodes::TDqCnHashShuffle>(), "" << input.Ref().Content());
}
if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) {
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
index 49676339aa9..4da4d13d034 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
@@ -55,10 +55,10 @@ void TDqIntegrationBase::FillSinkSettings(const TExprNode& node, ::google::proto
}
void TDqIntegrationBase::Annotate(const TExprNode& node, THashMap<TString, TString>& params) {
- Y_UNUSED(node);
+ Y_UNUSED(node);
Y_UNUSED(params);
-}
-
+}
+
bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) {
Y_UNUSED(root);
Y_UNUSED(ctx);
diff --git a/ydb/library/yql/providers/common/metrics/metrics_registry.cpp b/ydb/library/yql/providers/common/metrics/metrics_registry.cpp
index d42866be6dd..1fb7bdcbba7 100644
--- a/ydb/library/yql/providers/common/metrics/metrics_registry.cpp
+++ b/ydb/library/yql/providers/common/metrics/metrics_registry.cpp
@@ -46,7 +46,7 @@ private:
label->SetValue(labelValue);
}
- void OnHistogram(const TString& labelName, const TString& labelValue, NMonitoring::IHistogramSnapshotPtr snapshot, bool) override {
+ void OnHistogram(const TString& labelName, const TString& labelValue, NMonitoring::IHistogramSnapshotPtr snapshot, bool) override {
if (Invalidate_) {
return;
}
@@ -195,7 +195,7 @@ public:
: false);
}
- IMetricsRegistryPtr Personalized(const TString& userName) const override {
+ IMetricsRegistryPtr Personalized(const TString& userName) const override {
return new TMetricsRegistryImpl(Sensors_, MakeMaybe(userName));
}
diff --git a/ydb/library/yql/providers/dq/actors/actor_helpers.h b/ydb/library/yql/providers/dq/actors/actor_helpers.h
index 28f862bf1f5..b92603ab0fb 100644
--- a/ydb/library/yql/providers/dq/actors/actor_helpers.h
+++ b/ydb/library/yql/providers/dq/actors/actor_helpers.h
@@ -7,17 +7,17 @@
#include <ydb/library/yql/utils/actors/rich_actor.h>
#include <ydb/library/yql/utils/log/log.h>
-
+
namespace NYql {
-enum EExecutorPoolType {
- Main,
+enum EExecutorPoolType {
+ Main,
FullResultWriter,
-
- TotalCount,
-};
-
-template <typename EventType>
+
+ TotalCount,
+};
+
+template <typename EventType>
struct TRichActorFutureCallback : public TRichActor<TRichActorFutureCallback<EventType>> {
using TCallback = std::function<void(TAutoPtr<NActors::TEventHandle<EventType>>&)>;
using TFailure = std::function<void(void)>;
@@ -79,98 +79,98 @@ private:
}
};
-template <class TDerived>
-class TSynchronizableRichActor : public TRichActor<TDerived> {
-public:
- using TBase = TRichActor<TDerived>;
- template <class TEvType>
- using TCallback = std::function<void(typename TEvType::TPtr&)>;
- using TAbstractCallback = std::function<void(TAutoPtr<NActors::IEventHandle>&)>;
- using THandler = typename TBase::TReceiveFunc;
-
- enum ESyncState {
- E_IDLE,
- E_SYNC_REQUESTED,
- E_SYNC_RECEIVED,
- };
-
- template <class... Args>
- explicit TSynchronizableRichActor(Args&&... args)
- : TRichActor<TDerived>(std::forward<Args>(args)...) {}
-
- template <class TEvType>
- void Synchronize(TCallback<TEvType> callback) {
- switch (SyncState_) {
- case E_SYNC_REQUESTED:
- throw yexception() << "Synchronization was already requested";
- break;
- case E_IDLE:
- [[fallthrough]];
- case E_SYNC_RECEIVED:
- InterruptedHandler_ = TBase::CurrentStateFunc();
- SyncCallback_ = [callback](TAutoPtr<NActors::IEventHandle>& ev) {
- auto* x = reinterpret_cast<typename TEvType::TPtr*>(&ev);
- callback(*x);
- };
- TBase::Become(&TSynchronizableRichActor::SyncHandler);
- ExpectedEventType_ = TEvType::EventType;
- SyncState_ = E_SYNC_REQUESTED;
- break;
- }
- }
-
-protected:
- void AddCriticalEventType(ui32 type) {
- CriticalEventTypes_.insert(type);
- }
-
-private:
- THandler InterruptedHandler_{nullptr};
- TDeque<TAutoPtr<NActors::IEventHandle>> DelayedEvents_{};
- TAbstractCallback SyncCallback_{nullptr};
- ESyncState SyncState_{E_IDLE};
- ui32 ExpectedEventType_{0};
- THashSet<ui32> CriticalEventTypes_{};
-
- void SyncHandler(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) {
- const ui32 etype = ev->GetTypeRewrite();
- if (etype == ExpectedEventType_) {
- OnSync(ev, ctx);
- } else if (CriticalEventTypes_.contains(etype)) {
- (this->*InterruptedHandler_)(ev, ctx);
- } else {
- EnqueueEvent(ev);
- }
- }
-
- void OnSync(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) {
- YQL_LOG(DEBUG) << "OnSync(): delayed messages " << DelayedEvents_.size();
- SyncState_ = E_SYNC_RECEIVED;
- TBase::Become(InterruptedHandler_);
- SyncCallback_(ev);
- if (SyncState_ == E_SYNC_REQUESTED) {
- return;
- }
- SyncCallback_ = nullptr;
-
- while (!DelayedEvents_.empty()) {
- auto event = std::move(DelayedEvents_.front());
- DelayedEvents_.pop_front();
- InterruptedHandler_ = TBase::CurrentStateFunc();
- (this->*InterruptedHandler_)(event, ctx);
- if (SyncState_ == E_SYNC_REQUESTED) {
- return;
- }
- }
-
- InterruptedHandler_ = nullptr;
- ExpectedEventType_ = 0;
- SyncState_ = E_IDLE;
- }
-
- void EnqueueEvent(TAutoPtr<NActors::IEventHandle>& ev) {
- DelayedEvents_.emplace_back(ev.Release());
- }
-};
-
+template <class TDerived>
+class TSynchronizableRichActor : public TRichActor<TDerived> {
+public:
+ using TBase = TRichActor<TDerived>;
+ template <class TEvType>
+ using TCallback = std::function<void(typename TEvType::TPtr&)>;
+ using TAbstractCallback = std::function<void(TAutoPtr<NActors::IEventHandle>&)>;
+ using THandler = typename TBase::TReceiveFunc;
+
+ enum ESyncState {
+ E_IDLE,
+ E_SYNC_REQUESTED,
+ E_SYNC_RECEIVED,
+ };
+
+ template <class... Args>
+ explicit TSynchronizableRichActor(Args&&... args)
+ : TRichActor<TDerived>(std::forward<Args>(args)...) {}
+
+ template <class TEvType>
+ void Synchronize(TCallback<TEvType> callback) {
+ switch (SyncState_) {
+ case E_SYNC_REQUESTED:
+ throw yexception() << "Synchronization was already requested";
+ break;
+ case E_IDLE:
+ [[fallthrough]];
+ case E_SYNC_RECEIVED:
+ InterruptedHandler_ = TBase::CurrentStateFunc();
+ SyncCallback_ = [callback](TAutoPtr<NActors::IEventHandle>& ev) {
+ auto* x = reinterpret_cast<typename TEvType::TPtr*>(&ev);
+ callback(*x);
+ };
+ TBase::Become(&TSynchronizableRichActor::SyncHandler);
+ ExpectedEventType_ = TEvType::EventType;
+ SyncState_ = E_SYNC_REQUESTED;
+ break;
+ }
+ }
+
+protected:
+ void AddCriticalEventType(ui32 type) {
+ CriticalEventTypes_.insert(type);
+ }
+
+private:
+ THandler InterruptedHandler_{nullptr};
+ TDeque<TAutoPtr<NActors::IEventHandle>> DelayedEvents_{};
+ TAbstractCallback SyncCallback_{nullptr};
+ ESyncState SyncState_{E_IDLE};
+ ui32 ExpectedEventType_{0};
+ THashSet<ui32> CriticalEventTypes_{};
+
+ void SyncHandler(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) {
+ const ui32 etype = ev->GetTypeRewrite();
+ if (etype == ExpectedEventType_) {
+ OnSync(ev, ctx);
+ } else if (CriticalEventTypes_.contains(etype)) {
+ (this->*InterruptedHandler_)(ev, ctx);
+ } else {
+ EnqueueEvent(ev);
+ }
+ }
+
+ void OnSync(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) {
+ YQL_LOG(DEBUG) << "OnSync(): delayed messages " << DelayedEvents_.size();
+ SyncState_ = E_SYNC_RECEIVED;
+ TBase::Become(InterruptedHandler_);
+ SyncCallback_(ev);
+ if (SyncState_ == E_SYNC_REQUESTED) {
+ return;
+ }
+ SyncCallback_ = nullptr;
+
+ while (!DelayedEvents_.empty()) {
+ auto event = std::move(DelayedEvents_.front());
+ DelayedEvents_.pop_front();
+ InterruptedHandler_ = TBase::CurrentStateFunc();
+ (this->*InterruptedHandler_)(event, ctx);
+ if (SyncState_ == E_SYNC_REQUESTED) {
+ return;
+ }
+ }
+
+ InterruptedHandler_ = nullptr;
+ ExpectedEventType_ = 0;
+ SyncState_ = E_IDLE;
+ }
+
+ void EnqueueEvent(TAutoPtr<NActors::IEventHandle>& ev) {
+ DelayedEvents_.emplace_back(ev.Release());
+ }
+};
+
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index 07369e66e05..892c5cccd70 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -36,10 +36,10 @@ namespace NYql::NDqs {
Record = std::move(proto);
}
- TEvGraphExecutionEvent::TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt) {
- Record = evt;
- }
-
+ TEvGraphExecutionEvent::TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt) {
+ Record = evt;
+ }
+
TEvPullDataRequest::TEvPullDataRequest(ui32 rowThreshold) {
Record.SetRowThreshold(rowThreshold);
}
@@ -47,8 +47,8 @@ namespace NYql::NDqs {
TEvPullDataResponse::TEvPullDataResponse(NYql::NDqProto::TPullResponse& data) {
Record.Swap(&data);
}
-
- TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) {
- Record.CopyFrom(data);
- }
+
+ TEvFullResultWriterStatusResponse::TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data) {
+ Record.CopyFrom(data);
+ }
}
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index b0e3817e725..594921560ae 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -45,12 +45,12 @@ namespace NYql::NDqs {
DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPullResult, "");
};
- struct TEvGraphExecutionEvent
- : NActors::TEventPB<TEvGraphExecutionEvent, NYql::NDqProto::TGraphExecutionEvent, TDqExecuterEvents::ES_GRAPH_EXECUTION_EVENT> {
- TEvGraphExecutionEvent() = default;
- explicit TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt);
- };
-
+ struct TEvGraphExecutionEvent
+ : NActors::TEventPB<TEvGraphExecutionEvent, NYql::NDqProto::TGraphExecutionEvent, TDqExecuterEvents::ES_GRAPH_EXECUTION_EVENT> {
+ TEvGraphExecutionEvent() = default;
+ explicit TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt);
+ };
+
using TDqDataEvents = NDq::TBaseDqDataEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
struct TEvPullDataRequest
@@ -75,19 +75,19 @@ namespace NYql::NDqs {
TEvPingResponse() = default;
};
- struct TEvFullResultWriterStatusRequest
- : NActors::TEventPB<TEvFullResultWriterStatusRequest, NYql::NDqProto::TFullResultWriterStatusRequest,
- TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_REQUEST> {
- TEvFullResultWriterStatusRequest() = default;
- };
-
- struct TEvFullResultWriterStatusResponse
- : NActors::TEventPB<TEvFullResultWriterStatusResponse, NYql::NDqProto::TFullResultWriterStatusResponse,
- TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_RESPONSE> {
- TEvFullResultWriterStatusResponse() = default;
- explicit TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data);
- };
-
+ struct TEvFullResultWriterStatusRequest
+ : NActors::TEventPB<TEvFullResultWriterStatusRequest, NYql::NDqProto::TFullResultWriterStatusRequest,
+ TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_REQUEST> {
+ TEvFullResultWriterStatusRequest() = default;
+ };
+
+ struct TEvFullResultWriterStatusResponse
+ : NActors::TEventPB<TEvFullResultWriterStatusResponse, NYql::NDqProto::TFullResultWriterStatusResponse,
+ TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_RESPONSE> {
+ TEvFullResultWriterStatusResponse() = default;
+ explicit TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data);
+ };
+
struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> {
DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, "");
};
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
index 49bbc83e444..927ac060436 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
@@ -4,7 +4,7 @@
#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
#include <ydb/library/yql/providers/dq/actors/events.h>
#include <ydb/library/yql/providers/dq/api/protos/service.pb.h>
-
+
#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -14,7 +14,7 @@
#include <library/cpp/actors/core/actor.h>
#include <util/generic/size_literals.h>
-#include <util/system/env.h>
+#include <util/system/env.h>
#include <utility>
@@ -34,65 +34,65 @@ public:
, TraceID(traceId)
, ResultBuilder(MakeHolder<TProtoBuilder>(resultType, TVector<TString>()))
, FullResultWriter(std::move(writer))
- , AggregatorID(aggregatorId)
+ , AggregatorID(aggregatorId)
{
- }
-
-private:
- STRICT_STFUNC(Handler, {
- cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
- HFunc(TEvPullDataResponse, OnPullResponse)
- HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest)
- })
-
- void PassAway() override {
+ }
+
+private:
+ STRICT_STFUNC(Handler, {
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ HFunc(TEvPullDataResponse, OnPullResponse)
+ HFunc(TEvFullResultWriterStatusRequest, OnStatusRequest)
+ })
+
+ void PassAway() override {
YQL_LOG_CTX_SCOPE(TraceID);
YQL_LOG(DEBUG) << __FUNCTION__;
- try {
- FullResultWriter->Abort();
- } catch (...) {
- YQL_LOG(WARN) << "FullResultWriter->Abort(): " << CurrentExceptionMessage();
- }
+ try {
+ FullResultWriter->Abort();
+ } catch (...) {
+ YQL_LOG(WARN) << "FullResultWriter->Abort(): " << CurrentExceptionMessage();
+ }
ResultBuilder.Reset();
- FullResultWriter.Reset();
-
- Send(AggregatorID, MakeHolder<NActors::TEvents::TEvGone>());
-
+ FullResultWriter.Reset();
+
+ Send(AggregatorID, MakeHolder<NActors::TEvents::TEvGone>());
+
NActors::TActor<TThis>::PassAway();
- }
-
- void OnStatusRequest(TEvFullResultWriterStatusRequest::TPtr&, const NActors::TActorContext&) {
+ }
+
+ void OnStatusRequest(TEvFullResultWriterStatusRequest::TPtr&, const NActors::TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceID);
- NDqProto::TFullResultWriterStatusResponse response;
- response.SetBytesReceived(BytesReceived);
- if (ErrorMessage) {
- response.SetErrorMessage(*ErrorMessage);
- }
- Send(AggregatorID, new TEvFullResultWriterStatusResponse(response));
- }
-
- void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext&) {
+ NDqProto::TFullResultWriterStatusResponse response;
+ response.SetBytesReceived(BytesReceived);
+ if (ErrorMessage) {
+ response.SetErrorMessage(*ErrorMessage);
+ }
+ Send(AggregatorID, new TEvFullResultWriterStatusResponse(response));
+ }
+
+ void OnPullResponse(TEvPullDataResponse::TPtr& ev, const NActors::TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceID);
- auto& response = ev->Get()->Record;
-
- switch (response.GetResponseType()) {
- case NDqProto::FINISH:
- Finish();
- break;
- case NDqProto::CONTINUE:
- Continue(response);
- break;
- default:
- YQL_ENSURE(false, "Unsupported pull request");
- break;
- }
- }
-
- void Finish() {
+ auto& response = ev->Get()->Record;
+
+ switch (response.GetResponseType()) {
+ case NDqProto::FINISH:
+ Finish();
+ break;
+ case NDqProto::CONTINUE:
+ Continue(response);
+ break;
+ default:
+ YQL_ENSURE(false, "Unsupported pull request");
+ break;
+ }
+ }
+
+ void Finish() {
YQL_LOG(DEBUG) << __FUNCTION__;
- try {
+ try {
TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; });
- FullResultWriter->Finish();
+ FullResultWriter->Finish();
if (ErrorMessage) {
TIssue issue(*ErrorMessage);
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
@@ -100,44 +100,44 @@ private:
} else {
Send(AggregatorID, MakeHolder<TEvDqFailure>().Release());
}
- } catch (...) {
- TIssue issue(CurrentExceptionMessage());
- issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
- if (ErrorMessage) {
- issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage));
- }
+ } catch (...) {
+ TIssue issue(CurrentExceptionMessage());
+ issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
+ if (ErrorMessage) {
+ issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage));
+ }
Send(AggregatorID, MakeHolder<TEvDqFailure>(issue, false, false).Release());
- }
+ }
Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>());
- }
-
- void Continue(NDqProto::TPullResponse& response) {
- YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount();
- ui64 respSize = response.ByteSizeLong();
- WriteToFullResultTable(response.GetData());
- BytesReceived += respSize;
- }
-
- void WriteToFullResultTable(const NDqProto::TData& data) {
- if (ErrorMessage) {
- YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting";
- return;
- }
-
- try {
+ }
+
+ void Continue(NDqProto::TPullResponse& response) {
+ YQL_LOG(DEBUG) << "Continue -- RowCount = " << FullResultWriter->GetRowCount();
+ ui64 respSize = response.ByteSizeLong();
+ WriteToFullResultTable(response.GetData());
+ BytesReceived += respSize;
+ }
+
+ void WriteToFullResultTable(const NDqProto::TData& data) {
+ if (ErrorMessage) {
+ YQL_LOG(DEBUG) << "Failed to write previous chunk, aborting";
+ return;
+ }
+
+ try {
TFailureInjector::Reach("full_result_fail_on_write", [] { throw yexception() << "full_result_fail_on_write"; });
ResultBuilder->WriteData(data, [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) {
writer->AddRow(value);
return true;
});
- } catch (...) {
- ErrorMessage = CurrentExceptionMessage();
- }
-
- if (ErrorMessage) {
- YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage;
- }
- }
+ } catch (...) {
+ ErrorMessage = CurrentExceptionMessage();
+ }
+
+ if (ErrorMessage) {
+ YQL_LOG(DEBUG) << "An error occurred: " << *ErrorMessage;
+ }
+ }
private:
const TString TraceID;
THolder<TProtoBuilder> ResultBuilder;
@@ -146,16 +146,16 @@ private:
ui64 BytesReceived{0};
TMaybe<TString> ErrorMessage;
-};
-
-
-THolder<NActors::IActor> MakeFullResultWriterActor(
+};
+
+
+THolder<NActors::IActor> MakeFullResultWriterActor(
const TString& traceId,
const TString& resultType,
THolder<IDqFullResultWriter>&& writer,
const NActors::TActorId& aggregatorId)
{
return MakeHolder<TFullResultWriterActor>(traceId, resultType, std::move(writer), aggregatorId);
-}
-
+}
+
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.h b/ydb/library/yql/providers/dq/actors/full_result_writer.h
index 8b317696440..4d4345522ad 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.h
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.h
@@ -1,14 +1,14 @@
#pragma once
#include <ydb/library/yql/providers/dq/interface/yql_dq_full_result_writer.h>
-#include <library/cpp/actors/core/actor.h>
-
+#include <library/cpp/actors/core/actor.h>
+
namespace NYql::NDqs {
-THolder<NActors::IActor> MakeFullResultWriterActor(
+THolder<NActors::IActor> MakeFullResultWriterActor(
const TString& traceId,
const TString& resultType,
THolder<IDqFullResultWriter>&& writer,
const NActors::TActorId& aggregatorId);
-
+
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp
index 281283d45e8..3ff4fb16588 100644
--- a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.cpp
@@ -1,128 +1,128 @@
-#include "graph_execution_events_actor.h"
+#include "graph_execution_events_actor.h"
#include "actor_helpers.h"
#include "full_result_writer.h"
#include "events.h"
-
+
#include <library/cpp/actors/core/event_pb.h>
-
+
#include <util/generic/yexception.h>
namespace NYql::NDqs {
-
-class TGraphExecutionEventsActor : public TRichActor<TGraphExecutionEventsActor> {
-public:
+
+class TGraphExecutionEventsActor : public TRichActor<TGraphExecutionEventsActor> {
+public:
static constexpr const char ActorName[] = "YQL_DQ_GRAPH_EXECUTION_EVENTS_ACTOR";
-
+
TGraphExecutionEventsActor(const TString& traceID, std::vector<IDqTaskPreprocessor::TPtr>&& taskPreprocessors)
- : TRichActor<TGraphExecutionEventsActor>(&TGraphExecutionEventsActor::Handler)
+ : TRichActor<TGraphExecutionEventsActor>(&TGraphExecutionEventsActor::Handler)
, TraceID(traceID)
, TaskPreprocessors(std::move(taskPreprocessors))
{
}
-
-private:
+
+private:
const TString TraceID;
std::vector<IDqTaskPreprocessor::TPtr> TaskPreprocessors;
-
- STRICT_STFUNC(Handler, {
- HFunc(NDqs::TEvGraphExecutionEvent, OnEvent);
- cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
- })
-
+
+ STRICT_STFUNC(Handler, {
+ HFunc(NDqs::TEvGraphExecutionEvent, OnEvent);
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
+ })
+
void DoPassAway() override {
YQL_LOG_CTX_SCOPE(TraceID);
}
- void OnEvent(NDqs::TEvGraphExecutionEvent::TPtr& ev, const NActors::TActorContext&) {
+ void OnEvent(NDqs::TEvGraphExecutionEvent::TPtr& ev, const NActors::TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceID);
YQL_LOG(DEBUG) << __FUNCTION__ << ' ' << NYql::NDqProto::EGraphExecutionEventType_Name(ev->Get()->Record.GetEventType());
-
+
try {
switch (ev->Get()->Record.GetEventType()) {
- case NYql::NDqProto::EGraphExecutionEventType::START: {
+ case NYql::NDqProto::EGraphExecutionEventType::START: {
NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor payload;
ev->Get()->Record.GetMessage().UnpackTo(&payload);
OnStart(ev->Sender, payload);
break;
}
- case NYql::NDqProto::EGraphExecutionEventType::SUCCESS:
+ case NYql::NDqProto::EGraphExecutionEventType::SUCCESS:
OnSuccess(ev->Sender);
- break;
- case NYql::NDqProto::EGraphExecutionEventType::FAIL:
+ break;
+ case NYql::NDqProto::EGraphExecutionEventType::FAIL:
OnFail(ev->Sender);
- break;
+ break;
case NYql::NDqProto::EGraphExecutionEventType::FULL_RESULT: {
NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload;
ev->Get()->Record.GetMessage().UnpackTo(&payload);
OnFullResult(ev->Sender, payload);
break;
}
- default:
+ default:
Reply(ev->Sender);
- break;
+ break;
}
} catch (...) {
TString message = TStringBuilder() << "Error on TEvGraphExecutionEvent: " << CurrentExceptionMessage();
YQL_LOG(ERROR) << message;
Reply(ev->Sender, message);
- }
- }
-
- template <class TKey, class TValue>
- static THashMap<TKey, TValue> AsHashMap(const google::protobuf::Map<TKey, TValue>& map) {
- return THashMap<TKey, TValue>(map.begin(), map.end());
- }
-
+ }
+ }
+
+ template <class TKey, class TValue>
+ static THashMap<TKey, TValue> AsHashMap(const google::protobuf::Map<TKey, TValue>& map) {
+ return THashMap<TKey, TValue>(map.begin(), map.end());
+ }
+
void OnStart(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor& payload) {
YQL_LOG(DEBUG) << __FUNCTION__;
- const auto& secureParams = AsHashMap(payload.GetSecureParams().GetData());
+ const auto& secureParams = AsHashMap(payload.GetSecureParams().GetData());
const auto& graphParams = AsHashMap(payload.GetGraphParams().GetData());
-
+
NDqProto::TGraphExecutionEvent::TMap response;
-
+
auto* addedParams = response.MutableData();
for (const auto& preprocessor: TaskPreprocessors) {
auto newParams = preprocessor->GetTaskParams(graphParams, secureParams);
for (const auto& [k, v] : newParams) {
addedParams->insert({k, v});
- }
- }
-
+ }
+ }
+
Reply(replyTo, response);
- }
-
+ }
+
void Reply(NActors::TActorId replyTo, TString msg = "") const {
- NYql::NDqProto::TGraphExecutionEvent sync;
- sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC);
+ NYql::NDqProto::TGraphExecutionEvent sync;
+ sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC);
if (msg) {
sync.SetErrorMessage(msg);
}
Send(replyTo, MakeHolder<NDqs::TEvGraphExecutionEvent>(sync));
- }
-
- template <class TPayload>
+ }
+
+ template <class TPayload>
void Reply(NActors::TActorId replyTo, const TPayload& resp) const {
- NYql::NDqProto::TGraphExecutionEvent sync;
- sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC);
- sync.MutableMessage()->PackFrom(resp);
+ NYql::NDqProto::TGraphExecutionEvent sync;
+ sync.SetEventType(NDqProto::EGraphExecutionEventType::SYNC);
+ sync.MutableMessage()->PackFrom(resp);
Send(replyTo, MakeHolder<NDqs::TEvGraphExecutionEvent>(sync));
- }
-
+ }
+
void OnFail(NActors::TActorId replyTo) {
YQL_LOG(DEBUG) << __FUNCTION__;
for (const auto& preprocessor: TaskPreprocessors) {
preprocessor->Finish(false);
- }
+ }
Reply(replyTo);
- }
-
+ }
+
void OnSuccess(NActors::TActorId replyTo) {
YQL_LOG(DEBUG) << __FUNCTION__;
for (const auto& preprocessor: TaskPreprocessors) {
preprocessor->Finish(true);
- }
+ }
Reply(replyTo);
- }
+ }
void OnFullResult(NActors::TActorId replyTo, const NDqProto::TGraphExecutionEvent::TFullResultDescriptor& payload) {
YQL_LOG(DEBUG) << __FUNCTION__;
@@ -143,10 +143,10 @@ private:
Reply(replyTo, TString{"Failed to create full result writer"});
}
}
-};
-
+};
+
NActors::IActor* MakeGraphExecutionEventsActor(const TString& traceID, std::vector<IDqTaskPreprocessor::TPtr>&& taskPreprocessors) {
return new TGraphExecutionEventsActor(traceID, std::move(taskPreprocessors));
-}
-
+}
+
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h
index fb042e6ed3e..73e6eba0fb0 100644
--- a/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h
+++ b/ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h
@@ -1,10 +1,10 @@
-#pragma once
-
-#include <library/cpp/actors/core/actor.h>
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
-
+
namespace NYql::NDqs {
-
+
NActors::IActor* MakeGraphExecutionEventsActor(const TString& traceID, std::vector<IDqTaskPreprocessor::TPtr>&& taskPreprocessors);
-
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
index b49717ed918..e4b16403c3b 100644
--- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp
+++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
@@ -140,7 +140,7 @@ bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::f
}
return true;
}
-
+
Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDqProto::TData>& data) {
Ydb::ResultSet resultSet;
auto structType = AS_TYPE(TStructType, ResultType);
@@ -150,8 +150,8 @@ Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDqProto::TData
const ui32 memberIndex = ColumnOrder.empty() ? i : ColumnOrder[i];
column.set_name(TString(structType->GetMemberName(memberIndex)));
ExportTypeToProto(structType->GetMemberType(memberIndex), *column.mutable_type());
- }
-
+ }
+
WriteData(data, [&](const NYql::NUdf::TUnboxedValuePod& value) {
ExportValueToProto(ResultType, value, *resultSet.add_rows(), &ColumnOrder);
return true;
diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
index 40b9a389fee..59ae95c3af5 100644
--- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
@@ -94,7 +94,7 @@ private:
YQL_LOG_CTX_SCOPE(TraceId); \
YQL_LOG(DEBUG) << "Unexpected event " << ( #TEvType ); \
})
-
+
STRICT_STFUNC(Handler, {
HFunc(TEvPullResult, OnPullResult);
HFunc(TEvReadyState, OnReadyState);
@@ -114,7 +114,7 @@ private:
cFunc(TEvents::TEvGone::EventType, OnFullResultWriterShutdown)
})
-
+
STRICT_STFUNC(ShutdownHandler, {
HFunc(TEvents::TEvGone, OnShutdownQueryResult);
HANDLER_STUB(TEvPullResult)
@@ -127,7 +127,7 @@ private:
HANDLER_STUB(TEvents::TEvUndelivered)
HANDLER_STUB(TEvents::TEvWakeup)
})
-
+
void DoPassAway() override {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << __FUNCTION__;
@@ -147,7 +147,7 @@ private:
if (PullRequestTimeout && now - PullRequestStartTime > PullRequestTimeout) {
OnError("Timeout " + ToString(SourceID.NodeId()), true, true);
}
-
+
if (PingTimeout && now - PingStartTime > PingTimeout) {
OnError("PingTimeout " + ToString(SourceID.NodeId()), true, true);
}
@@ -176,7 +176,7 @@ private:
AddCriticalEventType(TEvents::TEvWakeup::EventType);
AddCriticalEventType(TEvPingResponse::EventType);
}
-
+
void OnPullResult(TEvPullResult::TPtr&, const TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceId);
PullRequestStartTime = TInstant::Now();
@@ -274,17 +274,17 @@ private:
issue.Severity = TSeverityIds::S_WARNING;
Finish(/*truncated = */ true, {issue});
}
- }
- }
+ }
+ }
}
}
-
+
void FlushCurrent() {
YQL_LOG(DEBUG) << __FUNCTION__;
YQL_ENSURE(!Truncated);
YQL_ENSURE(!FullResultWriterID);
YQL_ENSURE(Settings->EnableFullResultWrite.Get().GetOrElse(false));
-
+
NDqProto::TGraphExecutionEvent record;
record.SetEventType(NDqProto::EGraphExecutionEventType::FULL_RESULT);
NDqProto::TGraphExecutionEvent::TFullResultDescriptor payload;
@@ -294,7 +294,7 @@ private:
Synchronize<TEvGraphExecutionEvent>([this](TEvGraphExecutionEvent::TPtr& ev) {
Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
YQL_LOG_CTX_SCOPE(TraceId);
-
+
if (auto msg = ev->Get()->Record.GetErrorMessage()) {
OnError(msg, false, true);
} else {
@@ -303,10 +303,10 @@ private:
FullResultWriterID = NActors::ActorIdFromProto(fullResultWriterProto);
Truncated = true;
WriteAllDataPartsToFullResultTable();
- }
+ }
});
}
-
+
bool CanSendToFullResultWriter() {
// TODO Customize
return FullResultSentBytes - FullResultReceivedBytes <= 32_MB;
@@ -328,11 +328,11 @@ private:
YQL_LOG(DEBUG) << "Received error message: " << ev->Get()->Record.GetErrorMessage();
OnError(ev->Get()->Record.GetErrorMessage(), false, false);
return;
- }
+ }
callback();
});
}
-
+
void WriteAllDataPartsToFullResultTable() {
while (FullResultSentDataParts < DataParts.size() && CanSendToFullResultWriter()) {
UnsafeWriteToFullResultTable(DataParts[FullResultSentDataParts]);
@@ -348,7 +348,7 @@ private:
WriteAllDataPartsToFullResultTable();
});
}
-
+
void WriteToFullResultTable(TAutoPtr<NDqProto::TData> data) {
if (CanSendToFullResultWriter()) {
UnsafeWriteToFullResultTable(*data);
@@ -358,7 +358,7 @@ private:
WriteToFullResultTable(data);
});
}
-
+
void UnsafeWriteToFullResultTable(const NDqProto::TData& data) {
NDqProto::TPullResponse response;
response.SetResponseType(EPullResponseType::CONTINUE);
@@ -401,47 +401,47 @@ private:
YQL_LOG_CTX_SCOPE(TraceId);
YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
YQL_LOG(DEBUG) << "Shutting down TResultAggregator";
-
+
BlockingActors.clear();
if (FullResultWriterID) {
BlockingActors.insert(FullResultWriterID);
Send(FullResultWriterID, MakeHolder<TEvents::TEvPoison>());
}
-
+
YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors";
QueryResponse.Reset(ev->Release().Release());
Become(&TResultAggregator::ShutdownHandler);
Send(SelfId(), MakeHolder<TEvents::TEvGone>());
}
-
+
void OnShutdownQueryResult(TEvents::TEvGone::TPtr& ev, const TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceId);
auto iter = BlockingActors.find(ev->Sender);
if (iter != BlockingActors.end()) {
BlockingActors.erase(iter);
}
-
+
YQL_LOG(DEBUG) << "Shutting down TResultAggregator, " << BlockingActors.size() << " blocking actors left";
-
+
if (BlockingActors.empty()) {
EndOnQueryResult();
}
}
-
+
void EndOnQueryResult() {
YQL_LOG(DEBUG) << __FUNCTION__;
NDqProto::TQueryResponse result = QueryResponse->Record;
-
+
YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty());
FlushCounters(result);
-
+
if (ResultYsonWriter) {
ResultYsonWriter->OnEndList();
ResultYsonWriter.Destroy();
}
ResultYsonOut.Destroy();
-
+
*result.MutableYson() = ResultYson;
if (!FinishIssues.Empty()) {
@@ -486,11 +486,11 @@ private:
THashSet<TActorId> BlockingActors;
THolder<TEvQueryResponse> QueryResponse;
};
-
+
class TResultPrinter: public TActor<TResultPrinter> {
public:
static constexpr char ActorName[] = "YQL_DQ_RESULT_PRINTER";
-
+
TResultPrinter(IOutputStream& output, NThreading::TPromise<void>& promise)
: TActor<TResultPrinter>(&TResultPrinter::Handler)
, Output(output)
diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.h b/ydb/library/yql/providers/dq/actors/result_aggregator.h
index c69ab078296..450bc2de88b 100644
--- a/ydb/library/yql/providers/dq/actors/result_aggregator.h
+++ b/ydb/library/yql/providers/dq/actors/result_aggregator.h
@@ -14,7 +14,7 @@ namespace NYql::NDqs::NExecutionHelpers {
const TString& traceId,
const THashMap<TString, TString>& secureParams,
const TDqConfiguration::TPtr& settings,
- const TString& resultType,
+ const TString& resultType,
bool discard,
const NActors::TActorId& graphExecutionEventsId);
} // namespace NYql::NDqs::NExecutionHelpers
diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
index 19d98f84029..7fdb73f257b 100644
--- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp
@@ -74,7 +74,7 @@ private:
void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext&) {
YQL_LOG_CTX_SCOPE(TraceId);
-
+
auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
if (!Discard) {
@@ -166,9 +166,9 @@ private:
TIssues Issues;
ui64 Size = 0;
ui64 Rows = 0;
- // const Yql::DqsProto::TFullResultTable FullResultTable;
+ // const Yql::DqsProto::TFullResultTable FullResultTable;
const THashMap<TString, TString> SecureParams;
- // THolder<IFullResultWriter> FullResultWriter;
+ // THolder<IFullResultWriter> FullResultWriter;
THolder<TProtoBuilder> ResultBuilder;
bool Discard = false;
};
diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.h b/ydb/library/yql/providers/dq/actors/result_receiver.h
index 1a8c49b7b01..968bcede101 100644
--- a/ydb/library/yql/providers/dq/actors/result_receiver.h
+++ b/ydb/library/yql/providers/dq/actors/result_receiver.h
@@ -12,7 +12,7 @@ THolder<NActors::IActor> MakeResultReceiver(
const NActors::TActorId& executerId,
const TString& traceId,
const TDqConfiguration::TPtr& settings,
-// const Yql::DqsProto::TFullResultTable& resultTable,
+// const Yql::DqsProto::TFullResultTable& resultTable,
const THashMap<TString, TString>& secureParams,
const TString& resultBuilder,
bool discard
diff --git a/ydb/library/yql/providers/dq/actors/ya.make b/ydb/library/yql/providers/dq/actors/ya.make
index dab7b1395df..14e93b587da 100644
--- a/ydb/library/yql/providers/dq/actors/ya.make
+++ b/ydb/library/yql/providers/dq/actors/ya.make
@@ -3,14 +3,14 @@ LIBRARY()
OWNER(g:yql)
SRCS(
- compute_actor.cpp
+ compute_actor.cpp
events.cpp
executer_actor.cpp
execution_helpers.cpp
- graph_execution_events_actor.cpp
+ graph_execution_events_actor.cpp
resource_allocator.cpp
- task_controller.cpp
- worker_actor.cpp
+ task_controller.cpp
+ worker_actor.cpp
result_aggregator.cpp
result_receiver.cpp
full_result_writer.cpp
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index 3db88fca9d8..09aecfcf923 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -3,7 +3,7 @@ option cc_enable_arenas = true;
package NYql.NDqProto;
-import "google/protobuf/any.proto";
+import "google/protobuf/any.proto";
import "ydb/library/yql/dq/proto/dq_transport.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
import "ydb/library/yql/providers/dq/api/protos/service.proto";
@@ -205,41 +205,41 @@ message TReadyState {
repeated NActorsProto.TActorId ActorId = 5; // used for compute actor
}
-enum EGraphExecutionEventType {
- SYNC = 0;
- START = 1;
- FAIL = 2;
- SUCCESS = 3;
+enum EGraphExecutionEventType {
+ SYNC = 0;
+ START = 1;
+ FAIL = 2;
+ SUCCESS = 3;
FULL_RESULT = 4;
-}
-
-message TGraphExecutionEvent {
- EGraphExecutionEventType EventType = 1;
- google.protobuf.Any Message = 2;
+}
+
+message TGraphExecutionEvent {
+ EGraphExecutionEventType EventType = 1;
+ google.protobuf.Any Message = 2;
optional string ErrorMessage = 3;
-
- message TMap {
- map<string, bytes> Data = 1;
- }
-
- message TExecuteGraphDescriptor {
+
+ message TMap {
+ map<string, bytes> Data = 1;
+ }
+
+ message TExecuteGraphDescriptor {
TMap SecureParams = 1;
TMap GraphParams = 2;
- }
+ }
message TFullResultDescriptor {
bytes ResultType = 1;
}
-}
-
-message TFullResultWriterStatusRequest {
-}
-
-message TFullResultWriterStatusResponse {
- uint64 BytesReceived = 1;
- optional string ErrorMessage = 2;
-}
-
+}
+
+message TFullResultWriterStatusRequest {
+}
+
+message TFullResultWriterStatusResponse {
+ uint64 BytesReceived = 1;
+ optional string ErrorMessage = 2;
+}
+
message TDqTaskPrepareResult {
bool Result = 1;
repeated TMetric Metric = 2;
diff --git a/ydb/library/yql/providers/dq/interface/ya.make b/ydb/library/yql/providers/dq/interface/ya.make
index 4fb104c900e..8d72c16c03a 100644
--- a/ydb/library/yql/providers/dq/interface/ya.make
+++ b/ydb/library/yql/providers/dq/interface/ya.make
@@ -4,7 +4,7 @@ OWNER(g:yql)
SRCS(
yql_dq_integration.cpp
- yql_dq_task_preprocessor.cpp
+ yql_dq_task_preprocessor.cpp
yql_dq_task_transform.cpp
)
diff --git a/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h b/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h
index f4f5aa03324..f70d4f6a917 100644
--- a/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h
+++ b/ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h
@@ -1,25 +1,25 @@
-#pragma once
-
+#pragma once
+
#include "yql_dq_full_result_writer.h"
-#include <util/generic/ptr.h>
-
+#include <util/generic/ptr.h>
+
#include <vector>
-namespace NYql {
-
-class IDqTaskPreprocessor : public TThrRefBase {
-public:
- using TPtr = TIntrusivePtr<IDqTaskPreprocessor>;
-
- virtual ~IDqTaskPreprocessor() = default;
-
+namespace NYql {
+
+class IDqTaskPreprocessor : public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IDqTaskPreprocessor>;
+
+ virtual ~IDqTaskPreprocessor() = default;
+
virtual THashMap<TString, TString> GetTaskParams(const THashMap<TString, TString>& graphParams, const THashMap<TString, TString>& secureParams) = 0;
virtual void Finish(bool success) = 0;
virtual THolder<IDqFullResultWriter> CreateFullResultWriter() = 0;
-};
-
-using TDqTaskPreprocessorFactory = std::function<IDqTaskPreprocessor::TPtr()>;
+};
+
+using TDqTaskPreprocessorFactory = std::function<IDqTaskPreprocessor::TPtr()>;
using TDqTaskPreprocessorFactoryCollection = std::vector<TDqTaskPreprocessorFactory>;
-
+
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index ba0d6a8ac8b..1a8d3172e41 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -86,8 +86,8 @@ public:
TActorSetupCmd(resman, TMailboxType::Simple, 0));
ServiceNode->StartActorSystem();
-
- ServiceNode->StartService(dqTaskPreprocessorFactories);
+
+ ServiceNode->StartService(dqTaskPreprocessorFactories);
}
~TLocalServiceHolder()
@@ -115,7 +115,7 @@ public:
return Gateway->CloseSession(sessionId);
}
- NThreading::TFuture<TResult>
+ NThreading::TFuture<TResult>
ExecutePlan(const TString& sessionId, NDqs::IDqsExecutionPlanner& plan, const TVector<TString>& columns,
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index f796f341fe4..858e3da45a0 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -35,10 +35,10 @@ public:
AddHandler(0, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<false>));
AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput));
AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage));
- AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
+ AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
AddHandler(0, &TDqJoin::Match, HNDL(PushJoinToStage<false>));
AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>));
- AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>));
+ AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>));
AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>));
AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>));
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
@@ -58,7 +58,7 @@ public:
AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>));
AddHandler(1, &TDqJoin::Match, HNDL(PushJoinToStage<true>));
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>));
- AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>));
+ AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>));
AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
@@ -261,10 +261,10 @@ protected:
return DqBuildExtendStage(node, ctx);
}
- TMaybeNode<TExprBase> RewriteRightJoinToLeft(TExprBase node, TExprContext& ctx) {
- return DqRewriteRightJoinToLeft(node, ctx);
- }
-
+ TMaybeNode<TExprBase> RewriteRightJoinToLeft(TExprBase node, TExprContext& ctx) {
+ return DqRewriteRightJoinToLeft(node, ctx);
+ }
+
template <bool IsGlobal>
TMaybeNode<TExprBase> PushJoinToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqPushJoinToStage(node, ctx, optCtx, *getParents(), IsGlobal);
@@ -273,21 +273,21 @@ protected:
template <bool IsGlobal>
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
auto join = node.Cast<TDqJoin>();
- const TParentsMap* parentsMap = getParents();
- if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) {
+ const TParentsMap* parentsMap = getParents();
+ if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) {
return node;
}
return DqBuildPhyJoin(join, false /* TODO */, ctx, optCtx);
- }
-
- template <bool IsGlobal>
- TMaybeNode<TExprBase> BuildJoinDict(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
- Y_UNUSED(optCtx);
-
- auto join = node.Cast<TDqJoin>();
+ }
+
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildJoinDict(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ Y_UNUSED(optCtx);
+
+ auto join = node.Cast<TDqJoin>();
const TParentsMap* parentsMap = getParents();
- if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) {
+ if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) {
return node;
}
@@ -302,24 +302,24 @@ protected:
return DqBuildScalarPrecompute(node, ctx, optCtx);
}
-private:
- bool JoinPrerequisitesVerify(TDqJoin join, const TParentsMap* parentsMap, bool isGlobal) const {
- // KqpBuildJoin copy/paste
- if (!join.LeftInput().Maybe<TDqCnUnionAll>()) {
- return false;
- }
- if (!join.RightInput().Maybe<TDqCnUnionAll>()) {
- return false;
- }
-
- if (!IsSingleConsumerConnection(join.LeftInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) {
- return false;
- }
- if (!IsSingleConsumerConnection(join.RightInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) {
- return false;
- }
- return true;
- }
+private:
+ bool JoinPrerequisitesVerify(TDqJoin join, const TParentsMap* parentsMap, bool isGlobal) const {
+ // KqpBuildJoin copy/paste
+ if (!join.LeftInput().Maybe<TDqCnUnionAll>()) {
+ return false;
+ }
+ if (!join.RightInput().Maybe<TDqCnUnionAll>()) {
+ return false;
+ }
+
+ if (!IsSingleConsumerConnection(join.LeftInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) {
+ return false;
+ }
+ if (!IsSingleConsumerConnection(join.RightInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) {
+ return false;
+ }
+ return true;
+ }
};
THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx) {
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
index fcf950cb416..2341b28ad44 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
@@ -228,7 +228,7 @@ public:
});
}
- NThreading::TFuture<TResult>
+ NThreading::TFuture<TResult>
ExecutePlan(const TString& sessionId, NDqs::IDqsExecutionPlanner& plan, const TVector<TString>& columns,
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
@@ -257,12 +257,12 @@ public:
*queryPB.AddColumns() = column;
}
settings->Save(queryPB);
-
- {
+
+ {
auto& secParams = *queryPB.MutableSecureParams();
- for (const auto&[k, v] : secureParams) {
- secParams[k] = v;
- }
+ for (const auto&[k, v] : secureParams) {
+ secParams[k] = v;
+ }
}
{
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
index d1e313b9129..d562776033f 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
@@ -63,13 +63,13 @@ public:
virtual void CloseSession(const TString& sessionId) = 0;
- virtual NThreading::TFuture<TResult>
+ virtual NThreading::TFuture<TResult>
ExecutePlan(const TString& sessionId, NDqs::IDqsExecutionPlanner& plan, const TVector<TString>& columns,
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) = 0;
-
+
virtual TString GetVanillaJobPath() {
return "";
}
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index b6de6cf2e2c..65a9e500d9d 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -56,7 +56,7 @@ namespace NYql::NDqs {
}
template<typename RequestType, typename ResponseType>
- class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> {
+ class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> {
public:
static constexpr char ActorName[] = "SERVICE_PROXY";
@@ -64,7 +64,7 @@ namespace NYql::NDqs {
NGrpc::IRequestContextBase* ctx,
const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
const TString& traceId, const TString& username)
- : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler)
+ : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler)
, Ctx(ctx)
, Counters(counters)
, ServiceProxyActorCounters(counters->GetSubgroup("component", "ServiceProxyActor"))
@@ -157,18 +157,18 @@ namespace NYql::NDqs {
}
auto aggregatedQueryStat = AggregateQueryStatsByStage(QueryStat, Task2Stage);
- aggregatedQueryStat.FlushCounters(ResponseBuffer);
+ aggregatedQueryStat.FlushCounters(ResponseBuffer);
- auto& operation = *ResponseBuffer.mutable_operation();
+ auto& operation = *ResponseBuffer.mutable_operation();
operation.Setready(true);
operation.Mutableresult()->PackFrom(queryResult);
*operation.Mutableissues() = result.GetIssues();
- ResponseBuffer.SetTruncated(result.GetTruncated());
+ ResponseBuffer.SetTruncated(result.GetTruncated());
Reply(Ydb::StatusIds::SUCCESS, result.GetIssues().size() > 0);
}
- virtual void DoRetry()
+ virtual void DoRetry()
{
this->CleanupChildren();
auto selfId = this->SelfId();
@@ -178,7 +178,7 @@ namespace NYql::NDqs {
}
void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- Y_UNUSED(ctx);
+ Y_UNUSED(ctx);
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << ev->Get()->Record.GetMetric().size();
QueryStat.AddCounters(ev->Get()->Record);
@@ -187,7 +187,7 @@ namespace NYql::NDqs {
NYql::TIssues issues;
NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
YQL_LOG(WARN) << "Retry " << Retry << " Issues: " << issues.ToString();
- DoRetry();
+ DoRetry();
} else {
if (ev->Get()->Record.GetIssues().size() > 0) {
NYql::TIssues issues;
@@ -207,21 +207,21 @@ namespace NYql::NDqs {
return Promise.GetFuture();
}
- virtual void ReplyError(grpc::StatusCode code, const TString& msg) {
+ virtual void ReplyError(grpc::StatusCode code, const TString& msg) {
Ctx->ReplyError(code, msg);
this->PassAway();
}
virtual void Reply(ui32 status, bool hasIssues) {
Y_UNUSED(hasIssues);
- Ctx->Reply(&ResponseBuffer, status);
+ Ctx->Reply(&ResponseBuffer, status);
this->PassAway();
}
private:
NGrpc::IRequestContextBase* Ctx;
bool CtxSubscribed = false;
- ResponseType ResponseBuffer;
+ ResponseType ResponseBuffer;
protected:
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
@@ -245,27 +245,27 @@ namespace NYql::NDqs {
NYql::TCounters QueryStat;
THashMap<ui64, ui64> Task2Stage;
-
- void RestoreRequest() {
- Request = dynamic_cast<const RequestType*>(Ctx->GetRequest());
- }
+
+ void RestoreRequest() {
+ Request = dynamic_cast<const RequestType*>(Ctx->GetRequest());
+ }
};
class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> {
public:
- using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>;
+ using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>;
TExecuteGraphProxyActor(NGrpc::IRequestContextBase* ctx,
const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
- const TString& traceId, const TString& username,
- const NActors::TActorId& graphExecutionEventsActorId)
+ const TString& traceId, const TString& username,
+ const NActors::TActorId& graphExecutionEventsActorId)
: TServiceProxyActor(ctx, counters, traceId, username)
- , GraphExecutionEventsActorId(graphExecutionEventsActorId)
- {
- }
+ , GraphExecutionEventsActorId(graphExecutionEventsActorId)
+ {
+ }
- void DoRetry() override {
+ void DoRetry() override {
YQL_LOG(DEBUG) << __FUNCTION__;
- SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) {
+ SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) {
if (ev->Get()->Record.GetErrorMessage()) {
TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
} else {
@@ -273,9 +273,9 @@ namespace NYql::NDqs {
ModifiedRequest.Reset();
TBase::DoRetry();
}
- });
- }
-
+ });
+ }
+
void Reply(ui32 status, bool hasIssues) override {
auto eventType = hasIssues
? NYql::NDqProto::EGraphExecutionEventType::FAIL
@@ -286,39 +286,39 @@ namespace NYql::NDqs {
} else {
TBase::Reply(status, hasIssues);
}
- });
- }
-
- void ReplyError(grpc::StatusCode code, const TString& msg) override {
- SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) {
- Y_UNUSED(ev);
- TBase::ReplyError(code, msg);
- });
- }
-
+ });
+ }
+
+ void ReplyError(grpc::StatusCode code, const TString& msg) override {
+ SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) {
+ Y_UNUSED(ev);
+ TBase::ReplyError(code, msg);
+ });
+ }
+
private:
THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest;
-
- void DoPassAway() override {
+
+ void DoPassAway() override {
YQL_LOG(DEBUG) << __FUNCTION__;
- Send(GraphExecutionEventsActorId, new TEvents::TEvPoison());
- TServiceProxyActor::DoPassAway();
- }
-
+ Send(GraphExecutionEventsActorId, new TEvents::TEvPoison());
+ TServiceProxyActor::DoPassAway();
+ }
+
NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor SerializeGraphDescriptor() const {
- NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result;
-
- for (const auto& kv : Request->GetSecureParams()) {
- result.MutableSecureParams()->MutableData()->insert(kv);
- }
-
+ NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result;
+
+ for (const auto& kv : Request->GetSecureParams()) {
+ result.MutableSecureParams()->MutableData()->insert(kv);
+ }
+
for (const auto& kv : Request->GetGraphParams()) {
result.MutableGraphParams()->MutableData()->insert(kv);
}
- return result;
- }
-
+ return result;
+ }
+
void Bootstrap() override {
YQL_LOG(DEBUG) << "TServiceProxyActor::OnExecuteGraph";
@@ -330,9 +330,9 @@ namespace NYql::NDqs {
ev->Get()->Record.GetMessage().UnpackTo(&params);
FinishBootstrap(params);
}
- });
- }
-
+ });
+ }
+
void MergeTaskMetas(const NDqProto::TGraphExecutionEvent::TMap& params) {
if (!params.data().empty()) {
for (size_t i = 0; i < Request->TaskSize(); ++i) {
@@ -340,29 +340,29 @@ namespace NYql::NDqs {
ModifiedRequest.Reset(new Yql::DqsProto::ExecuteGraphRequest());
ModifiedRequest->CopyFrom(*Request);
}
-
+
auto* task = ModifiedRequest->MutableTask(i);
-
+
Yql::DqsProto::TTaskMeta taskMeta;
task->GetMeta().UnpackTo(&taskMeta);
-
+
for (const auto&[key, value] : params.data()) {
(*taskMeta.MutableTaskParams())[key] = value;
}
-
+
task->MutableMeta()->PackFrom(taskMeta);
- }
- }
-
- if (ModifiedRequest) {
- Request = ModifiedRequest.Get();
- }
- }
-
+ }
+ }
+
+ if (ModifiedRequest) {
+ Request = ModifiedRequest.Get();
+ }
+ }
+
void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) {
YQL_LOG(DEBUG) << __FUNCTION__;
MergeTaskMetas(params);
-
+
auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime));
TVector<TString> columns;
@@ -379,7 +379,7 @@ namespace NYql::NDqs {
for (const auto& x : Request->GetSecureParams()) {
secureParams[x.first] = x.second;
}
- auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator(
+ auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator(
columns,
executerId,
TraceId,
@@ -395,23 +395,23 @@ namespace NYql::NDqs {
controlId,
resultId));
}
-
- template <class TPayload, class TCallback>
- void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) {
- NDqProto::TGraphExecutionEvent record;
- record.SetEventType(eventType);
- if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) {
- record.MutableMessage()->PackFrom(payload);
- }
- Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record));
+
+ template <class TPayload, class TCallback>
+ void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) {
+ NDqProto::TGraphExecutionEvent record;
+ record.SetEventType(eventType);
+ if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) {
+ record.MutableMessage()->PackFrom(payload);
+ }
+ Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record));
Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) {
YQL_LOG_CTX_SCOPE(traceId);
- Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
- callback(ev);
- });
- }
-
- NActors::TActorId GraphExecutionEventsActorId;
+ Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
+ callback(ev);
+ });
+ }
+
+ NActors::TActorId GraphExecutionEventsActorId;
};
TString GetVersionString() {
@@ -427,13 +427,13 @@ namespace NYql::NDqs {
}
}
- TDqsGrpcService::TDqsGrpcService(
- NActors::TActorSystem& system,
- TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
- const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories)
+ TDqsGrpcService::TDqsGrpcService(
+ NActors::TActorSystem& system,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories)
: ActorSystem(system)
, Counters(std::move(counters))
- , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories)
+ , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories)
, Promise(NewPromise<void>())
, RunningRequests(0)
, Stopping(false)
@@ -460,7 +460,7 @@ namespace NYql::NDqs {
CQ = cq;
using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>;
-
+
ADD_REQUEST(ExecuteGraph, ExecuteGraphRequest, ExecuteGraphResponse, {
TGuard<TMutex> lock(Mutex);
if (Stopping) {
@@ -478,13 +478,13 @@ namespace NYql::NDqs {
return;
}
- TDqTaskPreprocessorCollection taskPreprocessors;
+ TDqTaskPreprocessorCollection taskPreprocessors;
for (const auto& factory: DqTaskPreprocessorFactories) {
taskPreprocessors.push_back(factory());
- }
-
+ }
+
auto graphExecutionEventsActorId = ActorSystem.Register(NDqs::MakeGraphExecutionEventsActor(request->GetSession(), std::move(taskPreprocessors)));
-
+
RunningRequests++;
auto actor = MakeHolder<TExecuteGraphProxyActor>(ctx, Counters, request->GetSession(), session->GetUsername(), graphExecutionEventsActorId);
auto future = actor->GetFuture();
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h
index 8997b723b49..fa2a835c8c6 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.h
+++ b/ydb/library/yql/providers/dq/service/grpc_service.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
-
+
#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h>
#include <ydb/library/yql/providers/dq/api/protos/service.pb.h>
@@ -24,8 +24,8 @@ namespace NYql::NDqs {
class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> {
public:
TDqsGrpcService(NActors::TActorSystem& system,
- TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
- const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
@@ -41,7 +41,7 @@ namespace NYql::NDqs {
NGrpc::TGlobalLimiter* Limiter = nullptr;
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
- TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories;
+ TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories;
TMutex Mutex;
NThreading::TPromise<void> Promise;
std::atomic<ui64> RunningRequests;
diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp
index 1f68a427589..cda24ec6cf7 100644
--- a/ydb/library/yql/providers/dq/service/service_node.cpp
+++ b/ydb/library/yql/providers/dq/service/service_node.cpp
@@ -109,7 +109,7 @@ namespace NYql {
return ActorSystem.Get();
}
- void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) {
+ void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) {
class TCustomOption : public grpc::ServerBuilderOption {
public:
TCustomOption() { }
@@ -149,7 +149,7 @@ namespace NYql {
.SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER
Server = MakeHolder<TGRpcServer>(options);
- Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories));
+ Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories));
Server->AddService(Service);
Server->Start();
}
diff --git a/ydb/library/yql/providers/dq/service/service_node.h b/ydb/library/yql/providers/dq/service/service_node.h
index 9c6ac5df5fb..32f904901dc 100644
--- a/ydb/library/yql/providers/dq/service/service_node.h
+++ b/ydb/library/yql/providers/dq/service/service_node.h
@@ -26,7 +26,7 @@ namespace NYql {
void AddLocalService(NActors::TActorId actorId, const NActors::TActorSetupCmd& service);
NActors::TActorSystem* StartActorSystem(void* appData = nullptr);
- void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
+ void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
void Stop(TDuration time = TDuration::Max());