aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <aozeritsky@gmail.com>2022-06-30 04:11:21 +0300
committerAlexey Ozeritskiy <aozeritsky@gmail.com>2022-06-30 04:11:21 +0300
commitc98d93e22358d17b43d0c558445af14895557c6e (patch)
tree87e3b8894f5de851bca47886b147e1929c71857a
parent735d9c3127b2fd3472830948c1f4a0d9be1fdab3 (diff)
downloadydb-c98d93e22358d17b43d0c558445af14895557c6e.tar.gz
KIKIMR-15111: Move out some dirs
ref:0e4ad63f175fa8e05da557c75aab9f9e9f9cf9b8
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp222
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h14
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp850
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.h52
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_session.cpp106
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_session.h57
-rw-r--r--ydb/library/yql/providers/dq/service/interconnect_helpers.cpp303
-rw-r--r--ydb/library/yql/providers/dq/service/interconnect_helpers.h49
-rw-r--r--ydb/library/yql/providers/dq/service/service_node.cpp166
-rw-r--r--ydb/library/yql/providers/dq/service/service_node.h48
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp231
11 files changed, 0 insertions, 2098 deletions
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
deleted file mode 100644
index 997d1f7bdd..0000000000
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ /dev/null
@@ -1,222 +0,0 @@
-#include "yql_dq_gateway_local.h"
-
-#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
-#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h>
-
-#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h>
-#include <ydb/library/yql/providers/dq/service/service_node.h>
-
-#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
-
-#include <ydb/library/yql/utils/range_walker.h>
-#include <ydb/library/yql/utils/bind_in_range.h>
-
-#include <library/cpp/messagebus/network.h>
-
-#include <util/system/env.h>
-
-namespace NYql {
-
-using namespace NActors;
-using NDqs::MakeWorkerManagerActorID;
-
-class TLocalServiceHolder {
-public:
- TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
- NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
- {
- ui32 nodeId = 1;
-
- TString hostName;
- TString localAddress;
- std::tie(hostName, localAddress) = NDqs::GetLocalAddress();
-
- NDqs::TServiceNodeConfig config = {
- nodeId,
- localAddress,
- hostName,
- static_cast<ui16>(interconnectPort.Addr.GetPort()),
- static_cast<ui16>(grpcPort.Addr.GetPort()),
- 0, // mbus
- interconnectPort.Socket.Get()->Release(),
- grpcPort.Socket.Get()->Release(),
- };
-
- ServiceNode = MakeHolder<TServiceNode>(
- config,
- 1,
- CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq)));
-
- NDqs::TLocalWorkerManagerOptions lwmOptions;
- lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true);
- lwmOptions.AsyncIoFactory = std::move(asyncIoFactory);
- lwmOptions.FunctionRegistry = functionRegistry;
- lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
- lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
- [=](const NDqProto::TDqTask& task, const NDq::TLogFunc& )
- {
- return lwmOptions.Factory->Get(task);
- });
- auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
-
- ServiceNode->AddLocalService(
- MakeWorkerManagerActorID(nodeId),
- TActorSetupCmd(resman, TMailboxType::Simple, 0));
-
- ServiceNode->StartActorSystem();
-
- ServiceNode->StartService(dqTaskPreprocessorFactories);
- }
-
- ~TLocalServiceHolder()
- {
- ServiceNode->Stop();
- }
-
-private:
- THolder<TServiceNode> ServiceNode;
-};
-
-class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalImpl>
-{
- struct TRequest {
- TString SessionId;
- NDqs::TPlan Plan;
- TVector<TString> Columns;
- THashMap<TString, TString> SecureParams;
- THashMap<TString, TString> GraphParams;
- TDqSettings::TPtr Settings;
- IDqGateway::TDqProgressWriter ProgressWriter;
- THashMap<TString, TString> ModulesMapping;
- bool Discard;
- NThreading::TPromise<IDqGateway::TResult> Result;
- };
-
-public:
- TDqGatewayLocalImpl(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway)
- : LocalService(std::move(localService))
- , Gateway(gateway)
- , DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE"))
- { }
-
- NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) {
- return Gateway->OpenSession(sessionId, username);
- }
-
- void CloseSession(const TString& sessionId) {
- return Gateway->CloseSession(sessionId);
- }
-
- NThreading::TFuture<IDqGateway::TResult>
- ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
- const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
- const TDqSettings::TPtr& settings,
- const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
- bool discard)
- {
-
- NThreading::TFuture<IDqGateway::TResult> result;
- {
- TGuard<TMutex> lock(Mutex);
- Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()});
- result = Queue.back().Result;
- }
-
- TryExecuteNext();
-
- return result;
- }
-
-private:
- void TryExecuteNext() {
- TGuard<TMutex> lock(Mutex);
- if (!Queue.empty() && (!DeterministicMode || Inflight == 0)) {
- auto request = std::move(Queue.front()); Queue.pop_front();
- Inflight++;
- lock.Release();
-
- auto weak = weak_from_this();
-
- Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard)
- .Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable {
- try {
- promise.SetValue(result.GetValue());
- } catch (...) {
- promise.SetException(std::current_exception());
- }
-
- if (auto ptr = weak.lock()) {
- {
- TGuard<TMutex> lock(ptr->Mutex);
- ptr->Inflight--;
- }
-
- ptr->TryExecuteNext();
- }
- });
- }
- }
-
- THolder<TLocalServiceHolder> LocalService;
- IDqGateway::TPtr Gateway;
- const bool DeterministicMode;
- TMutex Mutex;
- TList<TRequest> Queue;
- int Inflight = 0;
-};
-
-class TDqGatewayLocal : public IDqGateway {
-public:
- TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway)
- : Impl(std::make_shared<TDqGatewayLocalImpl>(std::move(localService), gateway))
- {}
-
- NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override {
- return Impl->OpenSession(sessionId, username);
- }
-
- void CloseSession(const TString& sessionId) override {
- return Impl->CloseSession(sessionId);
- }
-
- NThreading::TFuture<TResult>
- ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
- const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
- const TDqSettings::TPtr& settings,
- const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
- bool discard) override
- {
- return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams,
- settings, progressWriter, modulesMapping, discard);
- }
-
-private:
- std::shared_ptr<TDqGatewayLocalImpl> Impl;
-};
-
-THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
- NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
-{
- return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory));
-}
-
-TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
-{
- int startPort = 31337;
- TRangeWalker<int> portWalker(startPort, startPort+100);
- auto interconnectPort = BindInRange(portWalker)[1];
- auto grpcPort = BindInRange(portWalker)[1];
-
- return new TDqGatewayLocal(
- CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)),
- CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort()));
-}
-
-} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
deleted file mode 100644
index 01634057da..0000000000
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
-#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-
-namespace NYql {
-
-TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NDq::IDqAsyncIoFactory::TPtr = nullptr);
-
-} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
deleted file mode 100644
index c6d535cfcc..0000000000
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ /dev/null
@@ -1,850 +0,0 @@
-#include "grpc_service.h"
-
-#include <ydb/library/yql/utils/log/log.h>
-
-#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
-#include <ydb/library/yql/providers/dq/actors/executer_actor.h>
-#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h>
-#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
-#include <ydb/library/yql/providers/dq/actors/result_aggregator.h>
-#include <ydb/library/yql/providers/dq/actors/events.h>
-#include <ydb/library/yql/providers/dq/actors/task_controller.h>
-#include <ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h>
-
-#include <ydb/library/yql/providers/dq/counters/counters.h>
-#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
-#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
-
-//#include <yql/tools/yqlworker/dq/worker_manager/benchmark.h>
-
-#include <ydb/library/yql/public/issue/yql_issue_message.h>
-
-#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
-
-#include <library/cpp/grpc/server/grpc_counters.h>
-#include <ydb/public/api/protos/ydb_status_codes.pb.h>
-
-#include <library/cpp/actors/interconnect/interconnect.h>
-#include <library/cpp/actors/helpers/future_callback.h>
-#include <library/cpp/build_info/build_info.h>
-#include <library/cpp/svnversion/svnversion.h>
-
-#include <util/string/split.h>
-#include <util/system/env.h>
-
-namespace NYql::NDqs {
- using namespace NYql::NDqs;
- using namespace NKikimr;
- using namespace NThreading;
- using namespace NMonitoring;
- using namespace NActors;
-
- namespace {
- NGrpc::ICounterBlockPtr BuildCB(TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, const TString& name) {
- auto grpcCB = counters->GetSubgroup("rpc_name", name);
- return MakeIntrusive<NGrpc::TCounterBlock>(
- grpcCB->GetCounter("total", true),
- grpcCB->GetCounter("infly", true),
- grpcCB->GetCounter("notOkReq", true),
- grpcCB->GetCounter("notOkResp", true),
- grpcCB->GetCounter("reqBytes", true),
- grpcCB->GetCounter("inflyReqBytes", true),
- grpcCB->GetCounter("resBytes", true),
- grpcCB->GetCounter("notAuth", true),
- grpcCB->GetCounter("resExh", true),
- grpcCB);
- }
-
- template<typename RequestType, typename ResponseType>
- class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> {
- public:
- static constexpr char ActorName[] = "SERVICE_PROXY";
- static constexpr char RetryName[] = "OperationRetry";
-
- explicit TServiceProxyActor(
- NGrpc::IRequestContextBase* ctx,
- const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
- const TString& traceId, const TString& username)
- : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler)
- , Ctx(ctx)
- , Counters(counters)
- , ServiceProxyActorCounters(counters->GetSubgroup("component", "ServiceProxyActor"))
- , ClientDisconnectedCounter(ServiceProxyActorCounters->GetCounter("ClientDisconnected", /*derivative=*/ true))
- , RetryCounter(ServiceProxyActorCounters->GetCounter(RetryName, /*derivative=*/ true))
- , FallbackCounter(ServiceProxyActorCounters->GetCounter("Fallback", /*derivative=*/ true))
- , ErrorCounter(ServiceProxyActorCounters->GetCounter("UnrecoverableError", /*derivative=*/ true))
- , Request(dynamic_cast<const RequestType*>(ctx->GetRequest()))
- , TraceId(traceId)
- , Username(username)
- , Promise(NewPromise<void>())
- {
- Settings->Dispatch(Request->GetSettings());
- Settings->FreezeDefaults();
-
- MaxRetries = Settings->MaxRetries.Get().GetOrElse(MaxRetries);
- RetryBackoff = TDuration::MilliSeconds(Settings->RetryBackoffMs.Get().GetOrElse(RetryBackoff.MilliSeconds()));
- }
-
- STRICT_STFUNC(Handler, {
- HFunc(TEvQueryResponse, OnReturnResult);
- cFunc(TEvents::TEvPoison::EventType, OnPoison);
- SFunc(TEvents::TEvBootstrap, DoBootstrap)
- })
-
- TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override {
- return new IEventHandle(self, parentId, new TEvents::TEvBootstrap(), 0);
- }
-
- void OnPoison() {
- YQL_LOG_CTX_ROOT_SCOPE(TraceId);
- YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ ;
- ReplyError(grpc::UNAVAILABLE, "Unexpected error");
- *ClientDisconnectedCounter += 1;
- }
-
- void DoPassAway() override {
- Promise.SetValue();
- }
-
- void DoBootstrap(const NActors::TActorContext& ctx) {
- YQL_LOG_CTX_ROOT_SCOPE(TraceId);
- if (!CtxSubscribed) {
- auto selfId = ctx.SelfID;
- auto* actorSystem = ctx.ExecutorThread.ActorSystem;
- Ctx->GetFinishFuture().Subscribe([selfId, actorSystem](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) {
- Y_VERIFY(future.HasValue());
- if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) {
- actorSystem->Send(selfId, new TEvents::TEvPoison());
- }
- });
- CtxSubscribed = true;
- }
- Bootstrap();
- }
-
- virtual void Bootstrap() = 0;
-
- void SendResponse(TEvQueryResponse::TPtr& ev)
- {
- auto& result = ev->Get()->Record;
- Yql::DqsProto::ExecuteQueryResult queryResult;
- queryResult.Mutableresult()->CopyFrom(result.resultset());
- queryResult.set_yson(result.yson());
-
- bool needFallback;
- auto statusCode = result.GetStatusCode();
- if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
- needFallback = NCommon::NeedFallback(statusCode);
- if (needFallback != result.GetDeprecatedNeedFallback()) {
- Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc();
- }
- } else {
- needFallback = result.GetDeprecatedNeedFallback();
- }
-
- if (needFallback) {
- NYql::TIssue rootIssue("Fatal Error");
- rootIssue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
- NYql::TIssues issues;
- NYql::IssuesFromMessage(result.GetIssues(), issues);
- for (const auto& issue: issues) {
- rootIssue.AddSubIssue(MakeIntrusive<TIssue>(issue));
- }
- result.MutableIssues()->Clear();
- NYql::IssuesToMessage({rootIssue}, result.MutableIssues());
- }
-
- for (const auto& [k, v] : QueryStat.Get()) {
- std::map<TString, TString> labels;
- TString prefix, name;
- if (NCommon::ParseCounterName(&prefix, &labels, &name, k)) {
- if (prefix == "Actor") {
- auto group = Counters->GetSubgroup("counters", "Actor");
- for (const auto& [k, v] : labels) {
- group = group->GetSubgroup(k, v);
- }
- group->GetHistogram(name, ExponentialHistogram(10, 2, 50))->Collect(v.Sum);
- }
- }
- }
- auto aggregatedQueryStat = AggregateQueryStatsByStage(QueryStat, Task2Stage);
-
- aggregatedQueryStat.FlushCounters(ResponseBuffer);
-
- auto& operation = *ResponseBuffer.mutable_operation();
- operation.Setready(true);
- operation.Mutableresult()->PackFrom(queryResult);
- *operation.Mutableissues() = result.GetIssues();
- ResponseBuffer.SetTruncated(result.GetTruncated());
-
- Reply(Ydb::StatusIds::SUCCESS, result.GetIssues().size() > 0);
- }
-
- virtual void DoRetry()
- {
- this->CleanupChildren();
- auto selfId = this->SelfId();
- TActivationContext::Schedule(RetryBackoff, new IEventHandle(selfId, selfId, new TEvents::TEvBootstrap(), 0));
- Retry += 1;
- *RetryCounter +=1 ;
- }
-
- void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- auto& result = ev->Get()->Record;
- Y_UNUSED(ctx);
- YQL_LOG_CTX_ROOT_SCOPE(TraceId);
- YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size();
- QueryStat.AddCounters(result);
-
- bool retriable;
- auto statusCode = result.GetStatusCode();
- if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
- retriable = NCommon::IsRetriable(statusCode);
- if (retriable != result.GetDeprecatedRetriable()) {
- Counters->GetSubgroup("MistmatchedRetriable", retriable ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc();
- }
- } else {
- retriable = result.GetDeprecatedRetriable();
- }
-
- if (result.GetIssues().size() > 0 && retriable && Retry < MaxRetries) {
- QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0));
- NYql::TIssues issues;
- NYql::IssuesFromMessage(result.GetIssues(), issues);
- YQL_CLOG(WARN, ProviderDq) << RetryName << " " << Retry << " Issues: " << issues.ToString();
- DoRetry();
- } else {
- auto needFallback = NCommon::NeedFallback(statusCode);
- if (result.GetIssues().size() > 0) {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(result.GetIssues(), issues);
- YQL_CLOG(WARN, ProviderDq) << "Issues: " << issues.ToString();
- *ErrorCounter += 1;
- }
- if (needFallback) {
- // TODO: Remove GetNeedFallback, use only issue codes!
- *FallbackCounter += 1;
- }
- SendResponse(ev);
- }
- }
-
- TFuture<void> GetFuture() {
- return Promise.GetFuture();
- }
-
- virtual void ReplyError(grpc::StatusCode code, const TString& msg) {
- Ctx->ReplyError(code, msg);
- this->PassAway();
- }
-
- virtual void Reply(ui32 status, bool hasIssues) {
- Y_UNUSED(hasIssues);
- Ctx->Reply(&ResponseBuffer, status);
- this->PassAway();
- }
-
- private:
- NGrpc::IRequestContextBase* Ctx;
- bool CtxSubscribed = false;
- ResponseType ResponseBuffer;
-
- protected:
- TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
- TIntrusivePtr<NMonitoring::TDynamicCounters> ServiceProxyActorCounters;
- TDynamicCounters::TCounterPtr ClientDisconnectedCounter;
- TDynamicCounters::TCounterPtr RetryCounter;
- TDynamicCounters::TCounterPtr FallbackCounter;
- TDynamicCounters::TCounterPtr ErrorCounter;
-
- const RequestType* Request;
- const TString TraceId;
- const TString Username;
- TPromise<void> Promise;
- const TInstant RequestStartTime = TInstant::Now();
-
- TDqConfiguration::TPtr Settings = MakeIntrusive<TDqConfiguration>();
-
- int Retry = 0;
- int MaxRetries = 10;
- TDuration RetryBackoff = TDuration::MilliSeconds(1000);
-
- NYql::TCounters QueryStat;
- THashMap<ui64, ui64> Task2Stage;
-
- void RestoreRequest() {
- Request = dynamic_cast<const RequestType*>(Ctx->GetRequest());
- }
- };
-
- class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> {
- public:
- using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>;
- TExecuteGraphProxyActor(NGrpc::IRequestContextBase* ctx,
- const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
- const TString& traceId, const TString& username,
- const NActors::TActorId& graphExecutionEventsActorId)
- : TServiceProxyActor(ctx, counters, traceId, username)
- , GraphExecutionEventsActorId(graphExecutionEventsActorId)
- {
- }
-
- void DoRetry() override {
- YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
- SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) {
- if (ev->Get()->Record.GetErrorMessage()) {
- TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
- } else {
- RestoreRequest();
- ModifiedRequest.Reset();
- TBase::DoRetry();
- }
- });
- }
-
- void Reply(ui32 status, bool hasIssues) override {
- auto eventType = hasIssues
- ? NYql::NDqProto::EGraphExecutionEventType::FAIL
- : NYql::NDqProto::EGraphExecutionEventType::SUCCESS;
- SendEvent(eventType, nullptr, [this, status, hasIssues](const auto& ev) {
- if (!hasIssues && ev->Get()->Record.GetErrorMessage()) {
- TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
- } else {
- TBase::Reply(status, hasIssues);
- }
- });
- }
-
- void ReplyError(grpc::StatusCode code, const TString& msg) override {
- SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) {
- Y_UNUSED(ev);
- TBase::ReplyError(code, msg);
- });
- }
-
- private:
- THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest;
-
- void DoPassAway() override {
- YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
- Send(GraphExecutionEventsActorId, new TEvents::TEvPoison());
- TServiceProxyActor::DoPassAway();
- }
-
- NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor SerializeGraphDescriptor() const {
- NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result;
-
- for (const auto& kv : Request->GetSecureParams()) {
- result.MutableSecureParams()->MutableData()->insert(kv);
- }
-
- for (const auto& kv : Request->GetGraphParams()) {
- result.MutableGraphParams()->MutableData()->insert(kv);
- }
-
- return result;
- }
-
- void Bootstrap() override {
- YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnExecuteGraph";
-
- SendEvent(NYql::NDqProto::EGraphExecutionEventType::START, SerializeGraphDescriptor(), [this](const TEvGraphExecutionEvent::TPtr& ev) {
- if (ev->Get()->Record.GetErrorMessage()) {
- TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
- } else {
- NDqProto::TGraphExecutionEvent::TMap params;
- ev->Get()->Record.GetMessage().UnpackTo(&params);
- FinishBootstrap(params);
- }
- });
- }
-
- void MergeTaskMetas(const NDqProto::TGraphExecutionEvent::TMap& params) {
- if (!params.data().empty()) {
- for (size_t i = 0; i < Request->TaskSize(); ++i) {
- if (!ModifiedRequest) {
- ModifiedRequest.Reset(new Yql::DqsProto::ExecuteGraphRequest());
- ModifiedRequest->CopyFrom(*Request);
- }
-
- auto* task = ModifiedRequest->MutableTask(i);
-
- Yql::DqsProto::TTaskMeta taskMeta;
- task->GetMeta().UnpackTo(&taskMeta);
-
- for (const auto&[key, value] : params.data()) {
- (*taskMeta.MutableTaskParams())[key] = value;
- }
-
- task->MutableMeta()->PackFrom(taskMeta);
- }
- }
-
- if (ModifiedRequest) {
- Request = ModifiedRequest.Get();
- }
- }
-
- void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) {
- YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
- MergeTaskMetas(params);
-
- auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime));
-
- TVector<TString> columns;
- columns.reserve(Request->GetColumns().size());
- for (const auto& column : Request->GetColumns()) {
- columns.push_back(column);
- }
- for (const auto& task : Request->GetTask()) {
- Yql::DqsProto::TTaskMeta taskMeta;
- task.GetMeta().UnpackTo(&taskMeta);
- Task2Stage[task.GetId()] = taskMeta.GetStageId();
- }
- THashMap<TString, TString> secureParams;
- for (const auto& x : Request->GetSecureParams()) {
- secureParams[x.first] = x.second;
- }
- auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator(
- columns,
- executerId,
- TraceId,
- secureParams,
- Settings,
- Request->GetResultType(),
- Request->GetDiscard(),
- GraphExecutionEventsActorId).Release());
- auto controlId = Settings->EnableComputeActor.Get().GetOrElse(false) == false ? resultId
- : RegisterChild(NYql::MakeTaskController(TraceId, executerId, resultId, Settings, NYql::NCommon::TServiceCounters(Counters, nullptr, "")).Release());
- Send(executerId, MakeHolder<TEvGraphRequest>(
- *Request,
- controlId,
- resultId));
- }
-
- template <class TPayload, class TCallback>
- void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) {
- NDqProto::TGraphExecutionEvent record;
- record.SetEventType(eventType);
- if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) {
- record.MutableMessage()->PackFrom(payload);
- }
- Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record));
- Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) {
- YQL_LOG_CTX_ROOT_SCOPE(traceId);
- Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
- callback(ev);
- });
- }
-
- NActors::TActorId GraphExecutionEventsActorId;
- };
-
- TString GetVersionString() {
- TStringBuilder sb;
- sb << GetProgramSvnVersion() << "\n";
- sb << GetBuildInfo();
- TString sandboxTaskId = GetSandboxTaskId();
- if (sandboxTaskId != TString("0")) {
- sb << "\nSandbox task id: " << sandboxTaskId;
- }
-
- return sb;
- }
- }
-
- TDqsGrpcService::TDqsGrpcService(
- NActors::TActorSystem& system,
- TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
- const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories)
- : ActorSystem(system)
- , Counters(std::move(counters))
- , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories)
- , Promise(NewPromise<void>())
- , RunningRequests(0)
- , Stopping(false)
- , Sessions(&system, Counters->GetSubgroup("component", "grpc")->GetCounter("Sessions"))
- { }
-
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
- do { \
- MakeIntrusive<NGrpc::TGRpcRequest<Yql::DqsProto::IN, Yql::DqsProto::OUT, TDqsGrpcService>>( \
- this, \
- &Service_, \
- CQ, \
- [this](NGrpc::IRequestContextBase* ctx) { ACTION }, \
- &Yql::DqsProto::DqService::AsyncService::Request##NAME, \
- #NAME, \
- logger, \
- BuildCB(Counters, #NAME)) \
- ->Run(); \
- } while (0)
-
- void TDqsGrpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
- using namespace google::protobuf;
-
- CQ = cq;
-
- using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>;
-
- ADD_REQUEST(ExecuteGraph, ExecuteGraphRequest, ExecuteGraphResponse, {
- TGuard<TMutex> lock(Mutex);
- if (Stopping) {
- ctx->ReplyError(grpc::UNAVAILABLE, "Terminating in progress");
- return;
- }
- auto* request = dynamic_cast<const Yql::DqsProto::ExecuteGraphRequest*>(ctx->GetRequest());
- auto session = Sessions.GetSession(request->GetSession());
- if (!session) {
- TString message = TStringBuilder()
- << "Bad session: "
- << request->GetSession();
- YQL_CLOG(DEBUG, ProviderDq) << message;
- ctx->ReplyError(grpc::INVALID_ARGUMENT, message);
- return;
- }
-
- TDqTaskPreprocessorCollection taskPreprocessors;
- for (const auto& factory: DqTaskPreprocessorFactories) {
- taskPreprocessors.push_back(factory());
- }
-
- auto graphExecutionEventsActorId = ActorSystem.Register(NDqs::MakeGraphExecutionEventsActor(request->GetSession(), std::move(taskPreprocessors)));
-
- RunningRequests++;
- auto actor = MakeHolder<TExecuteGraphProxyActor>(ctx, Counters, request->GetSession(), session->GetUsername(), graphExecutionEventsActorId);
- auto future = actor->GetFuture();
- auto actorId = ActorSystem.Register(actor.Release());
- future.Apply([session, actorId, this] (const TFuture<void>&) mutable {
- RunningRequests--;
- if (Stopping && !RunningRequests) {
- Promise.SetValue();
- }
-
- session->DeleteRequest(actorId);
- });
- session->AddRequest(actorId);
- });
-
- ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, {
- Y_UNUSED(this);
- Yql::DqsProto::SvnRevisionResponse result;
- result.SetRevision(GetVersionString());
- ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
- });
-
- ADD_REQUEST(CloseSession, CloseSessionRequest, CloseSessionResponse, {
- Y_UNUSED(this);
- auto* request = dynamic_cast<const Yql::DqsProto::CloseSessionRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- Yql::DqsProto::CloseSessionResponse result;
- Sessions.CloseSession(request->GetSession());
- ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
- });
-
- ADD_REQUEST(PingSession, PingSessionRequest, PingSessionResponse, {
- Y_UNUSED(this);
- auto* request = dynamic_cast<const Yql::DqsProto::PingSessionRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- YQL_CLOG(TRACE, ProviderDq) << "PingSession " << request->GetSession();
-
- Yql::DqsProto::PingSessionResponse result;
- auto session = Sessions.GetSession(request->GetSession());
- if (!session) {
- TString message = TStringBuilder()
- << "Bad session: "
- << request->GetSession();
- YQL_CLOG(DEBUG, ProviderDq) << message;
- ctx->ReplyError(grpc::INVALID_ARGUMENT, message);
- } else {
- ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
- }
- });
-
- ADD_REQUEST(OpenSession, OpenSessionRequest, OpenSessionResponse, {
- Y_UNUSED(this);
- auto* request = dynamic_cast<const Yql::DqsProto::OpenSessionRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- YQL_CLOG(DEBUG, ProviderDq) << "OpenSession for " << request->GetSession() << " " << request->GetUsername();
-
- Yql::DqsProto::OpenSessionResponse result;
- if (Sessions.OpenSession(request->GetSession(), request->GetUsername())) {
- ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
- } else {
- ctx->ReplyError(grpc::INVALID_ARGUMENT, "Session `" + request->GetSession() + "' exists");
- }
- });
-
- ADD_REQUEST(JobStop, JobStopRequest, JobStopResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::JobStopRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- auto ev = MakeHolder<TEvJobStop>(*request);
-
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::JobStopResponse>(ctx->GetArena());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
-
- ActorSystem.Send(MakeWorkerManagerActorID(ActorSystem.NodeId), ev.Release());
- });
-
- ADD_REQUEST(ClusterStatus, ClusterStatusRequest, ClusterStatusResponse, {
- auto ev = MakeHolder<TEvClusterStatus>();
-
- using ResultEv = TEvClusterStatusResponse;
-
- TExecutorPoolStats poolStats;
-
- TExecutorThreadStats stat;
- TVector<TExecutorThreadStats> stats;
- ActorSystem.GetPoolStats(0, poolStats, stats);
-
- for (const auto& s : stats) {
- stat.Aggregate(s);
- }
-
- YQL_CLOG(DEBUG, ProviderDq) << "SentEvents: " << stat.SentEvents;
- YQL_CLOG(DEBUG, ProviderDq) << "ReceivedEvents: " << stat.ReceivedEvents;
- YQL_CLOG(DEBUG, ProviderDq) << "NonDeliveredEvents: " << stat.NonDeliveredEvents;
- YQL_CLOG(DEBUG, ProviderDq) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation;
- Sessions.PrintInfo();
-
- for (ui32 i = 0; i < stat.ActorsAliveByActivity.size(); i=i+1) {
- if (stat.ActorsAliveByActivity[i]) {
- YQL_CLOG(DEBUG, ProviderDq) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i];
- }
- }
-
- auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>(
- [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable {
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ClusterStatusResponse>(ctx->GetArena());
- result->MergeFrom(event->Get()->Record.GetResponse());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- },
- [ctx] () mutable {
- YQL_CLOG(INFO, ProviderDq) << "ClusterStatus failed";
- ctx->ReplyError(grpc::UNAVAILABLE, "Error");
- },
- TDuration::MilliSeconds(2000));
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
- });
-
- ADD_REQUEST(OperationStop, OperationStopRequest, OperationStopResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::OperationStopRequest*>(ctx->GetRequest());
- auto ev = MakeHolder<TEvOperationStop>(*request);
-
- auto callback = MakeHolder<TRichActorFutureCallback<TEvOperationStopResponse>>(
- [ctx] (TAutoPtr<TEventHandle<TEvOperationStopResponse>>& event) mutable {
- Y_UNUSED(event);
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::OperationStopResponse>(ctx->GetArena());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- },
- [ctx] () mutable {
- YQL_CLOG(INFO, ProviderDq) << "OperationStopResponse failed";
- ctx->ReplyError(grpc::UNAVAILABLE, "Error");
- },
- TDuration::MilliSeconds(2000));
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
- });
-
- ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest());
-
- auto ev = MakeHolder<TEvQueryStatus>(*request);
-
- auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>(
- [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable {
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena());
- result->MergeFrom(event->Get()->Record.GetResponse());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- },
- [ctx] () mutable {
- YQL_CLOG(INFO, ProviderDq) << "QueryStatus failed";
- ctx->ReplyError(grpc::UNAVAILABLE, "Error");
- },
- TDuration::MilliSeconds(2000));
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
- });
-
- ADD_REQUEST(RegisterNode, RegisterNodeRequest, RegisterNodeResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::RegisterNodeRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- if (!request->GetPort()
- || request->GetRole().empty()
- || request->GetAddress().empty()
- || request->GetRevision().empty())
- {
- ctx->ReplyError(grpc::INVALID_ARGUMENT, "Invalid argument");
- return;
- }
-
- auto ev = MakeHolder<TEvRegisterNode>(*request);
-
- using ResultEv = TEvRegisterNodeResponse;
-
- auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>(
- [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable {
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::RegisterNodeResponse>(ctx->GetArena());
- result->MergeFrom(event->Get()->Record.GetResponse());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- },
- [ctx] () mutable {
- YQL_CLOG(INFO, ProviderDq) << "RegisterNode failed";
- ctx->ReplyError(grpc::UNAVAILABLE, "Error");
- },
- TDuration::MilliSeconds(5000));
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
- });
-
- ADD_REQUEST(GetMaster, GetMasterRequest, GetMasterResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::GetMasterRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- auto requestEvent = MakeHolder<TEvGetMasterRequest>();
-
- auto callback = MakeHolder<TActorFutureCallback<TEvGetMasterResponse>>(
- [ctx] (TAutoPtr<TEventHandle<TEvGetMasterResponse>>& event) mutable {
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::GetMasterResponse>(ctx->GetArena());
- result->MergeFrom(event->Get()->Record.GetResponse());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- });
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release()));
- });
-
- ADD_REQUEST(ConfigureFailureInjector, ConfigureFailureInjectorRequest, ConfigureFailureInjectorResponse,{
- auto* request = dynamic_cast<const Yql::DqsProto::ConfigureFailureInjectorRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- auto requestEvent = MakeHolder<TEvConfigureFailureInjectorRequest>(*request);
-
- auto callback = MakeHolder<TActorFutureCallback<TEvConfigureFailureInjectorResponse>>(
- [ctx] (TAutoPtr<TEventHandle<TEvConfigureFailureInjectorResponse>>& event) mutable {
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ConfigureFailureInjectorResponse>(ctx->GetArena());
- result->MergeFrom(event->Get()->Record.GetResponse());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- });
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release()));
- });
-
- ADD_REQUEST(IsReady, IsReadyRequest, IsReadyResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::IsReadyRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- auto ev = MakeHolder<TEvIsReady>(*request);
-
- auto callback = MakeHolder<TRichActorFutureCallback<TEvIsReadyResponse>>(
- [ctx] (TAutoPtr<TEventHandle<TEvIsReadyResponse>>& event) mutable {
- Yql::DqsProto::IsReadyResponse result;
- result.SetIsReady(event->Get()->Record.GetIsReady());
- ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
- },
- [ctx] () mutable {
- YQL_CLOG(INFO, ProviderDq) << "IsReadyForRevision failed";
- ctx->ReplyError(grpc::UNAVAILABLE, "Error");
- },
- TDuration::MilliSeconds(2000));
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release()));
- });
-
- ADD_REQUEST(Routes, RoutesRequest, RoutesResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::RoutesRequest*>(ctx->GetRequest());
- Y_VERIFY(!!request);
-
- auto ev = MakeHolder<TEvRoutesRequest>();
-
- auto callback = MakeHolder<TRichActorFutureCallback<TEvRoutesResponse>>(
- [ctx] (TAutoPtr<TEventHandle<TEvRoutesResponse>>& event) mutable {
- Yql::DqsProto::RoutesResponse result;
- result.MergeFrom(event->Get()->Record.GetResponse());
- ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
- },
- [ctx] () mutable {
- YQL_CLOG(INFO, ProviderDq) << "Routes failed";
- ctx->ReplyError(grpc::UNAVAILABLE, "Error");
- },
- TDuration::MilliSeconds(5000));
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
- ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(request->GetNodeId()), callbackId, ev.Release()));
- });
-
-/* 1. move grpc to providers/dq, 2. move benchmark to providers/dq 3. uncomment
- ADD_REQUEST(Benchmark, BenchmarkRequest, BenchmarkResponse, {
- auto* req = dynamic_cast<const Yql::DqsProto::BenchmarkRequest*>(ctx->GetRequest());
- Y_VERIFY(!!req);
-
- TWorkerManagerBenchmarkOptions options;
- if (req->GetWorkerCount()) {
- options.WorkerCount = req->GetWorkerCount();
- }
- if (req->GetInflight()) {
- options.Inflight = req->GetInflight();
- }
- if (req->GetTotalRequests()) {
- options.TotalRequests = req->GetTotalRequests();
- }
- if (req->GetMaxRunTimeMs()) {
- options.MaxRunTimeMs = TDuration::MilliSeconds(req->GetMaxRunTimeMs());
- }
-
- auto benchmarkId = ActorSystem.Register(
- CreateWorkerManagerBenchmark(
- MakeWorkerManagerActorID(ActorSystem.NodeId), options
- ));
-
- ActorSystem.Send(benchmarkId, new TEvents::TEvBootstrap);
-
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::BenchmarkResponse>(ctx->GetArena());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- });
-*/
- }
-
- void TDqsGrpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
- }
-
- bool TDqsGrpcService::IncRequest() {
- return Limiter->Inc();
- }
-
- void TDqsGrpcService::DecRequest() {
- Limiter->Dec();
- Y_ASSERT(Limiter->GetCurrentInFlight() >= 0);
- }
-
- TFuture<void> TDqsGrpcService::Stop() {
- TGuard<TMutex> lock(Mutex);
- Stopping = true;
- auto future = Promise.GetFuture();
- if (RunningRequests == 0) {
- Promise.SetValue();
- }
- return future;
- }
-}
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h
deleted file mode 100644
index fa2a835c8c..0000000000
--- a/ydb/library/yql/providers/dq/service/grpc_service.h
+++ /dev/null
@@ -1,52 +0,0 @@
-#pragma once
-
-#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
-
-#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h>
-#include <ydb/library/yql/providers/dq/api/protos/service.pb.h>
-
-#include <ydb/library/yql/minikql/mkql_function_registry.h>
-
-#include <library/cpp/grpc/server/grpc_request.h>
-#include <library/cpp/grpc/server/grpc_server.h>
-
-#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/core/event_local.h>
-#include <library/cpp/actors/core/events.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/threading/future/future.h>
-
-#include "grpc_session.h"
-
-namespace NYql::NDqs {
- class TDatabaseManager;
-
- class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> {
- public:
- TDqsGrpcService(NActors::TActorSystem& system,
- TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
- const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
-
- NThreading::TFuture<void> Stop();
-
- private:
- NActors::TActorSystem& ActorSystem;
- grpc::ServerCompletionQueue* CQ = nullptr;
- NGrpc::TGlobalLimiter* Limiter = nullptr;
-
- TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
- TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories;
- TMutex Mutex;
- NThreading::TPromise<void> Promise;
- std::atomic<ui64> RunningRequests;
- std::atomic<bool> Stopping;
-
- TSessionStorage Sessions;
- };
-}
diff --git a/ydb/library/yql/providers/dq/service/grpc_session.cpp b/ydb/library/yql/providers/dq/service/grpc_session.cpp
deleted file mode 100644
index 2de87f9d46..0000000000
--- a/ydb/library/yql/providers/dq/service/grpc_session.cpp
+++ /dev/null
@@ -1,106 +0,0 @@
-#include "grpc_session.h"
-
-#include <ydb/library/yql/utils/log/log.h>
-
-namespace NYql::NDqs {
-
-TSession::~TSession() {
- TGuard<TMutex> lock(Mutex);
- for (auto actorId : Requests) {
- ActorSystem->Send(actorId, new NActors::TEvents::TEvPoison());
- }
-}
-
-void TSession::DeleteRequest(const NActors::TActorId& actorId)
-{
- TGuard<TMutex> lock(Mutex);
- Requests.erase(actorId);
-}
-
-void TSession::AddRequest(const NActors::TActorId& actorId)
-{
- TGuard<TMutex> lock(Mutex);
- Requests.insert(actorId);
-}
-
-TSessionStorage::TSessionStorage(
- NActors::TActorSystem* actorSystem,
- const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter)
- : ActorSystem(actorSystem)
- , SessionsCounter(sessionsCounter)
-{ }
-
-void TSessionStorage::CloseSession(const TString& sessionId)
-{
- TGuard<TMutex> lock(SessionMutex);
- auto it = Sessions.find(sessionId);
- if (it == Sessions.end()) {
- return;
- }
- SessionsByLastUpdate.erase(it->second.Iterator);
- Sessions.erase(it);
- *SessionsCounter = Sessions.size();
-}
-
-std::shared_ptr<TSession> TSessionStorage::GetSession(const TString& sessionId)
-{
- Clean(TInstant::Now() - TDuration::Minutes(10));
-
- TGuard<TMutex> lock(SessionMutex);
- auto it = Sessions.find(sessionId);
- if (it == Sessions.end()) {
- return std::shared_ptr<TSession>();
- } else {
- SessionsByLastUpdate.erase(it->second.Iterator);
- SessionsByLastUpdate.push_back({TInstant::Now(), sessionId});
- it->second.Iterator = SessionsByLastUpdate.end();
- it->second.Iterator--;
- return it->second.Session;
- }
-}
-
-bool TSessionStorage::OpenSession(const TString& sessionId, const TString& username)
-{
- TGuard<TMutex> lock(SessionMutex);
- if (Sessions.contains(sessionId)) {
- return false;
- }
-
- SessionsByLastUpdate.push_back({TInstant::Now(), sessionId});
- auto it = SessionsByLastUpdate.end(); --it;
-
- Sessions[sessionId] = TSessionAndIterator {
- std::make_shared<TSession>(username, ActorSystem),
- it
- };
-
- *SessionsCounter = Sessions.size();
-
- return true;
-}
-
-void TSessionStorage::Clean(TInstant before) {
- TGuard<TMutex> lock(SessionMutex);
- for (TSessionsByLastUpdate::iterator it = SessionsByLastUpdate.begin();
- it != SessionsByLastUpdate.end(); )
- {
- if (it->LastUpdate < before) {
- YQL_CLOG(INFO, ProviderDq) << "Drop session by timeout " << it->SessionId;
- Sessions.erase(it->SessionId);
- it = SessionsByLastUpdate.erase(it);
- } else {
- break;
- }
- }
-
- *SessionsCounter = Sessions.size();
-}
-
-void TSessionStorage::PrintInfo() const {
- YQL_CLOG(INFO, ProviderDq) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size();
- YQL_CLOG(DEBUG, ProviderDq) << "Sessions: " << Sessions.size();
- ui64 currenSessionsCounter = *SessionsCounter;
- YQL_CLOG(DEBUG, ProviderDq) << "SessionsCounter: " << currenSessionsCounter;
-}
-
-} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/service/grpc_session.h b/ydb/library/yql/providers/dq/service/grpc_session.h
deleted file mode 100644
index 56e592a757..0000000000
--- a/ydb/library/yql/providers/dq/service/grpc_session.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#pragma once
-#include <library/cpp/actors/core/actorsystem.h>
-
-namespace NYql::NDqs {
-
-class TSession {
-public:
- TSession(const TString& username, NActors::TActorSystem* actorSystem)
- : Username(username)
- , ActorSystem(actorSystem)
- { }
-
- ~TSession();
-
- const TString& GetUsername() const {
- return Username;
- }
-
- void DeleteRequest(const NActors::TActorId& id);
- void AddRequest(const NActors::TActorId& id);
-
-private:
- TMutex Mutex;
- const TString Username;
- NActors::TActorSystem* ActorSystem;
- THashSet<NActors::TActorId> Requests;
-};
-
-class TSessionStorage {
-public:
- TSessionStorage(
- NActors::TActorSystem* actorSystem,
- const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter);
- void CloseSession(const TString& sessionId);
- std::shared_ptr<TSession> GetSession(const TString& sessionId);
- bool OpenSession(const TString& sessionId, const TString& username);
- void Clean(TInstant before);
- void PrintInfo() const;
-
-private:
- NActors::TActorSystem* ActorSystem;
- NMonitoring::TDynamicCounters::TCounterPtr SessionsCounter;
- TMutex SessionMutex;
- struct TTimeAndSessionId {
- TInstant LastUpdate;
- TString SessionId;
- };
- using TSessionsByLastUpdate = TList<TTimeAndSessionId>;
- TSessionsByLastUpdate SessionsByLastUpdate;
- struct TSessionAndIterator {
- std::shared_ptr<TSession> Session;
- TSessionsByLastUpdate::iterator Iterator;
- };
- THashMap<TString, TSessionAndIterator> Sessions;
-};
-
-} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
deleted file mode 100644
index 9a0baa3f8b..0000000000
--- a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
+++ /dev/null
@@ -1,303 +0,0 @@
-#include "interconnect_helpers.h"
-#include "service_node.h"
-
-#include "grpc_service.h"
-
-#include <library/cpp/actors/helpers/selfping_actor.h>
-
-#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/utils/backtrace/backtrace.h>
-#include <ydb/library/yql/utils/yql_panic.h>
-
-#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
-
-#include <library/cpp/actors/core/executor_pool_basic.h>
-#include <library/cpp/actors/core/scheduler_basic.h>
-#include <library/cpp/actors/core/scheduler_actor.h>
-#include <library/cpp/actors/dnsresolver/dnsresolver.h>
-#include <library/cpp/actors/interconnect/interconnect.h>
-#include <library/cpp/actors/interconnect/interconnect_common.h>
-#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
-#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
-#include <library/cpp/actors/interconnect/poller_actor.h>
-#include <library/cpp/yson/node/node_io.h>
-
-#include <util/stream/file.h>
-#include <util/system/env.h>
-
-namespace NYql::NDqs {
- using namespace NActors;
- using namespace NActors::NDnsResolver;
- using namespace NGrpc;
-
- class TYqlLogBackend: public TLogBackend {
- void WriteData(const TLogRecord& rec) override {
- TString message(rec.Data, rec.Len);
- if (message.find("ICP01 ready to work") != TString::npos) {
- return;
- }
- YQL_CLOG(DEBUG, ProviderDq) << message;
- }
-
- void ReopenLog() override { }
- };
-
- static void InitSelfPingActor(NActors::TActorSystemSetup* setup, NMonitoring::TDynamicCounterPtr rootCounters)
- {
- const TDuration selfPingInterval = TDuration::MilliSeconds(10);
-
- const auto counters = rootCounters->GetSubgroup("counters", "utils");
-
- for (size_t poolId = 0; poolId < setup->GetExecutorsCount(); ++poolId) {
- const auto& poolName = setup->GetPoolName(poolId);
- auto poolGroup = counters->GetSubgroup("execpool", poolName);
- auto counter = poolGroup->GetCounter("SelfPingMaxUs", false);
- auto cpuTimeCounter = poolGroup->GetCounter("CpuMatBenchNs", false);
- IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, counter, cpuTimeCounter);
- setup->LocalServices.push_back(
- std::make_pair(TActorId(),
- TActorSetupCmd(selfPingActor,
- TMailboxType::HTSwap,
- poolId)));
- }
- }
-
- std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup(
- ui32 nodeId,
- TString interconnectAddress,
- ui16 port,
- SOCKET socket,
- TVector<ui32> threads,
- NMonitoring::TDynamicCounterPtr counters,
- const TNameserverFactory& nameserverFactory,
- const NYql::NProto::TDqConfig::TICSettings& icSettings)
- {
- auto setup = MakeHolder<TActorSystemSetup>();
-
- setup->NodeId = nodeId;
-
- const int maxActivityType = NActors::GetActivityTypeCount();
- if (threads.empty()) {
- threads = {icSettings.GetThreads()};
- }
-
- setup->ExecutorsCount = threads.size();
- setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
- for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- setup->Executors[i] = new TBasicExecutorPool(
- i,
- threads[i],
- 50,
- "pool-"+ToString(i), // poolName
- nullptr, // affinity
- NActors::TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX, // timePerMailbox
- NActors::TBasicExecutorPool::DEFAULT_EVENTS_PER_MAILBOX, // eventsPermailbox
- 0, // realtimePriority
- maxActivityType // maxActivityType
- );
- }
- auto schedulerConfig = TSchedulerConfig();
- schedulerConfig.MonCounters = counters;
-
-#define SET_VALUE(name) \
- if (icSettings.Has ## name()) { \
- schedulerConfig.name = icSettings.Get ## name (); \
- YQL_CLOG(DEBUG, ProviderDq) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \
- }
-
- SET_VALUE(ResolutionMicroseconds);
- SET_VALUE(SpinThreshold);
- SET_VALUE(ProgressThreshold);
- SET_VALUE(UseSchedulerActor);
- SET_VALUE(RelaxedSendPaceEventsPerSecond);
- SET_VALUE(RelaxedSendPaceEventsPerCycle);
- SET_VALUE(RelaxedSendThresholdEventsPerSecond);
- SET_VALUE(RelaxedSendThresholdEventsPerCycle);
-
-#undef SET_VALUE
-
- setup->Scheduler = CreateSchedulerThread(schedulerConfig);
- setup->MaxActivityType = maxActivityType;
-
- YQL_CLOG(DEBUG, ProviderDq) << "Initializing local services";
- setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0));
- if (IActor* schedulerActor = CreateSchedulerActor(schedulerConfig)) {
- TActorId schedulerActorId = MakeSchedulerActorId();
- setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0));
- }
-
- NActors::TActorId loggerActorId(nodeId, "logger");
- auto logSettings = MakeIntrusive<NActors::NLog::TSettings>(loggerActorId,
- 0, NActors::NLog::PRI_INFO);
- static TString defaultComponent = "ActorLib";
- logSettings->Append(0, 1024, [&](NActors::NLog::EComponent) -> const TString & { return defaultComponent; });
- TString wtf = "";
- logSettings->SetLevel(NActors::NLog::PRI_DEBUG, 535 /*NKikimrServices::KQP_COMPUTE*/, wtf);
- logSettings->SetLevel(NActors::NLog::PRI_DEBUG, 713 /*NKikimrServices::YQL_PROXY*/, wtf);
- NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(
- logSettings,
- new TYqlLogBackend,
- counters->GetSubgroup("logger", "counters"));
- setup->LocalServices.emplace_back(logSettings->LoggerActorId, TActorSetupCmd(loggerActor, TMailboxType::Simple, 0));
-
- TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup();
- THashSet<ui32> staticNodeId;
-
- YQL_CLOG(DEBUG, ProviderDq) << "Initializing node table";
- nameserverTable->StaticNodeTable[nodeId] = std::make_pair(interconnectAddress, port);
-
- setup->LocalServices.emplace_back(
- MakeDnsResolverActorId(), TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0));
-
- setup->LocalServices.emplace_back(
- GetNameserviceActorId(), TActorSetupCmd(nameserverFactory(nameserverTable), TMailboxType::ReadAsFilled, 0));
-
-
- InitSelfPingActor(setup.Get(), counters);
-
- TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon();
- icCommon->NameserviceId = GetNameserviceActorId();
- Y_UNUSED(counters);
- //icCommon->MonCounters = counters->GetSubgroup("counters", "interconnect");
- icCommon->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>();
-
-#define SET_DURATION(name) \
- { \
- icCommon->Settings.name = TDuration::MilliSeconds(icSettings.Get ## name ## Ms()); \
- YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \
- }
-
-#define SET_VALUE(name) \
- { \
- icCommon->Settings.name = icSettings.Get ## name(); \
- YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \
- }
-
- SET_DURATION(Handshake);
- SET_DURATION(DeadPeer);
- SET_DURATION(CloseOnIdle);
-
- SET_VALUE(SendBufferDieLimitInMB);
- SET_VALUE(TotalInflightAmountOfData);
- SET_VALUE(MergePerPeerCounters);
- SET_VALUE(MergePerDataCenterCounters);
- SET_VALUE(TCPSocketBufferSize);
-
- SET_DURATION(PingPeriod);
- SET_DURATION(ForceConfirmPeriod);
- SET_DURATION(LostConnection);
- SET_DURATION(BatchPeriod);
-
- SET_DURATION(MessagePendingTimeout);
-
- SET_VALUE(MessagePendingSize);
- SET_VALUE(MaxSerializedEventSize);
-
-#undef SET_DURATION
-#undef SET_VALUE
-
- ui32 maxNodeId = static_cast<ui32>(ENodeIdLimits::MaxWorkerNodeId);
-
- YQL_CLOG(DEBUG, ProviderDq) << "Initializing proxy actors";
- setup->Interconnect.ProxyActors.resize(maxNodeId + 1);
- for (ui32 id = 1; id <= maxNodeId; ++id) {
- if (nodeId != id) {
- IActor* actor = new TInterconnectProxyTCP(id, icCommon);
- setup->Interconnect.ProxyActors[id] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0);
- }
- }
-
- // start listener
- YQL_CLOG(DEBUG, ProviderDq) << "Start listener";
- {
- icCommon->TechnicalSelfHostName = interconnectAddress;
- YQL_CLOG(INFO, ProviderDq) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket;
- IActor* listener;
- TMaybe<SOCKET> maybeSocket = socket < 0
- ? Nothing()
- : TMaybe<SOCKET>(socket);
-
- listener = new NActors::TInterconnectListenerTCP(interconnectAddress, port, icCommon, maybeSocket);
-
- setup->LocalServices.emplace_back(
- MakeInterconnectListenerActorId(false),
- TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0));
- }
-
- YQL_CLOG(DEBUG, ProviderDq) << "Actor initialization complete";
-
-#ifdef _unix_
- signal(SIGPIPE, SIG_IGN);
-#endif
-
- return std::make_tuple(std::move(setup), logSettings);
- }
-
- std::tuple<TString, TString> GetLocalAddress(const TString* overrideHostname) {
- constexpr auto MaxLocalHostNameLength = 4096;
- std::array<char, MaxLocalHostNameLength> buffer;
- buffer.fill(0);
- TString hostName;
- TString localAddress;
-
- int result = gethostname(buffer.data(), buffer.size() - 1);
- if (result != 0) {
- Cerr << "gethostname failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << strerror(errno) << Endl;
- return std::make_tuple(hostName, localAddress);
- }
-
- if (overrideHostname) {
- memcpy(&buffer[0], overrideHostname->c_str(), Min<int>(
- overrideHostname->size()+1, buffer.size()-1
- ));
- }
-
- hostName = &buffer[0];
-
- addrinfo request;
- memset(&request, 0, sizeof(request));
- request.ai_family = AF_INET6;
- request.ai_socktype = SOCK_STREAM;
-
- addrinfo* response = nullptr;
- result = getaddrinfo(buffer.data(), nullptr, &request, &response);
- if (result != 0) {
- Cerr << "getaddrinfo failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << gai_strerror(result) << Endl;
- return std::make_tuple(hostName, localAddress);
- }
-
- std::unique_ptr<addrinfo, void (*)(addrinfo*)> holder(response, &freeaddrinfo);
-
- if (!response->ai_addr) {
- Cerr << "getaddrinfo failed: no ai_addr" << Endl;
- return std::make_tuple(hostName, localAddress);
- }
-
- auto* sa = response->ai_addr;
- Y_VERIFY(sa->sa_family == AF_INET6);
- inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr),
- &buffer[0], buffer.size() - 1);
-
- localAddress = &buffer[0];
-
- return std::make_tuple(hostName, localAddress);
- }
-
- std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& maybeUser, const TMaybe<TString>& maybeTokenFile)
- {
- auto home = GetEnv("HOME");
- auto systemUser = GetEnv("USER");
-
- TString userName = maybeUser
- ? *maybeUser
- : systemUser;
-
- TString tokenFile = maybeTokenFile
- ? *maybeTokenFile
- : home + "/.yt/token";
-
- TString token = TFileInput(tokenFile).ReadLine();
-
- return std::make_tuple(userName, token);
- }
-}
diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.h b/ydb/library/yql/providers/dq/service/interconnect_helpers.h
deleted file mode 100644
index 9009c8c179..0000000000
--- a/ydb/library/yql/providers/dq/service/interconnect_helpers.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#pragma once
-
-#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/interconnect/poller_tcp.h>
-#include <library/cpp/actors/interconnect/interconnect.h>
-#include <library/cpp/yson/node/node.h>
-
-#include <ydb/library/yql/providers/dq/config/config.pb.h>
-
-namespace NYql::NDqs {
-
-enum class ENodeIdLimits {
- MinServiceNodeId = 1,
- MaxServiceNodeId = 512, // excluding
- MinWorkerNodeId = 512,
- MaxWorkerNodeId = 8192, // excluding
-};
-
-using TNameserverFactory = std::function<NActors::IActor*(const TIntrusivePtr<NActors::TTableNameserverSetup>& setup)>;
-
-struct TServiceNodeConfig {
- ui32 NodeId;
- TString InterconnectAddress;
- TString GrpcHostname;
- ui16 Port;
- ui16 GrpcPort = 8080;
- ui16 MbusPort = 0;
- SOCKET Socket = -1;
- SOCKET GrpcSocket = -1;
- NYql::NProto::TDqConfig::TICSettings ICSettings = NYql::NProto::TDqConfig::TICSettings();
- TNameserverFactory NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
- return CreateNameserverTable(setup);
- };
-};
-
-std::tuple<TString, TString> GetLocalAddress(const TString* hostname = nullptr);
-std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& user, const TMaybe<TString>& tokenFile);
-
-std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup(
- ui32 nodeId,
- TString interconnectAddress,
- ui16 port,
- SOCKET socket,
- TVector<ui32> threads,
- NMonitoring::TDynamicCounterPtr counters,
- const TNameserverFactory& nameserverFactory,
- const NYql::NProto::TDqConfig::TICSettings& icSettings = NYql::NProto::TDqConfig::TICSettings());
-
-} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp
deleted file mode 100644
index cdf05b6661..0000000000
--- a/ydb/library/yql/providers/dq/service/service_node.cpp
+++ /dev/null
@@ -1,166 +0,0 @@
-#include "service_node.h"
-
-#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/providers/common/metrics/metrics_registry.h>
-#include <ydb/library/yql/utils/yql_panic.h>
-
-#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
-
-#include <library/cpp/grpc/server/actors/logger.h>
-
-#include <utility>
-
-namespace NYql {
- using namespace NActors;
- using namespace NGrpc;
- using namespace NYql::NDqs;
-
- class TGrpcExternalListener: public IExternalListener {
- public:
- TGrpcExternalListener(SOCKET socket)
- : Socket(socket)
- , Listener(MakeIntrusive<NInterconnect::TStreamSocket>(Socket))
- {
- SetNonBlock(socket, true);
- }
-
- void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) override {
- Acceptor = std::move(acceptor);
- }
-
- private:
- void Start() override {
- YQL_CLOG(DEBUG, ProviderDq) << "Start GRPC Listener";
- Poller.Start();
- StartRead();
- }
-
- void Stop() override {
- Poller.Stop();
- }
-
- void StartRead() {
- YQL_CLOG(TRACE, ProviderDq) << "Read next GRPC event";
- Poller.StartRead(Listener, [&](const TIntrusivePtr<TSharedDescriptor>& ss) {
- return Accept(ss);
- });
- }
-
- TDelegate Accept(const TIntrusivePtr<TSharedDescriptor>& ss)
- {
- NInterconnect::TStreamSocket* socket = (NInterconnect::TStreamSocket*)ss.Get();
- int r = 0;
- while (r >= 0) {
- NInterconnect::TAddress address;
- r = socket->Accept(address);
- if (r >= 0) {
- YQL_CLOG(TRACE, ProviderDq) << "New GRPC connection";
- grpc::experimental::ExternalConnectionAcceptor::NewConnectionParameters params;
- SetNonBlock(r, true);
- params.listener_fd = -1; // static_cast<int>(Socket);
- params.fd = r;
- Acceptor->HandleNewConnection(&params);
- } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
- YQL_CLOG(DEBUG, ProviderDq) << "Unknown error code " + ToString(r);
- }
- }
-
- return [this] {
- StartRead();
- };
- }
-
- SOCKET Socket;
- TIntrusivePtr<NInterconnect::TStreamSocket> Listener;
- NInterconnect::TPollerThreads Poller;
- std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> Acceptor;
- };
-
- TServiceNode::TServiceNode(
- const TServiceNodeConfig& config,
- ui32 threads,
- IMetricsRegistryPtr metricsRegistry)
- : Config(config)
- , Threads(threads)
- , MetricsRegistry(std::move(metricsRegistry))
- {
- std::tie(Setup, LogSettings) = BuildActorSetup(
- Config.NodeId,
- Config.InterconnectAddress,
- Config.Port,
- Config.Socket,
- {Threads, 8},
- MetricsRegistry->GetSensors(),
- Config.NameserverFactory,
- Config.ICSettings);
- }
-
- void TServiceNode::AddLocalService(TActorId actorId, const TActorSetupCmd& service) {
- YQL_ENSURE(!ActorSystem);
- Setup->LocalServices.emplace_back(actorId, service);
- }
-
- NActors::TActorSystem* TServiceNode::StartActorSystem(void* appData) {
- Y_VERIFY(!ActorSystem);
-
- ActorSystem = MakeHolder<NActors::TActorSystem>(Setup, appData, LogSettings);
- ActorSystem->Start();
-
- return ActorSystem.Get();
- }
-
- void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) {
- class TCustomOption : public grpc::ServerBuilderOption {
- public:
- TCustomOption() { }
-
- void UpdateArguments(grpc::ChannelArguments *args) override {
- args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 1);
- }
-
- void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
- { }
- };
-
- YQL_CLOG(INFO, ProviderDq) << "Starting GRPC on " << Config.GrpcPort;
-
- IExternalListener::TPtr listener = nullptr;
- if (Config.GrpcSocket >= 0) {
- listener = MakeIntrusive<TGrpcExternalListener>(Config.GrpcSocket);
- }
-
- auto options = TServerOptions()
- // .SetHost(CurrentNode->Address)
- .SetHost("[::]")
- .SetPort(Config.GrpcPort)
- .SetExternalListener(listener)
- .SetWorkerThreads(2)
- .SetGRpcMemoryQuotaBytes(1024 * 1024 * 1024)
- .SetMaxMessageSize(1024 * 1024 * 256)
- .SetMaxGlobalRequestInFlight(50000)
- .SetUseAuth(false)
- .SetKeepAliveEnable(true)
- .SetKeepAliveIdleTimeoutTriggerSec(360)
- .SetKeepAliveMaxProbeCount(3)
- .SetKeepAliveProbeIntervalSec(1)
- .SetServerBuilderMutator([](grpc::ServerBuilder& builder) {
- builder.SetOption(std::make_unique<TCustomOption>());
- })
- .SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER
-
- Server = MakeHolder<TGRpcServer>(options);
- Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories));
- Server->AddService(Service);
- Server->Start();
- }
-
- void TServiceNode::Stop(TDuration timeout) {
- (static_cast<TDqsGrpcService*>(Service.Get()))->Stop().Wait(timeout);
-
- Server->Stop();
- for (auto id : ActorIds) {
- ActorSystem->Send(id, new NActors::TEvents::TEvPoison);
- }
- ActorSystem->Stop();
- }
-} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/service/service_node.h b/ydb/library/yql/providers/dq/service/service_node.h
deleted file mode 100644
index 32f904901d..0000000000
--- a/ydb/library/yql/providers/dq/service/service_node.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#pragma once
-
-#include "grpc_service.h"
-#include "interconnect_helpers.h"
-
-#include <ydb/library/yql/providers/common/metrics/metrics_registry.h>
-#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
-
-#include <ydb/library/yql/minikql/mkql_function_registry.h>
-
-#include <library/cpp/actors/core/executor_pool_basic.h>
-#include <library/cpp/actors/core/scheduler_basic.h>
-#include <library/cpp/actors/interconnect/interconnect.h>
-#include <library/cpp/actors/interconnect/interconnect_common.h>
-#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
-#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
-#include <library/cpp/actors/interconnect/poller_actor.h>
-
-namespace NYql {
- class TServiceNode {
- public:
- TServiceNode(
- const NDqs::TServiceNodeConfig& config,
- ui32 threads,
- IMetricsRegistryPtr metricsRegistry);
-
- void AddLocalService(NActors::TActorId actorId, const NActors::TActorSetupCmd& service);
- NActors::TActorSystem* StartActorSystem(void* appData = nullptr);
- void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
-
- void Stop(TDuration time = TDuration::Max());
-
- NActors::TActorSystemSetup* GetSetup() const {
- return Setup.Get();
- }
-
- private:
- NDqs::TServiceNodeConfig Config;
- ui32 Threads;
- IMetricsRegistryPtr MetricsRegistry;
- THolder<NActors::TActorSystemSetup> Setup;
- TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
- THolder<NActors::TActorSystem> ActorSystem;
- TVector<NActors::TActorId> ActorIds;
- THolder<NGrpc::TGRpcServer> Server;
- TIntrusivePtr<NGrpc::IGRpcService> Service;
- };
-}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
deleted file mode 100644
index 7635df0469..0000000000
--- a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
+++ /dev/null
@@ -1,231 +0,0 @@
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
-#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
-
-#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
-
-#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
-#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h>
-
-#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
-#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
-#include <ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h>
-#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h>
-
-#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
-#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
-
-#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
-#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
-#include <ydb/library/yql/minikql/mkql_function_registry.h>
-
-#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
-#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
-
-#include <ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h>
-
-#include <ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h>
-
-#include <ydb/library/yql/core/facade/yql_facade.h>
-#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
-
-#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
-#include <ydb/library/yql/core/file_storage/file_storage.h>
-
-#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
-
-#include <util/stream/tee.h>
-#include <util/string/cast.h>
-
-namespace NYql {
-
-NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver) {
- auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
- RegisterDqPqReadActorFactory(*factory, driver, nullptr);
-
- RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
- return factory;
-}
-
-bool RunPqProgram(
- const TString& code,
- bool optimizeOnly,
- bool printExpr = false,
- bool printTrace = false,
- TString* errorsMessage = nullptr) {
- NLog::YqlLoggerScope logger("cerr", false);
- NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::DEBUG);
- NLog::YqlLogger().SetComponentLevel(NLog::EComponent::ProviderRtmr, NLog::ELevel::DEBUG);
-
- IOutputStream* errorsOutput = &Cerr;
- TMaybe<TStringOutput> errorsMessageOutput;
- TMaybe<TTeeOutput> tee;
- if (errorsMessage) {
- errorsMessageOutput.ConstructInPlace(*errorsMessage);
- tee.ConstructInPlace(&*errorsMessageOutput, &Cerr);
- errorsOutput = &*tee;
- }
-
- // Gateways config.
- TGatewaysConfig gatewaysConfig;
- // pq
- {
- auto& pqClusterConfig = *gatewaysConfig.MutablePq()->MutableClusterMapping()->Add();
- pqClusterConfig.SetName("lb");
- pqClusterConfig.SetClusterType(NYql::TPqClusterConfig::CT_PERS_QUEUE);
- pqClusterConfig.SetEndpoint("lb.ru");
- pqClusterConfig.SetConfigManagerEndpoint("cm.lb.ru");
- pqClusterConfig.SetTvmId(777);
- }
-
- // solomon
- {
- auto& solomonClusterConfig = *gatewaysConfig.MutableSolomon()->MutableClusterMapping()->Add();
- solomonClusterConfig.SetName("sol");
- solomonClusterConfig.SetCluster("sol.ru");
- }
-
- // dq
- {
- auto& dqCfg = *gatewaysConfig.MutableDq();
- auto* setting = dqCfg.AddDefaultSettings();
- setting->SetName("EnableComputeActor");
- setting->SetValue("1");
- }
-
- auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone();
- TVector<TDataProviderInitializer> dataProvidersInit;
-
- // pq
- auto pqGateway = MakeIntrusive<TDummyPqGateway>();
- pqGateway->AddDummyTopic(TDummyTopic("lb", "my_in_topic"));
- pqGateway->AddDummyTopic(TDummyTopic("lb", "my_out_topic"));
- dataProvidersInit.push_back(GetPqDataProviderInitializer(std::move(pqGateway)));
-
- // solomon
- auto solomonGateway = CreateSolomonGateway(gatewaysConfig.GetSolomon());
- dataProvidersInit.push_back(GetSolomonDataProviderInitializer(std::move(solomonGateway)));
-
- // dq
- auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
- NYql::GetCommonDqFactory(),
- NKikimr::NMiniKQL::GetYqlFactory()
- });
-
- auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
- NYql::CreateCommonDqTaskTransformFactory()
- });
-
- const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"));
- NYdb::TDriver driver(driverConfig);
- auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, CreateAsyncIoFactory(driver));
-
- auto storage = NYql::CreateFileStorage({});
- dataProvidersInit.push_back(NYql::GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
-
- TExprContext moduleCtx;
- IModuleResolver::TPtr moduleResolver;
- YQL_ENSURE(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));
-
- TProgramFactory factory(true, functionRegistry.Get(), 0ULL, dataProvidersInit, "ut");
-
- factory.SetGatewaysConfig(&gatewaysConfig);
- factory.SetModules(moduleResolver);
-
- TProgramPtr program = factory.Create("program", code);
- program->ConfigureYsonResultFormat(NYson::EYsonFormat::Text);
-
- Cerr << "Parse SQL..." << Endl;
- NSQLTranslation::TTranslationSettings sqlSettings;
- sqlSettings.SyntaxVersion = 1;
- sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
- sqlSettings.Flags.insert("DqEngineEnable");
- sqlSettings.Flags.insert("DqEngineForce");
-
- sqlSettings.ClusterMapping["lb"] = PqProviderName;
- sqlSettings.ClusterMapping["sol"] = SolomonProviderName;
- if (!program->ParseSql(sqlSettings)) {
- program->PrintErrorsTo(*errorsOutput);
- return false;
- }
- program->AstRoot()->PrettyPrintTo(Cerr, NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote);
-
-
- Cerr << "Compile..." << Endl;
- if (!program->Compile("user")) {
- program->PrintErrorsTo(*errorsOutput);
- return false;
- }
-
- auto exprOut = printExpr ? &Cout : nullptr;
- auto traceOpt = printTrace ? &Cerr : nullptr;
-
- TProgram::TStatus status = TProgram::TStatus::Error;
- if (optimizeOnly) {
- Cerr << "Optimize..." << Endl;
- status = program->Optimize("user", traceOpt, nullptr, exprOut);
- } else {
- Cerr << "Run..." << Endl;
- status = program->Run("user", traceOpt, nullptr, exprOut);
- }
-
- if (status == TProgram::TStatus::Error) {
- if (printTrace) {
- program->Print(traceOpt, nullptr);
- }
- program->PrintErrorsTo(*errorsOutput);
- return false;
- }
-
- driver.Stop(true);
-
- Cerr << "Done." << Endl;
- return true;
-}
-
-Y_UNIT_TEST_SUITE(YqlPqSimpleTests) {
-
- Y_UNIT_TEST(SelectWithNoSchema) {
- auto code = R"(
-USE lb;
-PRAGMA pq.Consumer="my_test_consumer";
-INSERT INTO my_out_topic
-SELECT Data FROM my_in_topic WHERE Data < "100";
- )";
- TString errorMessage;
- auto res = RunPqProgram(code, true, true, true, &errorMessage);
- UNIT_ASSERT_C(res, errorMessage);
- }
-
- Y_UNIT_TEST(SelectWithSchema) {
- auto code = R"(
-USE lb;
-PRAGMA pq.Consumer="my_test_consumer";
-
-INSERT INTO my_out_topic
-SELECT CAST(y as string) || x FROM lb.object(my_in_topic, "json") WITH SCHEMA (Int32 as y, String as x)
- )";
- TString errorMessage;
- auto res = RunPqProgram(code, true, true, true, &errorMessage);
- UNIT_ASSERT_C(res, errorMessage);
- }
-
- Y_UNIT_TEST(SelectStarWithSchema) {
- auto code = R"(
-USE lb;
-PRAGMA pq.Consumer="my_test_consumer";
-
-$q = SELECT * FROM lb.object(my_in_topic, "json") WITH SCHEMA (Int32 as y, String as x);
-INSERT INTO my_out_topic
-SELECT x FROM $q
- )";
- TString errorMessage;
- auto res = RunPqProgram(code, true, true, true, &errorMessage);
- UNIT_ASSERT_C(res, errorMessage);
- }
-
-}
-
-} // NYql