diff options
author | udovichenko-r <rvu@ydb.tech> | 2022-12-08 22:47:14 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2022-12-08 22:47:14 +0300 |
commit | 8fdf497e036a8b96b30dcc1c0db56822d4e9244f (patch) | |
tree | ed071349554e175d66123f9ec4d9b3c0104bbfa3 | |
parent | 904b24c0f6381f6212ca11248c92b05503e89ee5 (diff) | |
download | ydb-8fdf497e036a8b96b30dcc1c0db56822d4e9244f.tar.gz |
[yql] Add missing Rewind()s (second part)
28 files changed, 141 insertions, 7 deletions
diff --git a/ydb/core/client/minikql_compile/yql_expr_minikql.cpp b/ydb/core/client/minikql_compile/yql_expr_minikql.cpp index c025b8fe73..9224f16eab 100644 --- a/ydb/core/client/minikql_compile/yql_expr_minikql.cpp +++ b/ydb/core/client/minikql_compile/yql_expr_minikql.cpp @@ -333,6 +333,9 @@ public: return CallableTransformer->Transform(input, output, ctx); } + void Rewind() final { + } + private: static bool CheckKeyColumn(const TStringBuf& columnName, ui32 keyIndex, IDbSchemeResolver::TTableResult* lookup, TExprNode& node, TExprContext& ctx) { auto column = lookup->Columns.FindPtr(columnName); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 2048202842..a7b4c6ed9b 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -422,6 +422,8 @@ public: return TStatus::Error; } + void Rewind() final { + } }; class TPrepareDataQueryAstTransformer : public TGraphTransformerBase { diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 626aa75021..f747f0c060 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -55,6 +55,9 @@ public: } } + void Rewind() final { + } + private: EPhysicalTxType GetPhyTxType(bool allStagesArePure) { if (QueryType == EKikimrQueryType::Scan) { @@ -549,7 +552,6 @@ public: } void Rewind() final { - TSyncTransformerBase::Rewind(); DataTxTransformer->Rewind(); ScanTxTransformer->Rewind(); } diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp index 69db4a3026..0f2516dc74 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp @@ -355,7 +355,6 @@ public: } void Rewind() final { - TSyncTransformerBase::Rewind(); TxTransformer->Rewind(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index c1dd2b0df8..4598c970bb 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -201,6 +201,11 @@ public: return TStatus::Ok; } + void Rewind() final { + LoadResults.clear(); + AsyncFuture = {}; + } + private: TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 5d5eeb2e87..b8985e5b3a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -218,6 +218,8 @@ public: return TStatus::Ok; } + void Rewind() final { + } private: struct TExecInfo { TKiExecDataQuery Node; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index 35a34545f0..e989605dac 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -15,6 +15,8 @@ class TKiSourceVisitorTransformer: public TSyncTransformerBase { public: TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + void Rewind() override { + } private: virtual TStatus HandleKiRead(NNodes::TKiReadBase node, TExprContext& ctx) = 0; virtual TStatus HandleRead(NNodes::TExprBase node, TExprContext& ctx) = 0; @@ -26,6 +28,8 @@ class TKiSinkVisitorTransformer : public TSyncTransformerBase { public: TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + void Rewind() override { + } private: virtual TStatus HandleWriteTable(NNodes::TKiWriteTable node, TExprContext& ctx) = 0; virtual TStatus HandleUpdateTable(NNodes::TKiUpdateTable node, TExprContext& ctx) = 0; diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index edbf7734c0..c09f06078d 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11945,6 +11945,15 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return ret; } + void Rewind() final { + for (auto& x : Types.DataSources) { + x->GetIntentDeterminationTransformer().Rewind(); + } + for (auto& x : Types.DataSinks) { + x->GetIntentDeterminationTransformer().Rewind(); + } + } + private: IGraphTransformer::TStatus DetermineIntents(IDataProvider& dataProvider, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { diff --git a/ydb/library/yql/core/yql_callable_transform.h b/ydb/library/yql/core/yql_callable_transform.h index d508152029..f0c0a83729 100644 --- a/ydb/library/yql/core/yql_callable_transform.h +++ b/ydb/library/yql/core/yql_callable_transform.h @@ -134,6 +134,10 @@ public: return status; } + void Rewind() override { + PendingNodes.clear(); + } + protected: IDataProvider* ParseCommit(const TExprNode& input, TExprContext& ctx) { if (!EnsureMinArgsCount(input, 2, ctx)) { diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index 425c53a017..c7c0ced7fc 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -2621,6 +2621,9 @@ public: Y_UNUSED(ctx); return UpdateAllChildLambdasConstraints(*input); } + + void Rewind() final { + } }; class TConstraintTransformer : public TGraphTransformerBase { @@ -2688,6 +2691,14 @@ public: return combinedStatus; } + void Rewind() final { + CallableTransformer->Rewind(); + CallableInputs.clear(); + Processed.clear(); + HasRenames = false; + CurrentFunctions = {}; + } + private: TStatus TransformNode(const TExprNode::TPtr& start, TExprNode::TPtr& output, TExprContext& ctx) { output = start; diff --git a/ydb/library/yql/core/yql_gc_transformer.cpp b/ydb/library/yql/core/yql_gc_transformer.cpp index d74099da4d..a9e4e5a9ff 100644 --- a/ydb/library/yql/core/yql_gc_transformer.cpp +++ b/ydb/library/yql/core/yql_gc_transformer.cpp @@ -67,6 +67,11 @@ public: return TStatus::Ok; } + void Rewind() final { + LastGcCount = 0; + CurrentThreshold = 0; + } + private: ui64 LastGcCount = 0; ui64 CurrentThreshold = 0; diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h index 3e902c462c..4512ff4e3f 100644 --- a/ydb/library/yql/core/yql_graph_transformer.h +++ b/ydb/library/yql/core/yql_graph_transformer.h @@ -100,7 +100,7 @@ public: virtual TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0; virtual NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) = 0; virtual TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0; - virtual void Rewind() {} + virtual void Rewind() = 0; virtual TStatistics GetStatistics() const { return TStatistics::NotPresent(); } }; @@ -248,7 +248,7 @@ public: } }; -class TNullTransformer : public TSyncTransformerBase { +class TNullTransformer final: public TSyncTransformerBase { public: TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; @@ -256,10 +256,12 @@ public: return IGraphTransformer::TStatus::Ok; } + void Rewind() final { + } }; template <typename TFunctor> -class TFunctorTransformer : public TSyncTransformerBase { +class TFunctorTransformer final: public TSyncTransformerBase { public: TFunctorTransformer(TFunctor functor) : Functor(std::move(functor)) {} @@ -271,6 +273,9 @@ public: return status; } + void Rewind() final { + } + private: TFunctor Functor; }; diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index ffc27fa84f..b07be281fe 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -308,6 +308,9 @@ public: return RemapExpr(input, output, replaces, ctx, settings); } + void Rewind() final { + } + private: const bool AllowDependantConsumers; }; @@ -377,6 +380,8 @@ public: #endif return status; } + void Rewind() final { + } }; } // namespace diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp index ed5db76023..8b7bc177d6 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp @@ -109,6 +109,12 @@ public: } return TStatus::Ok; } + + void Rewind() final { + AsyncFuture_ = {}; + FullResolvedIds_.clear(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + } private: const TClickHouseState::TPtr State_; diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_load_meta.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_load_meta.cpp index b4ca9df568..59bf8c0c58 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_load_meta.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_load_meta.cpp @@ -190,6 +190,11 @@ public: return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); } + + void Rewind() final { + Results_.clear(); + AsyncFuture_ = {}; + } private: static void OnDiscovery(const TMapType::mapped_type& res, IHTTPGateway::TResult&& result, NThreading::TPromise<void>& promise) { *res = std::move(result); diff --git a/ydb/library/yql/providers/common/config/yql_configuration_transformer.h b/ydb/library/yql/providers/common/config/yql_configuration_transformer.h index 38997103ff..f114a3c369 100644 --- a/ydb/library/yql/providers/common/config/yql_configuration_transformer.h +++ b/ydb/library/yql/providers/common/config/yql_configuration_transformer.h @@ -18,6 +18,8 @@ public: const TString& provider, const THashSet<TStringBuf>& configureCallables = {}); TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + void Rewind() final { + } protected: virtual bool HandleAttr(TPositionHandle pos, const TString& cluster, const TString& name, diff --git a/ydb/library/yql/providers/common/transform/yql_visit.h b/ydb/library/yql/providers/common/transform/yql_visit.h index 5e6212579e..586d87fcfc 100644 --- a/ydb/library/yql/providers/common/transform/yql_visit.h +++ b/ydb/library/yql/providers/common/transform/yql_visit.h @@ -23,6 +23,9 @@ public: TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + void Rewind() final { + } + bool CanParse(const TExprNode& node) const { return Handlers.contains(node.Content()); } diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp index 3f20f6c24f..250137c828 100644 --- a/ydb/library/yql/providers/config/yql_config_provider.cpp +++ b/ydb/library/yql/providers/config/yql_config_provider.cpp @@ -114,6 +114,8 @@ namespace { ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Failed to execute node: " << input->Content())); return TStatus::Error; } + void Rewind() final { + } private: const TTypeAnnotationContext& Types; diff --git a/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp b/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp index 1dcab11d1f..444819319f 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp @@ -83,6 +83,11 @@ public: return TStatus::Ok; } + void Rewind() final { + ResolverContext.Reset(); + AllFutures = {}; + } + private: TDqFunctionState::TPtr State; NThreading::TFuture<void> AllFutures; @@ -94,4 +99,4 @@ THolder<IGraphTransformer> CreateDqFunctionMetaLoader(TDqFunctionState::TPtr sta return THolder(new TDqFunctionResolverTransform(state)); } -}
\ No newline at end of file +} diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp index e73655b6c2..e5437828e2 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp @@ -60,6 +60,12 @@ public: return TStatus::Ok; } + void Rewind() final { + AsyncFuture_ = {}; + FullResolvedIds_.clear(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + } + private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp index b71a33474e..4eef17a78d 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp @@ -62,6 +62,12 @@ public: return TStatus::Ok; } + void Rewind() final { + AsyncFuture_ = {}; + FullResolvedIds_.clear(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + } + private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp index 827a4e2fd4..d3fa81dca9 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp @@ -139,6 +139,12 @@ private: } } + void Rewind() final { + PendingWriteTopics_.clear(); + PendingReadTopics_.clear(); + AsyncFuture_ = {}; + } + private: TPqState::TPtr State_; // (cluster, topic) -> meta diff --git a/ydb/library/yql/providers/result/provider/yql_result_provider.cpp b/ydb/library/yql/providers/result/provider/yql_result_provider.cpp index 97c200ffac..3074c37559 100644 --- a/ydb/library/yql/providers/result/provider/yql_result_provider.cpp +++ b/ydb/library/yql/providers/result/provider/yql_result_provider.cpp @@ -267,7 +267,21 @@ namespace { return status; } - private: + void Rewind() final { + DelegatedProvider = nullptr; + DelegatedNode = nullptr; + DelegatedNodeOutput = nullptr; + + CommittedPullSize = 0; + PullOverflow = false; + + CommittedFillSize = 0; + FillOverflow = false; + + ResultWriter.Drop(); + } + + private: template <class TTarget> bool& GetOverflowFlagAndCommitedSize(ui64& committed); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 7cee2e1550..44d463de94 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -79,6 +79,13 @@ public: , Lister_(IS3Lister::Make(gateway, State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxInflightListsPerQuery, State_->Configuration->AllowLocalFiles)) {} + void Rewind() final { + PendingRequests_.clear(); + RequestsByNode_.clear(); + GenColumnsByNode_.clear(); + AllFuture_ = {}; + } + private: TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp index 880811beee..73a21638be 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp @@ -68,6 +68,8 @@ public: return status; } + void Rewind() final { + } private: TSolomonState::TPtr State_; }; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp index 42324bb3de..c4ddad581d 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp @@ -33,6 +33,9 @@ public: return TStatus::Ok; } + void Rewind() final { + } + private: TSolomonState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp index 36e9890a55..1aea23417f 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp @@ -96,6 +96,12 @@ public: } return TStatus::Ok; } + + void Rewind() final { + AsyncFuture_ = {}; + FullResolvedIds_.clear(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + } private: const TYdbState::TPtr State_; diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_load_meta.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_load_meta.cpp index 0d735d0b98..235e22ade1 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_load_meta.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_load_meta.cpp @@ -264,6 +264,11 @@ public: return failed ? TStatus::Error : TStatus::Ok; } + void Rewind() final { + Clients_ = std::make_shared<TCluster2ClientPerSnapshotHandle>(); + PendingTables_ = std::make_shared<TTableKey2DescribeTableResult>(); + AsyncFuture_ = {}; + } private: const NYdb::TDriver Driver_; const TYdbState::TPtr State_; |