summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <[email protected]>2023-09-20 16:38:38 +0300
committergalaxycrab <[email protected]>2023-09-20 17:21:52 +0300
commit3f677764092395bae1d88fb95d99ec865f9ce76d (patch)
tree761586e3bc4554ae8b09e3b652b73ebfa5503cb4
parentbdff2ab3ac00395a854f7d009457d8e482bb1085 (diff)
KIKIMR-18961 Add async processing in KQP runner
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp319
-rw-r--r--ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp44
-rw-r--r--ydb/library/yql/core/yql_graph_transformer.cpp84
-rw-r--r--ydb/library/yql/core/yql_graph_transformer.h3
-rw-r--r--ydb/library/yql/providers/common/config/yql_dispatch.h4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp1
6 files changed, 313 insertions, 142 deletions
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index 2b5eb4cb8a0..f77295229d8 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -31,31 +31,233 @@ using namespace NThreading;
namespace {
-class TPhysicalAsyncRunResult : public TKqpAsyncResultBase<IKikimrQueryExecutor::TQueryResult, false> {
-public:
+class TPrepareNewEngineAsyncResult : public IKikimrAsyncResult<IKikimrQueryExecutor::TQueryResult> {
using TResult = IKikimrQueryExecutor::TQueryResult;
- TPhysicalAsyncRunResult(const TExprNode::TPtr& queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
- const TKqlTransformContext& transformCtx)
- : TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
- , TransformCtx(transformCtx) {}
+public:
+ TPrepareNewEngineAsyncResult(const TString& cluster,
+ const TKiDataQueryBlocks& dataQueryBlocks,
+ TExprContext& ctx,
+ const TIntrusivePtr<TKqlTransformContext>& transformCtx,
+ const TIntrusivePtr<TKqpOptimizeContext>& optimizeCtx,
+ TTypeAnnotationContext& typesCtx,
+ const TIntrusivePtr<TKqpBuildQueryContext>& buildQueryCtx,
+ const IKikimrQueryExecutor::TExecuteSettings& settings,
+ bool sysColumnsEnabled,
+ const NMiniKQL::IFunctionRegistry& funcRegistry,
+ const TKikimrConfiguration::TPtr& config,
+ IGraphTransformer* preparedExplainTransformer,
+ IGraphTransformer* physicalOptimizeTransformer,
+ IGraphTransformer* physicalBuildTxsTransformer,
+ IGraphTransformer* physicalBuildQueryTransformer,
+ IGraphTransformer* physicalPeepholeTransformer)
+ : Cluster(cluster)
+ , DataQueryBlocks(dataQueryBlocks)
+ , ExprCtx(ctx)
+ , TransformCtx(transformCtx)
+ , OptimizeCtx(optimizeCtx)
+ , TypesCtx(typesCtx)
+ , BuildQueryCtx(buildQueryCtx)
+ , Settings(settings)
+ , SysColumnsEnabled(sysColumnsEnabled)
+ , FuncRegistry(funcRegistry)
+ , Config(config)
+ , PreparedExplainTransformer(preparedExplainTransformer)
+ , PhysicalBuildQueryTransformer(physicalBuildQueryTransformer)
+ , PhysicalPeepholeTransformer(physicalPeepholeTransformer)
+ {
+ OptimizeTransformer = CreateCompositeGraphTransformer(
+ {
+ TTransformStage{ *physicalOptimizeTransformer, "PhysicalOptimize", TIssuesIds::DEFAULT_ERROR },
+ TTransformStage{ *physicalBuildTxsTransformer, "PhysicalBuildTxs", TIssuesIds::DEFAULT_ERROR },
+ },
+ true
+ );
+ }
+
+ bool HasResult() const override {
+ return Result.Defined();
+ }
+
+ TResult GetResult() override {
+ Y_VERIFY(Result.Defined());
+ TResult result = std::move(*Result);
+ Result = Nothing();
+ return result;
+ }
+
+ NThreading::TFuture<bool> Continue() override {
+ if (Result.Defined()) {
+ return NThreading::MakeFuture<bool>(true);
+ }
+
+ if (!KqlQueryBlocks) {
+ return Start();
+ }
+
+ if (TransformInProgress) {
+ return ContinueOptimization();
+ }
+
+ if (CurrentTransformer == OptimizeTransformer.Get()) {
+ return OnOptimizeTransformerOk();
+ }
+
+ if (CurrentTransformer == PreparedExplainTransformer) {
+ return OnPreparedExplainTransformerOk();
+ }
+
+ Y_VERIFY(Result.Defined());
+ return NThreading::MakeFuture<bool>(true);
+ }
+
+private:
+ NThreading::TFuture<bool> SetResult(TResult&& result) {
+ Result = std::move(result);
+ return NThreading::MakeFuture<bool>(true);
+ }
+
+ NThreading::TFuture<bool> Start() {
+ KqlQueryBlocks = BuildKqlQuery(DataQueryBlocks, *TransformCtx->Tables, ExprCtx, SysColumnsEnabled, OptimizeCtx, TypesCtx);
+ if (!KqlQueryBlocks) {
+ return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues()));
+ }
+
+ Query = KqlQueryBlocks->Ptr();
+ YQL_CLOG(DEBUG, ProviderKqp) << "Initial KQL query: " << KqpExprToPrettyString(*Query, ExprCtx);
+
+ TransformCtx->Reset();
+ BuildQueryCtx->Reset();
+
+ OptimizedQuery = Query;
+ return StartOptimization(OptimizeTransformer.Get(), OptimizedQuery);
+ }
+
+ NThreading::TFuture<bool> StartOptimization(IGraphTransformer* transformer, TExprNode::TPtr& transformRoot) {
+ CurrentTransformer = transformer;
+ ApplyAsyncChanges = false;
+ TransformInProgress = true;
+ CurrentTransformRoot = &transformRoot;
+ CurrentTransformer->Rewind();
+ return ContinueOptimization();
+ }
+
+ NThreading::TFuture<bool> ContinueOptimization() { // true if transformation is finished
+ NThreading::TFuture<IGraphTransformer::TStatus> transformResultFuture =
+ AsyncTransform(*CurrentTransformer, *CurrentTransformRoot, ExprCtx, ApplyAsyncChanges);
+ Y_VERIFY(!transformResultFuture.HasException()); // AsyncTransform catches exceptions
+ if (transformResultFuture.HasValue()) {
+ IGraphTransformer::TStatus status = transformResultFuture.ExtractValue();
+ Y_VERIFY(status.Level != IGraphTransformer::TStatus::Repeat);
+ if (status.Level == IGraphTransformer::TStatus::Error) {
+ return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues()));
+ }
+ if (status.Level == IGraphTransformer::TStatus::Ok) {
+ TransformInProgress = false;
+ return NThreading::MakeFuture<bool>(false);
+ }
+ }
+
+ ApplyAsyncChanges = true;
+ return transformResultFuture.Apply(
+ [](const NThreading::TFuture<IGraphTransformer::TStatus>&) {
+ return false;
+ }
+ );
+ }
+
+ NThreading::TFuture<bool> OnOptimizeTransformerOk() {
+ YQL_CLOG(TRACE, ProviderKqp) << "OptimizeTransformer: "
+ << TransformerStatsToYson(OptimizeTransformer->GetStatistics());
+ YQL_CLOG(DEBUG, ProviderKqp) << "Optimized KQL query: " << KqpExprToPrettyString(*OptimizedQuery, ExprCtx);
+
+ PhysicalBuildQueryTransformer->Rewind();
+ IGraphTransformer::TStatus status = InstantTransform(*PhysicalBuildQueryTransformer, OptimizedQuery, ExprCtx);
+ if (status != IGraphTransformer::TStatus::Ok) {
+ ExprCtx.AddError(TIssue(ExprCtx.GetPosition(Query->Pos()), "Failed to build physical query."));
+ return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues()));
+ }
+
+ YQL_CLOG(TRACE, ProviderKqp) << "PhysicalBuildQueryTransformer: "
+ << TransformerStatsToYson(PhysicalBuildQueryTransformer->GetStatistics());
+ PhysicalPeepholeTransformer->Rewind();
+ PeepholeOptimizedQuery = OptimizedQuery;
+ status = InstantTransform(*PhysicalPeepholeTransformer, PeepholeOptimizedQuery, ExprCtx);
+ if (status != IGraphTransformer::TStatus::Ok) {
+ ExprCtx.AddError(TIssue(ExprCtx.GetPosition(Query->Pos()), "Failed peephole."));
+ return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues()));
+ }
+
+ YQL_CLOG(TRACE, ProviderKqp) << "PhysicalPeepholeTransformer: "
+ << TransformerStatsToYson(PhysicalPeepholeTransformer->GetStatistics());
+ YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query: " << KqpExprToPrettyString(*OptimizedQuery, ExprCtx);
+ YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query after peephole: " << KqpExprToPrettyString(*PeepholeOptimizedQuery, ExprCtx);
+
+ auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery;
+ TKqpPhysicalQuery physicalQuery(PeepholeOptimizedQuery);
+
+ auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry, TypesCtx, Config);
+ auto ret = compiler->CompilePhysicalQuery(physicalQuery, DataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ExprCtx);
+ if (!ret) {
+ ExprCtx.AddError(TIssue(ExprCtx.GetPosition(Query->Pos()), "Failed to compile physical query."));
+ return SetResult(ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues()));
+ }
+
+ preparedQuery.SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1);
+ // TODO(sk): only on stats mode or if explain-only
+ return StartOptimization(PreparedExplainTransformer, OptimizedQuery);
+ }
- void FillResult(TResult& queryResult) const override {
+ NThreading::TFuture<bool> OnPreparedExplainTransformerOk() {
+ Result.ConstructInPlace();
+ Result->ProtobufArenaPtr.reset(new google::protobuf::Arena());
+
+ Result->SetSuccess();
TVector<NKikimrMiniKQL::TResult*> results;
- for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
+ for (auto& phyResult : TransformCtx->PhysicalQueryResults) {
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
- queryResult.ProtobufArenaPtr.get());
+ Result->ProtobufArenaPtr.get());
result->CopyFrom(phyResult);
results.push_back(result);
}
- queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
- queryResult.Results = std::move(results);
+ Result->QueryStats.CopyFrom(TransformCtx->QueryStats);
+ Result->Results = std::move(results);
+ return NThreading::MakeFuture<bool>(true);
}
private:
- const TKqlTransformContext& TransformCtx;
+ TString Cluster;
+ TKiDataQueryBlocks DataQueryBlocks;
+ TExprContext& ExprCtx;
+ TIntrusivePtr<TKqlTransformContext> TransformCtx;
+ TIntrusivePtr<TKqpOptimizeContext> OptimizeCtx;
+ TTypeAnnotationContext& TypesCtx;
+ TIntrusivePtr<TKqpBuildQueryContext> BuildQueryCtx;
+ IKikimrQueryExecutor::TExecuteSettings Settings;
+ bool SysColumnsEnabled = false;
+ const NMiniKQL::IFunctionRegistry& FuncRegistry;
+ TKikimrConfiguration::TPtr Config;
+
+ // Transformers
+ IGraphTransformer* PreparedExplainTransformer = nullptr;
+ IGraphTransformer* PhysicalBuildQueryTransformer = nullptr;
+ IGraphTransformer* PhysicalPeepholeTransformer = nullptr;
+ TAutoPtr<IGraphTransformer> OptimizeTransformer;
+
+ // State
+ TMaybe<TResult> Result;
+ TMaybe<NYql::NNodes::TKqlQueryList> KqlQueryBlocks;
+ TExprNode::TPtr Query;
+ TExprNode::TPtr OptimizedQuery;
+ TExprNode::TPtr PeepholeOptimizedQuery;
+
+ IGraphTransformer* CurrentTransformer = nullptr;
+ TExprNode::TPtr* CurrentTransformRoot = nullptr;
+ NThreading::TFuture<IGraphTransformer::TStatus> CurrentTransformerStatus;
+ bool ApplyAsyncChanges = false;
+ bool TransformInProgress = false;
};
class TKqpRunner : public IKqpRunner {
@@ -85,7 +287,9 @@ public:
PhysicalOptimizeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(TLogExprTransformer::Sync("PhysicalOptimizeTransformer", logComp, logLevel), "LogPhysicalOptimize")
+ .AddPreTypeAnnotation()
.AddExpressionEvaluation(FuncRegistry)
+ .AddIOAnnotation()
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(),
*typesCtx, Config))
.Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery")
@@ -245,79 +449,24 @@ private:
YQL_ENSURE(false, "PrepareQueryNewEngine, unexpected query type: " << queryType);
}
- auto kqlQueryBlocks = BuildKqlQuery(dataQueryBlocks, *TransformCtx->Tables, ctx, sysColumnsEnabled, OptimizeCtx, TypesCtx);
- if (!kqlQueryBlocks) {
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- auto query = kqlQueryBlocks->Ptr();
- YQL_CLOG(DEBUG, ProviderKqp) << "Initial KQL query: " << KqpExprToPrettyString(*query, ctx);
-
- TransformCtx->Reset();
-
- PhysicalOptimizeTransformer->Rewind();
- auto optimizedQuery = query;
- auto status = InstantTransform(*PhysicalOptimizeTransformer, optimizedQuery, ctx);
- if (status != IGraphTransformer::TStatus::Ok) {
- ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to optimize query."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- YQL_CLOG(TRACE, ProviderKqp) << "PhysicalOptimizeTransformer: "
- << TransformerStatsToYson(PhysicalOptimizeTransformer->GetStatistics());
- YQL_CLOG(DEBUG, ProviderKqp) << "Optimized KQL query: " << KqpExprToPrettyString(*optimizedQuery, ctx);
-
- BuildQueryCtx->Reset();
- PhysicalBuildTxsTransformer->Rewind();
- auto builtTxsQuery = optimizedQuery;
- status = InstantTransform(*PhysicalBuildTxsTransformer, builtTxsQuery, ctx);
- if (status != IGraphTransformer::TStatus::Ok) {
- ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to build physical txs."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- YQL_CLOG(TRACE, ProviderKqp) << "PhysicalBuildTxsTransformer: "
- << TransformerStatsToYson(PhysicalOptimizeTransformer->GetStatistics());
-
- PhysicalBuildQueryTransformer->Rewind();
- auto builtQuery = builtTxsQuery;
- status = InstantTransform(*PhysicalBuildQueryTransformer, builtQuery, ctx);
- if (status != IGraphTransformer::TStatus::Ok) {
- ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to build physical query."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- YQL_CLOG(TRACE, ProviderKqp) << "PhysicalBuildQueryTransformer: "
- << TransformerStatsToYson(PhysicalBuildQueryTransformer->GetStatistics());
-
- PhysicalPeepholeTransformer->Rewind();
- auto transformedQuery = builtQuery;
- status = InstantTransform(*PhysicalPeepholeTransformer, transformedQuery, ctx);
- if (status != IGraphTransformer::TStatus::Ok) {
- ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed peephole."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(
- ctx.IssueManager.GetIssues()));
- }
-
- YQL_CLOG(TRACE, ProviderKqp) << "PhysicalPeepholeTransformer: "
- << TransformerStatsToYson(PhysicalPeepholeTransformer->GetStatistics());
- YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query: " << KqpExprToPrettyString(*builtQuery, ctx);
- YQL_CLOG(DEBUG, ProviderKqp) << "Physical KQL query after peephole: " << KqpExprToPrettyString(*transformedQuery, ctx);
-
- auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery;
- TKqpPhysicalQuery physicalQuery(transformedQuery);
-
- auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry, TypesCtx, Config);
- auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ctx);
- if (!ret) {
- ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to compile physical query."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- preparedQuery.SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1);
- // TODO(sk): only on stats mode or if explain-only
- PreparedExplainTransformer->Rewind();
- return MakeIntrusive<TPhysicalAsyncRunResult>(builtQuery, ctx, *PreparedExplainTransformer, *TransformCtx);
+ return MakeIntrusive<TPrepareNewEngineAsyncResult>(
+ Cluster,
+ dataQueryBlocks,
+ ctx,
+ TransformCtx,
+ OptimizeCtx,
+ TypesCtx,
+ BuildQueryCtx,
+ settings,
+ sysColumnsEnabled,
+ FuncRegistry,
+ Config,
+ PreparedExplainTransformer.Get(),
+ PhysicalOptimizeTransformer.Get(),
+ PhysicalBuildTxsTransformer.Get(),
+ PhysicalBuildQueryTransformer.Get(),
+ PhysicalPeepholeTransformer.Get()
+ );
}
static bool MergeFlagValue(const TMaybe<bool>& configFlag, const TMaybe<bool>& flag) {
diff --git a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp
index ee57eff0906..e69ba094bba 100644
--- a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp
+++ b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp
@@ -2,6 +2,8 @@
#include <ydb/library/yql/core/yql_graph_transformer.h>
+#include <util/generic/yexception.h>
+
namespace NKikimr::NKqp::NOpt {
using namespace NYql;
@@ -29,38 +31,54 @@ public:
TKqlQueryList queryBlocks(input);
TVector<TKqlQuery> transformedQueryBlocks;
transformedQueryBlocks.reserve(queryBlocks.Size());
- for (auto queryBlock : queryBlocks) {
- auto transformed = queryBlock.Ptr();
- auto status = InstantTransform(*QueryBlockTransformer, transformed, ctx);
- transformedQueryBlocks.emplace_back(std::move(transformed));
- if (status.Level != IGraphTransformer::TStatus::Ok) {
+ // Already transformed previous time
+ for (size_t transformedBlock = 0; transformedBlock < CurrentBlock; ++transformedBlock) {
+ transformedQueryBlocks.emplace_back(queryBlocks.Item(transformedBlock));
+ }
+
+ // Current
+ TStatus status = TStatus::Ok;
+ for (; CurrentBlock < queryBlocks.Size(); ++CurrentBlock) {
+ TExprNode::TPtr transformed = queryBlocks.Item(CurrentBlock).Ptr();
+ status = AsyncTransformStep(*QueryBlockTransformer, transformed, ctx, false);
+ transformedQueryBlocks.emplace_back(std::move(transformed));
+ YQL_ENSURE(status.Level != TStatus::Repeat);
+ if (status.Level == TStatus::Ok) {
+ QueryBlockTransformer->Rewind();
+ continue;
+ }
+ if (status.Level == TStatus::Error) {
return status;
}
+ // Async
+ break;
+ }
- QueryBlockTransformer->Rewind();
+ // Not yet transformed
+ for (size_t nonTransformed = CurrentBlock + 1; nonTransformed < queryBlocks.Size(); ++nonTransformed) {
+ transformedQueryBlocks.emplace_back(queryBlocks.Item(nonTransformed).Ptr());
}
output = Build<TKqlQueryList>(ctx, queryBlocks.Pos())
.Add(transformedQueryBlocks)
.Done().Ptr();
- return TStatus::Ok;
+ return status;
}
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) override {
- Y_UNUSED(input);
- return NThreading::MakeFuture();
+ YQL_ENSURE(CurrentBlock < input.ChildrenSize());
+ return QueryBlockTransformer->GetAsyncFuture(*input.Child(CurrentBlock));
}
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
- Y_UNUSED(ctx);
- output = input;
- return TStatus::Ok;
+ return QueryBlockTransformer->ApplyAsyncChanges(input, output, ctx);
}
private:
TAutoPtr<IGraphTransformer> QueryBlockTransformer;
+ size_t CurrentBlock = 0;
};
} // namespace
@@ -70,5 +88,3 @@ TAutoPtr<IGraphTransformer> CreateKqpQueryBlocksTransformer(TAutoPtr<IGraphTrans
}
} // namespace NKikimr::NKqp::NOpt
-
-
diff --git a/ydb/library/yql/core/yql_graph_transformer.cpp b/ydb/library/yql/core/yql_graph_transformer.cpp
index dd3aa951df7..8d0f6c45434 100644
--- a/ydb/library/yql/core/yql_graph_transformer.cpp
+++ b/ydb/library/yql/core/yql_graph_transformer.cpp
@@ -277,42 +277,10 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo
return IGraphTransformer::TStatus::Error;
}
-IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart) {
- try {
- for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) {
- TExprNode::TPtr newRoot;
- auto status = transformer.Transform(root, newRoot, ctx);
- if (newRoot) {
- root = newRoot;
- }
-
- switch (status.Level) {
- case IGraphTransformer::TStatus::Ok:
- case IGraphTransformer::TStatus::Error:
- return status;
- case IGraphTransformer::TStatus::Repeat:
- if (breakOnRestart && status.HasRestart) {
- return status;
- }
-
- continue;
- case IGraphTransformer::TStatus::Async:
- ctx.AddError(TIssue(ctx.GetPosition(root->Pos()), "Instant transform can not be delayed"));
- return IGraphTransformer::TStatus::Error;
- default:
- YQL_ENSURE(false, "Unknown status");
- }
- }
- AddTooManyTransformationsError(root->Pos(), "InstantTransform", ctx);
- }
- catch (const std::exception& e) {
- ctx.AddError(ExceptionToIssue(e));
- }
- return IGraphTransformer::TStatus::Error;
-}
-
-NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx,
- bool applyAsyncChanges) {
+IGraphTransformer::TStatus AsyncTransformStepImpl(IGraphTransformer& transformer, TExprNode::TPtr& root,
+ TExprContext& ctx, bool applyAsyncChanges, bool breakOnRestart,
+ const TStringBuf& name)
+{
try {
if (applyAsyncChanges) {
TExprNode::TPtr newRoot;
@@ -326,7 +294,10 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer
case IGraphTransformer::TStatus::Error:
break;
case IGraphTransformer::TStatus::Repeat:
- return AsyncTransform(transformer, root, ctx, false /* no async changes */);
+ if (breakOnRestart && status.HasRestart) {
+ return status;
+ }
+ return AsyncTransformStepImpl(transformer, root, ctx, false /* no async changes */, breakOnRestart, name);
case IGraphTransformer::TStatus::Async:
YQL_ENSURE(false, "Async status is forbidden for ApplyAsyncChanges");
break;
@@ -334,7 +305,7 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer
YQL_ENSURE(false, "Unknown status");
break;
}
- return NThreading::MakeFuture(status);
+ return status;
}
for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) {
TExprNode::TPtr newRoot;
@@ -346,8 +317,11 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer
switch (status.Level) {
case IGraphTransformer::TStatus::Ok:
case IGraphTransformer::TStatus::Error:
- return NThreading::MakeFuture(status);
+ return status;
case IGraphTransformer::TStatus::Repeat:
+ if (breakOnRestart && status.HasRestart) {
+ return status;
+ }
// if (currentTime - startTime >= threshold) return NThreading::MakeFuture(IGraphTransformer::TStatus::Yield);
continue;
case IGraphTransformer::TStatus::Async:
@@ -358,20 +332,44 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer
break;
}
if (ctx.RepeatTransformCounter >= ctx.RepeatTransformLimit) {
- AddTooManyTransformationsError(root->Pos(), "AsyncTransform", ctx);
- return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error));
+ AddTooManyTransformationsError(root->Pos(), name, ctx);
+ return IGraphTransformer::TStatus::Error;
}
}
catch (const std::exception& e) {
ctx.AddError(ExceptionToIssue(e));
- return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ return IGraphTransformer::TStatus::Async;
+}
+
+IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart) {
+ IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, false, breakOnRestart, "InstantTransform");
+ if (status.Level == IGraphTransformer::TStatus::Async) {
+ ctx.AddError(TIssue(ctx.GetPosition(root->Pos()), "Instant transform can not be delayed"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ return status;
+}
+
+IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
+ TExprContext& ctx, bool applyAsyncChanges)
+{
+ return AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransformStep");
+}
+
+NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx,
+ bool applyAsyncChanges) {
+ IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransform");
+ if (status.Level != IGraphTransformer::TStatus::Async) {
+ return NThreading::MakeFuture(status);
}
return transformer.GetAsyncFuture(*root).Apply(
[] (const NThreading::TFuture<void>&) mutable -> NThreading::TFuture<IGraphTransformer::TStatus> {
return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Async));
});
-
}
void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges,
diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h
index 26e508fc369..8fed6120ce5 100644
--- a/ydb/library/yql/core/yql_graph_transformer.h
+++ b/ydb/library/yql/core/yql_graph_transformer.h
@@ -233,6 +233,9 @@ NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer
void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges,
std::function<void(const IGraphTransformer::TStatus&)> asyncCallback);
+IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
+ TExprContext& ctx, bool applyAsyncChanges);
+
class TSyncTransformerBase : public TGraphTransformerBase {
public:
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
diff --git a/ydb/library/yql/providers/common/config/yql_dispatch.h b/ydb/library/yql/providers/common/config/yql_dispatch.h
index 8a75a81530a..3973508d24d 100644
--- a/ydb/library/yql/providers/common/config/yql_dispatch.h
+++ b/ydb/library/yql/providers/common/config/yql_dispatch.h
@@ -313,6 +313,10 @@ public:
ValidClusters.insert(validClusters.begin(), validClusters.end());
}
+ void AddValidCluster(const TString& cluster) {
+ ValidClusters.insert(cluster);
+ }
+
template <typename TType, bool RUNTIME>
TSettingHandlerImpl<TType, RUNTIME>& AddSetting(const TString& name, TConfSetting<TType, RUNTIME>& setting) {
TIntrusivePtr<TSettingHandlerImpl<TType, RUNTIME>> handler = new TSettingHandlerImpl<TType, RUNTIME>(name, setting);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
index c149913f535..25257c554c6 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
@@ -30,6 +30,7 @@ public:
{}
void AddCluster(const TString& name, const THashMap<TString, TString>& properties) override {
+ State_->Configuration->AddValidCluster(name);
auto& settings = State_->Configuration->Clusters[name];
settings.Url = properties.Value("location", "");
auto signReference = properties.Value("serviceAccountIdSignatureReference", "");