diff options
author | Alexey Ozeritskiy <[email protected]> | 2022-06-25 02:45:39 +0300 |
---|---|---|
committer | Alexey Ozeritskiy <[email protected]> | 2022-06-25 02:45:39 +0300 |
commit | ea9530d813f474e68e596f573611284a000f01a6 (patch) | |
tree | a80c55f81ab3bc278fbafeccca5ca9594d6f9b67 | |
parent | d362d692242721c003d1002fc959b459eef9fe9d (diff) |
Use 1 thread in dq gateway
ref:7d2ec19fba507eda78786981b79493fc99700f96
3 files changed, 214 insertions, 112 deletions
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index d473b7308e1..997d1f7bddf 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -216,7 +216,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio return new TDqGatewayLocal( CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)), - CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort(), 8)); + CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort())); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 96c21c88ab2..82b6907affe 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -19,47 +19,40 @@ namespace NYql { -class TDqGateway: public IDqGateway -{ - enum ETaskPriority { - PRIO_NORMAL = 1, - PRIO_RT = 2 - }; +using namespace NThreading; +class TDqGatewayImpl: public std::enable_shared_from_this<TDqGatewayImpl> +{ public: - TDqGateway(const TString& host, int port, int threads, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout = TDuration::Minutes(60), TDuration requestTimeout = TDuration::Max()) + using TResult = IDqGateway::TResult; + using TDqProgressWriter = IDqGateway::TDqProgressWriter; + + TDqGatewayImpl(const TString& host, int port, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout, TDuration requestTimeout) : GrpcConf(TStringBuilder() << host << ":" << port, requestTimeout) , GrpcClient(1) , Service(GrpcClient.CreateGRpcServiceConnection<Yql::DqsProto::DqService>(GrpcConf)) , VanillaJobPath(vanillaJobPath) , VanillaJobMd5(vanillaJobMd5) - , TaskScheduler(threads) - , RtTaskScheduler(1) + , TaskScheduler(1) , OpenSessionTimeout(timeout) { TaskScheduler.Start(); - RtTaskScheduler.Start(); } - TString GetVanillaJobPath() override { + TString GetVanillaJobPath() { return VanillaJobPath; } - TString GetVanillaJobMd5() override { + TString GetVanillaJobMd5() { return VanillaJobMd5; } template<typename RespType> - void OnResponse(NThreading::TPromise<TResult> promise, TString sessionId, NGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap<TString, TString>& modulesMapping, bool alwaysFallback = false) + void OnResponse(TPromise<TResult> promise, TString sessionId, NGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap<TString, TString>& modulesMapping, bool alwaysFallback = false) { YQL_LOG_CTX_ROOT_SCOPE(sessionId); YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback"; - { - TGuard<TMutex> lock(ProgressMutex); - RunningQueries.erase(sessionId); - } - TResult result; bool error = false; @@ -149,35 +142,23 @@ public: result.AddIssues(result.Issues); } - Async([promise=std::move(promise), result=std::move(result)]() mutable { promise.SetValue(result); }); + promise.SetValue(result); } - NThreading::TFuture<void> Delay(TDuration duration, ETaskPriority prio = PRIO_NORMAL) { - NThreading::TPromise<void> promise = NThreading::NewPromise(); + TFuture<void> Delay(TDuration duration) { + TPromise<void> promise = NewPromise(); auto future = promise.GetFuture(); - auto& taskScheduler = prio == PRIO_NORMAL ? TaskScheduler : RtTaskScheduler; - - if (!taskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant::Now() + duration)) { + if (!TaskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant::Now() + duration)) { promise.SetException("cannot delay"); } return future; } - void Async(const std::function<void(void)>& f) { - NThreading::TPromise<void> promise = NThreading::NewPromise(); - - promise.GetFuture().Apply([=](const NThreading::TFuture<void>&) { - f(); - }); - - Y_VERIFY(TaskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant())); - } - template <typename TResponse, typename TRequest, typename TStub> - NThreading::TFuture<TResult> WithRetry( + TFuture<TResult> WithRetry( const TString& sessionId, const TRequest& queryPB, TStub stub, @@ -186,11 +167,19 @@ public: const THashMap<TString, TString>& modulesMapping ) { auto backoff = TDuration::MilliSeconds(settings->RetryBackoffMs.Get().GetOrElse(1000)); - auto promise = NThreading::NewPromise<TResult>(); + auto promise = NewPromise<TResult>(); auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse("default"); auto alwaysFallback = fallbackPolicy == "always"; - auto callback = [this, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { - return OnResponse(std::move(promise), std::move(sessionId), std::move(status), std::move(resp), modulesMapping, alwaysFallback); + auto self = weak_from_this(); + auto callback = [self, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { + auto this_ = self.lock(); + if (!this_) { + YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; + promise.SetException("Gateway was closed"); + return; + } + + this_->OnResponse(std::move(promise), std::move(sessionId), std::move(status), std::move(resp), modulesMapping, alwaysFallback); }; Service->DoRequest<TRequest, TResponse>(queryPB, callback, stub); @@ -199,41 +188,48 @@ public: TGuard<TMutex> lock(ProgressMutex); auto i = RunningQueries.find(sessionId); if (i != RunningQueries.end()) { - if (i->second.first) { + if (i->second.ProgressWriter) { ScheduleQueryStatusRequest(sessionId); } } else { - return NThreading::MakeFuture(TResult()); + return MakeFuture(TResult()); } } - return promise.GetFuture().Apply([=](const NThreading::TFuture<TResult>& result) { + return promise.GetFuture().Apply([=](const TFuture<TResult>& result) { if (result.HasException()) { return result; } auto value = result.GetValue(); - if (value.Success() || retry == 0 || !value.Retriable) { + auto this_ = self.lock(); + + if (value.Success() || retry == 0 || !value.Retriable || !this_) { return result; } - return Delay(backoff) - .Apply([=](const NThreading::TFuture<void>& result) { + return this_->Delay(backoff) + .Apply([=](const TFuture<void>& result) { + auto this_ = self.lock(); try { result.TryRethrow(); + if (!this_) { + YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; + throw std::runtime_error("Gateway was closed"); + } } catch (...) { - return NThreading::MakeErrorFuture<TResult>(std::current_exception()); + return MakeErrorFuture<TResult>(std::current_exception()); } - return WithRetry<TResponse>(sessionId, queryPB, stub, retry - 1, settings, modulesMapping); + return this_->WithRetry<TResponse>(sessionId, queryPB, stub, retry - 1, settings, modulesMapping); }); }); } - NThreading::TFuture<TResult> + TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, - bool discard) override + bool discard) { YQL_LOG_CTX_ROOT_SCOPE(sessionId); @@ -275,52 +271,92 @@ public: int retry = settings->MaxRetries.Get().GetOrElse(5); + TFuture<void> sessionFuture; { TGuard<TMutex> lock(ProgressMutex); - RunningQueries.emplace(sessionId, std::make_pair(progressWriter, TString(""))); + auto it = RunningQueries.find(sessionId); + if (it == RunningQueries.end()) { + YQL_CLOG(DEBUG, ProviderDq) << "Session was closed: " << sessionId; + return MakeErrorFuture<TResult>(std::make_exception_ptr(std::runtime_error("Session was closed"))); + } + it->second.ProgressWriter = progressWriter; + sessionFuture = it->second.OpenSessionFuture; } YQL_CLOG(DEBUG, ProviderDq) << "Send query of size " << queryPB.ByteSizeLong(); - return WithRetry<Yql::DqsProto::ExecuteGraphResponse>( - sessionId, - queryPB, - &Yql::DqsProto::DqService::Stub::AsyncExecuteGraph, - retry, - settings, - modulesMapping); + auto self = weak_from_this(); + return sessionFuture.Apply([self, sessionId, queryPB, retry, settings, modulesMapping](const TFuture<void>& ) { + auto this_ = self.lock(); + if (!this_) { + YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; + return MakeErrorFuture<TResult>(std::make_exception_ptr(std::runtime_error("Gateway was closed"))); + } + + return this_->WithRetry<Yql::DqsProto::ExecuteGraphResponse>( + sessionId, + queryPB, + &Yql::DqsProto::DqService::Stub::AsyncExecuteGraph, + retry, + settings, + modulesMapping); + }); } - NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { + TFuture<void> OpenSession(const TString& sessionId, const TString& username) { YQL_LOG_CTX_ROOT_SCOPE(sessionId); YQL_CLOG(INFO, ProviderDq) << "OpenSession"; Yql::DqsProto::OpenSessionRequest request; request.SetSession(sessionId); request.SetUsername(username); + { + TGuard<TMutex> lock(ProgressMutex); + if (RunningQueries.find(sessionId) != RunningQueries.end()) { + return MakeFuture(); + } + } + NGrpc::TCallMeta meta; meta.Timeout = OpenSessionTimeout; - auto promise = NThreading::NewPromise<void>(); - auto callback = [this, promise, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable { + auto promise = NewPromise<void>(); + auto self = weak_from_this(); + auto callback = [self, promise, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable { Y_UNUSED(resp); YQL_LOG_CTX_ROOT_SCOPE(sessionId); + auto this_ = self.lock(); + if (!this_) { + YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; + promise.SetException("Gateway was closed"); + return; + } if (status.Ok()) { YQL_CLOG(INFO, ProviderDq) << "OpenSession OK"; - SchedulePingSessionRequest(sessionId); - Async([promise=std::move(promise)]() mutable { promise.SetValue(); }); + this_->SchedulePingSessionRequest(sessionId); + promise.SetValue(); } else { YQL_CLOG(ERROR, ProviderDq) << "OpenSession error: " << status.Msg; - Async([promise=std::move(promise), status]() mutable { promise.SetException(status.Msg); }); + promise.SetException(status.Msg); } }; Service->DoRequest<Yql::DqsProto::OpenSessionRequest, Yql::DqsProto::OpenSessionResponse>( request, callback, &Yql::DqsProto::DqService::Stub::AsyncOpenSession, meta); - return promise.GetFuture(); + + { + TGuard<TMutex> lock(ProgressMutex); + RunningQueries.emplace(sessionId, TSession { + std::optional<TDqProgressWriter> {}, + "", + promise.GetFuture() + }); + } + + return MakeFuture(); } - void CloseSession(const TString& sessionId) override { + void CloseSession(const TString& sessionId) { Yql::DqsProto::CloseSessionRequest request; request.SetSession(sessionId); @@ -338,70 +374,95 @@ public: request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession); } + void OnRequestQueryStatus(const TString& sessionId, const TString& status, bool ok) { + TGuard<TMutex> lock(ProgressMutex); + TString stage; + TDqProgressWriter* dqProgressWriter = nullptr; + auto it = RunningQueries.find(sessionId); + if (it != RunningQueries.end() && ok) { + dqProgressWriter = it->second.ProgressWriter ? &*it->second.ProgressWriter:nullptr; + auto lastStatus = it->second.Status; + if (dqProgressWriter && lastStatus != status) { + stage = status; + it->second.Status = stage; + } + + ScheduleQueryStatusRequest(sessionId); + } else if (it != RunningQueries.end()) { + it->second.ProgressWriter = {}; + } + + if (!stage.empty() && dqProgressWriter) { + (*dqProgressWriter)(stage); + } + } + void RequestQueryStatus(const TString& sessionId) { Yql::DqsProto::QueryStatusRequest request; request.SetSession(sessionId); - IDqGateway::TPtr self = this; - auto callback = [this, self, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) { - if (status.Ok()) { - TGuard<TMutex> lock(ProgressMutex); - TString stage; - TDqProgressWriter* dqProgressWriter = nullptr; - auto it = RunningQueries.find(sessionId); - if (it != RunningQueries.end()) { - dqProgressWriter = &it->second.first; - auto lastStatus = it->second.second; - if (dqProgressWriter && lastStatus != resp.GetStatus()) { - stage = resp.GetStatus(); - it->second.second = stage; - } - - ScheduleQueryStatusRequest(sessionId); - } - - if (!stage.empty() && dqProgressWriter) { - (*dqProgressWriter)(stage); - } - } else { - TGuard<TMutex> lock(ProgressMutex); - RunningQueries.erase(sessionId); + auto self = weak_from_this(); + auto callback = [self, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) { + auto this_ = self.lock(); + if (!this_) { + return; } + + this_->OnRequestQueryStatus(sessionId, resp.GetStatus(), status.Ok()); }; Service->DoRequest<Yql::DqsProto::QueryStatusRequest, Yql::DqsProto::QueryStatusResponse>( request, callback, &Yql::DqsProto::DqService::Stub::AsyncQueryStatus, {}, nullptr); } + void StartQueryStatusRequest(const TString& sessionId, bool ok) { + TGuard<TMutex> lock(ProgressMutex); + auto it = RunningQueries.find(sessionId); + if (it != RunningQueries.end() && ok) { + RequestQueryStatus(sessionId); + } else if (it != RunningQueries.end()) { + it->second.ProgressWriter = {}; + } + } + void ScheduleQueryStatusRequest(const TString& sessionId) { - Delay(TDuration::MilliSeconds(1000)).Subscribe([this, sessionId](NThreading::TFuture<void> fut) { - if (fut.HasException()) { - TGuard<TMutex> lock(ProgressMutex); - RunningQueries.erase(sessionId); - } else { - TGuard<TMutex> lock(ProgressMutex); - auto it = RunningQueries.find(sessionId); - if (it != RunningQueries.end()) { - RequestQueryStatus(sessionId); - } + auto self = weak_from_this(); + Delay(TDuration::MilliSeconds(1000)).Subscribe([self, sessionId](TFuture<void> fut) { + auto this_ = self.lock(); + if (!this_) { + return; } + + this_->StartQueryStatusRequest(sessionId, !fut.HasException()); }); } void SchedulePingSessionRequest(const TString& sessionId) { - auto callback = [this, sessionId]( + auto self = weak_from_this(); + auto callback = [self, sessionId]( NGrpc::TGrpcStatus&& status, Yql::DqsProto::PingSessionResponse&&) mutable { + auto this_ = self.lock(); + if (!this_) { + return; + } + if (status.GRpcStatusCode == grpc::INVALID_ARGUMENT) { YQL_CLOG(INFO, ProviderDq) << "Session closed " << sessionId; } else { - SchedulePingSessionRequest(sessionId); + this_->SchedulePingSessionRequest(sessionId); } }; - Delay(TDuration::Seconds(10), PRIO_RT).Subscribe([this, callback, sessionId](const NThreading::TFuture<void>&) { + Delay(TDuration::Seconds(10)).Subscribe([self, callback, sessionId](const TFuture<void>&) { + auto this_ = self.lock(); + if (!this_) { + return; + } + Yql::DqsProto::PingSessionRequest query; query.SetSession(sessionId); - Service->DoRequest<Yql::DqsProto::PingSessionRequest, Yql::DqsProto::PingSessionResponse>( + + this_->Service->DoRequest<Yql::DqsProto::PingSessionRequest, Yql::DqsProto::PingSessionResponse>( query, callback, &Yql::DqsProto::DqService::Stub::AsyncPingSession); @@ -409,7 +470,7 @@ public: } struct TDelay: public TTaskScheduler::ITask { - TDelay(NThreading::TPromise<void> p) + TDelay(TPromise<void> p) : Promise(std::move(p)) { } @@ -418,7 +479,7 @@ public: return TInstant::Max(); } - NThreading::TPromise<void> Promise; + TPromise<void> Promise; }; private: @@ -428,22 +489,63 @@ private: TMutex ProgressMutex; TMutex Mutex; - THashMap<TString, std::pair<TDqProgressWriter, TString>> RunningQueries; + + struct TSession { + std::optional<TDqProgressWriter> ProgressWriter; + TString Status; + TFuture<void> OpenSessionFuture; + }; + THashMap<TString, TSession> RunningQueries; TString VanillaJobPath; TString VanillaJobMd5; TTaskScheduler TaskScheduler; - TTaskScheduler RtTaskScheduler; - const TDuration OpenSessionTimeout; }; -TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port, int threads) { - return new TDqGateway(host, port, threads, "", ""); +class TDqGateway: public IDqGateway { +public: + TDqGateway(const TString& host, int port, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout = TDuration::Minutes(60), TDuration requestTimeout = TDuration::Max()) + : Impl(std::make_shared<TDqGatewayImpl>(host, port, vanillaJobPath, vanillaJobMd5, timeout, requestTimeout)) + { } + + TFuture<void> OpenSession(const TString& sessionId, const TString& username) override + { + return Impl->OpenSession(sessionId, username); + } + + void CloseSession(const TString& sessionId) override + { + Impl->CloseSession(sessionId); + } + + TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, + const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, + const TDqSettings::TPtr& settings, + const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, + bool discard) override + { + return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard); + } + + TString GetVanillaJobPath() override { + return Impl->GetVanillaJobPath(); + } + + TString GetVanillaJobMd5() override { + return Impl->GetVanillaJobMd5(); + } + +private: + std::shared_ptr<TDqGatewayImpl> Impl; +}; + +TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port) { + return new TDqGateway(host, port, "", ""); } TIntrusivePtr<IDqGateway> CreateDqGateway(const NProto::TDqConfig& config) { - return new TDqGateway("localhost", config.GetPort(), 8, + return new TDqGateway("localhost", config.GetPort(), config.GetYtBackends()[0].GetVanillaJob(), config.GetYtBackends()[0].GetVanillaJobMd5(), TDuration::MilliSeconds(config.GetOpenSessionTimeoutMs()), diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index b335230e0bd..cf648de0bea 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -79,7 +79,7 @@ public: } }; -TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port, int threads); +TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port); TIntrusivePtr<IDqGateway> CreateDqGateway(const NProto::TDqConfig& config); } // namespace NYql |