aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-11-10 15:58:16 +0300
committeraneporada <aneporada@ydb.tech>2023-11-10 17:33:02 +0300
commit786a2afbefd3c737c8028129a3de52335fe59f9e (patch)
tree3f39b9ab79dc2546e8117f0bc5a3813a087ecc55
parentc540d4e186b249a7d218d745b29d42731913f2e7 (diff)
downloadydb-786a2afbefd3c737c8028129a3de52335fe59f9e.tar.gz
Disable LLVM for block-only stages
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp12
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h1
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp28
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h4
-rw-r--r--ydb/library/yql/dq/type_ann/CMakeLists.darwin-x86_64.txt12
-rw-r--r--ydb/library/yql/dq/type_ann/CMakeLists.linux-aarch64.txt12
-rw-r--r--ydb/library/yql/dq/type_ann/CMakeLists.linux-x86_64.txt12
-rw-r--r--ydb/library/yql/dq/type_ann/CMakeLists.windows-x86_64.txt12
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp15
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.h10
-rw-r--r--ydb/library/yql/dq/type_ann/ya.make2
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h1
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp7
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp91
15 files changed, 209 insertions, 11 deletions
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 76ff636413..27bbe18ae2 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -3032,6 +3032,18 @@ bool IsWideBlockType(const TTypeAnnotationNode& type) {
return blockLenType->Cast<TDataExprType>()->GetSlot() == EDataSlot::Uint64;
}
+bool IsWideSequenceBlockType(const TTypeAnnotationNode& type) {
+ const TTypeAnnotationNode* itemType = nullptr;
+ if (type.GetKind() == ETypeAnnotationKind::Stream) {
+ itemType = type.Cast<TStreamExprType>()->GetItemType();
+ } else if (type.GetKind() == ETypeAnnotationKind::Flow) {
+ itemType = type.Cast<TFlowExprType>()->GetItemType();
+ } else {
+ return false;
+ }
+ return IsWideBlockType(*itemType);
+}
+
bool IsSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types) {
if (!types.ArrowResolver) {
return false;
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index 4391b96ddd..90cd276ad4 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -125,6 +125,7 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ
bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx);
bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool IsWideBlockType(const TTypeAnnotationNode& type);
+bool IsWideSequenceBlockType(const TTypeAnnotationNode& type);
bool IsSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types);
bool EnsureSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types);
bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 03c9c2a3a5..b0ab0b91ea 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -280,11 +280,12 @@ public:
return TaskId;
}
- bool UseSeparatePatternAlloc() const {
- return Context.PatternCache && (Settings.OptLLVM == "OFF" || Settings.UseCacheForLLVM);
+ bool UseSeparatePatternAlloc(const TDqTaskSettings& taskSettings) const {
+ return Context.PatternCache &&
+ (Settings.OptLLVM == "OFF" || taskSettings.IsLLVMDisabled() || Settings.UseCacheForLLVM);
}
- TComputationPatternOpts CreatePatternOpts(TScopedAlloc& alloc, TTypeEnvironment& typeEnv) {
+ TComputationPatternOpts CreatePatternOpts(const TDqTaskSettings& task, TScopedAlloc& alloc, TTypeEnvironment& typeEnv) {
auto validatePolicy = Settings.TerminateOnError ? NUdf::EValidatePolicy::Fail : NUdf::EValidatePolicy::Exception;
auto taskRunnerFactory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
@@ -301,8 +302,14 @@ public:
if (Y_UNLIKELY(CollectFull() && !AllocatedHolder->ProgramParsed.StatsRegistry)) {
AllocatedHolder->ProgramParsed.StatsRegistry = NMiniKQL::CreateDefaultStatsRegistry();
}
+
+ TString optLLVM = Settings.OptLLVM;
+ if (task.IsLLVMDisabled()) {
+ optLLVM = "OFF";
+ }
+
TComputationPatternOpts opts(alloc.Ref(), typeEnv, taskRunnerFactory,
- Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi,
+ Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, optLLVM, EGraphPerProcess::Multi,
AllocatedHolder->ProgramParsed.StatsRegistry.Get());
if (!SecureParamsProvider) {
@@ -315,9 +322,10 @@ public:
std::shared_ptr<TPatternCacheEntry> CreateComputationPattern(const TDqTaskSettings& task, const TString& rawProgram, bool forCache, bool& canBeCached) {
canBeCached = true;
- auto entry = TComputationPatternLRUCache::CreateCacheEntry(UseSeparatePatternAlloc());
- auto& patternAlloc = UseSeparatePatternAlloc() ? entry->Alloc : Alloc();
- auto& patternEnv = UseSeparatePatternAlloc() ? entry->Env : TypeEnv();
+ const bool useSeparatePattern = UseSeparatePatternAlloc(task);
+ auto entry = TComputationPatternLRUCache::CreateCacheEntry(useSeparatePattern);
+ auto& patternAlloc = useSeparatePattern ? entry->Alloc : Alloc();
+ auto& patternEnv = useSeparatePattern ? entry->Env : TypeEnv();
patternAlloc.Ref().UseRefLocking = forCache;
{
@@ -411,7 +419,7 @@ public:
LOG(TStringBuilder() << "task: " << TaskId << ", program size: " << programSize
<< ", llvm: `" << Settings.OptLLVM << "`.");
- auto opts = CreatePatternOpts(patternAlloc, patternEnv);
+ auto opts = CreatePatternOpts(task, patternAlloc, patternEnv);
opts.SetPatternEnv(entry);
{
@@ -431,7 +439,7 @@ public:
std::shared_ptr<TPatternCacheEntry> entry;
bool canBeCached;
- if (UseSeparatePatternAlloc() && Context.PatternCache) {
+ if (UseSeparatePatternAlloc(task) && Context.PatternCache) {
auto& cache = Context.PatternCache;
auto ticket = cache->FindOrSubscribe(program.GetRaw());
if (!ticket.HasFuture()) {
@@ -454,7 +462,7 @@ public:
AllocatedHolder->ProgramParsed.PatternCacheEntry = entry;
// clone pattern using TDqTaskRunner's alloc
- auto opts = CreatePatternOpts(Alloc(), TypeEnv());
+ auto opts = CreatePatternOpts(task, Alloc(), TypeEnv());
AllocatedHolder->ProgramParsed.CompGraph = AllocatedHolder->ProgramParsed.GetPattern()->Clone(
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &TypeEnv()));
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 42df8bb027..d5353a5ecc 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -312,6 +312,10 @@ public:
return Task_->HasUseLlvm();
}
+ bool IsLLVMDisabled() const {
+ return HasUseLlvm() && !GetUseLlvm();
+ }
+
const TVector<google::protobuf::Message*>& GetSourceSettings() const {
return SourceSettings;
}
diff --git a/ydb/library/yql/dq/type_ann/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/type_ann/CMakeLists.darwin-x86_64.txt
index 6516783acb..5ab3e6db14 100644
--- a/ydb/library/yql/dq/type_ann/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/dq/type_ann/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-dq-type_ann)
target_compile_options(yql-dq-type_ann PRIVATE
@@ -20,7 +26,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
yql-dq-expr_nodes
yql-dq-proto
providers-common-provider
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-type_ann PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
)
+generate_enum_serilization(yql-dq-type_ann
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+ INCLUDE_HEADERS
+ ydb/library/yql/dq/type_ann/dq_type_ann.h
+)
diff --git a/ydb/library/yql/dq/type_ann/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/type_ann/CMakeLists.linux-aarch64.txt
index acab33a299..365e9c2700 100644
--- a/ydb/library/yql/dq/type_ann/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/dq/type_ann/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-dq-type_ann)
target_compile_options(yql-dq-type_ann PRIVATE
@@ -21,7 +27,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
yql-dq-expr_nodes
yql-dq-proto
providers-common-provider
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-type_ann PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
)
+generate_enum_serilization(yql-dq-type_ann
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+ INCLUDE_HEADERS
+ ydb/library/yql/dq/type_ann/dq_type_ann.h
+)
diff --git a/ydb/library/yql/dq/type_ann/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/type_ann/CMakeLists.linux-x86_64.txt
index acab33a299..365e9c2700 100644
--- a/ydb/library/yql/dq/type_ann/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/dq/type_ann/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-dq-type_ann)
target_compile_options(yql-dq-type_ann PRIVATE
@@ -21,7 +27,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
yql-dq-expr_nodes
yql-dq-proto
providers-common-provider
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-type_ann PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
)
+generate_enum_serilization(yql-dq-type_ann
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+ INCLUDE_HEADERS
+ ydb/library/yql/dq/type_ann/dq_type_ann.h
+)
diff --git a/ydb/library/yql/dq/type_ann/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/type_ann/CMakeLists.windows-x86_64.txt
index 6516783acb..5ab3e6db14 100644
--- a/ydb/library/yql/dq/type_ann/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/dq/type_ann/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-dq-type_ann)
target_compile_options(yql-dq-type_ann PRIVATE
@@ -20,7 +26,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
yql-dq-expr_nodes
yql-dq-proto
providers-common-provider
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-type_ann PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
)
+generate_enum_serilization(yql-dq-type_ann
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+ INCLUDE_HEADERS
+ ydb/library/yql/dq/type_ann/dq_type_ann.h
+)
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index 013be5f2d2..4b3b235ccd 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -1119,6 +1119,8 @@ TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) {
} else if (name == WideChannelsSettingName) {
settings.WideChannels = true;
settings.OutputNarrowType = tuple.Value().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ } else if (name == BlockStatusSettingName) {
+ settings.BlockStatus = FromString<EBlockStatus>(tuple.Value().Cast<TCoAtom>().Value());
}
}
@@ -1141,7 +1143,7 @@ bool TDqStageSettings::Validate(const TExprNode& stage, TExprContext& ctx) {
}
TStringBuf name = setting->Head().Content();
- if (name == IdSettingName || name == LogicalIdSettingName) {
+ if (name == IdSettingName || name == LogicalIdSettingName || name == BlockStatusSettingName) {
if (setting->ChildrenSize() != 2) {
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value"));
return false;
@@ -1155,6 +1157,10 @@ bool TDqStageSettings::Validate(const TExprNode& stage, TExprContext& ctx) {
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain ui64 value, but got: " << value->Content()));
return false;
}
+ if (name == BlockStatusSettingName && !TryFromString<EBlockStatus>(value->Content())) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Unsupported " << name << " value: " << value->Content()));
+ return false;
+ }
} else if (name == WideChannelsSettingName) {
if (setting->ChildrenSize() != 2) {
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value"));
@@ -1229,6 +1235,13 @@ NNodes::TCoNameValueTupleList TDqStageSettings::BuildNode(TExprContext& ctx, TPo
.Done());
}
+ if (BlockStatus.Defined()) {
+ settings.push_back(Build<TCoNameValueTuple>(ctx, pos)
+ .Name().Build(BlockStatusSettingName)
+ .Value<TCoAtom>().Build(ToString(*BlockStatus))
+ .Done());
+ }
+
return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
.Done();
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h
index 1df5cbbc02..c73169e8fc 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.h
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h
@@ -38,6 +38,7 @@ struct TDqStageSettings {
static constexpr TStringBuf IdSettingName = "_id";
static constexpr TStringBuf SinglePartitionSettingName = "_single_partition";
static constexpr TStringBuf WideChannelsSettingName = "_wide_channels";
+ static constexpr TStringBuf BlockStatusSettingName = "_block_status";
ui64 LogicalId = 0;
TString Id;
@@ -46,8 +47,17 @@ struct TDqStageSettings {
bool WideChannels = false;
const TStructExprType* OutputNarrowType = nullptr;
+ enum class EBlockStatus {
+ None,
+ Partial,
+ Full,
+ };
+
+ TMaybe<EBlockStatus> BlockStatus;
+
TDqStageSettings& SetSinglePartition(bool value = true) { SinglePartition = value; return *this; }
TDqStageSettings& SetWideChannels(const TStructExprType& narrowType) { WideChannels = true; OutputNarrowType = &narrowType; return *this; }
+ TDqStageSettings& SetBlockStatus(EBlockStatus status) { BlockStatus = status; return *this; }
static TDqStageSettings New(const NNodes::TDqStageBase& node);
static TDqStageSettings New();
diff --git a/ydb/library/yql/dq/type_ann/ya.make b/ydb/library/yql/dq/type_ann/ya.make
index cb0daa7629..c6a6fe80d8 100644
--- a/ydb/library/yql/dq/type_ann/ya.make
+++ b/ydb/library/yql/dq/type_ann/ya.make
@@ -15,4 +15,6 @@ SRCS(
YQL_LAST_ABI_VERSION()
+GENERATE_ENUM_SERIALIZATION(dq_type_ann.h)
+
END()
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 464a9382a3..5aac22998d 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -86,6 +86,7 @@ TDqConfiguration::TDqConfiguration() {
EnableDqReplicate = true;
}
});
+ REGISTER_SETTING(*this, DisableLLVMForBlockStages);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index fcb32b1c62..bd70221220 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -123,6 +123,7 @@ struct TDqSettings {
NCommon::TConfSetting<bool, false> _SkipRevisionCheck;
NCommon::TConfSetting<bool, false> UseBlockReader;
NCommon::TConfSetting<ESpillingEngine, false> SpillingEngine;
+ NCommon::TConfSetting<bool, false> DisableLLVMForBlockStages;
// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index bf404a4410..0719bb3b92 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -457,6 +457,13 @@ namespace NYql::NDqs {
taskDesc.MutableMeta()->PackFrom(taskMeta);
taskDesc.SetStageId(stageId);
+ if (Settings->DisableLLVMForBlockStages.Get().GetOrElse(true)) {
+ auto& stage = TasksGraph.GetStageInfo(task.StageId).Meta.Stage;
+ auto settings = TDqStageSettings::Parse(stage);
+ if (settings.BlockStatus.Defined() && settings.BlockStatus == TDqStageSettings::EBlockStatus::Full) {
+ taskDesc.SetUseLlvm(false);
+ }
+ }
plan.emplace_back(std::move(taskDesc));
}
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index cb43eab7a9..e62280543f 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -261,6 +261,95 @@ private:
TDqStatePtr State_;
};
+TExprNode::TPtr DqMarkBlockStage(const TDqPhyStage& stage, TExprContext& ctx) {
+ using NDq::TDqStageSettings;
+ TDqStageSettings settings = NDq::TDqStageSettings::Parse(stage);
+ if (settings.BlockStatus.Defined()) {
+ return stage.Ptr();
+ }
+
+ TExprNode::TPtr root = stage.Program().Body().Ptr();
+
+ // scalar channel as output
+ if (root->IsCallable("FromFlow")) {
+ root = root->HeadPtr();
+ }
+ if (root->IsCallable("WideFromBlocks")) {
+ root = root->HeadPtr();
+ }
+
+ const TTypeAnnotationNode* nodeType = root->GetTypeAnn();
+ YQL_ENSURE(nodeType);
+ auto blockStatus = IsWideSequenceBlockType(*nodeType) ? TDqStageSettings::EBlockStatus::Full : TDqStageSettings::EBlockStatus::None;
+ bool stop = false;
+
+ VisitExpr(root, [&](const TExprNode::TPtr& node) {
+ if (stop || node->IsLambda() || node->IsArgument()) {
+ return false;
+ }
+
+ if (node->IsCallable("WideToBlocks") && node->Head().IsCallable("ToFlow") && node->Head().Head().IsArgument()) {
+ // scalar channel as input
+ return false;
+ }
+
+ const TTypeAnnotationNode* nodeType = node->GetTypeAnn();
+ YQL_ENSURE(nodeType);
+
+ if (nodeType->GetKind() != ETypeAnnotationKind::Stream && nodeType->GetKind() != ETypeAnnotationKind::Flow) {
+ return false;
+ }
+
+ const bool isBlock = IsWideSequenceBlockType(*nodeType);
+ if (blockStatus == TDqStageSettings::EBlockStatus::Full && !isBlock ||
+ blockStatus == TDqStageSettings::EBlockStatus::None && isBlock)
+ {
+ blockStatus = TDqStageSettings::EBlockStatus::Partial;
+ }
+
+ if (blockStatus == TDqStageSettings::EBlockStatus::Partial) {
+ stop = true;
+ return false;
+ }
+
+ return true;
+ });
+
+ YQL_CLOG(INFO, CoreDq) << "Setting block status for stage #" << settings.LogicalId << " = " << ToString(blockStatus);
+ return Build<TDqPhyStage>(ctx, stage.Pos())
+ .InitFrom(stage)
+ .Settings(settings.SetBlockStatus(blockStatus).BuildNode(ctx, stage.Settings().Pos()))
+ .Done().Ptr();
+}
+
+struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator {
+public:
+ TDqsFinalPipelineConfigurator() = default;
+private:
+ void AfterCreate(TTransformationPipeline*) const final {}
+
+ void AfterTypeAnnotation(TTransformationPipeline*) const final {}
+
+ void AfterOptimize(TTransformationPipeline* pipeline) const final {
+ auto typeCtx = pipeline->GetTypeAnnotationContext();
+ pipeline->Add(CreateFunctorTransformer(
+ [typeCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ TOptimizeExprSettings optSettings{typeCtx.Get()};
+ optSettings.VisitLambdas = false;
+ return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ TExprBase expr{node};
+ if (auto stage = expr.Maybe<TDqPhyStage>()) {
+ return DqMarkBlockStage(stage.Cast(), ctx);
+ }
+ return node;
+ }, ctx, optSettings);
+ }
+ ),
+ "DqAfterPeephole",
+ TIssuesIds::DEFAULT_ERROR);
+ }
+};
+
class TDqExecTransformer: public TExecTransformerBase, TCounters
{
public:
@@ -1750,8 +1839,10 @@ private:
IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const {
TDqsPipelineConfigurator peepholeConfig(State);
+ TDqsFinalPipelineConfigurator finalPeepholeConfg;
TPeepholeSettings peepholeSettings;
peepholeSettings.CommonConfig = &peepholeConfig;
+ peepholeSettings.FinalConfig = &finalPeepholeConfg;
bool hasNonDeterministicFunctions;
auto status = PeepHoleOptimizeNode(input, output, ctx, *State->TypeCtx, nullptr, hasNonDeterministicFunctions, peepholeSettings);
if (status.Level != TStatus::Ok) {