diff options
author | gritukan <gritukan@yandex-team.com> | 2023-12-07 02:29:59 +0300 |
---|---|---|
committer | gritukan <gritukan@yandex-team.com> | 2023-12-07 02:54:42 +0300 |
commit | 6c084f56f0fc117bdf761a3d5f0b9234cf818aee (patch) | |
tree | 6b733f71f482e400f8512d97366418e24eff06b1 | |
parent | c1e7152671340863e91efd85d9be2435ccfc1988 (diff) | |
download | ydb-6c084f56f0fc117bdf761a3d5f0b9234cf818aee.tar.gz |
Make async query abort for YQL
21 files changed, 219 insertions, 110 deletions
diff --git a/ydb/core/fq/libs/gateway/empty_gateway.cpp b/ydb/core/fq/libs/gateway/empty_gateway.cpp index 36981bad7d..72fd0a342f 100644 --- a/ydb/core/fq/libs/gateway/empty_gateway.cpp +++ b/ydb/core/fq/libs/gateway/empty_gateway.cpp @@ -21,8 +21,9 @@ public: return result; } - void CloseSession(const TString& action) override { + NThreading::TFuture<void> CloseSessionAsync(const TString& action) override { Y_UNUSED(action); + return NThreading::MakeFuture(); } NThreading::TFuture<TResult> ExecutePlan( diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index ef3adf31c1..77d958d103 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -301,9 +301,11 @@ TProgram::TProgram( TProgram::~TProgram() { try { - CloseLastSession(); + CloseLastSession().GetValueSync(); // stop all non complete execution before deleting TExprCtx - DataProviders_.clear(); + with_lock (DataProvidersLock_) { + DataProviders_.clear(); + } } catch (...) { Cerr << CurrentExceptionMessage() << Endl; } @@ -682,7 +684,12 @@ TProgram::TFutureStatus TProgram::ValidateAsync(const TString& username, IOutput } TypeCtx_->IsReadOnly = true; - for (const auto& dp : DataProviders_) { + TVector<TDataProviderInfo> dataProviders; + with_lock (DataProvidersLock_) { + dataProviders = DataProviders_; + } + + for (const auto& dp : dataProviders) { if (!dp.RemoteClusterProvider || !dp.RemoteValidate) { continue; } @@ -749,7 +756,12 @@ TProgram::TFutureStatus TProgram::OptimizeAsync( } TypeCtx_->IsReadOnly = true; - for (const auto& dp : DataProviders_) { + TVector<TDataProviderInfo> dataProviders; + with_lock (DataProvidersLock_) { + dataProviders = DataProviders_; + } + + for (const auto& dp : dataProviders) { if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) { continue; } @@ -813,6 +825,11 @@ TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig( } TypeCtx_->IsReadOnly = true; + TVector<TDataProviderInfo> dataProviders; + with_lock (DataProvidersLock_) { + dataProviders = DataProviders_; + } + for (const auto& dp : DataProviders_) { if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) { continue; @@ -944,6 +961,11 @@ TProgram::TFutureStatus TProgram::RunAsync( } TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable); + TVector<TDataProviderInfo> dataProviders; + with_lock (DataProvidersLock_) { + dataProviders = DataProviders_; + } + for (const auto& dp : DataProviders_) { if (!dp.RemoteClusterProvider || !dp.RemoteRun) { continue; @@ -1018,6 +1040,11 @@ TProgram::TFutureStatus TProgram::RunAsyncWithConfig( } TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable); + TVector<TDataProviderInfo> dataProviders; + with_lock (DataProvidersLock_) { + dataProviders = DataProviders_; + } + for (const auto& dp : DataProviders_) { if (!dp.RemoteClusterProvider || !dp.RemoteRun) { continue; @@ -1428,39 +1455,63 @@ TProgram::TFutureStatus TProgram::ContinueAsync() { return AsyncTransformWithFallback(true); } -void TProgram::Abort() +NThreading::TFuture<void> TProgram::Abort() { - CloseLastSession(); + return CloseLastSession(); } -void TProgram::CleanupLastSession() { +NThreading::TFuture<void> TProgram::CleanupLastSession() { YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId()); TString sessionId = GetSessionId(); if (sessionId.empty()) { - return; + return MakeFuture(); } - for (const auto& dp : DataProviders_) { + TVector<TDataProviderInfo> dataProviders; + with_lock (DataProvidersLock_) { + dataProviders = DataProviders_; + } + + TVector<NThreading::TFuture<void>> cleanupFutures; + cleanupFutures.reserve(dataProviders.size()); + for (const auto& dp : dataProviders) { if (dp.CleanupSession) { dp.CleanupSession(sessionId); } + if (dp.CleanupSessionAsync) { + cleanupFutures.push_back(dp.CleanupSessionAsync(sessionId)); + } } + + return NThreading::WaitExceptionOrAll(cleanupFutures); } -void TProgram::CloseLastSession() { +NThreading::TFuture<void> TProgram::CloseLastSession() { YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId()); TString sessionId = TakeSessionId(); if (sessionId.empty()) { - return; + return MakeFuture(); } - for (const auto& dp : DataProviders_) { + TVector<TDataProviderInfo> dataProviders; + with_lock (DataProvidersLock_) { + dataProviders = DataProviders_; + } + + TVector<NThreading::TFuture<void>> closeFutures; + closeFutures.reserve(dataProviders.size()); + for (const auto& dp : dataProviders) { if (dp.CloseSession) { dp.CloseSession(sessionId); } + if (dp.CloseSessionAsync) { + dp.CloseSessionAsync(sessionId); + } } + + return NThreading::WaitExceptionOrAll(closeFutures); } TString TProgram::ResultsAsString() const { @@ -1523,7 +1574,9 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us } providerNames.insert(dp.Names.begin(), dp.Names.end()); - DataProviders_.emplace_back(dp); + with_lock (DataProvidersLock_) { + DataProviders_.emplace_back(dp); + } if (dp.Source) { typeAnnotationContext->AddDataSource(dp.Names, dp.Source); } @@ -1592,11 +1645,13 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us TFuture<void> TProgram::OpenSession(const TString& username) { TVector<TFuture<void>> openFutures; - for (const auto& dp : DataProviders_) { - if (dp.OpenSession) { - auto future = dp.OpenSession(SessionId_, username, ProgressWriter_, OperationOptions_, - RandomProvider_, TimeProvider_); - openFutures.push_back(future); + with_lock (DataProvidersLock_) { + for (const auto& dp : DataProviders_) { + if (dp.OpenSession) { + auto future = dp.OpenSession(SessionId_, username, ProgressWriter_, OperationOptions_, + RandomProvider_, TimeProvider_); + openFutures.push_back(future); + } } } @@ -1627,20 +1682,26 @@ void TProgram::Print(IOutputStream* exprOut, IOutputStream* planOut, bool cleanP } bool TProgram::HasActiveProcesses() { - for (const auto& dp : DataProviders_) { - if (dp.HasActiveProcesses && dp.HasActiveProcesses()) { - return true; + with_lock (DataProvidersLock_) { + for (const auto& dp : DataProviders_) { + if (dp.HasActiveProcesses && dp.HasActiveProcesses()) { + return true; + } } } + return false; } bool TProgram::NeedWaitForActiveProcesses() { - for (const auto& dp : DataProviders_) { - if (dp.HasActiveProcesses && dp.HasActiveProcesses() && dp.WaitForActiveProcesses) { - return true; + with_lock (DataProvidersLock_) { + for (const auto& dp : DataProviders_) { + if (dp.HasActiveProcesses && dp.HasActiveProcesses() && dp.WaitForActiveProcesses) { + return true; + } } } + return false; } diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index 3f1fdb0f28..5252ba2d0f 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -186,7 +186,7 @@ public: bool HasActiveProcesses(); bool NeedWaitForActiveProcesses(); - void Abort(); + NThreading::TFuture<void> Abort(); inline TIssues Issues() { if (ExprCtx_) { @@ -362,8 +362,8 @@ private: NThreading::TFuture<void> OpenSession(const TString& username); - void CleanupLastSession(); - void CloseLastSession(); + NThreading::TFuture<void> CleanupLastSession(); + NThreading::TFuture<void> CloseLastSession(); TFutureStatus RemoteKikimrValidate(const TString& cluster); TFutureStatus RemoteKikimrOptimize(const TString& cluster, const IPipelineConfigurator* pipelineConf); @@ -384,6 +384,7 @@ private: const TIntrusivePtr<ITimeProvider> TimeProvider_; const ui64 NextUniqueId_; TVector<TDataProviderInitializer> DataProvidersInit_; + TAdaptiveLock DataProvidersLock_; TVector<TDataProviderInfo> DataProviders_; TYqlOperationOptions OperationOptions_; TCredentials::TPtr Credentials_; diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h index fd8a69594b..7859c88374 100644 --- a/ydb/library/yql/core/yql_data_provider.h +++ b/ydb/library/yql/core/yql_data_provider.h @@ -236,10 +236,14 @@ struct TDataProviderInfo { std::function<bool()> HasActiveProcesses; + // COMPAT(gritukan): Remove it after Arcadia migration. std::function<void(const TString& sessionId)> CloseSession; - std::function<void(const TString& sessionId)> CleanupSession; + std::function<NThreading::TFuture<void>(const TString& sessionId)> CloseSessionAsync; + + std::function<NThreading::TFuture<void>(const TString& sessionId)> CleanupSessionAsync; + std::function<TString(const TString& url, const TString& alias)> TokenResolver; }; diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index e4d92817ea..372b7be9fc 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -140,8 +140,8 @@ public: return Gateway->OpenSession(sessionId, username); } - void CloseSession(const TString& sessionId) { - return Gateway->CloseSession(sessionId); + NThreading::TFuture<void> CloseSession(const TString& sessionId) { + return Gateway->CloseSessionAsync(sessionId); } NThreading::TFuture<IDqGateway::TResult> @@ -220,7 +220,7 @@ public: return Impl->OpenSession(sessionId, username); } - void CloseSession(const TString& sessionId) override { + NThreading::TFuture<void> CloseSessionAsync(const TString& sessionId) override { return Impl->CloseSession(sessionId); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 466be06e7e..9e9691c37d 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -364,7 +364,7 @@ public: return MakeFuture(); } - void CloseSession(const TString& sessionId) { + TFuture<void> CloseSession(const TString& sessionId) { Yql::DqsProto::CloseSessionRequest request; request.SetSession(sessionId); @@ -380,6 +380,8 @@ public: Service->DoRequest<Yql::DqsProto::CloseSessionRequest, Yql::DqsProto::CloseSessionResponse>( request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession); + + return MakeFuture(); } void OnRequestQueryStatus(const TString& sessionId, const TString& status, bool ok) { @@ -535,9 +537,9 @@ public: return Impl->OpenSession(sessionId, username); } - void CloseSession(const TString& sessionId) override + TFuture<void> CloseSessionAsync(const TString& sessionId) override { - Impl->CloseSession(sessionId); + return Impl->CloseSession(sessionId); } TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index 6b87aaed60..42d07c21f0 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -60,7 +60,15 @@ public: virtual NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) = 0; - virtual void CloseSession(const TString& sessionId) = 0; + // TODO(gritukan): Leave only CloseSessionAsync after Arcadia migration and make it pure virtual. + virtual void CloseSession(const TString& sessionId) { + Y_UNUSED(sessionId); + } + + virtual NThreading::TFuture<void> CloseSessionAsync(const TString& sessionId) { + Y_UNUSED(sessionId); + return NThreading::MakeFuture(); + } virtual NThreading::TFuture<TResult> ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp index 4877f4f458..c8c16b7447 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp @@ -115,7 +115,7 @@ TDataProviderInitializer GetDqDataProviderInitializer( } }; - info.CloseSession = [dqGateway, metrics](const TString& sessionId) { + info.CloseSessionAsync = [dqGateway, metrics](const TString& sessionId) { if (metrics) { metrics->IncCounter("dq", "CloseSession"); } @@ -123,7 +123,11 @@ TDataProviderInitializer GetDqDataProviderInitializer( if (dqGateway) { // nullptr in yqlrun YQL_CLOG(DEBUG, ProviderDq) << "CloseSession " << sessionId; dqGateway->CloseSession(sessionId); + + return dqGateway->CloseSessionAsync(sessionId); } + + return NThreading::MakeFuture(); }; return info; diff --git a/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp b/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp index 0cf3a1ccc2..a781eabf00 100644 --- a/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp +++ b/ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp @@ -51,8 +51,9 @@ TDataProviderInitializer GetPgDataProviderInitializer() { return NThreading::MakeFuture(); }; - info.CloseSession = [](const TString& sessionId) { + info.CloseSessionAsync = [](const TString& sessionId) { Y_UNUSED(sessionId); + return NThreading::MakeFuture(); }; return info; diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp index b04f11ac02..ea1eb61432 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp @@ -18,11 +18,13 @@ NThreading::TFuture<void> TDummyPqGateway::OpenSession(const TString& sessionId, return NThreading::MakeFuture(); } -void TDummyPqGateway::CloseSession(const TString& sessionId) { +NThreading::TFuture<void> TDummyPqGateway::CloseSession(const TString& sessionId) { with_lock (Mutex) { Y_ENSURE(IsIn(OpenedSessions, sessionId), "Session " << sessionId << " is not opened in pq gateway"); OpenedSessions.erase(sessionId); } + + return NThreading::MakeFuture(); } NPq::NConfigurationManager::TAsyncDescribePathResult TDummyPqGateway::DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) { diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h index 56cd4604de..fe838c5b1f 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h @@ -32,7 +32,7 @@ public: public: NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override; - void CloseSession(const TString& sessionId) override; + NThreading::TFuture<void> CloseSession(const TString& sessionId) override; ::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath( const TString& sessionId, diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp index 59f5d1b59f..cdd6d3c09a 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp @@ -17,7 +17,7 @@ public: ~TPqNativeGateway(); NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override; - void CloseSession(const TString& sessionId) override; + NThreading::TFuture<void> CloseSession(const TString& sessionId) override; NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath( const TString& sessionId, @@ -108,10 +108,12 @@ NThreading::TFuture<void> TPqNativeGateway::OpenSession(const TString& sessionId return NThreading::MakeFuture(); } -void TPqNativeGateway::CloseSession(const TString& sessionId) { +NThreading::TFuture<void> TPqNativeGateway::CloseSession(const TString& sessionId) { with_lock (Mutex) { Sessions.erase(sessionId); } + + return NThreading::MakeFuture(); } TPqSession::TPtr TPqNativeGateway::GetExistingSession(const TString& sessionId) const { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h index bb9ac32ff0..f46931022f 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h @@ -18,7 +18,7 @@ struct IPqGateway : public TThrRefBase { }; virtual NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) = 0; - virtual void CloseSession(const TString& sessionId) = 0; + virtual NThreading::TFuture<void> CloseSession(const TString& sessionId) = 0; // CM API. virtual ::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) = 0; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp index 953f8ed9f7..911733f15f 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp @@ -58,8 +58,8 @@ TDataProviderInitializer GetPqDataProviderInitializer( return gateway->OpenSession(sessionId, username); }; - info.CloseSession = [gateway](const TString& sessionId) { - gateway->CloseSession(sessionId); + info.CloseSessionAsync = [gateway](const TString& sessionId) { + return gateway->CloseSession(sessionId); }; return info; diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp index 071ac579d2..e26de5605a 100644 --- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -356,12 +356,14 @@ public: } } - void CloseSession(TCloseSessionOptions&& options) final { + NThreading::TFuture<void> CloseSession(TCloseSessionOptions&& options) final { Sessions.erase(options.SessionId()); + return MakeFuture(); } - void CleanupSession(TCleanupSessionOptions&& options) final { + NThreading::TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final { Y_UNUSED(options); + return MakeFuture(); } template<typename T> diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp index 9ad5e08813..37ccd690b5 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp @@ -406,49 +406,43 @@ void TTransactionCache::AbortAll() { with_lock(Lock_) { txMap.swap(TxMap_); } + + TString error; + auto abortTx = [&] (const ITransactionPtr& tx) { + try { + tx->Abort(); + } catch (...) { + YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); + + // Store first abort error. + if (error.empty()) { + error = "Failed to abort transaction " + GetGuidAsString(tx->GetId()) + ": " + CurrentExceptionMessage(); + } + } + }; + for (auto& item : txMap) { auto entry = item.second; for (auto& item: entry->SnapshotTxs) { - try { - YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Snapshot tx " << GetGuidAsString(item.second->GetId()); - item.second->Abort(); - } catch (...) { - YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); - } + YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Snapshot tx " << GetGuidAsString(item.second->GetId()); + abortTx(item.second); } for (auto& item : entry->CheckpointTxs) { - try { - YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Checkpoint tx " << GetGuidAsString(item.second->GetId()); - item.second->Abort(); - } catch (...) { - YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); - } + YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Checkpoint tx " << GetGuidAsString(item.second->GetId()); + abortTx(item.second); } for (auto& item: entry->WriteTxs) { - try { - YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Write tx " << GetGuidAsString(item.second->GetId()); - item.second->Abort(); - } catch (...) { - YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); - } + YQL_CLOG(DEBUG, ProviderYt) << "AbortAll(): Aborting Write tx " << GetGuidAsString(item.second->GetId()); + abortTx(item.second); } if (entry->BinarySnapshotTx) { YQL_CLOG(INFO, ProviderYt) << "AbortAll(): Aborting BinarySnapshot tx " << GetGuidAsString(entry->BinarySnapshotTx->GetId()); - try { - entry->BinarySnapshotTx->Abort(); - } catch (...) { - YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); - } + abortTx(entry->BinarySnapshotTx); } - if (entry->Tx) { YQL_CLOG(INFO, ProviderYt) << "Aborting tx " << GetGuidAsString(entry->Tx->GetId()) << " on " << item.first; - try { - entry->Tx->Abort(); - } catch (...) { - YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); - } + abortTx(entry->Tx); } if (entry->Client) { @@ -456,10 +450,16 @@ void TTransactionCache::AbortAll() { try { entry->Client->Shutdown(); } catch (...) { - YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); + if (!error) { + error = "Failed to shut down client: " + CurrentExceptionMessage(); + } } } } + + if (error) { + ythrow yexception() << error; + } } } // NYql diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp index e84e1db8cf..71fabca7d0 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -230,36 +230,49 @@ public: } } - void CloseSession(TCloseSessionOptions&& options) final { + TFuture<void> CloseSession(TCloseSessionOptions&& options) final { YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); - try { - with_lock(Mutex_) { - auto it = Sessions_.find(options.SessionId()); - if (it != Sessions_.end()) { - auto session = it->second; - Sessions_.erase(it); + + with_lock(Mutex_) { + auto it = Sessions_.find(options.SessionId()); + if (it != Sessions_.end()) { + auto session = it->second; + Sessions_.erase(it); + try { session->Close(); - session.Drop(); + } catch (...) { + YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); + return MakeErrorFuture<void>(std::current_exception()); } } - } catch (const yexception& e) { - YQL_CLOG(ERROR, ProviderYt) << e.what(); } + + return MakeFuture(); } - void CleanupSession(TCleanupSessionOptions&& options) final { + TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final { YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); - try { - if (auto session = GetSession(options.SessionId(), false)) { - session->TxCache_.AbortAll(); - if (session->OperationSemaphore) { - session->OperationSemaphore->Cancel(); - session->OperationSemaphore.Drop(); - } + + if (auto session = GetSession(options.SessionId(), false)) { + if (session->OperationSemaphore) { + session->OperationSemaphore->Cancel(); + session->OperationSemaphore.Drop(); } - } catch (...) { - YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); + auto logCtx = NYql::NLog::CurrentLogContextPath(); + return session->Queue_->Async([session, logCtx] { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx); + try { + session->TxCache_.AbortAll(); + } catch (...) { + YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); + return MakeErrorFuture<void>(std::current_exception()); + } + + return MakeFuture(); + }); } + + return MakeFuture(); } TFuture<TFinalizeResult> Finalize(TFinalizeOptions&& options) final { diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp index d9eef11818..0bc230fe11 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp @@ -33,11 +33,16 @@ void TSession::Close() { if (OperationSemaphore) { OperationSemaphore->Cancel(); } + try { TxCache_.AbortAll(); } catch (...) { YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); + OpTracker_->Stop(); + Queue_->Stop(); + throw; } + OpTracker_->Stop(); Queue_->Stop(); } diff --git a/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp b/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp index 07b3bc31b1..7eda12658b 100644 --- a/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp +++ b/ydb/library/yql/providers/yt/gateway/profile/yql_yt_profiling.cpp @@ -21,14 +21,14 @@ public: Slave_->OpenSession(std::move(options)); } - void CloseSession(TCloseSessionOptions&& options) final { + TFuture<void> CloseSession(TCloseSessionOptions&& options) final { YQL_PROFILE_FUNC(TRACE); - Slave_->CloseSession(std::move(options)); + return Slave_->CloseSession(std::move(options)); } - void CleanupSession(TCleanupSessionOptions&& options) final { + TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final { YQL_PROFILE_FUNC(TRACE); - Slave_->CleanupSession(std::move(options)); + return Slave_->CleanupSession(std::move(options)); } TFuture<TFinalizeResult> Finalize(TFinalizeOptions&& options) final { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h index b01ee60a31..d947f58407 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h @@ -562,9 +562,9 @@ public: virtual void OpenSession(TOpenSessionOptions&& options) = 0; - virtual void CloseSession(TCloseSessionOptions&& options) = 0; + virtual NThreading::TFuture<void> CloseSession(TCloseSessionOptions&& options) = 0; - virtual void CleanupSession(TCleanupSessionOptions&& options) = 0; + virtual NThreading::TFuture<void> CleanupSession(TCleanupSessionOptions&& options) = 0; virtual NThreading::TFuture<TFinalizeResult> Finalize(TFinalizeOptions&& options) = 0; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp index 77301f3da7..0b7bd3551d 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp @@ -406,15 +406,18 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat return NThreading::MakeFuture(); }; - info.CleanupSession = [ytState, gateway](const TString& sessionId) { - gateway->CleanupSession(IYtGateway::TCleanupSessionOptions(sessionId)); + info.CleanupSessionAsync = [ytState, gateway](const TString& sessionId) { + return gateway->CleanupSession(IYtGateway::TCleanupSessionOptions(sessionId)); }; - info.CloseSession = [ytState, gateway](const TString& sessionId) { - gateway->CloseSession(IYtGateway::TCloseSessionOptions(sessionId)); - // do manual cleanup; otherwise there may be dead nodes at program termination - // in setup with several providers - ytState->TablesData->CleanupCompiledSQL(); + info.CloseSessionAsync = [ytState, gateway](const TString& sessionId) { + return gateway->CloseSession(IYtGateway::TCloseSessionOptions(sessionId)).Apply([ytState](const NThreading::TFuture<void>& future) { + // do manual cleanup; otherwise there may be dead nodes at program termination + // in setup with several providers + ytState->TablesData->CleanupCompiledSQL(); + + future.TryRethrow(); + }); }; info.TokenResolver = [ytState, gateway](const TString& url, const TString& alias) -> TString { |