summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2025-08-26 21:30:21 +0300
committergrigoriypisar <[email protected]>2025-08-26 22:39:37 +0300
commit198afb5d9f1b0c8a133e331e020bbc231adbfd0c (patch)
treeab6e7c2cdbf5c172b5e994ecf1291f90f799a5e3
parentae449eb2faf891d1d548a670549a952961009006 (diff)
fixed YT provider state leak
Перенос исправления из github: <https://github.com/ydb-platform/ydb/pull/22817> Исправлена циклическая зависимость из IntrusivePtr: [YtState](https://nda.ya.ru/t/J6yQpZti7J8HqG -\> [Gateway](https://nda.ya.ru/t/QTw9-d3Y7J8HqH -\> [Sessions\_](https://nda.ya.ru/t/CbIrRfy_7J8HqJ -\> [StatWriter](https://nda.ya.ru/t/ZQiPgPQ97J8HqK -\> [YtState](https://nda.ya.ru/t/HyJmOnA-7J8HqR commit_hash:c90e57ad30e2b05fa94eaac72786b1b9087d00c8
-rw-r--r--yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp148
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.h2
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_provider.cpp21
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_provider.h7
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp20
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h2
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp8
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h2
9 files changed, 133 insertions, 81 deletions
diff --git a/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp
index e767b3237b9..574422d7033 100644
--- a/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp
+++ b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp
@@ -186,7 +186,7 @@ private:
Y_UNIT_TEST_SUITE(YtNativeGateway) {
-std::pair<TIntrusivePtr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TPortHolder& port, TTypeAnnotationContext* types) {
+std::pair<std::shared_ptr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TPortHolder& port, TTypeAnnotationContext* types) {
TYtNativeServices nativeServices;
auto gatewaysConfig = MakeGatewaysConfig(port);
nativeServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt());
@@ -194,7 +194,7 @@ std::pair<TIntrusivePtr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TP
nativeServices.SecretMasker = CreateDummySecretMasker();
auto ytGateway = CreateYtNativeGateway(nativeServices);
- auto ytState = MakeIntrusive<TYtState>(types);
+ auto ytState = std::make_shared<TYtState>(types);
ytState->Gateway = ytGateway;
InitializeYtGateway(ytGateway, ytState);
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 913d30ecc4b..03219980a32 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -152,7 +152,7 @@ namespace {
class TYtDqIntegration: public TDqIntegrationBase {
public:
- TYtDqIntegration(TYtState* state)
+ TYtDqIntegration(TYtState::TWeakPtr state)
: State_(state)
{
}
@@ -175,7 +175,11 @@ public:
flattenPaths.push_back(pathInfo);
}
}
- auto result = EstimateDataSize(flattenPaths, Nothing(), *State_, ctx);
+
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
+ auto result = EstimateDataSize(flattenPaths, Nothing(), *ytState, ctx);
size_t statIdx = 0;
size_t pathIdx = 0;
for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) {
@@ -228,12 +232,15 @@ public:
groupIdPathInfos.push_back(pathInfos);
}
- if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); settings.CanFallback && chunksCount > maxChunks) {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
+ if (auto maxChunks = ytState->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); settings.CanFallback && chunksCount > maxChunks) {
throw TFallbackError() << DqFallbackErrorMessageWrap( TStringBuilder() << "table with too many chunks: " << chunksCount << " > " << maxChunks);
}
if (hasErasure) {
- if (auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster)) {
+ if (auto codecCpu = ytState->Configuration->ErasureCodecCpuForDq.Get(cluster)) {
dataSizePerJob = Max(ui64(dataSizePerJob / *codecCpu), 10_KB);
} else {
hasErasure = false;
@@ -242,7 +249,7 @@ public:
auto maxTasks = settings.MaxPartitions;
ui64 maxDataSizePerJob = 0;
- if (State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) {
+ if (ytState->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) {
TVector<TYtPathInfo::TPtr> paths;
TVector<TString> keys;
TMaybe<double> sample;
@@ -253,7 +260,7 @@ public:
for (const auto& [pathId, pathInfo] : Enumerate(pathInfos)) {
auto tableName = pathInfo->Table->Name;
if (pathInfo->Table->IsAnonymous && !TYtTableInfo::HasSubstAnonymousLabel(pathInfo->Table->FromNode.Cast())) {
- tableName = State_->AnonymousLabels.Value(std::make_pair(cluster, tableName), TString());
+ tableName = ytState->AnonymousLabels.Value(std::make_pair(cluster, tableName), TString());
YQL_ENSURE(tableName, "Unaccounted anonymous table: " << pathInfo->Table->Name);
pathInfo->Table->Name = tableName;
}
@@ -266,12 +273,12 @@ public:
dataSizePerJob /= *sample;
}
- auto res = State_->Gateway->GetTablePartitions(NYql::IYtGateway::TGetTablePartitionsOptions(State_->SessionId)
+ auto res = ytState->Gateway->GetTablePartitions(NYql::IYtGateway::TGetTablePartitionsOptions(ytState->SessionId)
.Cluster(cluster)
.MaxPartitions(maxTasks)
.DataSizePerJob(dataSizePerJob)
.AdjustDataWeightPerPartition(!settings.CanFallback)
- .Config(State_->Configuration->Snapshot())
+ .Config(ytState->Configuration->Snapshot())
.Paths(std::move(paths)));
if (!res.Success()) {
const auto message = DqFallbackErrorMessageWrap("failed to partition table");
@@ -375,11 +382,14 @@ public:
}
bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
if (TYtConfigure::Match(&node)) {
if (node.ChildrenSize() >= 5) {
if (node.Child(2)->Content() == "Attr" && node.Child(3)->Content() == "maxrowweight") {
if (FromString<NSize::TSize>(node.Child(4)->Content()).GetValue()>NSize::FromMegaBytes(128)) {
- State_->OnlyNativeExecution = true;
+ ytState->OnlyNativeExecution = true;
return false;
} else {
return true;
@@ -391,7 +401,7 @@ public:
auto pragma = node.Child(3)->Content();
if (UNSUPPORTED_YT_PRAGMAS.contains(pragma)) {
AddInfo(ctx, TStringBuilder() << "unsupported yt pragma: " << pragma, skipIssues);
- State_->OnlyNativeExecution = true;
+ ytState->OnlyNativeExecution = true;
return false;
}
@@ -400,7 +410,7 @@ public:
for (const auto& pool : pools) {
if (!POOL_TREES_WHITELIST.contains(pool)) {
AddInfo(ctx, TStringBuilder() << "unsupported pool tree: " << pool, skipIssues);
- State_->OnlyNativeExecution = true;
+ ytState->OnlyNativeExecution = true;
return false;
}
}
@@ -411,16 +421,19 @@ public:
}
bool CanRead(const TExprNode& node, TExprContext& ctx, bool skipIssues) override {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
if (TYtConfigure::Match(&node)) {
return CheckPragmas(node, ctx, skipIssues);
} else if (auto maybeRead = TMaybeNode<TYtReadTable>(&node)) {
auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
- if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) {
- AddMessage(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, State_->PassiveExecution);
+ if (!ytState->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) {
+ AddMessage(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, ytState->PassiveExecution);
return false;
}
- const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
- const auto enableDynamicStoreRead = State_->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false);
+ const auto canUseYtPartitioningApi = ytState->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
+ const auto enableDynamicStoreRead = ytState->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false);
ui64 chunksCount = 0ull;
for (auto section: maybeRead.Cast().Input()) {
if (HasSettingsExcept(maybeRead.Cast().Input().Item(0).Settings().Ref(), DqReadSupportedSettings) || HasNonEmptyKeyFilter(maybeRead.Cast().Input().Item(0))) {
@@ -433,43 +446,43 @@ public:
}
}
}
- AddMessage(ctx, info, skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, info, skipIssues, ytState->PassiveExecution);
return false;
}
for (auto path: section.Paths()) {
if (!path.Table().Maybe<TYtTable>()) {
- AddMessage(ctx, "non-table path", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "non-table path", skipIssues, ytState->PassiveExecution);
return false;
} else {
auto pathInfo = TYtPathInfo(path);
auto tableInfo = pathInfo.Table;
auto epoch = TEpochInfo::Parse(path.Table().Maybe<TYtTable>().CommitEpoch().Ref());
if (!tableInfo->Stat) {
- AddMessage(ctx, "table without statistics", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "table without statistics", skipIssues, ytState->PassiveExecution);
return false;
} else if (!tableInfo->RowSpec) {
- AddMessage(ctx, "table without row spec", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "table without row spec", skipIssues, ytState->PassiveExecution);
return false;
} else if (!tableInfo->Meta) {
- AddMessage(ctx, "table without meta", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "table without meta", skipIssues, ytState->PassiveExecution);
return false;
} else if (tableInfo->IsAnonymous) {
- AddMessage(ctx, "anonymous table", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "anonymous table", skipIssues, ytState->PassiveExecution);
return false;
} else if ((!epoch.Empty() && *epoch.Get() > 0)) {
- AddMessage(ctx, "table with non-empty epoch", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "table with non-empty epoch", skipIssues, ytState->PassiveExecution);
return false;
} else if (NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) {
- AddMessage(ctx, "table with QB2 premapper", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "table with QB2 premapper", skipIssues, ytState->PassiveExecution);
return false;
} else if (pathInfo.Ranges && !canUseYtPartitioningApi) {
- AddMessage(ctx, "table with ranges", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "table with ranges", skipIssues, ytState->PassiveExecution);
return false;
} else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) {
- AddMessage(ctx, "dynamic table", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "dynamic table", skipIssues, ytState->PassiveExecution);
return false;
} else if (tableInfo->Meta->IsDynamic && tableInfo->Meta->Attrs.contains("enable_dynamic_store_read") && !enableDynamicStoreRead) {
- AddMessage(ctx, "dynamic store read", skipIssues, State_->PassiveExecution);
+ AddMessage(ctx, "dynamic store read", skipIssues, ytState->PassiveExecution);
return false;
}
@@ -477,8 +490,8 @@ public:
}
}
}
- if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) {
- AddMessage(ctx, TStringBuilder() << "table with too many chunks: " << chunksCount << " > " << maxChunks, skipIssues, State_->PassiveExecution);
+ if (auto maxChunks = ytState->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) {
+ AddMessage(ctx, TStringBuilder() << "table with too many chunks: " << chunksCount << " > " << maxChunks, skipIssues, ytState->PassiveExecution);
return false;
}
return true;
@@ -488,18 +501,21 @@ public:
}
bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&) override {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
auto wrap = node.Cast<TDqReadWideWrap>();
auto maybeRead = wrap.Input().Maybe<TYtReadTable>();
if (!maybeRead) {
return false;
}
- if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) {
+ if (!ytState->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) {
return false;
}
- auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);
- auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES);
+ auto supportedTypes = ytState->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);
+ auto supportedDataTypes = ytState->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES);
const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>();
if (!CheckBlockReaderSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx, ctx.GetPosition(node.Pos()))) {
return false;
@@ -510,12 +526,12 @@ public:
subTypeAnn.emplace_back(type->GetItemType());
}
- if (!State_->Types->ArrowResolver) {
+ if (!ytState->Types->ArrowResolver) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "no arrow resolver provided");
return false;
}
- if (State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) {
+ if (ytState->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "arrow resolver don't support these types");
return false;
}
@@ -575,12 +591,15 @@ public:
}
TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
TVector<bool> hasErasurePerNode;
hasErasurePerNode.reserve(nodes.size());
TVector<ui64> dataSizes(nodes.size());
THashMap<TString, TVector<std::pair<const TExprNode*, bool>>> clusterToNodesAndErasure;
THashMap<TString, TVector<TVector<TYtPathInfo::TPtr>>> clusterToGroups;
- const auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ);
+ const auto maxChunks = ytState->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ);
ui64 chunksCount = 0u;
for (const auto &node_: nodes) {
@@ -590,8 +609,8 @@ public:
auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
auto& groupIdPathInfo = clusterToGroups[cluster];
- const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
- const auto enableDynamicStoreRead = State_->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false);
+ const auto canUseYtPartitioningApi = ytState->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
+ const auto enableDynamicStoreRead = ytState->Configuration->EnableDynamicStoreReadInDQ.Get().GetOrElse(false);
auto input = maybeRead.Cast().Input();
for (auto section: input) {
@@ -635,7 +654,7 @@ public:
ui64 dataSize = 0;
for (auto& [cluster, info]: clusterToNodesAndErasure) {
auto res = EstimateColumnStats(ctx, clusterToGroups[cluster], dataSize);
- auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster);
+ auto codecCpu = ytState->Configuration->ErasureCodecCpuForDq.Get(cluster);
if (!codecCpu) {
continue;
}
@@ -663,10 +682,13 @@ public:
}
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
if (auto maybeYtReadTable = TMaybeNode<TYtReadTable>(read)) {
TMaybeNode<TCoSecureParam> secParams;
const auto cluster = maybeYtReadTable.Cast().DataSource().Cluster();
- if (State_->Configuration->Auth.Get().GetOrElse(TString()) || State_->Configuration->Tokens.Value(cluster, "")) {
+ if (ytState->Configuration->Auth.Get().GetOrElse(TString()) || ytState->Configuration->Tokens.Value(cluster, "")) {
secParams = Build<TCoSecureParam>(ctx, read->Pos()).Name().Build(TString("cluster:default_").append(cluster)).Done();
}
return Build<TDqReadWrap>(ctx, read->Pos())
@@ -679,8 +701,11 @@ public:
}
TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
if (auto maybeWrite = TMaybeNode<TYtWriteTable>(write)) {
- if (State_->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) {
+ if (ytState->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) {
const auto& content = maybeWrite.Cast().Content();
if (TYtMaterialize::Match(&SkipCallables(content.Ref(), {TCoSort::CallableName(), TCoTopSort::CallableName(), TCoAssumeSorted::CallableName(), TCoAssumeConstraints::CallableName()}))) {
return write;
@@ -764,14 +789,17 @@ public:
}
TMaybe<bool> CanWrite(const TExprNode& node, TExprContext& ctx) override {
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
if (auto maybeWrite = TMaybeNode<TYtWriteTable>(&node)) {
auto cluster = TString{maybeWrite.Cast().DataSink().Cluster().Value()};
auto tableName = TString{TYtTableInfo::GetTableLabel(maybeWrite.Cast().Table())};
auto epoch = TEpochInfo::Parse(maybeWrite.Cast().Table().CommitEpoch().Ref());
- auto tableDesc = State_->TablesData->GetTable(cluster, tableName, epoch);
+ auto tableDesc = ytState->TablesData->GetTable(cluster, tableName, epoch);
- if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) {
+ if (!ytState->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) {
AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, false);
return false;
}
@@ -785,7 +813,7 @@ public:
return false;
}
- if (!State_->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) {
+ if (!ytState->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) {
const auto content = maybeWrite.Cast().Content().Raw();
if (const auto sorted = content->GetConstraint<TSortedConstraintNode>()) {
if (const auto distinct = content->GetConstraint<TDistinctConstraintNode>()) {
@@ -807,8 +835,11 @@ public:
}
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override {
- RegisterDqYtMkqlCompilers(compiler, State_);
- State_->Gateway->RegisterMkqlCompiler(compiler);
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
+ RegisterDqYtMkqlCompilers(compiler, ytState);
+ ytState->Gateway->RegisterMkqlCompiler(compiler);
}
bool CanFallback() override {
@@ -870,9 +901,13 @@ public:
}
return true;
});
+
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
TString cluster;
if (usedClusters.empty()) {
- cluster = State_->Configuration->DefaultCluster.Get().GetOrElse(State_->Gateway->GetDefaultClusterName());
+ cluster = ytState->Configuration->DefaultCluster.Get().GetOrElse(ytState->Gateway->GetDefaultClusterName());
} else {
cluster = TString{*usedClusters.begin()};
}
@@ -880,12 +915,12 @@ public:
const auto type = GetSequenceItemType(input->Pos(), input->GetTypeAnn(), false, ctx);
YQL_ENSURE(type);
- TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE, order);
+ TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), ytState->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE, order);
- const auto res = State_->Gateway->PrepareFullResultTable(
- IYtGateway::TFullResultTableOptions(State_->SessionId)
+ const auto res = ytState->Gateway->PrepareFullResultTable(
+ IYtGateway::TFullResultTableOptions(ytState->SessionId)
.Cluster(cluster)
- .Config(State_->Configuration->GetSettingsForNode(resOrPull.Origin().Ref()))
+ .Config(ytState->Configuration->GetSettingsForNode(resOrPull.Origin().Ref()))
.OutTable(outTableInfo)
);
@@ -895,11 +930,11 @@ public:
if (res.ExternalTransactionId) {
param("external_tx", *res.ExternalTransactionId);
}
- } else if (auto externalTx = State_->Configuration->ExternalTx.Get(cluster).GetOrElse(TGUID())) {
+ } else if (auto externalTx = ytState->Configuration->ExternalTx.Get(cluster).GetOrElse(TGUID())) {
param("external_tx", GetGuidAsString(externalTx));
}
TString tokenName;
- if (auto auth = State_->Configuration->Auth.Get().GetOrElse(TString())) {
+ if (auto auth = ytState->Configuration->Auth.Get().GetOrElse(TString())) {
tokenName = TString("cluster:default_").append(cluster);
if (!secureParams.contains(tokenName)) {
secureParams[tokenName] = auth;
@@ -957,15 +992,18 @@ public:
}
virtual void NotifyDqTimeout() override {
- State_->IsDqTimeout = true;
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
+ ytState->IsDqTimeout = true;
}
private:
- TYtState* State_;
+ TYtState::TWeakPtr State_;
};
-THolder<IDqIntegration> CreateYtDqIntegration(TYtState* state) {
- Y_ABORT_UNLESS(state);
+THolder<IDqIntegration> CreateYtDqIntegration(TYtState::TWeakPtr state) {
+ YQL_ENSURE(!state.expired());
return MakeHolder<TYtDqIntegration>(state);
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.h b/yt/yql/providers/yt/provider/yql_yt_dq_integration.h
index 9fce4842cfe..05301e268b6 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.h
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.h
@@ -8,7 +8,7 @@
namespace NYql {
-THolder<IDqIntegration> CreateYtDqIntegration(TYtState* state);
+THolder<IDqIntegration> CreateYtDqIntegration(TYtState::TWeakPtr state);
// TODO move to yql/core
bool CheckSupportedTypesOld(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler);
diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.cpp b/yt/yql/providers/yt/provider/yql_yt_provider.cpp
index 2a82da29f00..dd065bc4d37 100644
--- a/yt/yql/providers/yt/provider/yql_yt_provider.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_provider.cpp
@@ -338,18 +338,18 @@ void TYtState::LeaveEvaluation(ui64 id) {
}
}
-std::pair<TIntrusivePtr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId,
+std::pair<std::shared_ptr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId,
const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr<TTypeAnnotationContext> typeCtx,
const IOptimizerFactory::TPtr& optFactory, const IDqHelper::TPtr& helper)
{
- auto ytState = MakeIntrusive<TYtState>(typeCtx.Get());
+ auto ytState = std::make_shared<TYtState>(typeCtx.Get());
ytState->SessionId = sessionId;
ytState->Gateway = gateway;
- ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get());
+ ytState->DqIntegration_ = CreateYtDqIntegration(ytState);
ytState->OptimizerFactory_ = optFactory;
ytState->DqHelper = helper;
- ytState->YtflowIntegration_ = CreateYtYtflowIntegration(ytState.Get());
- ytState->YtflowOptimization_ = CreateYtYtflowOptimization(ytState.Get());
+ ytState->YtflowIntegration_ = CreateYtYtflowIntegration(ytState);
+ ytState->YtflowOptimization_ = CreateYtYtflowOptimization(ytState);
if (ytGatewayConfig) {
std::unordered_set<std::string_view> groups;
@@ -372,7 +372,14 @@ std::pair<TIntrusivePtr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::
ytState->Configuration->Init(*ytGatewayConfig, filter, *typeCtx);
}
- TStatWriter statWriter = [ytState](ui32 publicId, const TVector<TOperationStatistics::TEntry>& stat) {
+ TYtState::TWeakPtr weakState = ytState;
+
+ TStatWriter statWriter = [weakState](ui32 publicId, const TVector<TOperationStatistics::TEntry>& stat) {
+ auto ytState = weakState.lock();
+ if (!ytState) {
+ return;
+ }
+
with_lock(ytState->StatisticsMutex) {
for (size_t i = 0; i < stat.size(); ++i) {
ytState->Statistics[publicId].Entries.push_back(stat[i]);
@@ -411,7 +418,7 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat
info.SupportsHidden = true;
const TYtGatewayConfig* ytGatewayConfig = gatewaysConfig ? &gatewaysConfig->GetYt() : nullptr;
- TIntrusivePtr<TYtState> ytState;
+ TYtState::TPtr ytState;
TStatWriter statWriter;
std::tie(ytState, statWriter) = CreateYtNativeState(gateway, userName, sessionId, ytGatewayConfig, typeCtx, optFactory, helper);
ytState->PlanLimits = planLimits;
diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.h b/yt/yql/providers/yt/provider/yql_yt_provider.h
index 468a69c7807..3f11c78df4e 100644
--- a/yt/yql/providers/yt/provider/yql_yt_provider.h
+++ b/yt/yql/providers/yt/provider/yql_yt_provider.h
@@ -87,8 +87,9 @@ private:
};
-struct TYtState : public TThrRefBase {
- using TPtr = TIntrusivePtr<TYtState>;
+struct TYtState {
+ using TPtr = std::shared_ptr<TYtState>;
+ using TWeakPtr = std::weak_ptr<TYtState>;
void Reset();
void EnterEvaluation(ui64 id);
@@ -138,7 +139,7 @@ private:
class TYtGatewayConfig;
-std::pair<TIntrusivePtr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId,
+std::pair<std::shared_ptr<TYtState>, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId,
const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr<TTypeAnnotationContext> typeCtx,
const IOptimizerFactory::TPtr& optFactory, const IDqHelper::TPtr& helper);
TIntrusivePtr<IDataProvider> CreateYtDataSource(TYtState::TPtr state);
diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
index 2024ef2ca70..734c0eb80e7 100644
--- a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp
@@ -21,7 +21,7 @@ using namespace NNodes;
class TYtYtflowIntegration: public IYtflowIntegration {
public:
- TYtYtflowIntegration(TYtState* state)
+ TYtYtflowIntegration(TYtState::TWeakPtr state)
: State_(state)
{
}
@@ -81,10 +81,13 @@ public:
auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table()));
auto commitEpoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref());
- auto tableDesc = State_->TablesData->GetTable(
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
+ auto tableDesc = ytState->TablesData->GetTable(
cluster, tableName, 0);
- auto commitTableDesc = State_->TablesData->GetTable(
+ auto commitTableDesc = ytState->TablesData->GetTable(
cluster, tableName, commitEpoch);
if (!tableDesc.Meta->IsDynamic
@@ -171,7 +174,10 @@ public:
auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value());
auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table()));
- auto tableDesc = State_->TablesData->GetTable(
+ auto ytState = State_.lock();
+ YQL_ENSURE(ytState);
+
+ auto tableDesc = ytState->TablesData->GetTable(
cluster, tableName, 0);
sinkSettings.SetDoesExist(tableDesc.Meta->DoesExist);
@@ -197,11 +203,11 @@ private:
}
private:
- TYtState* State_;
+ TYtState::TWeakPtr State_;
};
-THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState* state) {
- Y_ABORT_UNLESS(state);
+THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState::TWeakPtr state) {
+ YQL_ENSURE(!state.expired());
return MakeHolder<TYtYtflowIntegration>(state);
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h
index 481880e06c5..6e3fa3dc75b 100644
--- a/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h
+++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_integration.h
@@ -9,6 +9,6 @@
namespace NYql {
-THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState* state);
+THolder<IYtflowIntegration> CreateYtYtflowIntegration(TYtState::TWeakPtr state);
} // namespace NYql
diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp
index 0191633524d..33bf2486def 100644
--- a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.cpp
@@ -17,7 +17,7 @@ using namespace NNodes;
class TYtYtflowOptimization: public IYtflowOptimization {
public:
- TYtYtflowOptimization(TYtState* state)
+ TYtYtflowOptimization(TYtState::TWeakPtr state)
: State_(state)
{
Y_UNUSED(State_);
@@ -91,11 +91,11 @@ public:
}
private:
- TYtState* State_;
+ TYtState::TWeakPtr State_;
};
-THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState* state) {
- Y_ABORT_UNLESS(state);
+THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState::TWeakPtr state) {
+ YQL_ENSURE(!state.expired());
return MakeHolder<TYtYtflowOptimization>(state);
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h
index 87e1bf575c0..131f59d1f39 100644
--- a/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h
+++ b/yt/yql/providers/yt/provider/yql_yt_ytflow_optimize.h
@@ -9,6 +9,6 @@
namespace NYql {
-THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState* state);
+THolder<IYtflowOptimization> CreateYtYtflowOptimization(TYtState::TWeakPtr state);
} // namespace NYql