diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-17 17:06:23 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-17 17:27:48 +0300 |
commit | 62f206670154d466db83d498ab9075d3cad7bd43 (patch) | |
tree | aca353ca30ddca98f3c5773fb94ec0fadbe95598 | |
parent | abe9abd80331e160103849c9bed168bcf9f67361 (diff) | |
download | ydb-62f206670154d466db83d498ab9075d3cad7bd43.tar.gz |
YT block input for table content
commit_hash:6ece06798fc8cef2e4d8e62cf5b9634b3162aa45
25 files changed, 569 insertions, 29 deletions
diff --git a/yt/yql/providers/yt/codec/yt_codec.h b/yt/yql/providers/yt/codec/yt_codec.h index f7ea0e0b78..4e6ef543a5 100644 --- a/yt/yql/providers/yt/codec/yt_codec.h +++ b/yt/yql/providers/yt/codec/yt_codec.h @@ -133,6 +133,10 @@ public: UseBlockOutput_ = true; } + void SetIsTableContent() { + IsTableContent_ = true; + } + void SetTableOffsets(const TVector<ui64>& offsets); void Clear(); @@ -148,6 +152,7 @@ public: bool UseSkiff_ = false; bool UseBlockInput_ = false; bool UseBlockOutput_ = false; + bool IsTableContent_ = false; TString OptLLVM_; TSystemFields SystemFields_; diff --git a/yt/yql/providers/yt/codec/yt_codec_io.cpp b/yt/yql/providers/yt/codec/yt_codec_io.cpp index 82b1a3e2f8..459602d9cf 100644 --- a/yt/yql/providers/yt/codec/yt_codec_io.cpp +++ b/yt/yql/providers/yt/codec/yt_codec_io.cpp @@ -1536,7 +1536,7 @@ public: YQL_ENSURE(inputFields.size() == ColumnConverters_.size()); auto rowIndices = batch->GetColumnByName("$row_index"); - YQL_ENSURE(rowIndices || decoder.Dynamic); + YQL_ENSURE(rowIndices || decoder.Dynamic || Specs_.IsTableContent_); arrow::compute::ExecContext execContext(Pool_); std::vector<arrow::Datum> convertedBatch; diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp index aee7b92e65..03443e6409 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.cpp +++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp @@ -524,6 +524,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx) REGISTER_SETTING(*this, MaxColumnGroups); REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount); REGISTER_SETTING(*this, JobBlockInput); + REGISTER_SETTING(*this, JobBlockTableContent); REGISTER_SETTING(*this, JobBlockOutput).Parser([](const TString& v) { return FromString<EBlockOutputMode>(v); }); REGISTER_SETTING(*this, _EnableYtDqProcessWriteConstraints); REGISTER_SETTING(*this, CompactForDistinct); diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h index 85cd91402a..88b12afff2 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.h +++ b/yt/yql/providers/yt/common/yql_yt_settings.h @@ -294,6 +294,7 @@ struct TYtSettings { NCommon::TConfSetting<ui16, false> MaxColumnGroups; NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount; NCommon::TConfSetting<bool, false> JobBlockInput; + NCommon::TConfSetting<bool, false> JobBlockTableContent; NCommon::TConfSetting<TSet<TString>, false> JobBlockInputSupportedTypes; NCommon::TConfSetting<TSet<NUdf::EDataSlot>, false> JobBlockInputSupportedDataTypes; NCommon::TConfSetting<EBlockOutputMode, false> JobBlockOutput; diff --git a/yt/yql/providers/yt/comp_nodes/ya.make.inc b/yt/yql/providers/yt/comp_nodes/ya.make.inc index b0a54d85a4..2e5ae696d4 100644 --- a/yt/yql/providers/yt/comp_nodes/ya.make.inc +++ b/yt/yql/providers/yt/comp_nodes/ya.make.inc @@ -4,11 +4,13 @@ INCLUDE(${ARCADIA_ROOT}/yql/essentials/minikql/invoke_builtins/header.ya.make.in SET(ORIG_SRC_DIR ${ARCADIA_ROOT}/yt/yql/providers/yt/comp_nodes) SET(ORIG_SOURCES + yql_mkql_file_block_stream.cpp yql_mkql_file_input_state.cpp yql_mkql_file_list.cpp yql_mkql_input_stream.cpp yql_mkql_input.cpp yql_mkql_output.cpp + yql_mkql_block_table_content.cpp yql_mkql_table_content.cpp yql_mkql_table.cpp yql_mkql_ungrouping_list.cpp diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp new file mode 100644 index 0000000000..d935da2004 --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp @@ -0,0 +1,73 @@ +#include "yql_mkql_block_table_content.h" +#include "yql_mkql_file_block_stream.h" + +#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h> +#include <yql/essentials/minikql/mkql_node_cast.h> +#include <yql/essentials/minikql/defs.h> + +#include <yql/essentials/public/udf/udf_value.h> + +#include <util/generic/vector.h> +#include <util/generic/string.h> +#include <util/generic/size_literals.h> + +namespace NYql { + +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +class TYtBlockTableContentWrapper : public TMutableComputationNode<TYtBlockTableContentWrapper> { + typedef TMutableComputationNode<TYtBlockTableContentWrapper> TBaseComputation; +public: + TYtBlockTableContentWrapper(TComputationMutables& mutables, NCommon::TCodecContext& codecCtx, + TVector<TString>&& files, const TString& inputSpec, TStructType* origStructType, bool decompress, std::optional<ui64> expectedRowCount) + : TBaseComputation(mutables) + , Files_(std::move(files)) + , Decompress_(decompress) + , ExpectedRowCount_(std::move(expectedRowCount)) + { + Spec_.SetUseBlockInput(); + Spec_.SetIsTableContent(); + Spec_.Init(codecCtx, inputSpec, {}, {}, origStructType, {}, TString()); + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TFileWideBlockStreamValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_); + } + +private: + void RegisterDependencies() const final {} + + TMkqlIOSpecs Spec_; + TVector<TString> Files_; + const bool Decompress_; + const std::optional<ui64> ExpectedRowCount_; +}; + +IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx, + TComputationMutables& mutables, TCallable& callable, TStringBuf pathPrefix) +{ + MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 arguments"); + TString uniqueId(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); + auto origStructType = AS_TYPE(TStructType, AS_VALUE(TTypeType, callable.GetInput(1))); + const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>(); + TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().AsStringRef()); + const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<bool>(); + + std::optional<ui64> length; + TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(5)); + if (lengthTuple->GetValuesCount() > 0) { + MKQL_ENSURE(lengthTuple->GetValuesCount() == 1, "Expect 1 element in the length tuple"); + length = AS_VALUE(TDataLiteral, lengthTuple->GetValue(0))->AsValue().Get<ui64>(); + } + + TVector<TString> files; + for (ui32 index = 0; index < tablesCount; ++index) { + files.push_back(TStringBuilder() << pathPrefix << uniqueId << '_' << index); + } + + return new TYtBlockTableContentWrapper(mutables, codecCtx, std::move(files), inputSpec, + origStructType, decompress, length); +} + +} // NYql diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h new file mode 100644 index 0000000000..444603406e --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h @@ -0,0 +1,18 @@ +#pragma once + +#include <yql/essentials/providers/common/codec/yql_codec.h> + +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> + +#include <util/generic/string.h> +#include <util/generic/strbuf.h> + +namespace NYql { + +NKikimr::NMiniKQL::IComputationNode* WrapYtBlockTableContent( + NYql::NCommon::TCodecContext& codecCtx, + NKikimr::NMiniKQL::TComputationMutables& mutables, + NKikimr::NMiniKQL::TCallable& callable, TStringBuf pathPrefix); + +} // NYql diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp new file mode 100644 index 0000000000..3253c3e1b1 --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp @@ -0,0 +1,50 @@ +#include "yql_mkql_file_block_stream.h" +#include "yql_mkql_file_input_state.h" + +namespace NYql { + +using namespace NKikimr::NMiniKQL; + +TFileWideBlockStreamValue::TFileWideBlockStreamValue( + TMemoryUsageInfo* memInfo, + const TMkqlIOSpecs& spec, + const THolderFactory& holderFactory, + const TVector<TString>& filePaths, + bool decompress, + size_t blockCount, + size_t blockSize, + std::optional<ui64> expectedRowCount +) + : TComputationValue(memInfo) + , Spec_(spec) + , HolderFactory_(holderFactory) + , FilePaths_(filePaths) + , Decompress_(decompress) + , BlockCount_(blockCount) + , BlockSize_(blockSize) + , ExpectedRowCount_(expectedRowCount) +{ + State_ = MakeHolder<TFileInputState>(Spec_, HolderFactory_, MakeMkqlFileInputs(FilePaths_, Decompress_), BlockCount_, BlockSize_); +} + +NUdf::EFetchStatus TFileWideBlockStreamValue::WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + if (!AtStart_) { + State_->Next(); + } + AtStart_ = false; + if (!State_->IsValid()) { + MKQL_ENSURE(!ExpectedRowCount_ || *ExpectedRowCount_ == State_->GetRecordIndex(), "Invalid file row count"); + return NUdf::EFetchStatus::Finish; + } + + auto elements = State_->GetCurrent().GetElements(); + for (ui32 i = 0; i < width; i++) { + if (auto out = output++) { + *out = elements[i]; + } + } + + return NUdf::EFetchStatus::Ok; +} + +} diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h new file mode 100644 index 0000000000..f38771c24f --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h @@ -0,0 +1,43 @@ +#pragma once + +#include "yql_mkql_file_input_state.h" + +#include <yt/yql/providers/yt/codec/yt_codec.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> + +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/generic/string.h> + +namespace NYql { + +class TFileWideBlockStreamValue : public NKikimr::NMiniKQL::TComputationValue<TFileWideBlockStreamValue> { +public: + TFileWideBlockStreamValue( + NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo, + const TMkqlIOSpecs& spec, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const TVector<TString>& filePaths, + bool decompress, + size_t blockCount, + size_t blockSize, + std::optional<ui64> expectedRowCount + ); + +private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width); + +private: + const TMkqlIOSpecs& Spec_; + const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; + const TVector<TString> FilePaths_; + const bool Decompress_; + const size_t BlockCount_; + const size_t BlockSize_; + const std::optional<ui64> ExpectedRowCount_; + + bool AtStart_ = true; + THolder<TFileInputState> State_; +}; + +} diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp index 46b2067d5e..d814246f76 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp @@ -1,5 +1,6 @@ #include "yql_mkql_file_input_state.h" +#include <yql/essentials/minikql/computation/mkql_block_impl.h> #include <yql/essentials/utils/yql_panic.h> #include <util/system/fs.h> @@ -55,7 +56,13 @@ bool TFileInputState::NextValue() { } MkqlReader_.Next(); - ++CurrentRecord_; + if (Spec_->UseBlockInput_) { + auto blockCountValue = CurrentValue_.GetElement(Spec_->Inputs[CurrentInput_]->StructSize); + CurrentRecord_ += GetBlockCount(blockCountValue); + } else { + ++CurrentRecord_; + } + return true; } } diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h index 64db03232d..1762e56d2f 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h @@ -35,7 +35,6 @@ public: OnNextBlockCallback_ = std::move(cb); } -protected: virtual bool IsValid() const override { return Valid_; } @@ -50,6 +49,7 @@ protected: virtual TString DebugInfo() const override; +protected: void Finish() { MkqlReader_.Finish(); } diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp index 4dc5e57243..8914cac7bd 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp @@ -33,6 +33,7 @@ public: if (useSkiff) { Spec_.SetUseSkiff(optLLVM); } + Spec_.SetIsTableContent(); Spec_.Init(codecCtx, inputSpec, {}, {}, AS_TYPE(TListType, listType)->GetItemType(), {}, TString()); } diff --git a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json index d7ff47f544..d881c9f565 100644 --- a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json +++ b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json @@ -204,6 +204,15 @@ ] }, { + "Name": "TYtBlockTableContent", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtBlockTableContent"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"}, + {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"} + ] + }, + { "Name": "TYtLength", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "YtLength"}, diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp index 78af8c3d70..36f936b17a 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp @@ -129,6 +129,9 @@ public: , Length_(std::move(length)) { Spec_.Init(codecCtx, inputSpecs, groups, tableNames, itemType, auxColumns, NYT::TNode()); + if constexpr (TableContent) { + Spec_.SetIsTableContent(); + } if (!rowOffsets.empty()) { Spec_.SetTableOffsets(rowOffsets); } diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp index 2d805612f9..89525fcbc5 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp @@ -596,6 +596,56 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { return values; }); + compiler.OverrideCallable(TYtBlockTableContent::CallableName(), + [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + TYtBlockTableContent tableContent(&node); + + auto origItemStructType = ( + tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back() + )->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + TMaybe<ui64> itemsCount; + if (auto setting = NYql::GetSetting(tableContent.Settings().Ref(), EYtSettingType::ItemsCount)) { + itemsCount = FromString<ui64>(setting->Child(1)->Content()); + } + const auto itemType = NCommon::BuildType(node, *origItemStructType, ctx.ProgramBuilder); + TRuntimeNode values; + if (auto maybeRead = tableContent.Input().Maybe<TYtReadTable>()) { + auto read = maybeRead.Cast(); + + const bool hasRangesOrSampling = AnyOf(read.Input(), [](const TYtSection& s) { + return NYql::HasSetting(s.Settings().Ref(), EYtSettingType::Sample) + || AnyOf(s.Paths(), [](const TYtPath& p) { return !p.Ranges().Maybe<TCoVoid>(); }); + }); + if (hasRangesOrSampling) { + itemsCount.Clear(); + } + + const bool forceKeyColumns = HasRangesWithKeyColumns(read.Input().Ref()); + values = BuildTableContentCall( + TYtTableContent::CallableName(), + itemType, + read.DataSource().Cluster().Value(), read.Input().Ref(), itemsCount, ctx, true, THashSet<TString>{"num", "index"}, forceKeyColumns); + values = ApplyPathRangesAndSampling(values, itemType, read.Input().Ref(), ctx); + } else { + auto output = tableContent.Input().Cast<TYtOutput>(); + values = BuildTableContentCall( + TYtTableContent::CallableName(), + itemType, + GetOutputOp(output).DataSink().Cluster().Value(), output.Ref(), itemsCount, ctx, true); + } + + return ctx.ProgramBuilder.WideToBlocks(ctx.ProgramBuilder.FromFlow(ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(values), [&](TRuntimeNode item) -> TRuntimeNode::TList { + TRuntimeNode::TList result; + for (auto& origItem : origItemStructType->GetItems()) { + result.push_back(ctx.ProgramBuilder.Member(item, origItem->GetName())); + } + return result; + }))); + }); + compiler.AddCallable({TYtSort::CallableName(), TYtCopy::CallableName(), TYtMerge::CallableName()}, [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp index 6feb3542dc..cc9981ccf1 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp @@ -6,6 +6,7 @@ #include <yql/essentials/core/yql_opt_utils.h> #include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_output.h> +#include <yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.h> #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h> @@ -74,6 +75,11 @@ NKikimr::NMiniKQL::TComputationNodeFactory GetGatewayNodeFactory(TCodecContext* return WrapYtTableContent(*codecCtx, ctx.Mutables, callable, "OFF" /* no LLVM for local exec */, filePrefix); } + if (callable.GetType()->GetName() == "YtBlockTableContentJob") { + YQL_ENSURE(codecCtx); + return WrapYtBlockTableContent(*codecCtx, ctx.Mutables, callable, filePrefix); + } + if (!exprContextObject) { exprContextObject = ctx.Mutables.CurValueIndex++; } diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp index d66943f868..f08309f4b9 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp @@ -74,14 +74,19 @@ TGatewayTransformer::TGatewayTransformer(const TExecContextBase& execCtx, TYtSet TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { auto name = internName.Str(); const bool small = name.SkipPrefix("Small"); - if (name == TYtTableContent::CallableName()) { + if (name == TYtTableContent::CallableName() || name == TYtBlockTableContent::CallableName()) { + bool useBlocks = (name == TYtBlockTableContent::CallableName()); *TableContentFlag_ = true; *RemoteExecutionFlag_ = *RemoteExecutionFlag_ || !small; if (EPhase::Content == Phase_ || EPhase::All == Phase_) { - return [&](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) { - YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + return [&, name, useBlocks](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) { + if (useBlocks) { + YQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); + } else { + YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + } const TString cluster(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); const TString& server = ExecCtx_.Clusters_->GetServer(cluster); @@ -90,7 +95,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { auto tx = entry->Tx; auto deliveryMode = ForceLocalTableContent_ ? ETableContentDeliveryMode::File : Settings_->TableContentDeliveryMode.Get(cluster).GetOrElse(ETableContentDeliveryMode::Native); - bool useSkiff = Settings_->TableContentUseSkiff.Get(cluster).GetOrElse(DEFAULT_USE_SKIFF); + bool useSkiff = !useBlocks && Settings_->TableContentUseSkiff.Get(cluster).GetOrElse(DEFAULT_USE_SKIFF); const bool ensureOldTypesOnly = !useSkiff; const ui64 maxChunksForNativeDelivery = Settings_->TableContentMaxChunksForNativeDelivery.Get().GetOrElse(1000ul); TString contentTmpFolder = ForceLocalTableContent_ ? TString() : Settings_->TableContentTmpFolder.Get(cluster).GetOrElse(TString()); @@ -116,6 +121,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { THashMap<TString, ui32> structColumns; if (useSkiff) { + YQL_ENSURE(!useBlocks); auto itemType = AS_TYPE(TListType, callable.GetType()->GetReturnType())->GetItemType(); TStructType* itemTypeStruct = AS_TYPE(TStructType, itemType); if (itemTypeStruct->GetMembersCount() == 0) { @@ -151,7 +157,10 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { refName = res.first->second; } tablesNode.Add(refName); - if (useSkiff) { + if (useBlocks) { + NYT::TNode formatNode("arrow"); + formats.push_back(formatNode); + } else if (useSkiff) { formats.push_back(SingleTableSpecToInputSkiff(specNode, structColumns, false, false, false)); } else { if (ensureOldTypesOnly && specNode.HasKey(YqlRowSpecAttribute)) { @@ -304,15 +313,24 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { } TCallableBuilder call(env, - TStringBuilder() << TYtTableContent::CallableName() << TStringBuf("Job"), + TStringBuilder() << name << TStringBuf("Job"), callable.GetType()->GetReturnType()); + if (useBlocks) { + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId)); + call.Add(callable.GetInput(4)); // orig struct type + call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount())); + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode))); + call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression + call.Add(callable.GetInput(3)); // length + } else { + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId)); + call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount())); + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode))); + call.Add(PgmBuilder_.NewDataLiteral(useSkiff)); + call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression + call.Add(callable.GetInput(3)); // length + } - call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId)); - call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount())); - call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode))); - call.Add(PgmBuilder_.NewDataLiteral(useSkiff)); - call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression - call.Add(callable.GetInput(3)); // length return TRuntimeNode(call.Build(), false); }; } diff --git a/yt/yql/providers/yt/job/yql_job_factory.cpp b/yt/yql/providers/yt/job/yql_job_factory.cpp index 6ff4a02606..6b7cd4fd69 100644 --- a/yt/yql/providers/yt/job/yql_job_factory.cpp +++ b/yt/yql/providers/yt/job/yql_job_factory.cpp @@ -3,6 +3,7 @@ #include <yt/yql/providers/yt/comp_nodes/yql_mkql_input.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_output.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.h> +#include <yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h> #include <yql/essentials/providers/common/comp_nodes/yql_factory.h> #include <yql/essentials/minikql/comp_nodes/mkql_factories.h> #include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h> @@ -24,6 +25,9 @@ TComputationNodeFactory GetJobFactory(NYql::NCommon::TCodecContext& codecCtx, co if (name == "TableContent") { return WrapYtTableContent(codecCtx, ctx.Mutables, callable, optLLVM, {} /*empty pathPrefix inside job*/); } + if (name == "BlockTableContent") { + return WrapYtBlockTableContent(codecCtx, ctx.Mutables, callable, {} /*empty pathPrefix inside job*/); + } if (name == "Input") { YQL_ENSURE(reader); YQL_ENSURE(specs); diff --git a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp index f92f152c31..bda09645b4 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp @@ -23,6 +23,7 @@ public: { #define HNDL(name) "YtBlockInput-"#name, Hndl(&TYtBlockInputTransformer::name) AddHandler(0, &TYtMap::Match, HNDL(TryTransformMap)); + AddHandler(0, &TYtTableContent::Match, HNDL(TryTransformTableContent)); #undef HNDL } @@ -71,6 +72,50 @@ private: return EnsureWideFlowType(map.Mapper().Args().Arg(0).Ref(), ctx); } + TMaybeNode<TExprBase> TryTransformTableContent(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + auto tableContent = node.Cast<TYtTableContent>(); + if (!NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) { + return tableContent; + } + + const TParentsMap* parentsMap = getParents(); + if (auto it = parentsMap->find(tableContent.Raw()); it != parentsMap->end() && it->second.size() > 1) { + return tableContent; + } + + YQL_CLOG(INFO, ProviderYt) << "Rewrite YtTableContent with block input"; + + auto inputStructType = GetSeqItemType(tableContent.Ref().GetTypeAnn())->Cast<TStructExprType>(); + auto asStructBuilder = Build<TCoAsStruct>(ctx, tableContent.Pos()); + TExprNode::TListType narrowMapArgs; + for (auto& item : inputStructType->GetItems()) { + auto arg = ctx.NewArgument(tableContent.Pos(), item->GetName()); + asStructBuilder.Add<TCoNameValueTuple>() + .Name().Build(item->GetName()) + .Value(arg) + .Build(); + narrowMapArgs.push_back(std::move(arg)); + } + + auto settings = RemoveSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady, ctx); + return Build<TCoForwardList>(ctx, tableContent.Pos()) + .Stream<TCoNarrowMap>() + .Input<TCoToFlow>() + .Input<TCoWideFromBlocks>() + .Input<TYtBlockTableContent>() + .Input(tableContent.Input()) + .Settings(settings) + .Build() + .Build() + .Build() + .Lambda() + .Args(narrowMapArgs) + .Body(asStructBuilder.Done()) + .Build() + .Build() + .Done(); + } + private: const TYtState::TPtr State_; }; diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp b/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp index 2493b340f2..3a1e05d9aa 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp @@ -23,6 +23,7 @@ public: #define HNDL(name) "YtBlockIOFilter-"#name, Hndl(&YtBlockIOFilterTransformer::name) AddHandler(0, &TYtMap::Match, HNDL(HandleMapInput)); AddHandler(0, &TYtMap::Match, HNDL(HandleMapOutput)); + AddHandler(0, &TYtTableContent::Match, HNDL(HandleTableContent)); #undef HNDL } @@ -112,11 +113,12 @@ private: } } + auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT); auto supportedTypes = State_->Configuration->JobBlockInputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_TYPES); auto supportedDataTypes = State_->Configuration->JobBlockInputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_DATA_TYPES); auto lambdaInputType = map.Mapper().Args().Arg(0).Ref().GetTypeAnn(); - if (!CheckBlockIOSupportedTypes(*lambdaInputType, supportedTypes, supportedDataTypes, [](const TString&) {})) { + if (!CheckBlockIOSupportedTypes(*lambdaInputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit)) { return false; } @@ -128,11 +130,12 @@ private: return false; } + auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT); auto supportedTypes = State_->Configuration->JobBlockOutputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_OUTPUT_SUPPORTED_TYPES); auto supportedDataTypes = State_->Configuration->JobBlockOutputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_OUTPUT_SUPPORTED_DATA_TYPES); auto lambdaOutputType = map.Mapper().Ref().GetTypeAnn(); - if (!CheckBlockIOSupportedTypes(*lambdaOutputType, supportedTypes, supportedDataTypes, [](const TString&) {}, false)) { + if (!CheckBlockIOSupportedTypes(*lambdaOutputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit, false)) { return false; } @@ -150,6 +153,66 @@ private: } } + TMaybeNode<TExprBase> HandleTableContent(TExprBase node, TExprContext& ctx) const { + auto tableContent = node.Cast<TYtTableContent>(); + if (NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) { + return tableContent; + } + + if (!State_->Configuration->JobBlockTableContent.Get().GetOrElse(Types->UseBlocks)) { + return tableContent; + } + + auto settings = tableContent.Settings().Ptr(); + bool canUseBlockInput = CanUseBlockInputForTableContent(tableContent); + bool hasSetting = HasSetting(*settings, EYtSettingType::BlockInputReady); + if (canUseBlockInput && !hasSetting) { + settings = AddSetting(*settings, EYtSettingType::BlockInputReady, TExprNode::TPtr(), ctx); + } else if (!canUseBlockInput && hasSetting) { + settings = RemoveSetting(*settings, EYtSettingType::BlockInputReady, ctx); + } else { + return tableContent; + } + return Build<TYtTableContent>(ctx, node.Pos()) + .InitFrom(tableContent) + .Settings(settings) + .Done(); + } + + bool CanUseBlockInputForTableContent(const TYtTableContent& tableContent) const { + if (auto readTable = tableContent.Input().Maybe<TYtReadTable>()) { + if (readTable.Cast().Input().Size() > 1) { + return false; + } + + for (auto path : readTable.Cast().Input().Item(0).Paths()) { + if (!IsYtTableSuitableForArrowInput(path.Table(), [](const TString&) {})) { + return false; + } + } + + } else if (auto output = tableContent.Input().Maybe<TYtOutput>()) { + auto outTable = GetOutTable(output.Cast()); + if (!IsYtTableSuitableForArrowInput(outTable, [](const TString&) {})) { + return false; + } + + } else { + YQL_ENSURE(false, "Expected " << TYtReadTable::CallableName() << " or " << TYtOutput::CallableName()); + } + + auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT); + auto supportedTypes = State_->Configuration->JobBlockInputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_TYPES); + auto supportedDataTypes = State_->Configuration->JobBlockInputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_DATA_TYPES); + + auto inputType = tableContent.Ref().GetTypeAnn(); + if (!CheckBlockIOSupportedTypes(*inputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit)) { + return false; + } + + return true; + } + private: const TYtState::TPtr State_; const THolder<IGraphTransformer> Finalizer_; diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp index 0f1041f4bb..639fa14ca0 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp @@ -1,6 +1,7 @@ #include "yql_yt_block_io_utils.h" #include <yql/essentials/core/yql_opt_utils.h> +#include <yt/yql/providers/yt/provider/yql_yt_provider.h> namespace NYql { @@ -9,21 +10,20 @@ bool CheckBlockIOSupportedTypes( const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler, + size_t wideFlowLimit, bool allowNestedOptionals ) { - auto itemType = type.Cast<TFlowExprType>()->GetItemType(); - if (itemType->GetKind() == ETypeAnnotationKind::Multi) { - auto& itemTypes = itemType->Cast<TMultiExprType>()->GetItems(); - if (itemTypes.empty()) { - return false; - } + auto& itemType = GetSeqItemType(type); + if (itemType.GetKind() == ETypeAnnotationKind::Multi) { + auto& itemTypes = itemType.Cast<TMultiExprType>()->GetItems(); if (!CheckSupportedTypes(itemTypes, supportedTypes, supportedDataTypes, std::move(unsupportedTypeHandler), allowNestedOptionals)) { return false; } - } else if (itemType->GetKind() == ETypeAnnotationKind::Struct) { - auto& items = itemType->Cast<TStructExprType>()->GetItems(); - if (items.empty()) { + } else if (itemType.GetKind() == ETypeAnnotationKind::Struct) { + auto& items = itemType.Cast<TStructExprType>()->GetItems(); + if (items.empty() || items.size() > wideFlowLimit) { + unsupportedTypeHandler("fields count doesn't satisfy wide flow requirements"); return false; } diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h index 58e94c9443..dc72518fe6 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h +++ b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h @@ -9,6 +9,7 @@ bool CheckBlockIOSupportedTypes( const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler, + size_t wideFlowLimit, bool allowNestedOptionals = true ); diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp index 8eada7e706..fcad1669af 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp @@ -27,6 +27,7 @@ public: AddHandler({TYtSection::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleSection)); AddHandler({TYtReadTable::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleReadTable)); AddHandler({TYtTableContent::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleTableContent)); + AddHandler({TYtBlockTableContent::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleBlockTableContent)); AddHandler({TYtIsKeySwitch::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleDefault)); AddHandler({TYqlRowSpec::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleDefault)); @@ -188,6 +189,45 @@ public: return TStatus::Ok; } + TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) { + TYtBlockTableContent tableContent = input.Cast<TYtBlockTableContent>(); + + auto listType = tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); + auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + auto pathRename = [&](TPartOfConstraintBase::TPathType path) -> std::vector<TPartOfConstraintBase::TPathType> { + YQL_ENSURE(!path.empty()); + + auto fieldIndex = itemStructType->FindItem(path[0]); + YQL_ENSURE(fieldIndex.Defined()); + + path[0] = ctx.GetIndexAsString(*fieldIndex); + return { path }; + }; + + TConstraintSet wideConstraints; + for (auto constraint : tableContent.Input().Ref().GetAllConstraints()) { + if (auto empty = dynamic_cast<const TEmptyConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(ctx.MakeConstraint<TEmptyConstraintNode>()); + } else if (auto sorted = dynamic_cast<const TSortedConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(sorted->RenameFields(ctx, pathRename)); + } else if (auto chopped = dynamic_cast<const TChoppedConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(chopped->RenameFields(ctx, pathRename)); + } else if (auto unique = dynamic_cast<const TUniqueConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(unique->RenameFields(ctx, pathRename)); + } else if (auto distinct = dynamic_cast<const TDistinctConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(distinct->RenameFields(ctx, pathRename)); + } else { + YQL_ENSURE(false, "unexpected constraint"); + } + } + + input.Ptr()->SetConstraints(wideConstraints); + return TStatus::Ok; + } + TStatus HandleDefault(TExprBase input, TExprContext& /*ctx*/) { return UpdateAllChildLambdasConstraints(input.Ref()); } diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp index 4f3ffa5444..3447fbf35b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp @@ -46,6 +46,7 @@ public: AddHandler({TYtReadTable::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleReadTable)); AddHandler({TYtReadTableScheme::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleReadTableScheme)); AddHandler({TYtTableContent::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleTableContent)); + AddHandler({TYtBlockTableContent::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleBlockTableContent)); AddHandler({TYtLength::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleLength)); AddHandler({TCoConfigure::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleConfigure)); AddHandler({TYtConfigure::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleYtConfigure)); @@ -846,7 +847,8 @@ public: return TStatus::Error; } - const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split | EYtSettingType::Small; + const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor + | EYtSettingType::Split | EYtSettingType::Small | EYtSettingType::BlockInputReady; if (!ValidateSettings(tableContent.Settings().Ref(), allowed, ctx)) { return TStatus::Error; } @@ -858,6 +860,49 @@ public: if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) { return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx); } + + return TStatus::Ok; + } + + TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) { + if (!EnsureArgsCount(input.Ref(), 2, ctx)) { + return TStatus::Error; + } + + const auto tableContent = input.Cast<TYtBlockTableContent>(); + + if (!tableContent.Input().Ref().IsCallable(TYtReadTable::CallableName()) + && !tableContent.Input().Ref().IsCallable(TYtOutput::CallableName())) { + ctx.AddError(TIssue(ctx.GetPosition(tableContent.Input().Pos()), TStringBuilder() + << "Expected " << TYtReadTable::CallableName() << " or " << TYtOutput::CallableName())); + return TStatus::Error; + } + + if (!EnsureTuple(tableContent.Settings().MutableRef(), ctx)) { + return TStatus::Error; + } + + const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split | EYtSettingType::Small; + if (!ValidateSettings(tableContent.Settings().Ref(), allowed, ctx)) { + return TStatus::Error; + } + + auto listType = tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); + auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + TTypeAnnotationNode::TListType multiTypeItems; + for (auto& item: itemStructType->GetItems()) { + multiTypeItems.emplace_back(ctx.MakeType<TBlockExprType>(item->GetItemType())); + } + multiTypeItems.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))); + input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TMultiExprType>(multiTypeItems))); + + if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) { + return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx); + } + return TStatus::Ok; } diff --git a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp index 3ad3f683d9..1b51ea6baa 100644 --- a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp @@ -49,6 +49,8 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, TType* const tupleTypeTables = ctx.ProgramBuilder.NewTupleType({strType, boolType, strType, ui64Type, ui64Type, boolType, ui32Type}); TType* const listTypeGroup = ctx.ProgramBuilder.NewListType(tupleTypeTables); + bool useBlocks = callName.EndsWith(TYtBlockTableContent::CallableName()); + const TExprNode* settings = nullptr; TMaybe<TSampleParams> sampling; TVector<TRuntimeNode> groups; @@ -227,9 +229,23 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, samplingTupleItems.push_back(ctx.ProgramBuilder.NewDataLiteral(isSystemSampling)); } - auto outListType = ctx.ProgramBuilder.NewListType(outItemType); + TType* outType = nullptr; + if (useBlocks) { + auto structType = AS_TYPE(TStructType, outItemType); + + std::vector<TType*> outputItems; + outputItems.reserve(structType->GetMembersCount()); + for (size_t i = 0; i < structType->GetMembersCount(); i++) { + outputItems.push_back(ctx.ProgramBuilder.NewBlockType(structType->GetMemberType(i), TBlockType::EShape::Many)); + } + outputItems.push_back(ctx.ProgramBuilder.NewBlockType(ctx.ProgramBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); + outType = ctx.ProgramBuilder.NewStreamType(ctx.ProgramBuilder.NewMultiType(outputItems)); + + } else { + outType = ctx.ProgramBuilder.NewListType(outItemType); + } - TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outListType); + TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outType); call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(clusterName)); // cluster name call.Add(ctx.ProgramBuilder.NewList(listTypeGroup, groups)); @@ -241,6 +257,10 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, call.Add(ctx.ProgramBuilder.NewEmptyTuple()); } + if (useBlocks) { + call.Add(TRuntimeNode(outItemType, true)); + } + auto res = TRuntimeNode(call.Build(), false); if (settings) { @@ -479,6 +499,41 @@ void RegisterYtMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { } }); + compiler.AddCallable(TYtBlockTableContent::CallableName(), + [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + TYtBlockTableContent tableContent(&node); + if (node.GetConstraint<TEmptyConstraintNode>()) { + const auto streamType = ctx.BuildType(node, *node.GetTypeAnn()); + return ctx.ProgramBuilder.EmptyIterator(streamType); + } + + auto origItemStructType = ( + tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back() + )->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + TMaybe<ui64> itemsCount; + TString name = ToString(TYtBlockTableContent::CallableName()); + if (auto setting = NYql::GetSetting(tableContent.Settings().Ref(), EYtSettingType::ItemsCount)) { + itemsCount = FromString<ui64>(setting->Child(1)->Content()); + } + if (NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::Small)) { + name.prepend("Small"); + } + if (auto maybeRead = tableContent.Input().Maybe<TYtReadTable>()) { + auto read = maybeRead.Cast(); + return BuildTableContentCall(name, + ctx.BuildType(node, *origItemStructType), + read.DataSource().Cluster().Value(), read.Input().Ref(), itemsCount, ctx, true); + } else { + auto output = tableContent.Input().Cast<TYtOutput>(); + return BuildTableContentCall(name, + ctx.BuildType(node, *origItemStructType), + GetOutputOp(output).DataSink().Cluster().Value(), output.Ref(), itemsCount, ctx, true); + } + }); + compiler.AddCallable({TYtTablePath::CallableName(), TYtTableRecord::CallableName(), TYtTableIndex::CallableName(), TYtIsKeySwitch::CallableName(), TYtRowNumber::CallableName()}, [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { auto dataSlot = node.GetTypeAnn()->Cast<TDataExprType>()->GetSlot(); |