diff options
author | galaxycrab <[email protected]> | 2023-09-20 16:38:38 +0300 |
---|---|---|
committer | galaxycrab <[email protected]> | 2023-09-20 17:21:52 +0300 |
commit | 3f677764092395bae1d88fb95d99ec865f9ce76d (patch) | |
tree | 761586e3bc4554ae8b09e3b652b73ebfa5503cb4 | |
parent | bdff2ab3ac00395a854f7d009457d8e482bb1085 (diff) |
KIKIMR-18961 Add async processing in KQP runner
-rw-r--r-- | ydb/core/kqp/host/kqp_runner.cpp | 319 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp | 44 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_graph_transformer.cpp | 84 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_graph_transformer.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/config/yql_dispatch.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp | 1 |
6 files changed, 313 insertions, 142 deletions
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 2b5eb4cb8a0..f77295229d8 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -31,31 +31,233 @@ using namespace NThreading; namespace { -class TPhysicalAsyncRunResult : public TKqpAsyncResultBase<IKikimrQueryExecutor::TQueryResult, false> { -public: +class TPrepareNewEngineAsyncResult : public IKikimrAsyncResult<IKikimrQueryExecutor::TQueryResult> { using TResult = IKikimrQueryExecutor::TQueryResult; - TPhysicalAsyncRunResult(const TExprNode::TPtr& queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer, - const TKqlTransformContext& transformCtx) - : TKqpAsyncResultBase(queryRoot, exprCtx, transformer) - , TransformCtx(transformCtx) {} +public: + TPrepareNewEngineAsyncResult(const TString& cluster, + const TKiDataQueryBlocks& dataQueryBlocks, + TExprContext& ctx, + const TIntrusivePtr<TKqlTransformContext>& transformCtx, + const TIntrusivePtr<TKqpOptimizeContext>& optimizeCtx, + TTypeAnnotationContext& typesCtx, + const TIntrusivePtr<TKqpBuildQueryContext>& buildQueryCtx, + const IKikimrQueryExecutor::TExecuteSettings& settings, + bool sysColumnsEnabled, + const NMiniKQL::IFunctionRegistry& funcRegistry, + const TKikimrConfiguration::TPtr& config, + IGraphTransformer* preparedExplainTransformer, + IGraphTransformer* physicalOptimizeTransformer, + IGraphTransformer* physicalBuildTxsTransformer, + IGraphTransformer* physicalBuildQueryTransformer, + IGraphTransformer* physicalPeepholeTransformer) + : Cluster(cluster) + , DataQueryBlocks(dataQueryBlocks) + , ExprCtx(ctx) + , TransformCtx(transformCtx) + , OptimizeCtx(optimizeCtx) + , TypesCtx(typesCtx) + , BuildQueryCtx(buildQueryCtx) + , Settings(settings) + , SysColumnsEnabled(sysColumnsEnabled) + , FuncRegistry(funcRegistry) + , Config(config) + , PreparedExplainTransformer(preparedExplainTransformer) + , PhysicalBuildQueryTransformer(physicalBuildQueryTransformer) + , PhysicalPeepholeTransformer(physicalPeepholeTransformer) + { + OptimizeTransformer = CreateCompositeGraphTransformer( + { + TTransformStage{ *physicalOptimizeTransformer, "PhysicalOptimize", TIssuesIds::DEFAULT_ERROR }, + TTransformStage{ *physicalBuildTxsTransformer, "PhysicalBuildTxs", TIssuesIds::DEFAULT_ERROR }, + }, + true + ); + } + + bool HasResult() const override { + return Result.Defined(); + } + + TResult GetResult() override { + Y_VERIFY(Result.Defined()); + TResult result = std::move(*Result); + Result = Nothing(); + return result; + } + + NThreading::TFuture<bool> Continue() override { + if (Result.Defined()) { + return NThreading::MakeFuture<bool>(true); + } + + if (!KqlQueryBlocks) { + return Start(); + } + + if (TransformInProgress) { + return ContinueOptimization(); + } + + if (CurrentTransformer == OptimizeTransformer.Get()) { + return OnOptimizeTransformerOk(); + } + + if (CurrentTransformer == PreparedExplainTransformer) { + return OnPreparedExplainTransformerOk(); + } + + Y_VERIFY(Result.Defined()); + return NThreading::MakeFuture<bool>(true); + } + +private: + NThreading::TFuture<bool> SetResult(TResult&& result) { + Result = std::move(result); + return NThreading::MakeFuture<bool>(true); + } + + NThreading::TFuture<bool> Start() { + KqlQueryBlocks = BuildKqlQuery(DataQueryBlocks, *TransformCtx->Tables, ExprCtx, SysColumnsEnabled, OptimizeCtx, TypesCtx); + if (!KqlQueryBlocks) { + return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues())); + } + + Query = KqlQueryBlocks->Ptr(); + YQL_CLOG(DEBUG, ProviderKqp) << "Initial KQL query: " << KqpExprToPrettyString(*Query, ExprCtx); + + TransformCtx->Reset(); + BuildQueryCtx->Reset(); + + OptimizedQuery = Query; + return StartOptimization(OptimizeTransformer.Get(), OptimizedQuery); + } + + NThreading::TFuture<bool> StartOptimization(IGraphTransformer* transformer, TExprNode::TPtr& transformRoot) { + CurrentTransformer = transformer; + ApplyAsyncChanges = false; + TransformInProgress = true; + CurrentTransformRoot = &transformRoot; + CurrentTransformer->Rewind(); + return ContinueOptimization(); + } + + NThreading::TFuture<bool> ContinueOptimization() { // true if transformation is finished + NThreading::TFuture<IGraphTransformer::TStatus> transformResultFuture = + AsyncTransform(*CurrentTransformer, *CurrentTransformRoot, ExprCtx, ApplyAsyncChanges); + Y_VERIFY(!transformResultFuture.HasException()); // AsyncTransform catches exceptions + if (transformResultFuture.HasValue()) { + IGraphTransformer::TStatus status = transformResultFuture.ExtractValue(); + Y_VERIFY(status.Level != IGraphTransformer::TStatus::Repeat); + if (status.Level == IGraphTransformer::TStatus::Error) { + return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues())); + } + if (status.Level == IGraphTransformer::TStatus::Ok) { + TransformInProgress = false; + return NThreading::MakeFuture<bool>(false); + } + } + + ApplyAsyncChanges = true; + return transformResultFuture.Apply( + [](const NThreading::TFuture<IGraphTransformer::TStatus>&) { + return false; + } + ); + } + + NThreading::TFuture<bool> OnOptimizeTransformerOk() { + YQL_CLOG(TRACE, ProviderKqp) << "OptimizeTransformer: " + << TransformerStatsToYson(OptimizeTransformer->GetStatistics()); + YQL_CLOG(DEBUG, ProviderKqp) << "Optimized KQL query: " << KqpExprToPrettyString(*OptimizedQuery, ExprCtx); + + PhysicalBuildQueryTransformer->Rewind(); + IGraphTransformer::TStatus status = InstantTransform(*PhysicalBuildQueryTransformer, OptimizedQuery, ExprCtx); + if (status != IGraphTransformer::TStatus::Ok) { + ExprCtx.AddError(TIssue(ExprCtx.GetPosition(Query->Pos()), "Failed to build physical query.")); + return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues())); + } + + YQL_CLOG(TRACE, ProviderKqp) << "PhysicalBuildQueryTransformer: " + << TransformerStatsToYson(PhysicalBuildQueryTransformer->GetStatistics()); + PhysicalPeepholeTransformer->Rewind(); + PeepholeOptimizedQuery = OptimizedQuery; + status = InstantTransform(*PhysicalPeepholeTransformer, PeepholeOptimizedQuery, ExprCtx); + if (status != IGraphTransformer::TStatus::Ok) { + ExprCtx.AddError(TIssue(ExprCtx.GetPosition(Query->Pos()), "Failed peephole.")); + return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues())); + } + + YQL_CLOG(TRACE, ProviderKqp) << "PhysicalPeepholeTransformer: " + << TransformerStatsToYson(PhysicalPeepholeTransformer->GetStatistics()); + YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query: " << KqpExprToPrettyString(*OptimizedQuery, ExprCtx); + YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query after peephole: " << KqpExprToPrettyString(*PeepholeOptimizedQuery, ExprCtx); + + auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery; + TKqpPhysicalQuery physicalQuery(PeepholeOptimizedQuery); + + auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry, TypesCtx, Config); + auto ret = compiler->CompilePhysicalQuery(physicalQuery, DataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ExprCtx); + if (!ret) { + ExprCtx.AddError(TIssue(ExprCtx.GetPosition(Query->Pos()), "Failed to compile physical query.")); + return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues())); + } + + preparedQuery.SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1); + // TODO(sk): only on stats mode or if explain-only + return StartOptimization(PreparedExplainTransformer, OptimizedQuery); + } - void FillResult(TResult& queryResult) const override { + NThreading::TFuture<bool> OnPreparedExplainTransformerOk() { + Result.ConstructInPlace(); + Result->ProtobufArenaPtr.reset(new google::protobuf::Arena()); + + Result->SetSuccess(); TVector<NKikimrMiniKQL::TResult*> results; - for (auto& phyResult : TransformCtx.PhysicalQueryResults) { + for (auto& phyResult : TransformCtx->PhysicalQueryResults) { auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>( - queryResult.ProtobufArenaPtr.get()); + Result->ProtobufArenaPtr.get()); result->CopyFrom(phyResult); results.push_back(result); } - queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats); - queryResult.Results = std::move(results); + Result->QueryStats.CopyFrom(TransformCtx->QueryStats); + Result->Results = std::move(results); + return NThreading::MakeFuture<bool>(true); } private: - const TKqlTransformContext& TransformCtx; + TString Cluster; + TKiDataQueryBlocks DataQueryBlocks; + TExprContext& ExprCtx; + TIntrusivePtr<TKqlTransformContext> TransformCtx; + TIntrusivePtr<TKqpOptimizeContext> OptimizeCtx; + TTypeAnnotationContext& TypesCtx; + TIntrusivePtr<TKqpBuildQueryContext> BuildQueryCtx; + IKikimrQueryExecutor::TExecuteSettings Settings; + bool SysColumnsEnabled = false; + const NMiniKQL::IFunctionRegistry& FuncRegistry; + TKikimrConfiguration::TPtr Config; + + // Transformers + IGraphTransformer* PreparedExplainTransformer = nullptr; + IGraphTransformer* PhysicalBuildQueryTransformer = nullptr; + IGraphTransformer* PhysicalPeepholeTransformer = nullptr; + TAutoPtr<IGraphTransformer> OptimizeTransformer; + + // State + TMaybe<TResult> Result; + TMaybe<NYql::NNodes::TKqlQueryList> KqlQueryBlocks; + TExprNode::TPtr Query; + TExprNode::TPtr OptimizedQuery; + TExprNode::TPtr PeepholeOptimizedQuery; + + IGraphTransformer* CurrentTransformer = nullptr; + TExprNode::TPtr* CurrentTransformRoot = nullptr; + NThreading::TFuture<IGraphTransformer::TStatus> CurrentTransformerStatus; + bool ApplyAsyncChanges = false; + bool TransformInProgress = false; }; class TKqpRunner : public IKqpRunner { @@ -85,7 +287,9 @@ public: PhysicalOptimizeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx) .AddServiceTransformers() .Add(TLogExprTransformer::Sync("PhysicalOptimizeTransformer", logComp, logLevel), "LogPhysicalOptimize") + .AddPreTypeAnnotation() .AddExpressionEvaluation(FuncRegistry) + .AddIOAnnotation() .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) .Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery") @@ -245,79 +449,24 @@ private: YQL_ENSURE(false, "PrepareQueryNewEngine, unexpected query type: " << queryType); } - auto kqlQueryBlocks = BuildKqlQuery(dataQueryBlocks, *TransformCtx->Tables, ctx, sysColumnsEnabled, OptimizeCtx, TypesCtx); - if (!kqlQueryBlocks) { - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - auto query = kqlQueryBlocks->Ptr(); - YQL_CLOG(DEBUG, ProviderKqp) << "Initial KQL query: " << KqpExprToPrettyString(*query, ctx); - - TransformCtx->Reset(); - - PhysicalOptimizeTransformer->Rewind(); - auto optimizedQuery = query; - auto status = InstantTransform(*PhysicalOptimizeTransformer, optimizedQuery, ctx); - if (status != IGraphTransformer::TStatus::Ok) { - ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to optimize query.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - YQL_CLOG(TRACE, ProviderKqp) << "PhysicalOptimizeTransformer: " - << TransformerStatsToYson(PhysicalOptimizeTransformer->GetStatistics()); - YQL_CLOG(DEBUG, ProviderKqp) << "Optimized KQL query: " << KqpExprToPrettyString(*optimizedQuery, ctx); - - BuildQueryCtx->Reset(); - PhysicalBuildTxsTransformer->Rewind(); - auto builtTxsQuery = optimizedQuery; - status = InstantTransform(*PhysicalBuildTxsTransformer, builtTxsQuery, ctx); - if (status != IGraphTransformer::TStatus::Ok) { - ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to build physical txs.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - YQL_CLOG(TRACE, ProviderKqp) << "PhysicalBuildTxsTransformer: " - << TransformerStatsToYson(PhysicalOptimizeTransformer->GetStatistics()); - - PhysicalBuildQueryTransformer->Rewind(); - auto builtQuery = builtTxsQuery; - status = InstantTransform(*PhysicalBuildQueryTransformer, builtQuery, ctx); - if (status != IGraphTransformer::TStatus::Ok) { - ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to build physical query.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - YQL_CLOG(TRACE, ProviderKqp) << "PhysicalBuildQueryTransformer: " - << TransformerStatsToYson(PhysicalBuildQueryTransformer->GetStatistics()); - - PhysicalPeepholeTransformer->Rewind(); - auto transformedQuery = builtQuery; - status = InstantTransform(*PhysicalPeepholeTransformer, transformedQuery, ctx); - if (status != IGraphTransformer::TStatus::Ok) { - ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed peephole.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>( - ctx.IssueManager.GetIssues())); - } - - YQL_CLOG(TRACE, ProviderKqp) << "PhysicalPeepholeTransformer: " - << TransformerStatsToYson(PhysicalPeepholeTransformer->GetStatistics()); - YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query: " << KqpExprToPrettyString(*builtQuery, ctx); - YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query after peephole: " << KqpExprToPrettyString(*transformedQuery, ctx); - - auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery; - TKqpPhysicalQuery physicalQuery(transformedQuery); - - auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry, TypesCtx, Config); - auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ctx); - if (!ret) { - ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to compile physical query.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - preparedQuery.SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1); - // TODO(sk): only on stats mode or if explain-only - PreparedExplainTransformer->Rewind(); - return MakeIntrusive<TPhysicalAsyncRunResult>(builtQuery, ctx, *PreparedExplainTransformer, *TransformCtx); + return MakeIntrusive<TPrepareNewEngineAsyncResult>( + Cluster, + dataQueryBlocks, + ctx, + TransformCtx, + OptimizeCtx, + TypesCtx, + BuildQueryCtx, + settings, + sysColumnsEnabled, + FuncRegistry, + Config, + PreparedExplainTransformer.Get(), + PhysicalOptimizeTransformer.Get(), + PhysicalBuildTxsTransformer.Get(), + PhysicalBuildQueryTransformer.Get(), + PhysicalPeepholeTransformer.Get() + ); } static bool MergeFlagValue(const TMaybe<bool>& configFlag, const TMaybe<bool>& flag) { diff --git a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp index ee57eff0906..e69ba094bba 100644 --- a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp +++ b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp @@ -2,6 +2,8 @@ #include <ydb/library/yql/core/yql_graph_transformer.h> +#include <util/generic/yexception.h> + namespace NKikimr::NKqp::NOpt { using namespace NYql; @@ -29,38 +31,54 @@ public: TKqlQueryList queryBlocks(input); TVector<TKqlQuery> transformedQueryBlocks; transformedQueryBlocks.reserve(queryBlocks.Size()); - for (auto queryBlock : queryBlocks) { - auto transformed = queryBlock.Ptr(); - auto status = InstantTransform(*QueryBlockTransformer, transformed, ctx); - transformedQueryBlocks.emplace_back(std::move(transformed)); - if (status.Level != IGraphTransformer::TStatus::Ok) { + // Already transformed previous time + for (size_t transformedBlock = 0; transformedBlock < CurrentBlock; ++transformedBlock) { + transformedQueryBlocks.emplace_back(queryBlocks.Item(transformedBlock)); + } + + // Current + TStatus status = TStatus::Ok; + for (; CurrentBlock < queryBlocks.Size(); ++CurrentBlock) { + TExprNode::TPtr transformed = queryBlocks.Item(CurrentBlock).Ptr(); + status = AsyncTransformStep(*QueryBlockTransformer, transformed, ctx, false); + transformedQueryBlocks.emplace_back(std::move(transformed)); + YQL_ENSURE(status.Level != TStatus::Repeat); + if (status.Level == TStatus::Ok) { + QueryBlockTransformer->Rewind(); + continue; + } + if (status.Level == TStatus::Error) { return status; } + // Async + break; + } - QueryBlockTransformer->Rewind(); + // Not yet transformed + for (size_t nonTransformed = CurrentBlock + 1; nonTransformed < queryBlocks.Size(); ++nonTransformed) { + transformedQueryBlocks.emplace_back(queryBlocks.Item(nonTransformed).Ptr()); } output = Build<TKqlQueryList>(ctx, queryBlocks.Pos()) .Add(transformedQueryBlocks) .Done().Ptr(); - return TStatus::Ok; + return status; } NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) override { - Y_UNUSED(input); - return NThreading::MakeFuture(); + YQL_ENSURE(CurrentBlock < input.ChildrenSize()); + return QueryBlockTransformer->GetAsyncFuture(*input.Child(CurrentBlock)); } TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override { - Y_UNUSED(ctx); - output = input; - return TStatus::Ok; + return QueryBlockTransformer->ApplyAsyncChanges(input, output, ctx); } private: TAutoPtr<IGraphTransformer> QueryBlockTransformer; + size_t CurrentBlock = 0; }; } // namespace @@ -70,5 +88,3 @@ TAutoPtr<IGraphTransformer> CreateKqpQueryBlocksTransformer(TAutoPtr<IGraphTrans } } // namespace NKikimr::NKqp::NOpt - - diff --git a/ydb/library/yql/core/yql_graph_transformer.cpp b/ydb/library/yql/core/yql_graph_transformer.cpp index dd3aa951df7..8d0f6c45434 100644 --- a/ydb/library/yql/core/yql_graph_transformer.cpp +++ b/ydb/library/yql/core/yql_graph_transformer.cpp @@ -277,42 +277,10 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo return IGraphTransformer::TStatus::Error; } -IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart) { - try { - for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) { - TExprNode::TPtr newRoot; - auto status = transformer.Transform(root, newRoot, ctx); - if (newRoot) { - root = newRoot; - } - - switch (status.Level) { - case IGraphTransformer::TStatus::Ok: - case IGraphTransformer::TStatus::Error: - return status; - case IGraphTransformer::TStatus::Repeat: - if (breakOnRestart && status.HasRestart) { - return status; - } - - continue; - case IGraphTransformer::TStatus::Async: - ctx.AddError(TIssue(ctx.GetPosition(root->Pos()), "Instant transform can not be delayed")); - return IGraphTransformer::TStatus::Error; - default: - YQL_ENSURE(false, "Unknown status"); - } - } - AddTooManyTransformationsError(root->Pos(), "InstantTransform", ctx); - } - catch (const std::exception& e) { - ctx.AddError(ExceptionToIssue(e)); - } - return IGraphTransformer::TStatus::Error; -} - -NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, - bool applyAsyncChanges) { +IGraphTransformer::TStatus AsyncTransformStepImpl(IGraphTransformer& transformer, TExprNode::TPtr& root, + TExprContext& ctx, bool applyAsyncChanges, bool breakOnRestart, + const TStringBuf& name) +{ try { if (applyAsyncChanges) { TExprNode::TPtr newRoot; @@ -326,7 +294,10 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer case IGraphTransformer::TStatus::Error: break; case IGraphTransformer::TStatus::Repeat: - return AsyncTransform(transformer, root, ctx, false /* no async changes */); + if (breakOnRestart && status.HasRestart) { + return status; + } + return AsyncTransformStepImpl(transformer, root, ctx, false /* no async changes */, breakOnRestart, name); case IGraphTransformer::TStatus::Async: YQL_ENSURE(false, "Async status is forbidden for ApplyAsyncChanges"); break; @@ -334,7 +305,7 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer YQL_ENSURE(false, "Unknown status"); break; } - return NThreading::MakeFuture(status); + return status; } for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) { TExprNode::TPtr newRoot; @@ -346,8 +317,11 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer switch (status.Level) { case IGraphTransformer::TStatus::Ok: case IGraphTransformer::TStatus::Error: - return NThreading::MakeFuture(status); + return status; case IGraphTransformer::TStatus::Repeat: + if (breakOnRestart && status.HasRestart) { + return status; + } // if (currentTime - startTime >= threshold) return NThreading::MakeFuture(IGraphTransformer::TStatus::Yield); continue; case IGraphTransformer::TStatus::Async: @@ -358,20 +332,44 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer break; } if (ctx.RepeatTransformCounter >= ctx.RepeatTransformLimit) { - AddTooManyTransformationsError(root->Pos(), "AsyncTransform", ctx); - return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error)); + AddTooManyTransformationsError(root->Pos(), name, ctx); + return IGraphTransformer::TStatus::Error; } } catch (const std::exception& e) { ctx.AddError(ExceptionToIssue(e)); - return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error)); + return IGraphTransformer::TStatus::Error; + } + + return IGraphTransformer::TStatus::Async; +} + +IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart) { + IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, false, breakOnRestart, "InstantTransform"); + if (status.Level == IGraphTransformer::TStatus::Async) { + ctx.AddError(TIssue(ctx.GetPosition(root->Pos()), "Instant transform can not be delayed")); + return IGraphTransformer::TStatus::Error; + } + return status; +} + +IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root, + TExprContext& ctx, bool applyAsyncChanges) +{ + return AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransformStep"); +} + +NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, + bool applyAsyncChanges) { + IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransform"); + if (status.Level != IGraphTransformer::TStatus::Async) { + return NThreading::MakeFuture(status); } return transformer.GetAsyncFuture(*root).Apply( [] (const NThreading::TFuture<void>&) mutable -> NThreading::TFuture<IGraphTransformer::TStatus> { return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Async)); }); - } void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges, diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h index 26e508fc369..8fed6120ce5 100644 --- a/ydb/library/yql/core/yql_graph_transformer.h +++ b/ydb/library/yql/core/yql_graph_transformer.h @@ -233,6 +233,9 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges, std::function<void(const IGraphTransformer::TStatus&)> asyncCallback); +IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root, + TExprContext& ctx, bool applyAsyncChanges); + class TSyncTransformerBase : public TGraphTransformerBase { public: NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { diff --git a/ydb/library/yql/providers/common/config/yql_dispatch.h b/ydb/library/yql/providers/common/config/yql_dispatch.h index 8a75a81530a..3973508d24d 100644 --- a/ydb/library/yql/providers/common/config/yql_dispatch.h +++ b/ydb/library/yql/providers/common/config/yql_dispatch.h @@ -313,6 +313,10 @@ public: ValidClusters.insert(validClusters.begin(), validClusters.end()); } + void AddValidCluster(const TString& cluster) { + ValidClusters.insert(cluster); + } + template <typename TType, bool RUNTIME> TSettingHandlerImpl<TType, RUNTIME>& AddSetting(const TString& name, TConfSetting<TType, RUNTIME>& setting) { TIntrusivePtr<TSettingHandlerImpl<TType, RUNTIME>> handler = new TSettingHandlerImpl<TType, RUNTIME>(name, setting); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp index c149913f535..25257c554c6 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp @@ -30,6 +30,7 @@ public: {} void AddCluster(const TString& name, const THashMap<TString, TString>& properties) override { + State_->Configuration->AddValidCluster(name); auto& settings = State_->Configuration->Clusters[name]; settings.Url = properties.Value("location", ""); auto signReference = properties.Value("serviceAccountIdSignatureReference", ""); |