summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <[email protected]>2022-06-25 02:45:39 +0300
committerAlexey Ozeritskiy <[email protected]>2022-06-25 02:45:39 +0300
commitea9530d813f474e68e596f573611284a000f01a6 (patch)
treea80c55f81ab3bc278fbafeccca5ca9594d6f9b67
parentd362d692242721c003d1002fc959b459eef9fe9d (diff)
Use 1 thread in dq gateway
ref:7d2ec19fba507eda78786981b79493fc99700f96
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp322
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.h2
3 files changed, 214 insertions, 112 deletions
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index d473b7308e1..997d1f7bddf 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -216,7 +216,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
return new TDqGatewayLocal(
CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)),
- CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort(), 8));
+ CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort()));
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
index 96c21c88ab2..82b6907affe 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
@@ -19,47 +19,40 @@
namespace NYql {
-class TDqGateway: public IDqGateway
-{
- enum ETaskPriority {
- PRIO_NORMAL = 1,
- PRIO_RT = 2
- };
+using namespace NThreading;
+class TDqGatewayImpl: public std::enable_shared_from_this<TDqGatewayImpl>
+{
public:
- TDqGateway(const TString& host, int port, int threads, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout = TDuration::Minutes(60), TDuration requestTimeout = TDuration::Max())
+ using TResult = IDqGateway::TResult;
+ using TDqProgressWriter = IDqGateway::TDqProgressWriter;
+
+ TDqGatewayImpl(const TString& host, int port, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout, TDuration requestTimeout)
: GrpcConf(TStringBuilder() << host << ":" << port, requestTimeout)
, GrpcClient(1)
, Service(GrpcClient.CreateGRpcServiceConnection<Yql::DqsProto::DqService>(GrpcConf))
, VanillaJobPath(vanillaJobPath)
, VanillaJobMd5(vanillaJobMd5)
- , TaskScheduler(threads)
- , RtTaskScheduler(1)
+ , TaskScheduler(1)
, OpenSessionTimeout(timeout)
{
TaskScheduler.Start();
- RtTaskScheduler.Start();
}
- TString GetVanillaJobPath() override {
+ TString GetVanillaJobPath() {
return VanillaJobPath;
}
- TString GetVanillaJobMd5() override {
+ TString GetVanillaJobMd5() {
return VanillaJobMd5;
}
template<typename RespType>
- void OnResponse(NThreading::TPromise<TResult> promise, TString sessionId, NGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap<TString, TString>& modulesMapping, bool alwaysFallback = false)
+ void OnResponse(TPromise<TResult> promise, TString sessionId, NGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap<TString, TString>& modulesMapping, bool alwaysFallback = false)
{
YQL_LOG_CTX_ROOT_SCOPE(sessionId);
YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback";
- {
- TGuard<TMutex> lock(ProgressMutex);
- RunningQueries.erase(sessionId);
- }
-
TResult result;
bool error = false;
@@ -149,35 +142,23 @@ public:
result.AddIssues(result.Issues);
}
- Async([promise=std::move(promise), result=std::move(result)]() mutable { promise.SetValue(result); });
+ promise.SetValue(result);
}
- NThreading::TFuture<void> Delay(TDuration duration, ETaskPriority prio = PRIO_NORMAL) {
- NThreading::TPromise<void> promise = NThreading::NewPromise();
+ TFuture<void> Delay(TDuration duration) {
+ TPromise<void> promise = NewPromise();
auto future = promise.GetFuture();
- auto& taskScheduler = prio == PRIO_NORMAL ? TaskScheduler : RtTaskScheduler;
-
- if (!taskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant::Now() + duration)) {
+ if (!TaskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant::Now() + duration)) {
promise.SetException("cannot delay");
}
return future;
}
- void Async(const std::function<void(void)>& f) {
- NThreading::TPromise<void> promise = NThreading::NewPromise();
-
- promise.GetFuture().Apply([=](const NThreading::TFuture<void>&) {
- f();
- });
-
- Y_VERIFY(TaskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant()));
- }
-
template <typename TResponse, typename TRequest, typename TStub>
- NThreading::TFuture<TResult> WithRetry(
+ TFuture<TResult> WithRetry(
const TString& sessionId,
const TRequest& queryPB,
TStub stub,
@@ -186,11 +167,19 @@ public:
const THashMap<TString, TString>& modulesMapping
) {
auto backoff = TDuration::MilliSeconds(settings->RetryBackoffMs.Get().GetOrElse(1000));
- auto promise = NThreading::NewPromise<TResult>();
+ auto promise = NewPromise<TResult>();
auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse("default");
auto alwaysFallback = fallbackPolicy == "always";
- auto callback = [this, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable {
- return OnResponse(std::move(promise), std::move(sessionId), std::move(status), std::move(resp), modulesMapping, alwaysFallback);
+ auto self = weak_from_this();
+ auto callback = [self, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable {
+ auto this_ = self.lock();
+ if (!this_) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId;
+ promise.SetException("Gateway was closed");
+ return;
+ }
+
+ this_->OnResponse(std::move(promise), std::move(sessionId), std::move(status), std::move(resp), modulesMapping, alwaysFallback);
};
Service->DoRequest<TRequest, TResponse>(queryPB, callback, stub);
@@ -199,41 +188,48 @@ public:
TGuard<TMutex> lock(ProgressMutex);
auto i = RunningQueries.find(sessionId);
if (i != RunningQueries.end()) {
- if (i->second.first) {
+ if (i->second.ProgressWriter) {
ScheduleQueryStatusRequest(sessionId);
}
} else {
- return NThreading::MakeFuture(TResult());
+ return MakeFuture(TResult());
}
}
- return promise.GetFuture().Apply([=](const NThreading::TFuture<TResult>& result) {
+ return promise.GetFuture().Apply([=](const TFuture<TResult>& result) {
if (result.HasException()) {
return result;
}
auto value = result.GetValue();
- if (value.Success() || retry == 0 || !value.Retriable) {
+ auto this_ = self.lock();
+
+ if (value.Success() || retry == 0 || !value.Retriable || !this_) {
return result;
}
- return Delay(backoff)
- .Apply([=](const NThreading::TFuture<void>& result) {
+ return this_->Delay(backoff)
+ .Apply([=](const TFuture<void>& result) {
+ auto this_ = self.lock();
try {
result.TryRethrow();
+ if (!this_) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId;
+ throw std::runtime_error("Gateway was closed");
+ }
} catch (...) {
- return NThreading::MakeErrorFuture<TResult>(std::current_exception());
+ return MakeErrorFuture<TResult>(std::current_exception());
}
- return WithRetry<TResponse>(sessionId, queryPB, stub, retry - 1, settings, modulesMapping);
+ return this_->WithRetry<TResponse>(sessionId, queryPB, stub, retry - 1, settings, modulesMapping);
});
});
}
- NThreading::TFuture<TResult>
+ TFuture<TResult>
ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
- bool discard) override
+ bool discard)
{
YQL_LOG_CTX_ROOT_SCOPE(sessionId);
@@ -275,52 +271,92 @@ public:
int retry = settings->MaxRetries.Get().GetOrElse(5);
+ TFuture<void> sessionFuture;
{
TGuard<TMutex> lock(ProgressMutex);
- RunningQueries.emplace(sessionId, std::make_pair(progressWriter, TString("")));
+ auto it = RunningQueries.find(sessionId);
+ if (it == RunningQueries.end()) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Session was closed: " << sessionId;
+ return MakeErrorFuture<TResult>(std::make_exception_ptr(std::runtime_error("Session was closed")));
+ }
+ it->second.ProgressWriter = progressWriter;
+ sessionFuture = it->second.OpenSessionFuture;
}
YQL_CLOG(DEBUG, ProviderDq) << "Send query of size " << queryPB.ByteSizeLong();
- return WithRetry<Yql::DqsProto::ExecuteGraphResponse>(
- sessionId,
- queryPB,
- &Yql::DqsProto::DqService::Stub::AsyncExecuteGraph,
- retry,
- settings,
- modulesMapping);
+ auto self = weak_from_this();
+ return sessionFuture.Apply([self, sessionId, queryPB, retry, settings, modulesMapping](const TFuture<void>& ) {
+ auto this_ = self.lock();
+ if (!this_) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId;
+ return MakeErrorFuture<TResult>(std::make_exception_ptr(std::runtime_error("Gateway was closed")));
+ }
+
+ return this_->WithRetry<Yql::DqsProto::ExecuteGraphResponse>(
+ sessionId,
+ queryPB,
+ &Yql::DqsProto::DqService::Stub::AsyncExecuteGraph,
+ retry,
+ settings,
+ modulesMapping);
+ });
}
- NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override {
+ TFuture<void> OpenSession(const TString& sessionId, const TString& username) {
YQL_LOG_CTX_ROOT_SCOPE(sessionId);
YQL_CLOG(INFO, ProviderDq) << "OpenSession";
Yql::DqsProto::OpenSessionRequest request;
request.SetSession(sessionId);
request.SetUsername(username);
+ {
+ TGuard<TMutex> lock(ProgressMutex);
+ if (RunningQueries.find(sessionId) != RunningQueries.end()) {
+ return MakeFuture();
+ }
+ }
+
NGrpc::TCallMeta meta;
meta.Timeout = OpenSessionTimeout;
- auto promise = NThreading::NewPromise<void>();
- auto callback = [this, promise, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable {
+ auto promise = NewPromise<void>();
+ auto self = weak_from_this();
+ auto callback = [self, promise, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable {
Y_UNUSED(resp);
YQL_LOG_CTX_ROOT_SCOPE(sessionId);
+ auto this_ = self.lock();
+ if (!this_) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId;
+ promise.SetException("Gateway was closed");
+ return;
+ }
if (status.Ok()) {
YQL_CLOG(INFO, ProviderDq) << "OpenSession OK";
- SchedulePingSessionRequest(sessionId);
- Async([promise=std::move(promise)]() mutable { promise.SetValue(); });
+ this_->SchedulePingSessionRequest(sessionId);
+ promise.SetValue();
} else {
YQL_CLOG(ERROR, ProviderDq) << "OpenSession error: " << status.Msg;
- Async([promise=std::move(promise), status]() mutable { promise.SetException(status.Msg); });
+ promise.SetException(status.Msg);
}
};
Service->DoRequest<Yql::DqsProto::OpenSessionRequest, Yql::DqsProto::OpenSessionResponse>(
request, callback, &Yql::DqsProto::DqService::Stub::AsyncOpenSession, meta);
- return promise.GetFuture();
+
+ {
+ TGuard<TMutex> lock(ProgressMutex);
+ RunningQueries.emplace(sessionId, TSession {
+ std::optional<TDqProgressWriter> {},
+ "",
+ promise.GetFuture()
+ });
+ }
+
+ return MakeFuture();
}
- void CloseSession(const TString& sessionId) override {
+ void CloseSession(const TString& sessionId) {
Yql::DqsProto::CloseSessionRequest request;
request.SetSession(sessionId);
@@ -338,70 +374,95 @@ public:
request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession);
}
+ void OnRequestQueryStatus(const TString& sessionId, const TString& status, bool ok) {
+ TGuard<TMutex> lock(ProgressMutex);
+ TString stage;
+ TDqProgressWriter* dqProgressWriter = nullptr;
+ auto it = RunningQueries.find(sessionId);
+ if (it != RunningQueries.end() && ok) {
+ dqProgressWriter = it->second.ProgressWriter ? &*it->second.ProgressWriter:nullptr;
+ auto lastStatus = it->second.Status;
+ if (dqProgressWriter && lastStatus != status) {
+ stage = status;
+ it->second.Status = stage;
+ }
+
+ ScheduleQueryStatusRequest(sessionId);
+ } else if (it != RunningQueries.end()) {
+ it->second.ProgressWriter = {};
+ }
+
+ if (!stage.empty() && dqProgressWriter) {
+ (*dqProgressWriter)(stage);
+ }
+ }
+
void RequestQueryStatus(const TString& sessionId) {
Yql::DqsProto::QueryStatusRequest request;
request.SetSession(sessionId);
- IDqGateway::TPtr self = this;
- auto callback = [this, self, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) {
- if (status.Ok()) {
- TGuard<TMutex> lock(ProgressMutex);
- TString stage;
- TDqProgressWriter* dqProgressWriter = nullptr;
- auto it = RunningQueries.find(sessionId);
- if (it != RunningQueries.end()) {
- dqProgressWriter = &it->second.first;
- auto lastStatus = it->second.second;
- if (dqProgressWriter && lastStatus != resp.GetStatus()) {
- stage = resp.GetStatus();
- it->second.second = stage;
- }
-
- ScheduleQueryStatusRequest(sessionId);
- }
-
- if (!stage.empty() && dqProgressWriter) {
- (*dqProgressWriter)(stage);
- }
- } else {
- TGuard<TMutex> lock(ProgressMutex);
- RunningQueries.erase(sessionId);
+ auto self = weak_from_this();
+ auto callback = [self, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) {
+ auto this_ = self.lock();
+ if (!this_) {
+ return;
}
+
+ this_->OnRequestQueryStatus(sessionId, resp.GetStatus(), status.Ok());
};
Service->DoRequest<Yql::DqsProto::QueryStatusRequest, Yql::DqsProto::QueryStatusResponse>(
request, callback, &Yql::DqsProto::DqService::Stub::AsyncQueryStatus, {}, nullptr);
}
+ void StartQueryStatusRequest(const TString& sessionId, bool ok) {
+ TGuard<TMutex> lock(ProgressMutex);
+ auto it = RunningQueries.find(sessionId);
+ if (it != RunningQueries.end() && ok) {
+ RequestQueryStatus(sessionId);
+ } else if (it != RunningQueries.end()) {
+ it->second.ProgressWriter = {};
+ }
+ }
+
void ScheduleQueryStatusRequest(const TString& sessionId) {
- Delay(TDuration::MilliSeconds(1000)).Subscribe([this, sessionId](NThreading::TFuture<void> fut) {
- if (fut.HasException()) {
- TGuard<TMutex> lock(ProgressMutex);
- RunningQueries.erase(sessionId);
- } else {
- TGuard<TMutex> lock(ProgressMutex);
- auto it = RunningQueries.find(sessionId);
- if (it != RunningQueries.end()) {
- RequestQueryStatus(sessionId);
- }
+ auto self = weak_from_this();
+ Delay(TDuration::MilliSeconds(1000)).Subscribe([self, sessionId](TFuture<void> fut) {
+ auto this_ = self.lock();
+ if (!this_) {
+ return;
}
+
+ this_->StartQueryStatusRequest(sessionId, !fut.HasException());
});
}
void SchedulePingSessionRequest(const TString& sessionId) {
- auto callback = [this, sessionId](
+ auto self = weak_from_this();
+ auto callback = [self, sessionId](
NGrpc::TGrpcStatus&& status,
Yql::DqsProto::PingSessionResponse&&) mutable
{
+ auto this_ = self.lock();
+ if (!this_) {
+ return;
+ }
+
if (status.GRpcStatusCode == grpc::INVALID_ARGUMENT) {
YQL_CLOG(INFO, ProviderDq) << "Session closed " << sessionId;
} else {
- SchedulePingSessionRequest(sessionId);
+ this_->SchedulePingSessionRequest(sessionId);
}
};
- Delay(TDuration::Seconds(10), PRIO_RT).Subscribe([this, callback, sessionId](const NThreading::TFuture<void>&) {
+ Delay(TDuration::Seconds(10)).Subscribe([self, callback, sessionId](const TFuture<void>&) {
+ auto this_ = self.lock();
+ if (!this_) {
+ return;
+ }
+
Yql::DqsProto::PingSessionRequest query;
query.SetSession(sessionId);
- Service->DoRequest<Yql::DqsProto::PingSessionRequest, Yql::DqsProto::PingSessionResponse>(
+
+ this_->Service->DoRequest<Yql::DqsProto::PingSessionRequest, Yql::DqsProto::PingSessionResponse>(
query,
callback,
&Yql::DqsProto::DqService::Stub::AsyncPingSession);
@@ -409,7 +470,7 @@ public:
}
struct TDelay: public TTaskScheduler::ITask {
- TDelay(NThreading::TPromise<void> p)
+ TDelay(TPromise<void> p)
: Promise(std::move(p))
{ }
@@ -418,7 +479,7 @@ public:
return TInstant::Max();
}
- NThreading::TPromise<void> Promise;
+ TPromise<void> Promise;
};
private:
@@ -428,22 +489,63 @@ private:
TMutex ProgressMutex;
TMutex Mutex;
- THashMap<TString, std::pair<TDqProgressWriter, TString>> RunningQueries;
+
+ struct TSession {
+ std::optional<TDqProgressWriter> ProgressWriter;
+ TString Status;
+ TFuture<void> OpenSessionFuture;
+ };
+ THashMap<TString, TSession> RunningQueries;
TString VanillaJobPath;
TString VanillaJobMd5;
TTaskScheduler TaskScheduler;
- TTaskScheduler RtTaskScheduler;
-
const TDuration OpenSessionTimeout;
};
-TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port, int threads) {
- return new TDqGateway(host, port, threads, "", "");
+class TDqGateway: public IDqGateway {
+public:
+ TDqGateway(const TString& host, int port, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout = TDuration::Minutes(60), TDuration requestTimeout = TDuration::Max())
+ : Impl(std::make_shared<TDqGatewayImpl>(host, port, vanillaJobPath, vanillaJobMd5, timeout, requestTimeout))
+ { }
+
+ TFuture<void> OpenSession(const TString& sessionId, const TString& username) override
+ {
+ return Impl->OpenSession(sessionId, username);
+ }
+
+ void CloseSession(const TString& sessionId) override
+ {
+ Impl->CloseSession(sessionId);
+ }
+
+ TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
+ const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
+ const TDqSettings::TPtr& settings,
+ const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
+ bool discard) override
+ {
+ return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard);
+ }
+
+ TString GetVanillaJobPath() override {
+ return Impl->GetVanillaJobPath();
+ }
+
+ TString GetVanillaJobMd5() override {
+ return Impl->GetVanillaJobMd5();
+ }
+
+private:
+ std::shared_ptr<TDqGatewayImpl> Impl;
+};
+
+TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port) {
+ return new TDqGateway(host, port, "", "");
}
TIntrusivePtr<IDqGateway> CreateDqGateway(const NProto::TDqConfig& config) {
- return new TDqGateway("localhost", config.GetPort(), 8,
+ return new TDqGateway("localhost", config.GetPort(),
config.GetYtBackends()[0].GetVanillaJob(),
config.GetYtBackends()[0].GetVanillaJobMd5(),
TDuration::MilliSeconds(config.GetOpenSessionTimeoutMs()),
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
index b335230e0bd..cf648de0bea 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
@@ -79,7 +79,7 @@ public:
}
};
-TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port, int threads);
+TIntrusivePtr<IDqGateway> CreateDqGateway(const TString& host, int port);
TIntrusivePtr<IDqGateway> CreateDqGateway(const NProto::TDqConfig& config);
} // namespace NYql