diff options
author | Alexey Ozeritskiy <aozeritsky@gmail.com> | 2022-06-30 04:11:21 +0300 |
---|---|---|
committer | Alexey Ozeritskiy <aozeritsky@gmail.com> | 2022-06-30 04:11:21 +0300 |
commit | c98d93e22358d17b43d0c558445af14895557c6e (patch) | |
tree | 87e3b8894f5de851bca47886b147e1929c71857a | |
parent | 735d9c3127b2fd3472830948c1f4a0d9be1fdab3 (diff) | |
download | ydb-c98d93e22358d17b43d0c558445af14895557c6e.tar.gz |
KIKIMR-15111: Move out some dirs
ref:0e4ad63f175fa8e05da557c75aab9f9e9f9cf9b8
11 files changed, 0 insertions, 2098 deletions
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp deleted file mode 100644 index 997d1f7bdd..0000000000 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ /dev/null @@ -1,222 +0,0 @@ -#include "yql_dq_gateway_local.h" - -#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> -#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h> - -#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h> -#include <ydb/library/yql/providers/dq/service/service_node.h> - -#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h> - -#include <ydb/library/yql/utils/range_walker.h> -#include <ydb/library/yql/utils/bind_in_range.h> - -#include <library/cpp/messagebus/network.h> - -#include <util/system/env.h> - -namespace NYql { - -using namespace NActors; -using NDqs::MakeWorkerManagerActorID; - -class TLocalServiceHolder { -public: - TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, - TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, - NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) - { - ui32 nodeId = 1; - - TString hostName; - TString localAddress; - std::tie(hostName, localAddress) = NDqs::GetLocalAddress(); - - NDqs::TServiceNodeConfig config = { - nodeId, - localAddress, - hostName, - static_cast<ui16>(interconnectPort.Addr.GetPort()), - static_cast<ui16>(grpcPort.Addr.GetPort()), - 0, // mbus - interconnectPort.Socket.Get()->Release(), - grpcPort.Socket.Get()->Release(), - }; - - ServiceNode = MakeHolder<TServiceNode>( - config, - 1, - CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq))); - - NDqs::TLocalWorkerManagerOptions lwmOptions; - lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true); - lwmOptions.AsyncIoFactory = std::move(asyncIoFactory); - lwmOptions.FunctionRegistry = functionRegistry; - lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); - lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - [=](const NDqProto::TDqTask& task, const NDq::TLogFunc& ) - { - return lwmOptions.Factory->Get(task); - }); - auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); - - ServiceNode->AddLocalService( - MakeWorkerManagerActorID(nodeId), - TActorSetupCmd(resman, TMailboxType::Simple, 0)); - - ServiceNode->StartActorSystem(); - - ServiceNode->StartService(dqTaskPreprocessorFactories); - } - - ~TLocalServiceHolder() - { - ServiceNode->Stop(); - } - -private: - THolder<TServiceNode> ServiceNode; -}; - -class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalImpl> -{ - struct TRequest { - TString SessionId; - NDqs::TPlan Plan; - TVector<TString> Columns; - THashMap<TString, TString> SecureParams; - THashMap<TString, TString> GraphParams; - TDqSettings::TPtr Settings; - IDqGateway::TDqProgressWriter ProgressWriter; - THashMap<TString, TString> ModulesMapping; - bool Discard; - NThreading::TPromise<IDqGateway::TResult> Result; - }; - -public: - TDqGatewayLocalImpl(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway) - : LocalService(std::move(localService)) - , Gateway(gateway) - , DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE")) - { } - - NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) { - return Gateway->OpenSession(sessionId, username); - } - - void CloseSession(const TString& sessionId) { - return Gateway->CloseSession(sessionId); - } - - NThreading::TFuture<IDqGateway::TResult> - ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, - const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, - const TDqSettings::TPtr& settings, - const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, - bool discard) - { - - NThreading::TFuture<IDqGateway::TResult> result; - { - TGuard<TMutex> lock(Mutex); - Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()}); - result = Queue.back().Result; - } - - TryExecuteNext(); - - return result; - } - -private: - void TryExecuteNext() { - TGuard<TMutex> lock(Mutex); - if (!Queue.empty() && (!DeterministicMode || Inflight == 0)) { - auto request = std::move(Queue.front()); Queue.pop_front(); - Inflight++; - lock.Release(); - - auto weak = weak_from_this(); - - Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard) - .Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable { - try { - promise.SetValue(result.GetValue()); - } catch (...) { - promise.SetException(std::current_exception()); - } - - if (auto ptr = weak.lock()) { - { - TGuard<TMutex> lock(ptr->Mutex); - ptr->Inflight--; - } - - ptr->TryExecuteNext(); - } - }); - } - } - - THolder<TLocalServiceHolder> LocalService; - IDqGateway::TPtr Gateway; - const bool DeterministicMode; - TMutex Mutex; - TList<TRequest> Queue; - int Inflight = 0; -}; - -class TDqGatewayLocal : public IDqGateway { -public: - TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway) - : Impl(std::make_shared<TDqGatewayLocalImpl>(std::move(localService), gateway)) - {} - - NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { - return Impl->OpenSession(sessionId, username); - } - - void CloseSession(const TString& sessionId) override { - return Impl->CloseSession(sessionId); - } - - NThreading::TFuture<TResult> - ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, - const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, - const TDqSettings::TPtr& settings, - const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, - bool discard) override - { - return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, - settings, progressWriter, modulesMapping, discard); - } - -private: - std::shared_ptr<TDqGatewayLocalImpl> Impl; -}; - -THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - NKikimr::NMiniKQL::TComputationNodeFactory compFactory, - TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, - NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) -{ - return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)); -} - -TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - NKikimr::NMiniKQL::TComputationNodeFactory compFactory, - TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) -{ - int startPort = 31337; - TRangeWalker<int> portWalker(startPort, startPort+100); - auto interconnectPort = BindInRange(portWalker)[1]; - auto grpcPort = BindInRange(portWalker)[1]; - - return new TDqGatewayLocal( - CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)), - CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort())); -} - -} // namespace NYql diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h deleted file mode 100644 index 01634057da..0000000000 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> -#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> - -namespace NYql { - -TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - NKikimr::NMiniKQL::TComputationNodeFactory compFactory, - TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NDq::IDqAsyncIoFactory::TPtr = nullptr); - -} // namespace NYql diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp deleted file mode 100644 index c6d535cfcc..0000000000 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ /dev/null @@ -1,850 +0,0 @@ -#include "grpc_service.h" - -#include <ydb/library/yql/utils/log/log.h> - -#include <ydb/library/yql/providers/dq/actors/actor_helpers.h> -#include <ydb/library/yql/providers/dq/actors/executer_actor.h> -#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h> -#include <ydb/library/yql/providers/dq/actors/execution_helpers.h> -#include <ydb/library/yql/providers/dq/actors/result_aggregator.h> -#include <ydb/library/yql/providers/dq/actors/events.h> -#include <ydb/library/yql/providers/dq/actors/task_controller.h> -#include <ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h> - -#include <ydb/library/yql/providers/dq/counters/counters.h> -#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> -#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> - -//#include <yql/tools/yqlworker/dq/worker_manager/benchmark.h> - -#include <ydb/library/yql/public/issue/yql_issue_message.h> - -#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> - -#include <library/cpp/grpc/server/grpc_counters.h> -#include <ydb/public/api/protos/ydb_status_codes.pb.h> - -#include <library/cpp/actors/interconnect/interconnect.h> -#include <library/cpp/actors/helpers/future_callback.h> -#include <library/cpp/build_info/build_info.h> -#include <library/cpp/svnversion/svnversion.h> - -#include <util/string/split.h> -#include <util/system/env.h> - -namespace NYql::NDqs { - using namespace NYql::NDqs; - using namespace NKikimr; - using namespace NThreading; - using namespace NMonitoring; - using namespace NActors; - - namespace { - NGrpc::ICounterBlockPtr BuildCB(TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, const TString& name) { - auto grpcCB = counters->GetSubgroup("rpc_name", name); - return MakeIntrusive<NGrpc::TCounterBlock>( - grpcCB->GetCounter("total", true), - grpcCB->GetCounter("infly", true), - grpcCB->GetCounter("notOkReq", true), - grpcCB->GetCounter("notOkResp", true), - grpcCB->GetCounter("reqBytes", true), - grpcCB->GetCounter("inflyReqBytes", true), - grpcCB->GetCounter("resBytes", true), - grpcCB->GetCounter("notAuth", true), - grpcCB->GetCounter("resExh", true), - grpcCB); - } - - template<typename RequestType, typename ResponseType> - class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> { - public: - static constexpr char ActorName[] = "SERVICE_PROXY"; - static constexpr char RetryName[] = "OperationRetry"; - - explicit TServiceProxyActor( - NGrpc::IRequestContextBase* ctx, - const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, - const TString& traceId, const TString& username) - : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler) - , Ctx(ctx) - , Counters(counters) - , ServiceProxyActorCounters(counters->GetSubgroup("component", "ServiceProxyActor")) - , ClientDisconnectedCounter(ServiceProxyActorCounters->GetCounter("ClientDisconnected", /*derivative=*/ true)) - , RetryCounter(ServiceProxyActorCounters->GetCounter(RetryName, /*derivative=*/ true)) - , FallbackCounter(ServiceProxyActorCounters->GetCounter("Fallback", /*derivative=*/ true)) - , ErrorCounter(ServiceProxyActorCounters->GetCounter("UnrecoverableError", /*derivative=*/ true)) - , Request(dynamic_cast<const RequestType*>(ctx->GetRequest())) - , TraceId(traceId) - , Username(username) - , Promise(NewPromise<void>()) - { - Settings->Dispatch(Request->GetSettings()); - Settings->FreezeDefaults(); - - MaxRetries = Settings->MaxRetries.Get().GetOrElse(MaxRetries); - RetryBackoff = TDuration::MilliSeconds(Settings->RetryBackoffMs.Get().GetOrElse(RetryBackoff.MilliSeconds())); - } - - STRICT_STFUNC(Handler, { - HFunc(TEvQueryResponse, OnReturnResult); - cFunc(TEvents::TEvPoison::EventType, OnPoison); - SFunc(TEvents::TEvBootstrap, DoBootstrap) - }) - - TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { - return new IEventHandle(self, parentId, new TEvents::TEvBootstrap(), 0); - } - - void OnPoison() { - YQL_LOG_CTX_ROOT_SCOPE(TraceId); - YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ ; - ReplyError(grpc::UNAVAILABLE, "Unexpected error"); - *ClientDisconnectedCounter += 1; - } - - void DoPassAway() override { - Promise.SetValue(); - } - - void DoBootstrap(const NActors::TActorContext& ctx) { - YQL_LOG_CTX_ROOT_SCOPE(TraceId); - if (!CtxSubscribed) { - auto selfId = ctx.SelfID; - auto* actorSystem = ctx.ExecutorThread.ActorSystem; - Ctx->GetFinishFuture().Subscribe([selfId, actorSystem](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) { - Y_VERIFY(future.HasValue()); - if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) { - actorSystem->Send(selfId, new TEvents::TEvPoison()); - } - }); - CtxSubscribed = true; - } - Bootstrap(); - } - - virtual void Bootstrap() = 0; - - void SendResponse(TEvQueryResponse::TPtr& ev) - { - auto& result = ev->Get()->Record; - Yql::DqsProto::ExecuteQueryResult queryResult; - queryResult.Mutableresult()->CopyFrom(result.resultset()); - queryResult.set_yson(result.yson()); - - bool needFallback; - auto statusCode = result.GetStatusCode(); - if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { - needFallback = NCommon::NeedFallback(statusCode); - if (needFallback != result.GetDeprecatedNeedFallback()) { - Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc(); - } - } else { - needFallback = result.GetDeprecatedNeedFallback(); - } - - if (needFallback) { - NYql::TIssue rootIssue("Fatal Error"); - rootIssue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); - NYql::TIssues issues; - NYql::IssuesFromMessage(result.GetIssues(), issues); - for (const auto& issue: issues) { - rootIssue.AddSubIssue(MakeIntrusive<TIssue>(issue)); - } - result.MutableIssues()->Clear(); - NYql::IssuesToMessage({rootIssue}, result.MutableIssues()); - } - - for (const auto& [k, v] : QueryStat.Get()) { - std::map<TString, TString> labels; - TString prefix, name; - if (NCommon::ParseCounterName(&prefix, &labels, &name, k)) { - if (prefix == "Actor") { - auto group = Counters->GetSubgroup("counters", "Actor"); - for (const auto& [k, v] : labels) { - group = group->GetSubgroup(k, v); - } - group->GetHistogram(name, ExponentialHistogram(10, 2, 50))->Collect(v.Sum); - } - } - } - auto aggregatedQueryStat = AggregateQueryStatsByStage(QueryStat, Task2Stage); - - aggregatedQueryStat.FlushCounters(ResponseBuffer); - - auto& operation = *ResponseBuffer.mutable_operation(); - operation.Setready(true); - operation.Mutableresult()->PackFrom(queryResult); - *operation.Mutableissues() = result.GetIssues(); - ResponseBuffer.SetTruncated(result.GetTruncated()); - - Reply(Ydb::StatusIds::SUCCESS, result.GetIssues().size() > 0); - } - - virtual void DoRetry() - { - this->CleanupChildren(); - auto selfId = this->SelfId(); - TActivationContext::Schedule(RetryBackoff, new IEventHandle(selfId, selfId, new TEvents::TEvBootstrap(), 0)); - Retry += 1; - *RetryCounter +=1 ; - } - - void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) { - auto& result = ev->Get()->Record; - Y_UNUSED(ctx); - YQL_LOG_CTX_ROOT_SCOPE(TraceId); - YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size(); - QueryStat.AddCounters(result); - - bool retriable; - auto statusCode = result.GetStatusCode(); - if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { - retriable = NCommon::IsRetriable(statusCode); - if (retriable != result.GetDeprecatedRetriable()) { - Counters->GetSubgroup("MistmatchedRetriable", retriable ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc(); - } - } else { - retriable = result.GetDeprecatedRetriable(); - } - - if (result.GetIssues().size() > 0 && retriable && Retry < MaxRetries) { - QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0)); - NYql::TIssues issues; - NYql::IssuesFromMessage(result.GetIssues(), issues); - YQL_CLOG(WARN, ProviderDq) << RetryName << " " << Retry << " Issues: " << issues.ToString(); - DoRetry(); - } else { - auto needFallback = NCommon::NeedFallback(statusCode); - if (result.GetIssues().size() > 0) { - NYql::TIssues issues; - NYql::IssuesFromMessage(result.GetIssues(), issues); - YQL_CLOG(WARN, ProviderDq) << "Issues: " << issues.ToString(); - *ErrorCounter += 1; - } - if (needFallback) { - // TODO: Remove GetNeedFallback, use only issue codes! - *FallbackCounter += 1; - } - SendResponse(ev); - } - } - - TFuture<void> GetFuture() { - return Promise.GetFuture(); - } - - virtual void ReplyError(grpc::StatusCode code, const TString& msg) { - Ctx->ReplyError(code, msg); - this->PassAway(); - } - - virtual void Reply(ui32 status, bool hasIssues) { - Y_UNUSED(hasIssues); - Ctx->Reply(&ResponseBuffer, status); - this->PassAway(); - } - - private: - NGrpc::IRequestContextBase* Ctx; - bool CtxSubscribed = false; - ResponseType ResponseBuffer; - - protected: - TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; - TIntrusivePtr<NMonitoring::TDynamicCounters> ServiceProxyActorCounters; - TDynamicCounters::TCounterPtr ClientDisconnectedCounter; - TDynamicCounters::TCounterPtr RetryCounter; - TDynamicCounters::TCounterPtr FallbackCounter; - TDynamicCounters::TCounterPtr ErrorCounter; - - const RequestType* Request; - const TString TraceId; - const TString Username; - TPromise<void> Promise; - const TInstant RequestStartTime = TInstant::Now(); - - TDqConfiguration::TPtr Settings = MakeIntrusive<TDqConfiguration>(); - - int Retry = 0; - int MaxRetries = 10; - TDuration RetryBackoff = TDuration::MilliSeconds(1000); - - NYql::TCounters QueryStat; - THashMap<ui64, ui64> Task2Stage; - - void RestoreRequest() { - Request = dynamic_cast<const RequestType*>(Ctx->GetRequest()); - } - }; - - class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> { - public: - using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>; - TExecuteGraphProxyActor(NGrpc::IRequestContextBase* ctx, - const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, - const TString& traceId, const TString& username, - const NActors::TActorId& graphExecutionEventsActorId) - : TServiceProxyActor(ctx, counters, traceId, username) - , GraphExecutionEventsActorId(graphExecutionEventsActorId) - { - } - - void DoRetry() override { - YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; - SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) { - if (ev->Get()->Record.GetErrorMessage()) { - TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); - } else { - RestoreRequest(); - ModifiedRequest.Reset(); - TBase::DoRetry(); - } - }); - } - - void Reply(ui32 status, bool hasIssues) override { - auto eventType = hasIssues - ? NYql::NDqProto::EGraphExecutionEventType::FAIL - : NYql::NDqProto::EGraphExecutionEventType::SUCCESS; - SendEvent(eventType, nullptr, [this, status, hasIssues](const auto& ev) { - if (!hasIssues && ev->Get()->Record.GetErrorMessage()) { - TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); - } else { - TBase::Reply(status, hasIssues); - } - }); - } - - void ReplyError(grpc::StatusCode code, const TString& msg) override { - SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) { - Y_UNUSED(ev); - TBase::ReplyError(code, msg); - }); - } - - private: - THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest; - - void DoPassAway() override { - YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; - Send(GraphExecutionEventsActorId, new TEvents::TEvPoison()); - TServiceProxyActor::DoPassAway(); - } - - NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor SerializeGraphDescriptor() const { - NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result; - - for (const auto& kv : Request->GetSecureParams()) { - result.MutableSecureParams()->MutableData()->insert(kv); - } - - for (const auto& kv : Request->GetGraphParams()) { - result.MutableGraphParams()->MutableData()->insert(kv); - } - - return result; - } - - void Bootstrap() override { - YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnExecuteGraph"; - - SendEvent(NYql::NDqProto::EGraphExecutionEventType::START, SerializeGraphDescriptor(), [this](const TEvGraphExecutionEvent::TPtr& ev) { - if (ev->Get()->Record.GetErrorMessage()) { - TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); - } else { - NDqProto::TGraphExecutionEvent::TMap params; - ev->Get()->Record.GetMessage().UnpackTo(¶ms); - FinishBootstrap(params); - } - }); - } - - void MergeTaskMetas(const NDqProto::TGraphExecutionEvent::TMap& params) { - if (!params.data().empty()) { - for (size_t i = 0; i < Request->TaskSize(); ++i) { - if (!ModifiedRequest) { - ModifiedRequest.Reset(new Yql::DqsProto::ExecuteGraphRequest()); - ModifiedRequest->CopyFrom(*Request); - } - - auto* task = ModifiedRequest->MutableTask(i); - - Yql::DqsProto::TTaskMeta taskMeta; - task->GetMeta().UnpackTo(&taskMeta); - - for (const auto&[key, value] : params.data()) { - (*taskMeta.MutableTaskParams())[key] = value; - } - - task->MutableMeta()->PackFrom(taskMeta); - } - } - - if (ModifiedRequest) { - Request = ModifiedRequest.Get(); - } - } - - void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) { - YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; - MergeTaskMetas(params); - - auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); - - TVector<TString> columns; - columns.reserve(Request->GetColumns().size()); - for (const auto& column : Request->GetColumns()) { - columns.push_back(column); - } - for (const auto& task : Request->GetTask()) { - Yql::DqsProto::TTaskMeta taskMeta; - task.GetMeta().UnpackTo(&taskMeta); - Task2Stage[task.GetId()] = taskMeta.GetStageId(); - } - THashMap<TString, TString> secureParams; - for (const auto& x : Request->GetSecureParams()) { - secureParams[x.first] = x.second; - } - auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator( - columns, - executerId, - TraceId, - secureParams, - Settings, - Request->GetResultType(), - Request->GetDiscard(), - GraphExecutionEventsActorId).Release()); - auto controlId = Settings->EnableComputeActor.Get().GetOrElse(false) == false ? resultId - : RegisterChild(NYql::MakeTaskController(TraceId, executerId, resultId, Settings, NYql::NCommon::TServiceCounters(Counters, nullptr, "")).Release()); - Send(executerId, MakeHolder<TEvGraphRequest>( - *Request, - controlId, - resultId)); - } - - template <class TPayload, class TCallback> - void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) { - NDqProto::TGraphExecutionEvent record; - record.SetEventType(eventType); - if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) { - record.MutableMessage()->PackFrom(payload); - } - Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record)); - Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) { - YQL_LOG_CTX_ROOT_SCOPE(traceId); - Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); - callback(ev); - }); - } - - NActors::TActorId GraphExecutionEventsActorId; - }; - - TString GetVersionString() { - TStringBuilder sb; - sb << GetProgramSvnVersion() << "\n"; - sb << GetBuildInfo(); - TString sandboxTaskId = GetSandboxTaskId(); - if (sandboxTaskId != TString("0")) { - sb << "\nSandbox task id: " << sandboxTaskId; - } - - return sb; - } - } - - TDqsGrpcService::TDqsGrpcService( - NActors::TActorSystem& system, - TIntrusivePtr<NMonitoring::TDynamicCounters> counters, - const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) - : ActorSystem(system) - , Counters(std::move(counters)) - , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories) - , Promise(NewPromise<void>()) - , RunningRequests(0) - , Stopping(false) - , Sessions(&system, Counters->GetSubgroup("component", "grpc")->GetCounter("Sessions")) - { } - -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - do { \ - MakeIntrusive<NGrpc::TGRpcRequest<Yql::DqsProto::IN, Yql::DqsProto::OUT, TDqsGrpcService>>( \ - this, \ - &Service_, \ - CQ, \ - [this](NGrpc::IRequestContextBase* ctx) { ACTION }, \ - &Yql::DqsProto::DqService::AsyncService::Request##NAME, \ - #NAME, \ - logger, \ - BuildCB(Counters, #NAME)) \ - ->Run(); \ - } while (0) - - void TDqsGrpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { - using namespace google::protobuf; - - CQ = cq; - - using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>; - - ADD_REQUEST(ExecuteGraph, ExecuteGraphRequest, ExecuteGraphResponse, { - TGuard<TMutex> lock(Mutex); - if (Stopping) { - ctx->ReplyError(grpc::UNAVAILABLE, "Terminating in progress"); - return; - } - auto* request = dynamic_cast<const Yql::DqsProto::ExecuteGraphRequest*>(ctx->GetRequest()); - auto session = Sessions.GetSession(request->GetSession()); - if (!session) { - TString message = TStringBuilder() - << "Bad session: " - << request->GetSession(); - YQL_CLOG(DEBUG, ProviderDq) << message; - ctx->ReplyError(grpc::INVALID_ARGUMENT, message); - return; - } - - TDqTaskPreprocessorCollection taskPreprocessors; - for (const auto& factory: DqTaskPreprocessorFactories) { - taskPreprocessors.push_back(factory()); - } - - auto graphExecutionEventsActorId = ActorSystem.Register(NDqs::MakeGraphExecutionEventsActor(request->GetSession(), std::move(taskPreprocessors))); - - RunningRequests++; - auto actor = MakeHolder<TExecuteGraphProxyActor>(ctx, Counters, request->GetSession(), session->GetUsername(), graphExecutionEventsActorId); - auto future = actor->GetFuture(); - auto actorId = ActorSystem.Register(actor.Release()); - future.Apply([session, actorId, this] (const TFuture<void>&) mutable { - RunningRequests--; - if (Stopping && !RunningRequests) { - Promise.SetValue(); - } - - session->DeleteRequest(actorId); - }); - session->AddRequest(actorId); - }); - - ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, { - Y_UNUSED(this); - Yql::DqsProto::SvnRevisionResponse result; - result.SetRevision(GetVersionString()); - ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - }); - - ADD_REQUEST(CloseSession, CloseSessionRequest, CloseSessionResponse, { - Y_UNUSED(this); - auto* request = dynamic_cast<const Yql::DqsProto::CloseSessionRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - Yql::DqsProto::CloseSessionResponse result; - Sessions.CloseSession(request->GetSession()); - ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - }); - - ADD_REQUEST(PingSession, PingSessionRequest, PingSessionResponse, { - Y_UNUSED(this); - auto* request = dynamic_cast<const Yql::DqsProto::PingSessionRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - YQL_CLOG(TRACE, ProviderDq) << "PingSession " << request->GetSession(); - - Yql::DqsProto::PingSessionResponse result; - auto session = Sessions.GetSession(request->GetSession()); - if (!session) { - TString message = TStringBuilder() - << "Bad session: " - << request->GetSession(); - YQL_CLOG(DEBUG, ProviderDq) << message; - ctx->ReplyError(grpc::INVALID_ARGUMENT, message); - } else { - ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - } - }); - - ADD_REQUEST(OpenSession, OpenSessionRequest, OpenSessionResponse, { - Y_UNUSED(this); - auto* request = dynamic_cast<const Yql::DqsProto::OpenSessionRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - YQL_CLOG(DEBUG, ProviderDq) << "OpenSession for " << request->GetSession() << " " << request->GetUsername(); - - Yql::DqsProto::OpenSessionResponse result; - if (Sessions.OpenSession(request->GetSession(), request->GetUsername())) { - ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - } else { - ctx->ReplyError(grpc::INVALID_ARGUMENT, "Session `" + request->GetSession() + "' exists"); - } - }); - - ADD_REQUEST(JobStop, JobStopRequest, JobStopResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::JobStopRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - auto ev = MakeHolder<TEvJobStop>(*request); - - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::JobStopResponse>(ctx->GetArena()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - - ActorSystem.Send(MakeWorkerManagerActorID(ActorSystem.NodeId), ev.Release()); - }); - - ADD_REQUEST(ClusterStatus, ClusterStatusRequest, ClusterStatusResponse, { - auto ev = MakeHolder<TEvClusterStatus>(); - - using ResultEv = TEvClusterStatusResponse; - - TExecutorPoolStats poolStats; - - TExecutorThreadStats stat; - TVector<TExecutorThreadStats> stats; - ActorSystem.GetPoolStats(0, poolStats, stats); - - for (const auto& s : stats) { - stat.Aggregate(s); - } - - YQL_CLOG(DEBUG, ProviderDq) << "SentEvents: " << stat.SentEvents; - YQL_CLOG(DEBUG, ProviderDq) << "ReceivedEvents: " << stat.ReceivedEvents; - YQL_CLOG(DEBUG, ProviderDq) << "NonDeliveredEvents: " << stat.NonDeliveredEvents; - YQL_CLOG(DEBUG, ProviderDq) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation; - Sessions.PrintInfo(); - - for (ui32 i = 0; i < stat.ActorsAliveByActivity.size(); i=i+1) { - if (stat.ActorsAliveByActivity[i]) { - YQL_CLOG(DEBUG, ProviderDq) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i]; - } - } - - auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>( - [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable { - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ClusterStatusResponse>(ctx->GetArena()); - result->MergeFrom(event->Get()->Record.GetResponse()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_CLOG(INFO, ProviderDq) << "ClusterStatus failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(2000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); - }); - - ADD_REQUEST(OperationStop, OperationStopRequest, OperationStopResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::OperationStopRequest*>(ctx->GetRequest()); - auto ev = MakeHolder<TEvOperationStop>(*request); - - auto callback = MakeHolder<TRichActorFutureCallback<TEvOperationStopResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvOperationStopResponse>>& event) mutable { - Y_UNUSED(event); - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::OperationStopResponse>(ctx->GetArena()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_CLOG(INFO, ProviderDq) << "OperationStopResponse failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(2000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); - }); - - ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest()); - - auto ev = MakeHolder<TEvQueryStatus>(*request); - - auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable { - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena()); - result->MergeFrom(event->Get()->Record.GetResponse()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_CLOG(INFO, ProviderDq) << "QueryStatus failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(2000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); - }); - - ADD_REQUEST(RegisterNode, RegisterNodeRequest, RegisterNodeResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::RegisterNodeRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - if (!request->GetPort() - || request->GetRole().empty() - || request->GetAddress().empty() - || request->GetRevision().empty()) - { - ctx->ReplyError(grpc::INVALID_ARGUMENT, "Invalid argument"); - return; - } - - auto ev = MakeHolder<TEvRegisterNode>(*request); - - using ResultEv = TEvRegisterNodeResponse; - - auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>( - [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable { - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::RegisterNodeResponse>(ctx->GetArena()); - result->MergeFrom(event->Get()->Record.GetResponse()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_CLOG(INFO, ProviderDq) << "RegisterNode failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(5000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); - }); - - ADD_REQUEST(GetMaster, GetMasterRequest, GetMasterResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::GetMasterRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - auto requestEvent = MakeHolder<TEvGetMasterRequest>(); - - auto callback = MakeHolder<TActorFutureCallback<TEvGetMasterResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvGetMasterResponse>>& event) mutable { - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::GetMasterResponse>(ctx->GetArena()); - result->MergeFrom(event->Get()->Record.GetResponse()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release())); - }); - - ADD_REQUEST(ConfigureFailureInjector, ConfigureFailureInjectorRequest, ConfigureFailureInjectorResponse,{ - auto* request = dynamic_cast<const Yql::DqsProto::ConfigureFailureInjectorRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - auto requestEvent = MakeHolder<TEvConfigureFailureInjectorRequest>(*request); - - auto callback = MakeHolder<TActorFutureCallback<TEvConfigureFailureInjectorResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvConfigureFailureInjectorResponse>>& event) mutable { - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ConfigureFailureInjectorResponse>(ctx->GetArena()); - result->MergeFrom(event->Get()->Record.GetResponse()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release())); - }); - - ADD_REQUEST(IsReady, IsReadyRequest, IsReadyResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::IsReadyRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - auto ev = MakeHolder<TEvIsReady>(*request); - - auto callback = MakeHolder<TRichActorFutureCallback<TEvIsReadyResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvIsReadyResponse>>& event) mutable { - Yql::DqsProto::IsReadyResponse result; - result.SetIsReady(event->Get()->Record.GetIsReady()); - ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_CLOG(INFO, ProviderDq) << "IsReadyForRevision failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(2000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release())); - }); - - ADD_REQUEST(Routes, RoutesRequest, RoutesResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::RoutesRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - auto ev = MakeHolder<TEvRoutesRequest>(); - - auto callback = MakeHolder<TRichActorFutureCallback<TEvRoutesResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvRoutesResponse>>& event) mutable { - Yql::DqsProto::RoutesResponse result; - result.MergeFrom(event->Get()->Record.GetResponse()); - ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_CLOG(INFO, ProviderDq) << "Routes failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(5000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(request->GetNodeId()), callbackId, ev.Release())); - }); - -/* 1. move grpc to providers/dq, 2. move benchmark to providers/dq 3. uncomment - ADD_REQUEST(Benchmark, BenchmarkRequest, BenchmarkResponse, { - auto* req = dynamic_cast<const Yql::DqsProto::BenchmarkRequest*>(ctx->GetRequest()); - Y_VERIFY(!!req); - - TWorkerManagerBenchmarkOptions options; - if (req->GetWorkerCount()) { - options.WorkerCount = req->GetWorkerCount(); - } - if (req->GetInflight()) { - options.Inflight = req->GetInflight(); - } - if (req->GetTotalRequests()) { - options.TotalRequests = req->GetTotalRequests(); - } - if (req->GetMaxRunTimeMs()) { - options.MaxRunTimeMs = TDuration::MilliSeconds(req->GetMaxRunTimeMs()); - } - - auto benchmarkId = ActorSystem.Register( - CreateWorkerManagerBenchmark( - MakeWorkerManagerActorID(ActorSystem.NodeId), options - )); - - ActorSystem.Send(benchmarkId, new TEvents::TEvBootstrap); - - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::BenchmarkResponse>(ctx->GetArena()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }); -*/ - } - - void TDqsGrpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; - } - - bool TDqsGrpcService::IncRequest() { - return Limiter->Inc(); - } - - void TDqsGrpcService::DecRequest() { - Limiter->Dec(); - Y_ASSERT(Limiter->GetCurrentInFlight() >= 0); - } - - TFuture<void> TDqsGrpcService::Stop() { - TGuard<TMutex> lock(Mutex); - Stopping = true; - auto future = Promise.GetFuture(); - if (RunningRequests == 0) { - Promise.SetValue(); - } - return future; - } -} diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h deleted file mode 100644 index fa2a835c8c..0000000000 --- a/ydb/library/yql/providers/dq/service/grpc_service.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> - -#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h> -#include <ydb/library/yql/providers/dq/api/protos/service.pb.h> - -#include <ydb/library/yql/minikql/mkql_function_registry.h> - -#include <library/cpp/grpc/server/grpc_request.h> -#include <library/cpp/grpc/server/grpc_server.h> - -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/core/event_local.h> -#include <library/cpp/actors/core/events.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> -#include <library/cpp/threading/future/future.h> - -#include "grpc_session.h" - -namespace NYql::NDqs { - class TDatabaseManager; - - class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> { - public: - TDqsGrpcService(NActors::TActorSystem& system, - TIntrusivePtr<NMonitoring::TDynamicCounters> counters, - const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); - - NThreading::TFuture<void> Stop(); - - private: - NActors::TActorSystem& ActorSystem; - grpc::ServerCompletionQueue* CQ = nullptr; - NGrpc::TGlobalLimiter* Limiter = nullptr; - - TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; - TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories; - TMutex Mutex; - NThreading::TPromise<void> Promise; - std::atomic<ui64> RunningRequests; - std::atomic<bool> Stopping; - - TSessionStorage Sessions; - }; -} diff --git a/ydb/library/yql/providers/dq/service/grpc_session.cpp b/ydb/library/yql/providers/dq/service/grpc_session.cpp deleted file mode 100644 index 2de87f9d46..0000000000 --- a/ydb/library/yql/providers/dq/service/grpc_session.cpp +++ /dev/null @@ -1,106 +0,0 @@ -#include "grpc_session.h" - -#include <ydb/library/yql/utils/log/log.h> - -namespace NYql::NDqs { - -TSession::~TSession() { - TGuard<TMutex> lock(Mutex); - for (auto actorId : Requests) { - ActorSystem->Send(actorId, new NActors::TEvents::TEvPoison()); - } -} - -void TSession::DeleteRequest(const NActors::TActorId& actorId) -{ - TGuard<TMutex> lock(Mutex); - Requests.erase(actorId); -} - -void TSession::AddRequest(const NActors::TActorId& actorId) -{ - TGuard<TMutex> lock(Mutex); - Requests.insert(actorId); -} - -TSessionStorage::TSessionStorage( - NActors::TActorSystem* actorSystem, - const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter) - : ActorSystem(actorSystem) - , SessionsCounter(sessionsCounter) -{ } - -void TSessionStorage::CloseSession(const TString& sessionId) -{ - TGuard<TMutex> lock(SessionMutex); - auto it = Sessions.find(sessionId); - if (it == Sessions.end()) { - return; - } - SessionsByLastUpdate.erase(it->second.Iterator); - Sessions.erase(it); - *SessionsCounter = Sessions.size(); -} - -std::shared_ptr<TSession> TSessionStorage::GetSession(const TString& sessionId) -{ - Clean(TInstant::Now() - TDuration::Minutes(10)); - - TGuard<TMutex> lock(SessionMutex); - auto it = Sessions.find(sessionId); - if (it == Sessions.end()) { - return std::shared_ptr<TSession>(); - } else { - SessionsByLastUpdate.erase(it->second.Iterator); - SessionsByLastUpdate.push_back({TInstant::Now(), sessionId}); - it->second.Iterator = SessionsByLastUpdate.end(); - it->second.Iterator--; - return it->second.Session; - } -} - -bool TSessionStorage::OpenSession(const TString& sessionId, const TString& username) -{ - TGuard<TMutex> lock(SessionMutex); - if (Sessions.contains(sessionId)) { - return false; - } - - SessionsByLastUpdate.push_back({TInstant::Now(), sessionId}); - auto it = SessionsByLastUpdate.end(); --it; - - Sessions[sessionId] = TSessionAndIterator { - std::make_shared<TSession>(username, ActorSystem), - it - }; - - *SessionsCounter = Sessions.size(); - - return true; -} - -void TSessionStorage::Clean(TInstant before) { - TGuard<TMutex> lock(SessionMutex); - for (TSessionsByLastUpdate::iterator it = SessionsByLastUpdate.begin(); - it != SessionsByLastUpdate.end(); ) - { - if (it->LastUpdate < before) { - YQL_CLOG(INFO, ProviderDq) << "Drop session by timeout " << it->SessionId; - Sessions.erase(it->SessionId); - it = SessionsByLastUpdate.erase(it); - } else { - break; - } - } - - *SessionsCounter = Sessions.size(); -} - -void TSessionStorage::PrintInfo() const { - YQL_CLOG(INFO, ProviderDq) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size(); - YQL_CLOG(DEBUG, ProviderDq) << "Sessions: " << Sessions.size(); - ui64 currenSessionsCounter = *SessionsCounter; - YQL_CLOG(DEBUG, ProviderDq) << "SessionsCounter: " << currenSessionsCounter; -} - -} // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/service/grpc_session.h b/ydb/library/yql/providers/dq/service/grpc_session.h deleted file mode 100644 index 56e592a757..0000000000 --- a/ydb/library/yql/providers/dq/service/grpc_session.h +++ /dev/null @@ -1,57 +0,0 @@ -#pragma once -#include <library/cpp/actors/core/actorsystem.h> - -namespace NYql::NDqs { - -class TSession { -public: - TSession(const TString& username, NActors::TActorSystem* actorSystem) - : Username(username) - , ActorSystem(actorSystem) - { } - - ~TSession(); - - const TString& GetUsername() const { - return Username; - } - - void DeleteRequest(const NActors::TActorId& id); - void AddRequest(const NActors::TActorId& id); - -private: - TMutex Mutex; - const TString Username; - NActors::TActorSystem* ActorSystem; - THashSet<NActors::TActorId> Requests; -}; - -class TSessionStorage { -public: - TSessionStorage( - NActors::TActorSystem* actorSystem, - const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter); - void CloseSession(const TString& sessionId); - std::shared_ptr<TSession> GetSession(const TString& sessionId); - bool OpenSession(const TString& sessionId, const TString& username); - void Clean(TInstant before); - void PrintInfo() const; - -private: - NActors::TActorSystem* ActorSystem; - NMonitoring::TDynamicCounters::TCounterPtr SessionsCounter; - TMutex SessionMutex; - struct TTimeAndSessionId { - TInstant LastUpdate; - TString SessionId; - }; - using TSessionsByLastUpdate = TList<TTimeAndSessionId>; - TSessionsByLastUpdate SessionsByLastUpdate; - struct TSessionAndIterator { - std::shared_ptr<TSession> Session; - TSessionsByLastUpdate::iterator Iterator; - }; - THashMap<TString, TSessionAndIterator> Sessions; -}; - -} // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp deleted file mode 100644 index 9a0baa3f8b..0000000000 --- a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp +++ /dev/null @@ -1,303 +0,0 @@ -#include "interconnect_helpers.h" -#include "service_node.h" - -#include "grpc_service.h" - -#include <library/cpp/actors/helpers/selfping_actor.h> - -#include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/utils/backtrace/backtrace.h> -#include <ydb/library/yql/utils/yql_panic.h> - -#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> - -#include <library/cpp/actors/core/executor_pool_basic.h> -#include <library/cpp/actors/core/scheduler_basic.h> -#include <library/cpp/actors/core/scheduler_actor.h> -#include <library/cpp/actors/dnsresolver/dnsresolver.h> -#include <library/cpp/actors/interconnect/interconnect.h> -#include <library/cpp/actors/interconnect/interconnect_common.h> -#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> -#include <library/cpp/actors/interconnect/interconnect_tcp_server.h> -#include <library/cpp/actors/interconnect/poller_actor.h> -#include <library/cpp/yson/node/node_io.h> - -#include <util/stream/file.h> -#include <util/system/env.h> - -namespace NYql::NDqs { - using namespace NActors; - using namespace NActors::NDnsResolver; - using namespace NGrpc; - - class TYqlLogBackend: public TLogBackend { - void WriteData(const TLogRecord& rec) override { - TString message(rec.Data, rec.Len); - if (message.find("ICP01 ready to work") != TString::npos) { - return; - } - YQL_CLOG(DEBUG, ProviderDq) << message; - } - - void ReopenLog() override { } - }; - - static void InitSelfPingActor(NActors::TActorSystemSetup* setup, NMonitoring::TDynamicCounterPtr rootCounters) - { - const TDuration selfPingInterval = TDuration::MilliSeconds(10); - - const auto counters = rootCounters->GetSubgroup("counters", "utils"); - - for (size_t poolId = 0; poolId < setup->GetExecutorsCount(); ++poolId) { - const auto& poolName = setup->GetPoolName(poolId); - auto poolGroup = counters->GetSubgroup("execpool", poolName); - auto counter = poolGroup->GetCounter("SelfPingMaxUs", false); - auto cpuTimeCounter = poolGroup->GetCounter("CpuMatBenchNs", false); - IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, counter, cpuTimeCounter); - setup->LocalServices.push_back( - std::make_pair(TActorId(), - TActorSetupCmd(selfPingActor, - TMailboxType::HTSwap, - poolId))); - } - } - - std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup( - ui32 nodeId, - TString interconnectAddress, - ui16 port, - SOCKET socket, - TVector<ui32> threads, - NMonitoring::TDynamicCounterPtr counters, - const TNameserverFactory& nameserverFactory, - const NYql::NProto::TDqConfig::TICSettings& icSettings) - { - auto setup = MakeHolder<TActorSystemSetup>(); - - setup->NodeId = nodeId; - - const int maxActivityType = NActors::GetActivityTypeCount(); - if (threads.empty()) { - threads = {icSettings.GetThreads()}; - } - - setup->ExecutorsCount = threads.size(); - setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); - for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - setup->Executors[i] = new TBasicExecutorPool( - i, - threads[i], - 50, - "pool-"+ToString(i), // poolName - nullptr, // affinity - NActors::TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX, // timePerMailbox - NActors::TBasicExecutorPool::DEFAULT_EVENTS_PER_MAILBOX, // eventsPermailbox - 0, // realtimePriority - maxActivityType // maxActivityType - ); - } - auto schedulerConfig = TSchedulerConfig(); - schedulerConfig.MonCounters = counters; - -#define SET_VALUE(name) \ - if (icSettings.Has ## name()) { \ - schedulerConfig.name = icSettings.Get ## name (); \ - YQL_CLOG(DEBUG, ProviderDq) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \ - } - - SET_VALUE(ResolutionMicroseconds); - SET_VALUE(SpinThreshold); - SET_VALUE(ProgressThreshold); - SET_VALUE(UseSchedulerActor); - SET_VALUE(RelaxedSendPaceEventsPerSecond); - SET_VALUE(RelaxedSendPaceEventsPerCycle); - SET_VALUE(RelaxedSendThresholdEventsPerSecond); - SET_VALUE(RelaxedSendThresholdEventsPerCycle); - -#undef SET_VALUE - - setup->Scheduler = CreateSchedulerThread(schedulerConfig); - setup->MaxActivityType = maxActivityType; - - YQL_CLOG(DEBUG, ProviderDq) << "Initializing local services"; - setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); - if (IActor* schedulerActor = CreateSchedulerActor(schedulerConfig)) { - TActorId schedulerActorId = MakeSchedulerActorId(); - setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0)); - } - - NActors::TActorId loggerActorId(nodeId, "logger"); - auto logSettings = MakeIntrusive<NActors::NLog::TSettings>(loggerActorId, - 0, NActors::NLog::PRI_INFO); - static TString defaultComponent = "ActorLib"; - logSettings->Append(0, 1024, [&](NActors::NLog::EComponent) -> const TString & { return defaultComponent; }); - TString wtf = ""; - logSettings->SetLevel(NActors::NLog::PRI_DEBUG, 535 /*NKikimrServices::KQP_COMPUTE*/, wtf); - logSettings->SetLevel(NActors::NLog::PRI_DEBUG, 713 /*NKikimrServices::YQL_PROXY*/, wtf); - NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor( - logSettings, - new TYqlLogBackend, - counters->GetSubgroup("logger", "counters")); - setup->LocalServices.emplace_back(logSettings->LoggerActorId, TActorSetupCmd(loggerActor, TMailboxType::Simple, 0)); - - TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup(); - THashSet<ui32> staticNodeId; - - YQL_CLOG(DEBUG, ProviderDq) << "Initializing node table"; - nameserverTable->StaticNodeTable[nodeId] = std::make_pair(interconnectAddress, port); - - setup->LocalServices.emplace_back( - MakeDnsResolverActorId(), TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0)); - - setup->LocalServices.emplace_back( - GetNameserviceActorId(), TActorSetupCmd(nameserverFactory(nameserverTable), TMailboxType::ReadAsFilled, 0)); - - - InitSelfPingActor(setup.Get(), counters); - - TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon(); - icCommon->NameserviceId = GetNameserviceActorId(); - Y_UNUSED(counters); - //icCommon->MonCounters = counters->GetSubgroup("counters", "interconnect"); - icCommon->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>(); - -#define SET_DURATION(name) \ - { \ - icCommon->Settings.name = TDuration::MilliSeconds(icSettings.Get ## name ## Ms()); \ - YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \ - } - -#define SET_VALUE(name) \ - { \ - icCommon->Settings.name = icSettings.Get ## name(); \ - YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \ - } - - SET_DURATION(Handshake); - SET_DURATION(DeadPeer); - SET_DURATION(CloseOnIdle); - - SET_VALUE(SendBufferDieLimitInMB); - SET_VALUE(TotalInflightAmountOfData); - SET_VALUE(MergePerPeerCounters); - SET_VALUE(MergePerDataCenterCounters); - SET_VALUE(TCPSocketBufferSize); - - SET_DURATION(PingPeriod); - SET_DURATION(ForceConfirmPeriod); - SET_DURATION(LostConnection); - SET_DURATION(BatchPeriod); - - SET_DURATION(MessagePendingTimeout); - - SET_VALUE(MessagePendingSize); - SET_VALUE(MaxSerializedEventSize); - -#undef SET_DURATION -#undef SET_VALUE - - ui32 maxNodeId = static_cast<ui32>(ENodeIdLimits::MaxWorkerNodeId); - - YQL_CLOG(DEBUG, ProviderDq) << "Initializing proxy actors"; - setup->Interconnect.ProxyActors.resize(maxNodeId + 1); - for (ui32 id = 1; id <= maxNodeId; ++id) { - if (nodeId != id) { - IActor* actor = new TInterconnectProxyTCP(id, icCommon); - setup->Interconnect.ProxyActors[id] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0); - } - } - - // start listener - YQL_CLOG(DEBUG, ProviderDq) << "Start listener"; - { - icCommon->TechnicalSelfHostName = interconnectAddress; - YQL_CLOG(INFO, ProviderDq) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket; - IActor* listener; - TMaybe<SOCKET> maybeSocket = socket < 0 - ? Nothing() - : TMaybe<SOCKET>(socket); - - listener = new NActors::TInterconnectListenerTCP(interconnectAddress, port, icCommon, maybeSocket); - - setup->LocalServices.emplace_back( - MakeInterconnectListenerActorId(false), - TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0)); - } - - YQL_CLOG(DEBUG, ProviderDq) << "Actor initialization complete"; - -#ifdef _unix_ - signal(SIGPIPE, SIG_IGN); -#endif - - return std::make_tuple(std::move(setup), logSettings); - } - - std::tuple<TString, TString> GetLocalAddress(const TString* overrideHostname) { - constexpr auto MaxLocalHostNameLength = 4096; - std::array<char, MaxLocalHostNameLength> buffer; - buffer.fill(0); - TString hostName; - TString localAddress; - - int result = gethostname(buffer.data(), buffer.size() - 1); - if (result != 0) { - Cerr << "gethostname failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << strerror(errno) << Endl; - return std::make_tuple(hostName, localAddress); - } - - if (overrideHostname) { - memcpy(&buffer[0], overrideHostname->c_str(), Min<int>( - overrideHostname->size()+1, buffer.size()-1 - )); - } - - hostName = &buffer[0]; - - addrinfo request; - memset(&request, 0, sizeof(request)); - request.ai_family = AF_INET6; - request.ai_socktype = SOCK_STREAM; - - addrinfo* response = nullptr; - result = getaddrinfo(buffer.data(), nullptr, &request, &response); - if (result != 0) { - Cerr << "getaddrinfo failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << gai_strerror(result) << Endl; - return std::make_tuple(hostName, localAddress); - } - - std::unique_ptr<addrinfo, void (*)(addrinfo*)> holder(response, &freeaddrinfo); - - if (!response->ai_addr) { - Cerr << "getaddrinfo failed: no ai_addr" << Endl; - return std::make_tuple(hostName, localAddress); - } - - auto* sa = response->ai_addr; - Y_VERIFY(sa->sa_family == AF_INET6); - inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr), - &buffer[0], buffer.size() - 1); - - localAddress = &buffer[0]; - - return std::make_tuple(hostName, localAddress); - } - - std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& maybeUser, const TMaybe<TString>& maybeTokenFile) - { - auto home = GetEnv("HOME"); - auto systemUser = GetEnv("USER"); - - TString userName = maybeUser - ? *maybeUser - : systemUser; - - TString tokenFile = maybeTokenFile - ? *maybeTokenFile - : home + "/.yt/token"; - - TString token = TFileInput(tokenFile).ReadLine(); - - return std::make_tuple(userName, token); - } -} diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.h b/ydb/library/yql/providers/dq/service/interconnect_helpers.h deleted file mode 100644 index 9009c8c179..0000000000 --- a/ydb/library/yql/providers/dq/service/interconnect_helpers.h +++ /dev/null @@ -1,49 +0,0 @@ -#pragma once - -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/interconnect/poller_tcp.h> -#include <library/cpp/actors/interconnect/interconnect.h> -#include <library/cpp/yson/node/node.h> - -#include <ydb/library/yql/providers/dq/config/config.pb.h> - -namespace NYql::NDqs { - -enum class ENodeIdLimits { - MinServiceNodeId = 1, - MaxServiceNodeId = 512, // excluding - MinWorkerNodeId = 512, - MaxWorkerNodeId = 8192, // excluding -}; - -using TNameserverFactory = std::function<NActors::IActor*(const TIntrusivePtr<NActors::TTableNameserverSetup>& setup)>; - -struct TServiceNodeConfig { - ui32 NodeId; - TString InterconnectAddress; - TString GrpcHostname; - ui16 Port; - ui16 GrpcPort = 8080; - ui16 MbusPort = 0; - SOCKET Socket = -1; - SOCKET GrpcSocket = -1; - NYql::NProto::TDqConfig::TICSettings ICSettings = NYql::NProto::TDqConfig::TICSettings(); - TNameserverFactory NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { - return CreateNameserverTable(setup); - }; -}; - -std::tuple<TString, TString> GetLocalAddress(const TString* hostname = nullptr); -std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& user, const TMaybe<TString>& tokenFile); - -std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup( - ui32 nodeId, - TString interconnectAddress, - ui16 port, - SOCKET socket, - TVector<ui32> threads, - NMonitoring::TDynamicCounterPtr counters, - const TNameserverFactory& nameserverFactory, - const NYql::NProto::TDqConfig::TICSettings& icSettings = NYql::NProto::TDqConfig::TICSettings()); - -} // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp deleted file mode 100644 index cdf05b6661..0000000000 --- a/ydb/library/yql/providers/dq/service/service_node.cpp +++ /dev/null @@ -1,166 +0,0 @@ -#include "service_node.h" - -#include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/providers/common/metrics/metrics_registry.h> -#include <ydb/library/yql/utils/yql_panic.h> - -#include <ydb/library/yql/providers/dq/actors/execution_helpers.h> - -#include <library/cpp/grpc/server/actors/logger.h> - -#include <utility> - -namespace NYql { - using namespace NActors; - using namespace NGrpc; - using namespace NYql::NDqs; - - class TGrpcExternalListener: public IExternalListener { - public: - TGrpcExternalListener(SOCKET socket) - : Socket(socket) - , Listener(MakeIntrusive<NInterconnect::TStreamSocket>(Socket)) - { - SetNonBlock(socket, true); - } - - void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) override { - Acceptor = std::move(acceptor); - } - - private: - void Start() override { - YQL_CLOG(DEBUG, ProviderDq) << "Start GRPC Listener"; - Poller.Start(); - StartRead(); - } - - void Stop() override { - Poller.Stop(); - } - - void StartRead() { - YQL_CLOG(TRACE, ProviderDq) << "Read next GRPC event"; - Poller.StartRead(Listener, [&](const TIntrusivePtr<TSharedDescriptor>& ss) { - return Accept(ss); - }); - } - - TDelegate Accept(const TIntrusivePtr<TSharedDescriptor>& ss) - { - NInterconnect::TStreamSocket* socket = (NInterconnect::TStreamSocket*)ss.Get(); - int r = 0; - while (r >= 0) { - NInterconnect::TAddress address; - r = socket->Accept(address); - if (r >= 0) { - YQL_CLOG(TRACE, ProviderDq) << "New GRPC connection"; - grpc::experimental::ExternalConnectionAcceptor::NewConnectionParameters params; - SetNonBlock(r, true); - params.listener_fd = -1; // static_cast<int>(Socket); - params.fd = r; - Acceptor->HandleNewConnection(¶ms); - } else if (-r != EAGAIN && -r != EWOULDBLOCK) { - YQL_CLOG(DEBUG, ProviderDq) << "Unknown error code " + ToString(r); - } - } - - return [this] { - StartRead(); - }; - } - - SOCKET Socket; - TIntrusivePtr<NInterconnect::TStreamSocket> Listener; - NInterconnect::TPollerThreads Poller; - std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> Acceptor; - }; - - TServiceNode::TServiceNode( - const TServiceNodeConfig& config, - ui32 threads, - IMetricsRegistryPtr metricsRegistry) - : Config(config) - , Threads(threads) - , MetricsRegistry(std::move(metricsRegistry)) - { - std::tie(Setup, LogSettings) = BuildActorSetup( - Config.NodeId, - Config.InterconnectAddress, - Config.Port, - Config.Socket, - {Threads, 8}, - MetricsRegistry->GetSensors(), - Config.NameserverFactory, - Config.ICSettings); - } - - void TServiceNode::AddLocalService(TActorId actorId, const TActorSetupCmd& service) { - YQL_ENSURE(!ActorSystem); - Setup->LocalServices.emplace_back(actorId, service); - } - - NActors::TActorSystem* TServiceNode::StartActorSystem(void* appData) { - Y_VERIFY(!ActorSystem); - - ActorSystem = MakeHolder<NActors::TActorSystem>(Setup, appData, LogSettings); - ActorSystem->Start(); - - return ActorSystem.Get(); - } - - void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) { - class TCustomOption : public grpc::ServerBuilderOption { - public: - TCustomOption() { } - - void UpdateArguments(grpc::ChannelArguments *args) override { - args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 1); - } - - void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override - { } - }; - - YQL_CLOG(INFO, ProviderDq) << "Starting GRPC on " << Config.GrpcPort; - - IExternalListener::TPtr listener = nullptr; - if (Config.GrpcSocket >= 0) { - listener = MakeIntrusive<TGrpcExternalListener>(Config.GrpcSocket); - } - - auto options = TServerOptions() - // .SetHost(CurrentNode->Address) - .SetHost("[::]") - .SetPort(Config.GrpcPort) - .SetExternalListener(listener) - .SetWorkerThreads(2) - .SetGRpcMemoryQuotaBytes(1024 * 1024 * 1024) - .SetMaxMessageSize(1024 * 1024 * 256) - .SetMaxGlobalRequestInFlight(50000) - .SetUseAuth(false) - .SetKeepAliveEnable(true) - .SetKeepAliveIdleTimeoutTriggerSec(360) - .SetKeepAliveMaxProbeCount(3) - .SetKeepAliveProbeIntervalSec(1) - .SetServerBuilderMutator([](grpc::ServerBuilder& builder) { - builder.SetOption(std::make_unique<TCustomOption>()); - }) - .SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER - - Server = MakeHolder<TGRpcServer>(options); - Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories)); - Server->AddService(Service); - Server->Start(); - } - - void TServiceNode::Stop(TDuration timeout) { - (static_cast<TDqsGrpcService*>(Service.Get()))->Stop().Wait(timeout); - - Server->Stop(); - for (auto id : ActorIds) { - ActorSystem->Send(id, new NActors::TEvents::TEvPoison); - } - ActorSystem->Stop(); - } -} // namespace NYql diff --git a/ydb/library/yql/providers/dq/service/service_node.h b/ydb/library/yql/providers/dq/service/service_node.h deleted file mode 100644 index 32f904901d..0000000000 --- a/ydb/library/yql/providers/dq/service/service_node.h +++ /dev/null @@ -1,48 +0,0 @@ -#pragma once - -#include "grpc_service.h" -#include "interconnect_helpers.h" - -#include <ydb/library/yql/providers/common/metrics/metrics_registry.h> -#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> - -#include <ydb/library/yql/minikql/mkql_function_registry.h> - -#include <library/cpp/actors/core/executor_pool_basic.h> -#include <library/cpp/actors/core/scheduler_basic.h> -#include <library/cpp/actors/interconnect/interconnect.h> -#include <library/cpp/actors/interconnect/interconnect_common.h> -#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> -#include <library/cpp/actors/interconnect/interconnect_tcp_server.h> -#include <library/cpp/actors/interconnect/poller_actor.h> - -namespace NYql { - class TServiceNode { - public: - TServiceNode( - const NDqs::TServiceNodeConfig& config, - ui32 threads, - IMetricsRegistryPtr metricsRegistry); - - void AddLocalService(NActors::TActorId actorId, const NActors::TActorSetupCmd& service); - NActors::TActorSystem* StartActorSystem(void* appData = nullptr); - void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); - - void Stop(TDuration time = TDuration::Max()); - - NActors::TActorSystemSetup* GetSetup() const { - return Setup.Get(); - } - - private: - NDqs::TServiceNodeConfig Config; - ui32 Threads; - IMetricsRegistryPtr MetricsRegistry; - THolder<NActors::TActorSystemSetup> Setup; - TIntrusivePtr<NActors::NLog::TSettings> LogSettings; - THolder<NActors::TActorSystem> ActorSystem; - TVector<NActors::TActorId> ActorIds; - THolder<NGrpc::TGRpcServer> Server; - TIntrusivePtr<NGrpc::IGRpcService> Service; - }; -} diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp deleted file mode 100644 index 7635df0469..0000000000 --- a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp +++ /dev/null @@ -1,231 +0,0 @@ -#include <library/cpp/testing/unittest/registar.h> - -#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h> -#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h> - -#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h> - -#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> -#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h> - -#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h> -#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h> -#include <ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h> -#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h> - -#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h> -#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h> - -#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> -#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> -#include <ydb/library/yql/minikql/mkql_function_registry.h> - -#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> -#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> - -#include <ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h> - -#include <ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h> - -#include <ydb/library/yql/core/facade/yql_facade.h> -#include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/core/services/mounts/yql_mounts.h> - -#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> -#include <ydb/library/yql/core/file_storage/file_storage.h> - -#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> - -#include <util/stream/tee.h> -#include <util/string/cast.h> - -namespace NYql { - -NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver) { - auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); - RegisterDqPqReadActorFactory(*factory, driver, nullptr); - - RegisterDqPqWriteActorFactory(*factory, driver, nullptr); - return factory; -} - -bool RunPqProgram( - const TString& code, - bool optimizeOnly, - bool printExpr = false, - bool printTrace = false, - TString* errorsMessage = nullptr) { - NLog::YqlLoggerScope logger("cerr", false); - NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::DEBUG); - NLog::YqlLogger().SetComponentLevel(NLog::EComponent::ProviderRtmr, NLog::ELevel::DEBUG); - - IOutputStream* errorsOutput = &Cerr; - TMaybe<TStringOutput> errorsMessageOutput; - TMaybe<TTeeOutput> tee; - if (errorsMessage) { - errorsMessageOutput.ConstructInPlace(*errorsMessage); - tee.ConstructInPlace(&*errorsMessageOutput, &Cerr); - errorsOutput = &*tee; - } - - // Gateways config. - TGatewaysConfig gatewaysConfig; - // pq - { - auto& pqClusterConfig = *gatewaysConfig.MutablePq()->MutableClusterMapping()->Add(); - pqClusterConfig.SetName("lb"); - pqClusterConfig.SetClusterType(NYql::TPqClusterConfig::CT_PERS_QUEUE); - pqClusterConfig.SetEndpoint("lb.ru"); - pqClusterConfig.SetConfigManagerEndpoint("cm.lb.ru"); - pqClusterConfig.SetTvmId(777); - } - - // solomon - { - auto& solomonClusterConfig = *gatewaysConfig.MutableSolomon()->MutableClusterMapping()->Add(); - solomonClusterConfig.SetName("sol"); - solomonClusterConfig.SetCluster("sol.ru"); - } - - // dq - { - auto& dqCfg = *gatewaysConfig.MutableDq(); - auto* setting = dqCfg.AddDefaultSettings(); - setting->SetName("EnableComputeActor"); - setting->SetValue("1"); - } - - auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone(); - TVector<TDataProviderInitializer> dataProvidersInit; - - // pq - auto pqGateway = MakeIntrusive<TDummyPqGateway>(); - pqGateway->AddDummyTopic(TDummyTopic("lb", "my_in_topic")); - pqGateway->AddDummyTopic(TDummyTopic("lb", "my_out_topic")); - dataProvidersInit.push_back(GetPqDataProviderInitializer(std::move(pqGateway))); - - // solomon - auto solomonGateway = CreateSolomonGateway(gatewaysConfig.GetSolomon()); - dataProvidersInit.push_back(GetSolomonDataProviderInitializer(std::move(solomonGateway))); - - // dq - auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ - NYql::GetCommonDqFactory(), - NKikimr::NMiniKQL::GetYqlFactory() - }); - - auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({ - NYql::CreateCommonDqTaskTransformFactory() - }); - - const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")); - NYdb::TDriver driver(driverConfig); - auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, CreateAsyncIoFactory(driver)); - - auto storage = NYql::CreateFileStorage({}); - dataProvidersInit.push_back(NYql::GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage)); - - TExprContext moduleCtx; - IModuleResolver::TPtr moduleResolver; - YQL_ENSURE(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver)); - - TProgramFactory factory(true, functionRegistry.Get(), 0ULL, dataProvidersInit, "ut"); - - factory.SetGatewaysConfig(&gatewaysConfig); - factory.SetModules(moduleResolver); - - TProgramPtr program = factory.Create("program", code); - program->ConfigureYsonResultFormat(NYson::EYsonFormat::Text); - - Cerr << "Parse SQL..." << Endl; - NSQLTranslation::TTranslationSettings sqlSettings; - sqlSettings.SyntaxVersion = 1; - sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable; - sqlSettings.Flags.insert("DqEngineEnable"); - sqlSettings.Flags.insert("DqEngineForce"); - - sqlSettings.ClusterMapping["lb"] = PqProviderName; - sqlSettings.ClusterMapping["sol"] = SolomonProviderName; - if (!program->ParseSql(sqlSettings)) { - program->PrintErrorsTo(*errorsOutput); - return false; - } - program->AstRoot()->PrettyPrintTo(Cerr, NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote); - - - Cerr << "Compile..." << Endl; - if (!program->Compile("user")) { - program->PrintErrorsTo(*errorsOutput); - return false; - } - - auto exprOut = printExpr ? &Cout : nullptr; - auto traceOpt = printTrace ? &Cerr : nullptr; - - TProgram::TStatus status = TProgram::TStatus::Error; - if (optimizeOnly) { - Cerr << "Optimize..." << Endl; - status = program->Optimize("user", traceOpt, nullptr, exprOut); - } else { - Cerr << "Run..." << Endl; - status = program->Run("user", traceOpt, nullptr, exprOut); - } - - if (status == TProgram::TStatus::Error) { - if (printTrace) { - program->Print(traceOpt, nullptr); - } - program->PrintErrorsTo(*errorsOutput); - return false; - } - - driver.Stop(true); - - Cerr << "Done." << Endl; - return true; -} - -Y_UNIT_TEST_SUITE(YqlPqSimpleTests) { - - Y_UNIT_TEST(SelectWithNoSchema) { - auto code = R"( -USE lb; -PRAGMA pq.Consumer="my_test_consumer"; -INSERT INTO my_out_topic -SELECT Data FROM my_in_topic WHERE Data < "100"; - )"; - TString errorMessage; - auto res = RunPqProgram(code, true, true, true, &errorMessage); - UNIT_ASSERT_C(res, errorMessage); - } - - Y_UNIT_TEST(SelectWithSchema) { - auto code = R"( -USE lb; -PRAGMA pq.Consumer="my_test_consumer"; - -INSERT INTO my_out_topic -SELECT CAST(y as string) || x FROM lb.object(my_in_topic, "json") WITH SCHEMA (Int32 as y, String as x) - )"; - TString errorMessage; - auto res = RunPqProgram(code, true, true, true, &errorMessage); - UNIT_ASSERT_C(res, errorMessage); - } - - Y_UNIT_TEST(SelectStarWithSchema) { - auto code = R"( -USE lb; -PRAGMA pq.Consumer="my_test_consumer"; - -$q = SELECT * FROM lb.object(my_in_topic, "json") WITH SCHEMA (Int32 as y, String as x); -INSERT INTO my_out_topic -SELECT x FROM $q - )"; - TString errorMessage; - auto res = RunPqProgram(code, true, true, true, &errorMessage); - UNIT_ASSERT_C(res, errorMessage); - } - -} - -} // NYql |