diff options
author | Vitaly Stoyan <vitstn@gmail.com> | 2022-06-17 11:11:46 +0300 |
---|---|---|
committer | Vitaly Stoyan <vitstn@gmail.com> | 2022-06-17 11:11:46 +0300 |
commit | 20de76014dc5dc07f17d727febf1d49dc14c7996 (patch) | |
tree | 3a91955032fded6acaae9d57e527e8ba51e6c691 | |
parent | 14263edb92185e889ea9e66d400cfd8eb6007125 (diff) | |
download | ydb-20de76014dc5dc07f17d727febf1d49dc14c7996.tar.gz |
fixed empty list access & use weak ptr from callback
ref:ff169b8fbfd15bb1e40748f466c0260c53725078
-rw-r--r-- | ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp | 62 |
1 files changed, 48 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index a4dab7ec996..cf0f58785ea 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -78,7 +78,7 @@ private: THolder<TServiceNode> ServiceNode; }; -class TDqGatewayLocal: public IDqGateway +class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalImpl> { struct TRequest { TString SessionId; @@ -87,39 +87,39 @@ class TDqGatewayLocal: public IDqGateway THashMap<TString, TString> SecureParams; THashMap<TString, TString> GraphParams; TDqSettings::TPtr Settings; - TDqProgressWriter ProgressWriter; + IDqGateway::TDqProgressWriter ProgressWriter; THashMap<TString, TString> ModulesMapping; bool Discard; - NThreading::TPromise<TResult> Result; + NThreading::TPromise<IDqGateway::TResult> Result; }; public: - TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway) + TDqGatewayLocalImpl(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway) : LocalService(std::move(localService)) , Gateway(gateway) , DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE")) { } - NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { + NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) { return Gateway->OpenSession(sessionId, username); } - void CloseSession(const TString& sessionId) override { + void CloseSession(const TString& sessionId) { return Gateway->CloseSession(sessionId); } - NThreading::TFuture<TResult> + NThreading::TFuture<IDqGateway::TResult> ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, const TDqSettings::TPtr& settings, - const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, - bool discard) override + const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, + bool discard) { - NThreading::TFuture<TResult> result; + NThreading::TFuture<IDqGateway::TResult> result; { TGuard<TMutex> lock(Mutex); - Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<TResult>()}); + Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()}); result = Queue.back().Result; } @@ -131,18 +131,23 @@ public: private: void TryExecuteNext() { TGuard<TMutex> lock(Mutex); - if (!DeterministicMode || Queue.size() == 1) { + if (!Queue.empty() && (!DeterministicMode || Queue.size() == 1)) { auto request = std::move(Queue.front()); Queue.pop_front(); lock.Release(); + auto weak = weak_from_this(); + Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard) - .Apply([promise=request.Result, this](const NThreading::TFuture<TResult>& result) mutable { + .Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable { try { promise.SetValue(result.GetValue()); } catch (...) { promise.SetException(std::current_exception()); } - TryExecuteNext(); + + if (auto ptr = weak.lock()) { + ptr->TryExecuteNext(); + } }); } } @@ -154,6 +159,35 @@ private: TList<TRequest> Queue; }; +class TDqGatewayLocal : public IDqGateway { +public: + TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway) + : Impl(std::make_shared<TDqGatewayLocalImpl>(std::move(localService), gateway)) + {} + + NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { + return Impl->OpenSession(sessionId, username); + } + + void CloseSession(const TString& sessionId) override { + return Impl->CloseSession(sessionId); + } + + NThreading::TFuture<TResult> + ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, + const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, + const TDqSettings::TPtr& settings, + const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, + bool discard) override + { + return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, + settings, progressWriter, modulesMapping, discard); + } + +private: + std::shared_ptr<TDqGatewayLocalImpl> Impl; +}; + THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, |