aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vitstn@gmail.com>2022-06-17 11:11:46 +0300
committerVitaly Stoyan <vitstn@gmail.com>2022-06-17 11:11:46 +0300
commit20de76014dc5dc07f17d727febf1d49dc14c7996 (patch)
tree3a91955032fded6acaae9d57e527e8ba51e6c691
parent14263edb92185e889ea9e66d400cfd8eb6007125 (diff)
downloadydb-20de76014dc5dc07f17d727febf1d49dc14c7996.tar.gz
fixed empty list access & use weak ptr from callback
ref:ff169b8fbfd15bb1e40748f466c0260c53725078
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp62
1 files changed, 48 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index a4dab7ec996..cf0f58785ea 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -78,7 +78,7 @@ private:
THolder<TServiceNode> ServiceNode;
};
-class TDqGatewayLocal: public IDqGateway
+class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalImpl>
{
struct TRequest {
TString SessionId;
@@ -87,39 +87,39 @@ class TDqGatewayLocal: public IDqGateway
THashMap<TString, TString> SecureParams;
THashMap<TString, TString> GraphParams;
TDqSettings::TPtr Settings;
- TDqProgressWriter ProgressWriter;
+ IDqGateway::TDqProgressWriter ProgressWriter;
THashMap<TString, TString> ModulesMapping;
bool Discard;
- NThreading::TPromise<TResult> Result;
+ NThreading::TPromise<IDqGateway::TResult> Result;
};
public:
- TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway)
+ TDqGatewayLocalImpl(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway)
: LocalService(std::move(localService))
, Gateway(gateway)
, DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE"))
{ }
- NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override {
+ NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) {
return Gateway->OpenSession(sessionId, username);
}
- void CloseSession(const TString& sessionId) override {
+ void CloseSession(const TString& sessionId) {
return Gateway->CloseSession(sessionId);
}
- NThreading::TFuture<TResult>
+ NThreading::TFuture<IDqGateway::TResult>
ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
- const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
- bool discard) override
+ const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
+ bool discard)
{
- NThreading::TFuture<TResult> result;
+ NThreading::TFuture<IDqGateway::TResult> result;
{
TGuard<TMutex> lock(Mutex);
- Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<TResult>()});
+ Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()});
result = Queue.back().Result;
}
@@ -131,18 +131,23 @@ public:
private:
void TryExecuteNext() {
TGuard<TMutex> lock(Mutex);
- if (!DeterministicMode || Queue.size() == 1) {
+ if (!Queue.empty() && (!DeterministicMode || Queue.size() == 1)) {
auto request = std::move(Queue.front()); Queue.pop_front();
lock.Release();
+ auto weak = weak_from_this();
+
Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard)
- .Apply([promise=request.Result, this](const NThreading::TFuture<TResult>& result) mutable {
+ .Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable {
try {
promise.SetValue(result.GetValue());
} catch (...) {
promise.SetException(std::current_exception());
}
- TryExecuteNext();
+
+ if (auto ptr = weak.lock()) {
+ ptr->TryExecuteNext();
+ }
});
}
}
@@ -154,6 +159,35 @@ private:
TList<TRequest> Queue;
};
+class TDqGatewayLocal : public IDqGateway {
+public:
+ TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway)
+ : Impl(std::make_shared<TDqGatewayLocalImpl>(std::move(localService), gateway))
+ {}
+
+ NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override {
+ return Impl->OpenSession(sessionId, username);
+ }
+
+ void CloseSession(const TString& sessionId) override {
+ return Impl->CloseSession(sessionId);
+ }
+
+ NThreading::TFuture<TResult>
+ ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
+ const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
+ const TDqSettings::TPtr& settings,
+ const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
+ bool discard) override
+ {
+ return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams,
+ settings, progressWriter, modulesMapping, discard);
+ }
+
+private:
+ std::shared_ptr<TDqGatewayLocalImpl> Impl;
+};
+
THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,