diff options
author | grigoriypisar <[email protected]> | 2025-08-26 21:30:21 +0300 |
---|---|---|
committer | grigoriypisar <[email protected]> | 2025-08-26 22:39:37 +0300 |
commit | 198afb5d9f1b0c8a133e331e020bbc231adbfd0c (patch) | |
tree | ab6e7c2cdbf5c172b5e994ecf1291f90f799a5e3 | |
parent | ae449eb2faf891d1d548a670549a952961009006 (diff) |
fixed YT provider state leak
Перенос исправления из github: <https://github.com/ydb-platform/ydb/pull/22817>
Исправлена циклическая зависимость из IntrusivePtr:
[YtState](https://nda.ya.ru/t/J6yQpZti7J8HqG -\> [Gateway](https://nda.ya.ru/t/QTw9-d3Y7J8HqH -\> [Sessions\_](https://nda.ya.ru/t/CbIrRfy_7J8HqJ -\> [StatWriter](https://nda.ya.ru/t/ZQiPgPQ97J8HqK -\> [YtState](https://nda.ya.ru/t/HyJmOnA-7J8HqR
commit_hash:c90e57ad30e2b05fa94eaac72786b1b9087d00c8
9 files changed, 133 insertions, 81 deletions
diff --git a/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp index e767b3237b9..574422d7033 100644 --- a/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp +++ b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp @@ -186,7 +186,7 @@ private: Y_UNIT_TEST_SUITE(YtNativeGateway) { -std::pair<TIntrusivePtr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TPortHolder& port, TTypeAnnotationContext* types) { +std::pair<std::shared_ptr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TPortHolder& port, TTypeAnnotationContext* types) { TYtNativeServices nativeServices; auto gatewaysConfig = MakeGatewaysConfig(port); nativeServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt()); @@ -194,7 +194,7 @@ std::pair<TIntrusivePtr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TP nativeServices.SecretMasker = CreateDummySecretMasker(); auto ytGateway = CreateYtNativeGateway(nativeServices); - auto ytState = MakeIntrusive<TYtState>(types); + auto ytState = std::make_shared<TYtState>(types); ytState->Gateway = ytGateway; InitializeYtGateway(ytGateway, ytState); diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 913d30ecc4b..03219980a32 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -152,7 +152,7 @@ namespace { class TYtDqIntegration: public TDqIntegrationBase { public: - TYtDqIntegration(TYtState* state) + TYtDqIntegration(TYtState::TWeakPtr state) : State_(state) { } @@ -175,7 +175,11 @@ public: flattenPaths.push_back(pathInfo); } } - auto result = EstimateDataSize(flattenPaths, Nothing(), *State_, ctx); + + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + + auto result = EstimateDataSize(flattenPaths, Nothing(), *ytState, ctx); size_t statIdx = 0; size_t pathIdx = 0; for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) { @@ -228,12 +232,15 @@ public: groupIdPathInfos.push_back(pathInfos); } - if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); settings.CanFallback && chunksCount > maxChunks) { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + + if (auto maxChunks = ytState->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); settings.CanFallback && chunksCount > maxChunks) { throw TFallbackError() << DqFallbackErrorMessageWrap( TStringBuilder() << "table with too many chunks: " << chunksCount << " > " << maxChunks); } if (hasErasure) { - if (auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster)) { + if (auto codecCpu = ytState->Configuration->ErasureCodecCpuForDq.Get(cluster)) { dataSizePerJob = Max(ui64(dataSizePerJob / *codecCpu), 10_KB); } else { hasErasure = false; @@ -242,7 +249,7 @@ public: auto maxTasks = settings.MaxPartitions; ui64 maxDataSizePerJob = 0; - if (State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) { + if (ytState->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) { TVector<TYtPathInfo::TPtr> paths; TVector<TString> keys; TMaybe<double> sample; @@ -253,7 +260,7 @@ public: for (const auto& [pathId, pathInfo] : Enumerate(pathInfos)) { auto tableName = pathInfo->Table->Name; if (pathInfo->Table->IsAnonymous && !TYtTableInfo::HasSubstAnonymousLabel(pathInfo->Table->FromNode.Cast())) { - tableName = State_->AnonymousLabels.Value(std::make_pair(cluster, tableName), TString()); + tableName = ytState->AnonymousLabels.Value(std::make_pair(cluster, tableName), TString()); YQL_ENSURE(tableName, "Unaccounted anonymous table: " << pathInfo->Table->Name); pathInfo->Table->Name = tableName; } @@ -266,12 +273,12 @@ public: dataSizePerJob /= *sample; } - auto res = State_->Gateway->GetTablePartitions(NYql::IYtGateway::TGetTablePartitionsOptions(State_->SessionId) + auto res = ytState->Gateway->GetTablePartitions(NYql::IYtGateway::TGetTablePartitionsOptions(ytState->SessionId) .Cluster(cluster) .MaxPartitions(maxTasks) .DataSizePerJob(dataSizePerJob) .AdjustDataWeightPerPartition(!settings.CanFallback) - .Config(State_->Configuration->Snapshot()) + .Config(ytState->Configuration->Snapshot()) .Paths(std::move(paths))); if (!res.Success()) { const auto message = DqFallbackErrorMessageWrap("failed to partition table"); @@ -375,11 +382,14 @@ public: } bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + if (TYtConfigure::Match(&node)) { if (node.ChildrenSize() >= 5) { if (node.Child(2)->Content() == "Attr" && node.Child(3)->Content() == "maxrowweight") { if (FromString<NSize::TSize>(node.Child(4)->Content()).GetValue()>NSize::FromMegaBytes(128)) { - State_->OnlyNativeExecution = true; + ytState->OnlyNativeExecution = true; return false; } else { return true; @@ -391,7 +401,7 @@ public: auto pragma = node.Child(3)->Content(); if (UNSUPPORTED_YT_PRAGMAS.contains(pragma)) { AddInfo(ctx, TStringBuilder() << "unsupported yt pragma: " << pragma, skipIssues); - State_->OnlyNativeExecution = true; + ytState->OnlyNativeExecution = true; return false; } @@ -400,7 +410,7 @@ public: for (const auto& pool : pools) { if (!POOL_TREES_WHITELIST.contains(pool)) { AddInfo(ctx, TStringBuilder() << "unsupported pool tree: " << pool, skipIssues); - State_->OnlyNativeExecution = true; + ytState->OnlyNativeExecution = true; return false; } } @@ -411,16 +421,19 @@ public: } bool CanRead(const TExprNode& node, TExprContext& ctx, bool skipIssues) override { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + if (TYtConfigure::Match(&node)) { return CheckPragmas(node, ctx, skipIssues); } else if (auto maybeRead = TMaybeNode<TYtReadTable>(&node)) { auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue(); - if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) { - AddMessage(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, State_->PassiveExecution); + if (!ytState->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) { + AddMessage(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, ytState->PassiveExecution); return false; } - const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false); - const auto enableDynamicStoreRead = State_->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false); + const auto canUseYtPartitioningApi = ytState->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false); + const auto enableDynamicStoreRead = ytState->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false); ui64 chunksCount = 0ull; for (auto section: maybeRead.Cast().Input()) { if (HasSettingsExcept(maybeRead.Cast().Input().Item(0).Settings().Ref(), DqReadSupportedSettings) || HasNonEmptyKeyFilter(maybeRead.Cast().Input().Item(0))) { @@ -433,43 +446,43 @@ public: } } } - AddMessage(ctx, info, skipIssues, State_->PassiveExecution); + AddMessage(ctx, info, skipIssues, ytState->PassiveExecution); return false; } for (auto path: section.Paths()) { if (!path.Table().Maybe<TYtTable>()) { - AddMessage(ctx, "non-table path", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "non-table path", skipIssues, ytState->PassiveExecution); return false; } else { auto pathInfo = TYtPathInfo(path); auto tableInfo = pathInfo.Table; auto epoch = TEpochInfo::Parse(path.Table().Maybe<TYtTable>().CommitEpoch().Ref()); if (!tableInfo->Stat) { - AddMessage(ctx, "table without statistics", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "table without statistics", skipIssues, ytState->PassiveExecution); return false; } else if (!tableInfo->RowSpec) { - AddMessage(ctx, "table without row spec", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "table without row spec", skipIssues, ytState->PassiveExecution); return false; } else if (!tableInfo->Meta) { - AddMessage(ctx, "table without meta", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "table without meta", skipIssues, ytState->PassiveExecution); return false; } else if (tableInfo->IsAnonymous) { - AddMessage(ctx, "anonymous table", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "anonymous table", skipIssues, ytState->PassiveExecution); return false; } else if ((!epoch.Empty() && *epoch.Get() > 0)) { - AddMessage(ctx, "table with non-empty epoch", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "table with non-empty epoch", skipIssues, ytState->PassiveExecution); return false; } else if (NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) { - AddMessage(ctx, "table with QB2 premapper", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "table with QB2 premapper", skipIssues, ytState->PassiveExecution); return false; } else if (pathInfo.Ranges && !canUseYtPartitioningApi) { - AddMessage(ctx, "table with ranges", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "table with ranges", skipIssues, ytState->PassiveExecution); return false; } else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) { - AddMessage(ctx, "dynamic table", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "dynamic table", skipIssues, ytState->PassiveExecution); return false; } else if (tableInfo->Meta->IsDynamic && tableInfo->Meta->Attrs.contains("enable_dynamic_store_read") && !enableDynamicStoreRead) { - AddMessage(ctx, "dynamic store read", skipIssues, State_->PassiveExecution); + AddMessage(ctx, "dynamic store read", skipIssues, ytState->PassiveExecution); return false; } @@ -477,8 +490,8 @@ public: } } } - if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) { - AddMessage(ctx, TStringBuilder() << "table with too many chunks: " << chunksCount << " > " << maxChunks, skipIssues, State_->PassiveExecution); + if (auto maxChunks = ytState->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) { + AddMessage(ctx, TStringBuilder() << "table with too many chunks: " << chunksCount << " > " << maxChunks, skipIssues, ytState->PassiveExecution); return false; } return true; @@ -488,18 +501,21 @@ public: } bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&) override { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + auto wrap = node.Cast<TDqReadWideWrap>(); auto maybeRead = wrap.Input().Maybe<TYtReadTable>(); if (!maybeRead) { return false; } - if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) { + if (!ytState->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) { return false; } - auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES); - auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES); + auto supportedTypes = ytState->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES); + auto supportedDataTypes = ytState->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES); const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>(); if (!CheckBlockReaderSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx, ctx.GetPosition(node.Pos()))) { return false; @@ -510,12 +526,12 @@ public: subTypeAnn.emplace_back(type->GetItemType()); } - if (!State_->Types->ArrowResolver) { + if (!ytState->Types->ArrowResolver) { BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "no arrow resolver provided"); return false; } - if (State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) { + if (ytState->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) { BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "arrow resolver don't support these types"); return false; } @@ -575,12 +591,15 @@ public: } TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + TVector<bool> hasErasurePerNode; hasErasurePerNode.reserve(nodes.size()); TVector<ui64> dataSizes(nodes.size()); THashMap<TString, TVector<std::pair<const TExprNode*, bool>>> clusterToNodesAndErasure; THashMap<TString, TVector<TVector<TYtPathInfo::TPtr>>> clusterToGroups; - const auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); + const auto maxChunks = ytState->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); ui64 chunksCount = 0u; for (const auto &node_: nodes) { @@ -590,8 +609,8 @@ public: auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue(); auto& groupIdPathInfo = clusterToGroups[cluster]; - const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false); - const auto enableDynamicStoreRead = State_->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false); + const auto canUseYtPartitioningApi = ytState->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false); + const auto enableDynamicStoreRead = ytState->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false); auto input = maybeRead.Cast().Input(); for (auto section: input) { @@ -635,7 +654,7 @@ public: ui64 dataSize = 0; for (auto& [cluster, info]: clusterToNodesAndErasure) { auto res = EstimateColumnStats(ctx, clusterToGroups[cluster], dataSize); - auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster); + auto codecCpu = ytState->Configuration->ErasureCodecCpuForDq.Get(cluster); if (!codecCpu) { continue; } @@ -663,10 +682,13 @@ public: } TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + if (auto maybeYtReadTable = TMaybeNode<TYtReadTable>(read)) { TMaybeNode<TCoSecureParam> secParams; const auto cluster = maybeYtReadTable.Cast().DataSource().Cluster(); - if (State_->Configuration->Auth.Get().GetOrElse(TString()) || State_->Configuration->Tokens.Value(cluster, "")) { + if (ytState->Configuration->Auth.Get().GetOrElse(TString()) || ytState->Configuration->Tokens.Value(cluster, "")) { secParams = Build<TCoSecureParam>(ctx, read->Pos()).Name().Build(TString("cluster:default_").append(cluster)).Done(); } return Build<TDqReadWrap>(ctx, read->Pos()) @@ -679,8 +701,11 @@ public: } TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + if (auto maybeWrite = TMaybeNode<TYtWriteTable>(write)) { - if (State_->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) { + if (ytState->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) { const auto& content = maybeWrite.Cast().Content(); if (TYtMaterialize::Match(&SkipCallables(content.Ref(), {TCoSort::CallableName(), TCoTopSort::CallableName(), TCoAssumeSorted::CallableName(), TCoAssumeConstraints::CallableName()}))) { return write; @@ -764,14 +789,17 @@ public: } TMaybe<bool> CanWrite(const TExprNode& node, TExprContext& ctx) override { + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + if (auto maybeWrite = TMaybeNode<TYtWriteTable>(&node)) { auto cluster = TString{maybeWrite.Cast().DataSink().Cluster().Value()}; auto tableName = TString{TYtTableInfo::GetTableLabel(maybeWrite.Cast().Table())}; auto epoch = TEpochInfo::Parse(maybeWrite.Cast().Table().CommitEpoch().Ref()); - auto tableDesc = State_->TablesData->GetTable(cluster, tableName, epoch); + auto tableDesc = ytState->TablesData->GetTable(cluster, tableName, epoch); - if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) { + if (!ytState->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) { AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, false); return false; } @@ -785,7 +813,7 @@ public: return false; } - if (!State_->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) { + if (!ytState->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) { const auto content = maybeWrite.Cast().Content().Raw(); if (const auto sorted = content->GetConstraint<TSortedConstraintNode>()) { if (const auto distinct = content->GetConstraint<TDistinctConstraintNode>()) { @@ -807,8 +835,11 @@ public: } void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override { - RegisterDqYtMkqlCompilers(compiler, State_); - State_->Gateway->RegisterMkqlCompiler(compiler); + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + + RegisterDqYtMkqlCompilers(compiler, ytState); + ytState->Gateway->RegisterMkqlCompiler(compiler); } bool CanFallback() override { @@ -870,9 +901,13 @@ public: } return true; }); + + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + TString cluster; if (usedClusters.empty()) { - cluster = State_->Configuration->DefaultCluster.Get().GetOrElse(State_->Gateway->GetDefaultClusterName()); + cluster = ytState->Configuration->DefaultCluster.Get().GetOrElse(ytState->Gateway->GetDefaultClusterName()); } else { cluster = TString{*usedClusters.begin()}; } @@ -880,12 +915,12 @@ public: const auto type = GetSequenceItemType(input->Pos(), input->GetTypeAnn(), false, ctx); YQL_ENSURE(type); - TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE, order); + TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), ytState->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE, order); - const auto res = State_->Gateway->PrepareFullResultTable( - IYtGateway::TFullResultTableOptions(State_->SessionId) + const auto res = ytState->Gateway->PrepareFullResultTable( + IYtGateway::TFullResultTableOptions(ytState->SessionId) .Cluster(cluster) - .Config(State_->Configuration->GetSettingsForNode(resOrPull.Origin().Ref())) + .Config(ytState->Configuration->GetSettingsForNode(resOrPull.Origin().Ref())) .OutTable(outTableInfo) ); @@ -895,11 +930,11 @@ public: if (res.ExternalTransactionId) { param("external_tx", *res.ExternalTransactionId); } - } else if (auto externalTx = State_->Configuration->ExternalTx.Get(cluster).GetOrElse(TGUID())) { + } else if (auto externalTx = ytState->Configuration->ExternalTx.Get(cluster).GetOrElse(TGUID())) { param("external_tx", GetGuidAsString(externalTx)); } TString tokenName; - if (auto auth = State_->Configuration->Auth.Get().GetOrElse(TString())) { + if (auto auth = ytState->Configuration->Auth.Get().GetOrElse(TString())) { tokenName = TString("cluster:default_").append(cluster); if (!secureParams.contains(tokenName)) { secureParams[tokenName] = auth; @@ -957,15 +992,18 @@ public: } virtual void NotifyDqTimeout() override { - State_->IsDqTimeout = true; + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + + ytState->IsDqTimeout = true; } private: - TYtState* State_; + TYtState::TWeakPtr State_; }; -THolder<IDqIntegration> CreateYtDqIntegration(TYtState* state) { - Y_ABORT_UNLESS(state); +THolder<IDqIntegration> CreateYtDqIntegration(TYtState::TWeakPtr state) { + YQL_ENSURE(!state.expired()); return MakeHolder<TYtDqIntegration>(state); } diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.h b/yt/yql/providers/yt/provider/yql_yt_dq_integration.h index 9fce4842cfe..05301e268b6 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.h +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.h @@ -8,7 +8,7 @@ namespace NYql { -THolder<IDqIntegration> CreateYtDqIntegration(TYtState* state); +THolder<IDqIntegration> CreateYtDqIntegration(TYtState::TWeakPtr state); // TODO move to yql/core bool CheckSupportedTypesOld(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler); diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.cpp b/yt/yql/providers/yt/provider/yql_yt_provider.cpp index 2a82da29f00..dd065bc4d37 100644 --- a/yt/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_provider.cpp @@ -338,18 +338,18 @@ void TYtState::LeaveEvaluation(ui64 id) { } } -std::pair<TIntrusivePtr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, +std::pair<std::shared_ptr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr<TTypeAnnotationContext> typeCtx, const IOptimizerFactory::TPtr& optFactory, const IDqHelper::TPtr& helper) { - auto ytState = MakeIntrusive<TYtState>(typeCtx.Get()); + auto ytState = std::make_shared<TYtState>(typeCtx.Get()); ytState->SessionId = sessionId; ytState->Gateway = gateway; - ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get()); + ytState->DqIntegration_ = CreateYtDqIntegration(ytState); ytState->OptimizerFactory_ = optFactory; ytState->DqHelper = helper; - ytState->YtflowIntegration_ = CreateYtYtflowIntegration(ytState.Get()); - ytState->YtflowOptimization_ = CreateYtYtflowOptimization(ytState.Get()); + ytState->YtflowIntegration_ = CreateYtYtflowIntegration(ytState); + ytState->YtflowOptimization_ = CreateYtYtflowOptimization(ytState); if (ytGatewayConfig) { std::unordered_set<std::string_view> groups; @@ -372,7 +372,14 @@ std::pair<TIntrusivePtr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway:: ytState->Configuration->Init(*ytGatewayConfig, filter, *typeCtx); } - TStatWriter statWriter = [ytState](ui32 publicId, const TVector<TOperationStatistics::TEntry>& stat) { + TYtState::TWeakPtr weakState = ytState; + + TStatWriter statWriter = [weakState](ui32 publicId, const TVector<TOperationStatistics::TEntry>& stat) { + auto ytState = weakState.lock(); + if (!ytState) { + return; + } + with_lock(ytState->StatisticsMutex) { for (size_t i = 0; i < stat.size(); ++i) { ytState->Statistics[publicId].Entries.push_back(stat[i]); @@ -411,7 +418,7 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat info.SupportsHidden = true; const TYtGatewayConfig* ytGatewayConfig = gatewaysConfig ? &gatewaysConfig->GetYt() : nullptr; - TIntrusivePtr<TYtState> ytState; + TYtState::TPtr ytState; TStatWriter statWriter; std::tie(ytState, statWriter) = CreateYtNativeState(gateway, userName, sessionId, ytGatewayConfig, typeCtx, optFactory, helper); ytState->PlanLimits = planLimits; diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.h b/yt/yql/providers/yt/provider/yql_yt_provider.h index 468a69c7807..3f11c78df4e 100644 --- a/yt/yql/providers/yt/provider/yql_yt_provider.h +++ b/yt/yql/providers/yt/provider/yql_yt_provider.h @@ -87,8 +87,9 @@ private: }; -struct TYtState : public TThrRefBase { - using TPtr = TIntrusivePtr<TYtState>; +struct TYtState { + using TPtr = std::shared_ptr<TYtState>; + using TWeakPtr = std::weak_ptr<TYtState>; void Reset(); void EnterEvaluation(ui64 id); @@ -138,7 +139,7 @@ private: class TYtGatewayConfig; -std::pair<TIntrusivePtr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, +std::pair<std::shared_ptr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr<TTypeAnnotationContext> typeCtx, const IOptimizerFactory::TPtr& optFactory, const IDqHelper::TPtr& helper); TIntrusivePtr<IDataProvider> CreateYtDataSource(TYtState::TPtr state); diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp index 2024ef2ca70..734c0eb80e7 100644 --- a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp @@ -21,7 +21,7 @@ using namespace NNodes; class TYtYtflowIntegration: public IYtflowIntegration { public: - TYtYtflowIntegration(TYtState* state) + TYtYtflowIntegration(TYtState::TWeakPtr state) : State_(state) { } @@ -81,10 +81,13 @@ public: auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table())); auto commitEpoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref()); - auto tableDesc = State_->TablesData->GetTable( + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + + auto tableDesc = ytState->TablesData->GetTable( cluster, tableName, 0); - auto commitTableDesc = State_->TablesData->GetTable( + auto commitTableDesc = ytState->TablesData->GetTable( cluster, tableName, commitEpoch); if (!tableDesc.Meta->IsDynamic @@ -171,7 +174,10 @@ public: auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value()); auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table())); - auto tableDesc = State_->TablesData->GetTable( + auto ytState = State_.lock(); + YQL_ENSURE(ytState); + + auto tableDesc = ytState->TablesData->GetTable( cluster, tableName, 0); sinkSettings.SetDoesExist(tableDesc.Meta->DoesExist); @@ -197,11 +203,11 @@ private: } private: - TYtState* State_; + TYtState::TWeakPtr State_; }; -THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState* state) { - Y_ABORT_UNLESS(state); +THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState::TWeakPtr state) { + YQL_ENSURE(!state.expired()); return MakeHolder<TYtYtflowIntegration>(state); } diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h index 481880e06c5..6e3fa3dc75b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h +++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h @@ -9,6 +9,6 @@ namespace NYql { -THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState* state); +THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState::TWeakPtr state); } // namespace NYql diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp index 0191633524d..33bf2486def 100644 --- a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp @@ -17,7 +17,7 @@ using namespace NNodes; class TYtYtflowOptimization: public IYtflowOptimization { public: - TYtYtflowOptimization(TYtState* state) + TYtYtflowOptimization(TYtState::TWeakPtr state) : State_(state) { Y_UNUSED(State_); @@ -91,11 +91,11 @@ public: } private: - TYtState* State_; + TYtState::TWeakPtr State_; }; -THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState* state) { - Y_ABORT_UNLESS(state); +THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState::TWeakPtr state) { + YQL_ENSURE(!state.expired()); return MakeHolder<TYtYtflowOptimization>(state); } diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h index 87e1bf575c0..131f59d1f39 100644 --- a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h +++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h @@ -9,6 +9,6 @@ namespace NYql { -THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState* state); +THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState::TWeakPtr state); } // namespace NYql |