aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgritukan <gritukan@yandex-team.com>2023-12-07 02:29:59 +0300
committergritukan <gritukan@yandex-team.com>2023-12-07 02:54:42 +0300
commit6c084f56f0fc117bdf761a3d5f0b9234cf818aee (patch)
tree6b733f71f482e400f8512d97366418e24eff06b1
parentc1e7152671340863e91efd85d9be2435ccfc1988 (diff)
downloadydb-6c084f56f0fc117bdf761a3d5f0b9234cf818aee.tar.gz
Make async query abort for YQL
-rw-r--r--ydb/core/fq/libs/gateway/empty_gateway.cpp3
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp109
-rw-r--r--ydb/library/yql/core/facade/yql_facade.h7
-rw-r--r--ydb/library/yql/core/yql_data_provider.h6
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp8
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.h10
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp6
-rw-r--r--ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp3
-rw-r--r--ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp4
-rw-r--r--ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h2
-rw-r--r--ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp6
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_gateway.h2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp4
-rw-r--r--ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp6
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp60
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp53
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp5
-rw-r--r--ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp8
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_gateway.h4
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp17
21 files changed, 219 insertions, 110 deletions
diff --git a/ydb/core/fq/libs/gateway/empty_gateway.cpp b/ydb/core/fq/libs/gateway/empty_gateway.cpp
index 36981bad7d..72fd0a342f 100644
--- a/ydb/core/fq/libs/gateway/empty_gateway.cpp
+++ b/ydb/core/fq/libs/gateway/empty_gateway.cpp
@@ -21,8 +21,9 @@ public:
return result;
}
- void CloseSession(const TString& action) override {
+ NThreading::TFuture<void> CloseSessionAsync(const TString& action) override {
Y_UNUSED(action);
+ return NThreading::MakeFuture();
}
NThreading::TFuture<TResult> ExecutePlan(
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index ef3adf31c1..77d958d103 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -301,9 +301,11 @@ TProgram::TProgram(
TProgram::~TProgram() {
try {
- CloseLastSession();
+ CloseLastSession().GetValueSync();
// stop all non complete execution before deleting TExprCtx
- DataProviders_.clear();
+ with_lock (DataProvidersLock_) {
+ DataProviders_.clear();
+ }
} catch (...) {
Cerr << CurrentExceptionMessage() << Endl;
}
@@ -682,7 +684,12 @@ TProgram::TFutureStatus TProgram::ValidateAsync(const TString& username, IOutput
}
TypeCtx_->IsReadOnly = true;
- for (const auto& dp : DataProviders_) {
+ TVector<TDataProviderInfo> dataProviders;
+ with_lock (DataProvidersLock_) {
+ dataProviders = DataProviders_;
+ }
+
+ for (const auto& dp : dataProviders) {
if (!dp.RemoteClusterProvider || !dp.RemoteValidate) {
continue;
}
@@ -749,7 +756,12 @@ TProgram::TFutureStatus TProgram::OptimizeAsync(
}
TypeCtx_->IsReadOnly = true;
- for (const auto& dp : DataProviders_) {
+ TVector<TDataProviderInfo> dataProviders;
+ with_lock (DataProvidersLock_) {
+ dataProviders = DataProviders_;
+ }
+
+ for (const auto& dp : dataProviders) {
if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) {
continue;
}
@@ -813,6 +825,11 @@ TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig(
}
TypeCtx_->IsReadOnly = true;
+ TVector<TDataProviderInfo> dataProviders;
+ with_lock (DataProvidersLock_) {
+ dataProviders = DataProviders_;
+ }
+
for (const auto& dp : DataProviders_) {
if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) {
continue;
@@ -944,6 +961,11 @@ TProgram::TFutureStatus TProgram::RunAsync(
}
TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable);
+ TVector<TDataProviderInfo> dataProviders;
+ with_lock (DataProvidersLock_) {
+ dataProviders = DataProviders_;
+ }
+
for (const auto& dp : DataProviders_) {
if (!dp.RemoteClusterProvider || !dp.RemoteRun) {
continue;
@@ -1018,6 +1040,11 @@ TProgram::TFutureStatus TProgram::RunAsyncWithConfig(
}
TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable);
+ TVector<TDataProviderInfo> dataProviders;
+ with_lock (DataProvidersLock_) {
+ dataProviders = DataProviders_;
+ }
+
for (const auto& dp : DataProviders_) {
if (!dp.RemoteClusterProvider || !dp.RemoteRun) {
continue;
@@ -1428,39 +1455,63 @@ TProgram::TFutureStatus TProgram::ContinueAsync() {
return AsyncTransformWithFallback(true);
}
-void TProgram::Abort()
+NThreading::TFuture<void> TProgram::Abort()
{
- CloseLastSession();
+ return CloseLastSession();
}
-void TProgram::CleanupLastSession() {
+NThreading::TFuture<void> TProgram::CleanupLastSession() {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
TString sessionId = GetSessionId();
if (sessionId.empty()) {
- return;
+ return MakeFuture();
}
- for (const auto& dp : DataProviders_) {
+ TVector<TDataProviderInfo> dataProviders;
+ with_lock (DataProvidersLock_) {
+ dataProviders = DataProviders_;
+ }
+
+ TVector<NThreading::TFuture<void>> cleanupFutures;
+ cleanupFutures.reserve(dataProviders.size());
+ for (const auto& dp : dataProviders) {
if (dp.CleanupSession) {
dp.CleanupSession(sessionId);
}
+ if (dp.CleanupSessionAsync) {
+ cleanupFutures.push_back(dp.CleanupSessionAsync(sessionId));
+ }
}
+
+ return NThreading::WaitExceptionOrAll(cleanupFutures);
}
-void TProgram::CloseLastSession() {
+NThreading::TFuture<void> TProgram::CloseLastSession() {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
TString sessionId = TakeSessionId();
if (sessionId.empty()) {
- return;
+ return MakeFuture();
}
- for (const auto& dp : DataProviders_) {
+ TVector<TDataProviderInfo> dataProviders;
+ with_lock (DataProvidersLock_) {
+ dataProviders = DataProviders_;
+ }
+
+ TVector<NThreading::TFuture<void>> closeFutures;
+ closeFutures.reserve(dataProviders.size());
+ for (const auto& dp : dataProviders) {
if (dp.CloseSession) {
dp.CloseSession(sessionId);
}
+ if (dp.CloseSessionAsync) {
+ dp.CloseSessionAsync(sessionId);
+ }
}
+
+ return NThreading::WaitExceptionOrAll(closeFutures);
}
TString TProgram::ResultsAsString() const {
@@ -1523,7 +1574,9 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us
}
providerNames.insert(dp.Names.begin(), dp.Names.end());
- DataProviders_.emplace_back(dp);
+ with_lock (DataProvidersLock_) {
+ DataProviders_.emplace_back(dp);
+ }
if (dp.Source) {
typeAnnotationContext->AddDataSource(dp.Names, dp.Source);
}
@@ -1592,11 +1645,13 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us
TFuture<void> TProgram::OpenSession(const TString& username)
{
TVector<TFuture<void>> openFutures;
- for (const auto& dp : DataProviders_) {
- if (dp.OpenSession) {
- auto future = dp.OpenSession(SessionId_, username, ProgressWriter_, OperationOptions_,
- RandomProvider_, TimeProvider_);
- openFutures.push_back(future);
+ with_lock (DataProvidersLock_) {
+ for (const auto& dp : DataProviders_) {
+ if (dp.OpenSession) {
+ auto future = dp.OpenSession(SessionId_, username, ProgressWriter_, OperationOptions_,
+ RandomProvider_, TimeProvider_);
+ openFutures.push_back(future);
+ }
}
}
@@ -1627,20 +1682,26 @@ void TProgram::Print(IOutputStream* exprOut, IOutputStream* planOut, bool cleanP
}
bool TProgram::HasActiveProcesses() {
- for (const auto& dp : DataProviders_) {
- if (dp.HasActiveProcesses && dp.HasActiveProcesses()) {
- return true;
+ with_lock (DataProvidersLock_) {
+ for (const auto& dp : DataProviders_) {
+ if (dp.HasActiveProcesses && dp.HasActiveProcesses()) {
+ return true;
+ }
}
}
+
return false;
}
bool TProgram::NeedWaitForActiveProcesses() {
- for (const auto& dp : DataProviders_) {
- if (dp.HasActiveProcesses && dp.HasActiveProcesses() && dp.WaitForActiveProcesses) {
- return true;
+ with_lock (DataProvidersLock_) {
+ for (const auto& dp : DataProviders_) {
+ if (dp.HasActiveProcesses && dp.HasActiveProcesses() && dp.WaitForActiveProcesses) {
+ return true;
+ }
}
}
+
return false;
}
diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h
index 3f1fdb0f28..5252ba2d0f 100644
--- a/ydb/library/yql/core/facade/yql_facade.h
+++ b/ydb/library/yql/core/facade/yql_facade.h
@@ -186,7 +186,7 @@ public:
bool HasActiveProcesses();
bool NeedWaitForActiveProcesses();
- void Abort();
+ NThreading::TFuture<void> Abort();
inline TIssues Issues() {
if (ExprCtx_) {
@@ -362,8 +362,8 @@ private:
NThreading::TFuture<void> OpenSession(const TString& username);
- void CleanupLastSession();
- void CloseLastSession();
+ NThreading::TFuture<void> CleanupLastSession();
+ NThreading::TFuture<void> CloseLastSession();
TFutureStatus RemoteKikimrValidate(const TString& cluster);
TFutureStatus RemoteKikimrOptimize(const TString& cluster, const IPipelineConfigurator* pipelineConf);
@@ -384,6 +384,7 @@ private:
const TIntrusivePtr<ITimeProvider> TimeProvider_;
const ui64 NextUniqueId_;
TVector<TDataProviderInitializer> DataProvidersInit_;
+ TAdaptiveLock DataProvidersLock_;
TVector<TDataProviderInfo> DataProviders_;
TYqlOperationOptions OperationOptions_;
TCredentials::TPtr Credentials_;
diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h
index fd8a69594b..7859c88374 100644
--- a/ydb/library/yql/core/yql_data_provider.h
+++ b/ydb/library/yql/core/yql_data_provider.h
@@ -236,10 +236,14 @@ struct TDataProviderInfo {
std::function<bool()> HasActiveProcesses;
+ // COMPAT(gritukan): Remove it after Arcadia migration.
std::function<void(const TString& sessionId)> CloseSession;
-
std::function<void(const TString& sessionId)> CleanupSession;
+ std::function<NThreading::TFuture<void>(const TString& sessionId)> CloseSessionAsync;
+
+ std::function<NThreading::TFuture<void>(const TString& sessionId)> CleanupSessionAsync;
+
std::function<TString(const TString& url, const TString& alias)> TokenResolver;
};
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index e4d92817ea..372b7be9fc 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -140,8 +140,8 @@ public:
return Gateway->OpenSession(sessionId, username);
}
- void CloseSession(const TString& sessionId) {
- return Gateway->CloseSession(sessionId);
+ NThreading::TFuture<void> CloseSession(const TString& sessionId) {
+ return Gateway->CloseSessionAsync(sessionId);
}
NThreading::TFuture<IDqGateway::TResult>
@@ -220,7 +220,7 @@ public:
return Impl->OpenSession(sessionId, username);
}
- void CloseSession(const TString& sessionId) override {
+ NThreading::TFuture<void> CloseSessionAsync(const TString& sessionId) override {
return Impl->CloseSession(sessionId);
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
index 466be06e7e..9e9691c37d 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
@@ -364,7 +364,7 @@ public:
return MakeFuture();
}
- void CloseSession(const TString& sessionId) {
+ TFuture<void> CloseSession(const TString& sessionId) {
Yql::DqsProto::CloseSessionRequest request;
request.SetSession(sessionId);
@@ -380,6 +380,8 @@ public:
Service->DoRequest<Yql::DqsProto::CloseSessionRequest, Yql::DqsProto::CloseSessionResponse>(
request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession);
+
+ return MakeFuture();
}
void OnRequestQueryStatus(const TString& sessionId, const TString& status, bool ok) {
@@ -535,9 +537,9 @@ public:
return Impl->OpenSession(sessionId, username);
}
- void CloseSession(const TString& sessionId) override
+ TFuture<void> CloseSessionAsync(const TString& sessionId) override
{
- Impl->CloseSession(sessionId);
+ return Impl->CloseSession(sessionId);
}
TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
index 6b87aaed60..42d07c21f0 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
@@ -60,7 +60,15 @@ public:
virtual NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) = 0;
- virtual void CloseSession(const TString& sessionId) = 0;
+ // TODO(gritukan): Leave only CloseSessionAsync after Arcadia migration and make it pure virtual.
+ virtual void CloseSession(const TString& sessionId) {
+ Y_UNUSED(sessionId);
+ }
+
+ virtual NThreading::TFuture<void> CloseSessionAsync(const TString& sessionId) {
+ Y_UNUSED(sessionId);
+ return NThreading::MakeFuture();
+ }
virtual NThreading::TFuture<TResult>
ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
index 4877f4f458..c8c16b7447 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
@@ -115,7 +115,7 @@ TDataProviderInitializer GetDqDataProviderInitializer(
}
};
- info.CloseSession = [dqGateway, metrics](const TString& sessionId) {
+ info.CloseSessionAsync = [dqGateway, metrics](const TString& sessionId) {
if (metrics) {
metrics->IncCounter("dq", "CloseSession");
}
@@ -123,7 +123,11 @@ TDataProviderInitializer GetDqDataProviderInitializer(
if (dqGateway) { // nullptr in yqlrun
YQL_CLOG(DEBUG, ProviderDq) << "CloseSession " << sessionId;
dqGateway->CloseSession(sessionId);
+
+ return dqGateway->CloseSessionAsync(sessionId);
}
+
+ return NThreading::MakeFuture();
};
return info;
diff --git a/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp b/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp
index 0cf3a1ccc2..a781eabf00 100644
--- a/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp
+++ b/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp
@@ -51,8 +51,9 @@ TDataProviderInitializer GetPgDataProviderInitializer() {
return NThreading::MakeFuture();
};
- info.CloseSession = [](const TString& sessionId) {
+ info.CloseSessionAsync = [](const TString& sessionId) {
Y_UNUSED(sessionId);
+ return NThreading::MakeFuture();
};
return info;
diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
index b04f11ac02..ea1eb61432 100644
--- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
+++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
@@ -18,11 +18,13 @@ NThreading::TFuture<void> TDummyPqGateway::OpenSession(const TString& sessionId,
return NThreading::MakeFuture();
}
-void TDummyPqGateway::CloseSession(const TString& sessionId) {
+NThreading::TFuture<void> TDummyPqGateway::CloseSession(const TString& sessionId) {
with_lock (Mutex) {
Y_ENSURE(IsIn(OpenedSessions, sessionId), "Session " << sessionId << " is not opened in pq gateway");
OpenedSessions.erase(sessionId);
}
+
+ return NThreading::MakeFuture();
}
NPq::NConfigurationManager::TAsyncDescribePathResult TDummyPqGateway::DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) {
diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
index 56cd4604de..fe838c5b1f 100644
--- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
+++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
@@ -32,7 +32,7 @@ public:
public:
NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override;
- void CloseSession(const TString& sessionId) override;
+ NThreading::TFuture<void> CloseSession(const TString& sessionId) override;
::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(
const TString& sessionId,
diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp
index 59f5d1b59f..cdd6d3c09a 100644
--- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp
+++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp
@@ -17,7 +17,7 @@ public:
~TPqNativeGateway();
NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override;
- void CloseSession(const TString& sessionId) override;
+ NThreading::TFuture<void> CloseSession(const TString& sessionId) override;
NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(
const TString& sessionId,
@@ -108,10 +108,12 @@ NThreading::TFuture<void> TPqNativeGateway::OpenSession(const TString& sessionId
return NThreading::MakeFuture();
}
-void TPqNativeGateway::CloseSession(const TString& sessionId) {
+NThreading::TFuture<void> TPqNativeGateway::CloseSession(const TString& sessionId) {
with_lock (Mutex) {
Sessions.erase(sessionId);
}
+
+ return NThreading::MakeFuture();
}
TPqSession::TPtr TPqNativeGateway::GetExistingSession(const TString& sessionId) const {
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h
index bb9ac32ff0..f46931022f 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h
@@ -18,7 +18,7 @@ struct IPqGateway : public TThrRefBase {
};
virtual NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) = 0;
- virtual void CloseSession(const TString& sessionId) = 0;
+ virtual NThreading::TFuture<void> CloseSession(const TString& sessionId) = 0;
// CM API.
virtual ::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) = 0;
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
index 953f8ed9f7..911733f15f 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
@@ -58,8 +58,8 @@ TDataProviderInitializer GetPqDataProviderInitializer(
return gateway->OpenSession(sessionId, username);
};
- info.CloseSession = [gateway](const TString& sessionId) {
- gateway->CloseSession(sessionId);
+ info.CloseSessionAsync = [gateway](const TString& sessionId) {
+ return gateway->CloseSession(sessionId);
};
return info;
diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
index 071ac579d2..e26de5605a 100644
--- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
+++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
@@ -356,12 +356,14 @@ public:
}
}
- void CloseSession(TCloseSessionOptions&& options) final {
+ NThreading::TFuture<void> CloseSession(TCloseSessionOptions&& options) final {
Sessions.erase(options.SessionId());
+ return MakeFuture();
}
- void CleanupSession(TCleanupSessionOptions&& options) final {
+ NThreading::TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final {
Y_UNUSED(options);
+ return MakeFuture();
}
template<typename T>
diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
index 9ad5e08813..37ccd690b5 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
+++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
@@ -406,49 +406,43 @@ void TTransactionCache::AbortAll() {
with_lock(Lock_) {
txMap.swap(TxMap_);
}
+
+ TString error;
+ auto abortTx = [&] (const ITransactionPtr& tx) {
+ try {
+ tx->Abort();
+ } catch (...) {
+ YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
+
+ // Store first abort error.
+ if (error.empty()) {
+ error = "Failed to abort transaction " + GetGuidAsString(tx->GetId()) + ": " + CurrentExceptionMessage();
+ }
+ }
+ };
+
for (auto& item : txMap) {
auto entry = item.second;
for (auto& item: entry->SnapshotTxs) {
- try {
- YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Snapshot tx " << GetGuidAsString(item.second->GetId());
- item.second->Abort();
- } catch (...) {
- YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
- }
+ YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Snapshot tx " << GetGuidAsString(item.second->GetId());
+ abortTx(item.second);
}
for (auto& item : entry->CheckpointTxs) {
- try {
- YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Checkpoint tx " << GetGuidAsString(item.second->GetId());
- item.second->Abort();
- } catch (...) {
- YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
- }
+ YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Checkpoint tx " << GetGuidAsString(item.second->GetId());
+ abortTx(item.second);
}
for (auto& item: entry->WriteTxs) {
- try {
- YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Write tx " << GetGuidAsString(item.second->GetId());
- item.second->Abort();
- } catch (...) {
- YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
- }
+ YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Write tx " << GetGuidAsString(item.second->GetId());
+ abortTx(item.second);
}
if (entry->BinarySnapshotTx) {
YQL_CLOG(INFO, ProviderYt) << "AbortAll(): Aborting BinarySnapshot tx " << GetGuidAsString(entry->BinarySnapshotTx->GetId());
- try {
- entry->BinarySnapshotTx->Abort();
- } catch (...) {
- YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
- }
+ abortTx(entry->BinarySnapshotTx);
}
-
if (entry->Tx) {
YQL_CLOG(INFO, ProviderYt) << "Aborting tx " << GetGuidAsString(entry->Tx->GetId()) << " on " << item.first;
- try {
- entry->Tx->Abort();
- } catch (...) {
- YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
- }
+ abortTx(entry->Tx);
}
if (entry->Client) {
@@ -456,10 +450,16 @@ void TTransactionCache::AbortAll() {
try {
entry->Client->Shutdown();
} catch (...) {
- YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
+ if (!error) {
+ error = "Failed to shut down client: " + CurrentExceptionMessage();
+ }
}
}
}
+
+ if (error) {
+ ythrow yexception() << error;
+ }
}
} // NYql
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
index e84e1db8cf..71fabca7d0 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -230,36 +230,49 @@ public:
}
}
- void CloseSession(TCloseSessionOptions&& options) final {
+ TFuture<void> CloseSession(TCloseSessionOptions&& options) final {
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
- try {
- with_lock(Mutex_) {
- auto it = Sessions_.find(options.SessionId());
- if (it != Sessions_.end()) {
- auto session = it->second;
- Sessions_.erase(it);
+
+ with_lock(Mutex_) {
+ auto it = Sessions_.find(options.SessionId());
+ if (it != Sessions_.end()) {
+ auto session = it->second;
+ Sessions_.erase(it);
+ try {
session->Close();
- session.Drop();
+ } catch (...) {
+ YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
+ return MakeErrorFuture<void>(std::current_exception());
}
}
- } catch (const yexception& e) {
- YQL_CLOG(ERROR, ProviderYt) << e.what();
}
+
+ return MakeFuture();
}
- void CleanupSession(TCleanupSessionOptions&& options) final {
+ TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final {
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
- try {
- if (auto session = GetSession(options.SessionId(), false)) {
- session->TxCache_.AbortAll();
- if (session->OperationSemaphore) {
- session->OperationSemaphore->Cancel();
- session->OperationSemaphore.Drop();
- }
+
+ if (auto session = GetSession(options.SessionId(), false)) {
+ if (session->OperationSemaphore) {
+ session->OperationSemaphore->Cancel();
+ session->OperationSemaphore.Drop();
}
- } catch (...) {
- YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
+ auto logCtx = NYql::NLog::CurrentLogContextPath();
+ return session->Queue_->Async([session, logCtx] {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx);
+ try {
+ session->TxCache_.AbortAll();
+ } catch (...) {
+ YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
+ return MakeErrorFuture<void>(std::current_exception());
+ }
+
+ return MakeFuture();
+ });
}
+
+ return MakeFuture();
}
TFuture<TFinalizeResult> Finalize(TFinalizeOptions&& options) final {
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
index d9eef11818..0bc230fe11 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
@@ -33,11 +33,16 @@ void TSession::Close() {
if (OperationSemaphore) {
OperationSemaphore->Cancel();
}
+
try {
TxCache_.AbortAll();
} catch (...) {
YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
+ OpTracker_->Stop();
+ Queue_->Stop();
+ throw;
}
+
OpTracker_->Stop();
Queue_->Stop();
}
diff --git a/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp b/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp
index 07b3bc31b1..7eda12658b 100644
--- a/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp
+++ b/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp
@@ -21,14 +21,14 @@ public:
Slave_->OpenSession(std::move(options));
}
- void CloseSession(TCloseSessionOptions&& options) final {
+ TFuture<void> CloseSession(TCloseSessionOptions&& options) final {
YQL_PROFILE_FUNC(TRACE);
- Slave_->CloseSession(std::move(options));
+ return Slave_->CloseSession(std::move(options));
}
- void CleanupSession(TCleanupSessionOptions&& options) final {
+ TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final {
YQL_PROFILE_FUNC(TRACE);
- Slave_->CleanupSession(std::move(options));
+ return Slave_->CleanupSession(std::move(options));
}
TFuture<TFinalizeResult> Finalize(TFinalizeOptions&& options) final {
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
index b01ee60a31..d947f58407 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
@@ -562,9 +562,9 @@ public:
virtual void OpenSession(TOpenSessionOptions&& options) = 0;
- virtual void CloseSession(TCloseSessionOptions&& options) = 0;
+ virtual NThreading::TFuture<void> CloseSession(TCloseSessionOptions&& options) = 0;
- virtual void CleanupSession(TCleanupSessionOptions&& options) = 0;
+ virtual NThreading::TFuture<void> CleanupSession(TCleanupSessionOptions&& options) = 0;
virtual NThreading::TFuture<TFinalizeResult> Finalize(TFinalizeOptions&& options) = 0;
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp
index 77301f3da7..0b7bd3551d 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp
@@ -406,15 +406,18 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat
return NThreading::MakeFuture();
};
- info.CleanupSession = [ytState, gateway](const TString& sessionId) {
- gateway->CleanupSession(IYtGateway::TCleanupSessionOptions(sessionId));
+ info.CleanupSessionAsync = [ytState, gateway](const TString& sessionId) {
+ return gateway->CleanupSession(IYtGateway::TCleanupSessionOptions(sessionId));
};
- info.CloseSession = [ytState, gateway](const TString& sessionId) {
- gateway->CloseSession(IYtGateway::TCloseSessionOptions(sessionId));
- // do manual cleanup; otherwise there may be dead nodes at program termination
- // in setup with several providers
- ytState->TablesData->CleanupCompiledSQL();
+ info.CloseSessionAsync = [ytState, gateway](const TString& sessionId) {
+ return gateway->CloseSession(IYtGateway::TCloseSessionOptions(sessionId)).Apply([ytState](const NThreading::TFuture<void>& future) {
+ // do manual cleanup; otherwise there may be dead nodes at program termination
+ // in setup with several providers
+ ytState->TablesData->CleanupCompiledSQL();
+
+ future.TryRethrow();
+ });
};
info.TokenResolver = [ytState, gateway](const TString& url, const TString& alias) -> TString {