aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-02-17 17:06:23 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-02-17 17:27:48 +0300
commit62f206670154d466db83d498ab9075d3cad7bd43 (patch)
treeaca353ca30ddca98f3c5773fb94ec0fadbe95598
parentabe9abd80331e160103849c9bed168bcf9f67361 (diff)
downloadydb-62f206670154d466db83d498ab9075d3cad7bd43.tar.gz
YT block input for table content
commit_hash:6ece06798fc8cef2e4d8e62cf5b9634b3162aa45
-rw-r--r--yt/yql/providers/yt/codec/yt_codec.h5
-rw-r--r--yt/yql/providers/yt/codec/yt_codec_io.cpp2
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.cpp1
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--yt/yql/providers/yt/comp_nodes/ya.make.inc2
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp73
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h18
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp50
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h43
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp9
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h2
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp1
-rw-r--r--yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json9
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp3
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp50
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp6
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp42
-rw-r--r--yt/yql/providers/yt/job/yql_job_factory.cpp4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_block_input.cpp45
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp67
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp18
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_block_io_utils.h1
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp40
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp47
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp59
25 files changed, 569 insertions, 29 deletions
diff --git a/yt/yql/providers/yt/codec/yt_codec.h b/yt/yql/providers/yt/codec/yt_codec.h
index f7ea0e0b78..4e6ef543a5 100644
--- a/yt/yql/providers/yt/codec/yt_codec.h
+++ b/yt/yql/providers/yt/codec/yt_codec.h
@@ -133,6 +133,10 @@ public:
UseBlockOutput_ = true;
}
+ void SetIsTableContent() {
+ IsTableContent_ = true;
+ }
+
void SetTableOffsets(const TVector<ui64>& offsets);
void Clear();
@@ -148,6 +152,7 @@ public:
bool UseSkiff_ = false;
bool UseBlockInput_ = false;
bool UseBlockOutput_ = false;
+ bool IsTableContent_ = false;
TString OptLLVM_;
TSystemFields SystemFields_;
diff --git a/yt/yql/providers/yt/codec/yt_codec_io.cpp b/yt/yql/providers/yt/codec/yt_codec_io.cpp
index 82b1a3e2f8..459602d9cf 100644
--- a/yt/yql/providers/yt/codec/yt_codec_io.cpp
+++ b/yt/yql/providers/yt/codec/yt_codec_io.cpp
@@ -1536,7 +1536,7 @@ public:
YQL_ENSURE(inputFields.size() == ColumnConverters_.size());
auto rowIndices = batch->GetColumnByName("$row_index");
- YQL_ENSURE(rowIndices || decoder.Dynamic);
+ YQL_ENSURE(rowIndices || decoder.Dynamic || Specs_.IsTableContent_);
arrow::compute::ExecContext execContext(Pool_);
std::vector<arrow::Datum> convertedBatch;
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp
index aee7b92e65..03443e6409 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp
@@ -524,6 +524,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
REGISTER_SETTING(*this, MaxColumnGroups);
REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
REGISTER_SETTING(*this, JobBlockInput);
+ REGISTER_SETTING(*this, JobBlockTableContent);
REGISTER_SETTING(*this, JobBlockOutput).Parser([](const TString& v) { return FromString<EBlockOutputMode>(v); });
REGISTER_SETTING(*this, _EnableYtDqProcessWriteConstraints);
REGISTER_SETTING(*this, CompactForDistinct);
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h
index 85cd91402a..88b12afff2 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.h
+++ b/yt/yql/providers/yt/common/yql_yt_settings.h
@@ -294,6 +294,7 @@ struct TYtSettings {
NCommon::TConfSetting<ui16, false> MaxColumnGroups;
NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
NCommon::TConfSetting<bool, false> JobBlockInput;
+ NCommon::TConfSetting<bool, false> JobBlockTableContent;
NCommon::TConfSetting<TSet<TString>, false> JobBlockInputSupportedTypes;
NCommon::TConfSetting<TSet<NUdf::EDataSlot>, false> JobBlockInputSupportedDataTypes;
NCommon::TConfSetting<EBlockOutputMode, false> JobBlockOutput;
diff --git a/yt/yql/providers/yt/comp_nodes/ya.make.inc b/yt/yql/providers/yt/comp_nodes/ya.make.inc
index b0a54d85a4..2e5ae696d4 100644
--- a/yt/yql/providers/yt/comp_nodes/ya.make.inc
+++ b/yt/yql/providers/yt/comp_nodes/ya.make.inc
@@ -4,11 +4,13 @@ INCLUDE(${ARCADIA_ROOT}/yql/essentials/minikql/invoke_builtins/header.ya.make.in
SET(ORIG_SRC_DIR ${ARCADIA_ROOT}/yt/yql/providers/yt/comp_nodes)
SET(ORIG_SOURCES
+ yql_mkql_file_block_stream.cpp
yql_mkql_file_input_state.cpp
yql_mkql_file_list.cpp
yql_mkql_input_stream.cpp
yql_mkql_input.cpp
yql_mkql_output.cpp
+ yql_mkql_block_table_content.cpp
yql_mkql_table_content.cpp
yql_mkql_table.cpp
yql_mkql_ungrouping_list.cpp
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp
new file mode 100644
index 0000000000..d935da2004
--- /dev/null
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp
@@ -0,0 +1,73 @@
+#include "yql_mkql_block_table_content.h"
+#include "yql_mkql_file_block_stream.h"
+
+#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
+#include <yql/essentials/minikql/mkql_node_cast.h>
+#include <yql/essentials/minikql/defs.h>
+
+#include <yql/essentials/public/udf/udf_value.h>
+
+#include <util/generic/vector.h>
+#include <util/generic/string.h>
+#include <util/generic/size_literals.h>
+
+namespace NYql {
+
+using namespace NKikimr;
+using namespace NKikimr::NMiniKQL;
+
+class TYtBlockTableContentWrapper : public TMutableComputationNode<TYtBlockTableContentWrapper> {
+ typedef TMutableComputationNode<TYtBlockTableContentWrapper> TBaseComputation;
+public:
+ TYtBlockTableContentWrapper(TComputationMutables& mutables, NCommon::TCodecContext& codecCtx,
+ TVector<TString>&& files, const TString& inputSpec, TStructType* origStructType, bool decompress, std::optional<ui64> expectedRowCount)
+ : TBaseComputation(mutables)
+ , Files_(std::move(files))
+ , Decompress_(decompress)
+ , ExpectedRowCount_(std::move(expectedRowCount))
+ {
+ Spec_.SetUseBlockInput();
+ Spec_.SetIsTableContent();
+ Spec_.Init(codecCtx, inputSpec, {}, {}, origStructType, {}, TString());
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TFileWideBlockStreamValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_);
+ }
+
+private:
+ void RegisterDependencies() const final {}
+
+ TMkqlIOSpecs Spec_;
+ TVector<TString> Files_;
+ const bool Decompress_;
+ const std::optional<ui64> ExpectedRowCount_;
+};
+
+IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx,
+ TComputationMutables& mutables, TCallable& callable, TStringBuf pathPrefix)
+{
+ MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 arguments");
+ TString uniqueId(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
+ auto origStructType = AS_TYPE(TStructType, AS_VALUE(TTypeType, callable.GetInput(1)));
+ const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>();
+ TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().AsStringRef());
+ const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<bool>();
+
+ std::optional<ui64> length;
+ TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(5));
+ if (lengthTuple->GetValuesCount() > 0) {
+ MKQL_ENSURE(lengthTuple->GetValuesCount() == 1, "Expect 1 element in the length tuple");
+ length = AS_VALUE(TDataLiteral, lengthTuple->GetValue(0))->AsValue().Get<ui64>();
+ }
+
+ TVector<TString> files;
+ for (ui32 index = 0; index < tablesCount; ++index) {
+ files.push_back(TStringBuilder() << pathPrefix << uniqueId << '_' << index);
+ }
+
+ return new TYtBlockTableContentWrapper(mutables, codecCtx, std::move(files), inputSpec,
+ origStructType, decompress, length);
+}
+
+} // NYql
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h
new file mode 100644
index 0000000000..444603406e
--- /dev/null
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <yql/essentials/providers/common/codec/yql_codec.h>
+
+#include <yql/essentials/minikql/mkql_node.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node.h>
+
+#include <util/generic/string.h>
+#include <util/generic/strbuf.h>
+
+namespace NYql {
+
+NKikimr::NMiniKQL::IComputationNode* WrapYtBlockTableContent(
+ NYql::NCommon::TCodecContext& codecCtx,
+ NKikimr::NMiniKQL::TComputationMutables& mutables,
+ NKikimr::NMiniKQL::TCallable& callable, TStringBuf pathPrefix);
+
+} // NYql
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp
new file mode 100644
index 0000000000..3253c3e1b1
--- /dev/null
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp
@@ -0,0 +1,50 @@
+#include "yql_mkql_file_block_stream.h"
+#include "yql_mkql_file_input_state.h"
+
+namespace NYql {
+
+using namespace NKikimr::NMiniKQL;
+
+TFileWideBlockStreamValue::TFileWideBlockStreamValue(
+ TMemoryUsageInfo* memInfo,
+ const TMkqlIOSpecs& spec,
+ const THolderFactory& holderFactory,
+ const TVector<TString>& filePaths,
+ bool decompress,
+ size_t blockCount,
+ size_t blockSize,
+ std::optional<ui64> expectedRowCount
+)
+ : TComputationValue(memInfo)
+ , Spec_(spec)
+ , HolderFactory_(holderFactory)
+ , FilePaths_(filePaths)
+ , Decompress_(decompress)
+ , BlockCount_(blockCount)
+ , BlockSize_(blockSize)
+ , ExpectedRowCount_(expectedRowCount)
+{
+ State_ = MakeHolder<TFileInputState>(Spec_, HolderFactory_, MakeMkqlFileInputs(FilePaths_, Decompress_), BlockCount_, BlockSize_);
+}
+
+NUdf::EFetchStatus TFileWideBlockStreamValue::WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ if (!AtStart_) {
+ State_->Next();
+ }
+ AtStart_ = false;
+ if (!State_->IsValid()) {
+ MKQL_ENSURE(!ExpectedRowCount_ || *ExpectedRowCount_ == State_->GetRecordIndex(), "Invalid file row count");
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ auto elements = State_->GetCurrent().GetElements();
+ for (ui32 i = 0; i < width; i++) {
+ if (auto out = output++) {
+ *out = elements[i];
+ }
+ }
+
+ return NUdf::EFetchStatus::Ok;
+}
+
+}
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h
new file mode 100644
index 0000000000..f38771c24f
--- /dev/null
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include "yql_mkql_file_input_state.h"
+
+#include <yt/yql/providers/yt/codec/yt_codec.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/generic/string.h>
+
+namespace NYql {
+
+class TFileWideBlockStreamValue : public NKikimr::NMiniKQL::TComputationValue<TFileWideBlockStreamValue> {
+public:
+ TFileWideBlockStreamValue(
+ NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo,
+ const TMkqlIOSpecs& spec,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const TVector<TString>& filePaths,
+ bool decompress,
+ size_t blockCount,
+ size_t blockSize,
+ std::optional<ui64> expectedRowCount
+ );
+
+private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width);
+
+private:
+ const TMkqlIOSpecs& Spec_;
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
+ const TVector<TString> FilePaths_;
+ const bool Decompress_;
+ const size_t BlockCount_;
+ const size_t BlockSize_;
+ const std::optional<ui64> ExpectedRowCount_;
+
+ bool AtStart_ = true;
+ THolder<TFileInputState> State_;
+};
+
+}
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp
index 46b2067d5e..d814246f76 100644
--- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp
@@ -1,5 +1,6 @@
#include "yql_mkql_file_input_state.h"
+#include <yql/essentials/minikql/computation/mkql_block_impl.h>
#include <yql/essentials/utils/yql_panic.h>
#include <util/system/fs.h>
@@ -55,7 +56,13 @@ bool TFileInputState::NextValue() {
}
MkqlReader_.Next();
- ++CurrentRecord_;
+ if (Spec_->UseBlockInput_) {
+ auto blockCountValue = CurrentValue_.GetElement(Spec_->Inputs[CurrentInput_]->StructSize);
+ CurrentRecord_ += GetBlockCount(blockCountValue);
+ } else {
+ ++CurrentRecord_;
+ }
+
return true;
}
}
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h
index 64db03232d..1762e56d2f 100644
--- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h
@@ -35,7 +35,6 @@ public:
OnNextBlockCallback_ = std::move(cb);
}
-protected:
virtual bool IsValid() const override {
return Valid_;
}
@@ -50,6 +49,7 @@ protected:
virtual TString DebugInfo() const override;
+protected:
void Finish() {
MkqlReader_.Finish();
}
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp
index 4dc5e57243..8914cac7bd 100644
--- a/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp
@@ -33,6 +33,7 @@ public:
if (useSkiff) {
Spec_.SetUseSkiff(optLLVM);
}
+ Spec_.SetIsTableContent();
Spec_.Init(codecCtx, inputSpec, {}, {}, AS_TYPE(TListType, listType)->GetItemType(), {}, TString());
}
diff --git a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
index d7ff47f544..d881c9f565 100644
--- a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
+++ b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
@@ -204,6 +204,15 @@
]
},
{
+ "Name": "TYtBlockTableContent",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "YtBlockTableContent"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"},
+ {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
"Name": "TYtLength",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "YtLength"},
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp
index 78af8c3d70..36f936b17a 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp
@@ -129,6 +129,9 @@ public:
, Length_(std::move(length))
{
Spec_.Init(codecCtx, inputSpecs, groups, tableNames, itemType, auxColumns, NYT::TNode());
+ if constexpr (TableContent) {
+ Spec_.SetIsTableContent();
+ }
if (!rowOffsets.empty()) {
Spec_.SetTableOffsets(rowOffsets);
}
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
index 2d805612f9..89525fcbc5 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
@@ -596,6 +596,56 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
return values;
});
+ compiler.OverrideCallable(TYtBlockTableContent::CallableName(),
+ [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
+ TYtBlockTableContent tableContent(&node);
+
+ auto origItemStructType = (
+ tableContent.Input().Maybe<TYtOutput>()
+ ? tableContent.Input().Ref().GetTypeAnn()
+ : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()
+ )->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+
+ TMaybe<ui64> itemsCount;
+ if (auto setting = NYql::GetSetting(tableContent.Settings().Ref(), EYtSettingType::ItemsCount)) {
+ itemsCount = FromString<ui64>(setting->Child(1)->Content());
+ }
+ const auto itemType = NCommon::BuildType(node, *origItemStructType, ctx.ProgramBuilder);
+ TRuntimeNode values;
+ if (auto maybeRead = tableContent.Input().Maybe<TYtReadTable>()) {
+ auto read = maybeRead.Cast();
+
+ const bool hasRangesOrSampling = AnyOf(read.Input(), [](const TYtSection& s) {
+ return NYql::HasSetting(s.Settings().Ref(), EYtSettingType::Sample)
+ || AnyOf(s.Paths(), [](const TYtPath& p) { return !p.Ranges().Maybe<TCoVoid>(); });
+ });
+ if (hasRangesOrSampling) {
+ itemsCount.Clear();
+ }
+
+ const bool forceKeyColumns = HasRangesWithKeyColumns(read.Input().Ref());
+ values = BuildTableContentCall(
+ TYtTableContent::CallableName(),
+ itemType,
+ read.DataSource().Cluster().Value(), read.Input().Ref(), itemsCount, ctx, true, THashSet<TString>{"num", "index"}, forceKeyColumns);
+ values = ApplyPathRangesAndSampling(values, itemType, read.Input().Ref(), ctx);
+ } else {
+ auto output = tableContent.Input().Cast<TYtOutput>();
+ values = BuildTableContentCall(
+ TYtTableContent::CallableName(),
+ itemType,
+ GetOutputOp(output).DataSink().Cluster().Value(), output.Ref(), itemsCount, ctx, true);
+ }
+
+ return ctx.ProgramBuilder.WideToBlocks(ctx.ProgramBuilder.FromFlow(ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(values), [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ TRuntimeNode::TList result;
+ for (auto& origItem : origItemStructType->GetItems()) {
+ result.push_back(ctx.ProgramBuilder.Member(item, origItem->GetName()));
+ }
+ return result;
+ })));
+ });
+
compiler.AddCallable({TYtSort::CallableName(), TYtCopy::CallableName(), TYtMerge::CallableName()},
[](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp
index 6feb3542dc..cc9981ccf1 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp
@@ -6,6 +6,7 @@
#include <yql/essentials/core/yql_opt_utils.h>
#include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <yt/yql/providers/yt/comp_nodes/yql_mkql_output.h>
+#include <yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h>
#include <yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.h>
#include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
@@ -74,6 +75,11 @@ NKikimr::NMiniKQL::TComputationNodeFactory GetGatewayNodeFactory(TCodecContext*
return WrapYtTableContent(*codecCtx, ctx.Mutables, callable, "OFF" /* no LLVM for local exec */, filePrefix);
}
+ if (callable.GetType()->GetName() == "YtBlockTableContentJob") {
+ YQL_ENSURE(codecCtx);
+ return WrapYtBlockTableContent(*codecCtx, ctx.Mutables, callable, filePrefix);
+ }
+
if (!exprContextObject) {
exprContextObject = ctx.Mutables.CurValueIndex++;
}
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp
index d66943f868..f08309f4b9 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp
@@ -74,14 +74,19 @@ TGatewayTransformer::TGatewayTransformer(const TExecContextBase& execCtx, TYtSet
TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
auto name = internName.Str();
const bool small = name.SkipPrefix("Small");
- if (name == TYtTableContent::CallableName()) {
+ if (name == TYtTableContent::CallableName() || name == TYtBlockTableContent::CallableName()) {
+ bool useBlocks = (name == TYtBlockTableContent::CallableName());
*TableContentFlag_ = true;
*RemoteExecutionFlag_ = *RemoteExecutionFlag_ || !small;
if (EPhase::Content == Phase_ || EPhase::All == Phase_) {
- return [&](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
- YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
+ return [&, name, useBlocks](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
+ if (useBlocks) {
+ YQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
+ } else {
+ YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
+ }
const TString cluster(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
const TString& server = ExecCtx_.Clusters_->GetServer(cluster);
@@ -90,7 +95,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
auto tx = entry->Tx;
auto deliveryMode = ForceLocalTableContent_ ? ETableContentDeliveryMode::File : Settings_->TableContentDeliveryMode.Get(cluster).GetOrElse(ETableContentDeliveryMode::Native);
- bool useSkiff = Settings_->TableContentUseSkiff.Get(cluster).GetOrElse(DEFAULT_USE_SKIFF);
+ bool useSkiff = !useBlocks && Settings_->TableContentUseSkiff.Get(cluster).GetOrElse(DEFAULT_USE_SKIFF);
const bool ensureOldTypesOnly = !useSkiff;
const ui64 maxChunksForNativeDelivery = Settings_->TableContentMaxChunksForNativeDelivery.Get().GetOrElse(1000ul);
TString contentTmpFolder = ForceLocalTableContent_ ? TString() : Settings_->TableContentTmpFolder.Get(cluster).GetOrElse(TString());
@@ -116,6 +121,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
THashMap<TString, ui32> structColumns;
if (useSkiff) {
+ YQL_ENSURE(!useBlocks);
auto itemType = AS_TYPE(TListType, callable.GetType()->GetReturnType())->GetItemType();
TStructType* itemTypeStruct = AS_TYPE(TStructType, itemType);
if (itemTypeStruct->GetMembersCount() == 0) {
@@ -151,7 +157,10 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
refName = res.first->second;
}
tablesNode.Add(refName);
- if (useSkiff) {
+ if (useBlocks) {
+ NYT::TNode formatNode("arrow");
+ formats.push_back(formatNode);
+ } else if (useSkiff) {
formats.push_back(SingleTableSpecToInputSkiff(specNode, structColumns, false, false, false));
} else {
if (ensureOldTypesOnly && specNode.HasKey(YqlRowSpecAttribute)) {
@@ -304,15 +313,24 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
}
TCallableBuilder call(env,
- TStringBuilder() << TYtTableContent::CallableName() << TStringBuf("Job"),
+ TStringBuilder() << name << TStringBuf("Job"),
callable.GetType()->GetReturnType());
+ if (useBlocks) {
+ call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId));
+ call.Add(callable.GetInput(4)); // orig struct type
+ call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount()));
+ call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode)));
+ call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression
+ call.Add(callable.GetInput(3)); // length
+ } else {
+ call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId));
+ call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount()));
+ call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode)));
+ call.Add(PgmBuilder_.NewDataLiteral(useSkiff));
+ call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression
+ call.Add(callable.GetInput(3)); // length
+ }
- call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId));
- call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount()));
- call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode)));
- call.Add(PgmBuilder_.NewDataLiteral(useSkiff));
- call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression
- call.Add(callable.GetInput(3)); // length
return TRuntimeNode(call.Build(), false);
};
}
diff --git a/yt/yql/providers/yt/job/yql_job_factory.cpp b/yt/yql/providers/yt/job/yql_job_factory.cpp
index 6ff4a02606..6b7cd4fd69 100644
--- a/yt/yql/providers/yt/job/yql_job_factory.cpp
+++ b/yt/yql/providers/yt/job/yql_job_factory.cpp
@@ -3,6 +3,7 @@
#include <yt/yql/providers/yt/comp_nodes/yql_mkql_input.h>
#include <yt/yql/providers/yt/comp_nodes/yql_mkql_output.h>
#include <yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.h>
+#include <yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h>
#include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
#include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h>
@@ -24,6 +25,9 @@ TComputationNodeFactory GetJobFactory(NYql::NCommon::TCodecContext& codecCtx, co
if (name == "TableContent") {
return WrapYtTableContent(codecCtx, ctx.Mutables, callable, optLLVM, {} /*empty pathPrefix inside job*/);
}
+ if (name == "BlockTableContent") {
+ return WrapYtBlockTableContent(codecCtx, ctx.Mutables, callable, {} /*empty pathPrefix inside job*/);
+ }
if (name == "Input") {
YQL_ENSURE(reader);
YQL_ENSURE(specs);
diff --git a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
index f92f152c31..bda09645b4 100644
--- a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
@@ -23,6 +23,7 @@ public:
{
#define HNDL(name) "YtBlockInput-"#name, Hndl(&TYtBlockInputTransformer::name)
AddHandler(0, &TYtMap::Match, HNDL(TryTransformMap));
+ AddHandler(0, &TYtTableContent::Match, HNDL(TryTransformTableContent));
#undef HNDL
}
@@ -71,6 +72,50 @@ private:
return EnsureWideFlowType(map.Mapper().Args().Arg(0).Ref(), ctx);
}
+ TMaybeNode<TExprBase> TryTransformTableContent(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ auto tableContent = node.Cast<TYtTableContent>();
+ if (!NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) {
+ return tableContent;
+ }
+
+ const TParentsMap* parentsMap = getParents();
+ if (auto it = parentsMap->find(tableContent.Raw()); it != parentsMap->end() && it->second.size() > 1) {
+ return tableContent;
+ }
+
+ YQL_CLOG(INFO, ProviderYt) << "Rewrite YtTableContent with block input";
+
+ auto inputStructType = GetSeqItemType(tableContent.Ref().GetTypeAnn())->Cast<TStructExprType>();
+ auto asStructBuilder = Build<TCoAsStruct>(ctx, tableContent.Pos());
+ TExprNode::TListType narrowMapArgs;
+ for (auto& item : inputStructType->GetItems()) {
+ auto arg = ctx.NewArgument(tableContent.Pos(), item->GetName());
+ asStructBuilder.Add<TCoNameValueTuple>()
+ .Name().Build(item->GetName())
+ .Value(arg)
+ .Build();
+ narrowMapArgs.push_back(std::move(arg));
+ }
+
+ auto settings = RemoveSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady, ctx);
+ return Build<TCoForwardList>(ctx, tableContent.Pos())
+ .Stream<TCoNarrowMap>()
+ .Input<TCoToFlow>()
+ .Input<TCoWideFromBlocks>()
+ .Input<TYtBlockTableContent>()
+ .Input(tableContent.Input())
+ .Settings(settings)
+ .Build()
+ .Build()
+ .Build()
+ .Lambda()
+ .Args(narrowMapArgs)
+ .Body(asStructBuilder.Done())
+ .Build()
+ .Build()
+ .Done();
+ }
+
private:
const TYtState::TPtr State_;
};
diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp b/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp
index 2493b340f2..3a1e05d9aa 100644
--- a/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp
@@ -23,6 +23,7 @@ public:
#define HNDL(name) "YtBlockIOFilter-"#name, Hndl(&YtBlockIOFilterTransformer::name)
AddHandler(0, &TYtMap::Match, HNDL(HandleMapInput));
AddHandler(0, &TYtMap::Match, HNDL(HandleMapOutput));
+ AddHandler(0, &TYtTableContent::Match, HNDL(HandleTableContent));
#undef HNDL
}
@@ -112,11 +113,12 @@ private:
}
}
+ auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT);
auto supportedTypes = State_->Configuration->JobBlockInputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_TYPES);
auto supportedDataTypes = State_->Configuration->JobBlockInputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_DATA_TYPES);
auto lambdaInputType = map.Mapper().Args().Arg(0).Ref().GetTypeAnn();
- if (!CheckBlockIOSupportedTypes(*lambdaInputType, supportedTypes, supportedDataTypes, [](const TString&) {})) {
+ if (!CheckBlockIOSupportedTypes(*lambdaInputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit)) {
return false;
}
@@ -128,11 +130,12 @@ private:
return false;
}
+ auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT);
auto supportedTypes = State_->Configuration->JobBlockOutputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_OUTPUT_SUPPORTED_TYPES);
auto supportedDataTypes = State_->Configuration->JobBlockOutputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_OUTPUT_SUPPORTED_DATA_TYPES);
auto lambdaOutputType = map.Mapper().Ref().GetTypeAnn();
- if (!CheckBlockIOSupportedTypes(*lambdaOutputType, supportedTypes, supportedDataTypes, [](const TString&) {}, false)) {
+ if (!CheckBlockIOSupportedTypes(*lambdaOutputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit, false)) {
return false;
}
@@ -150,6 +153,66 @@ private:
}
}
+ TMaybeNode<TExprBase> HandleTableContent(TExprBase node, TExprContext& ctx) const {
+ auto tableContent = node.Cast<TYtTableContent>();
+ if (NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) {
+ return tableContent;
+ }
+
+ if (!State_->Configuration->JobBlockTableContent.Get().GetOrElse(Types->UseBlocks)) {
+ return tableContent;
+ }
+
+ auto settings = tableContent.Settings().Ptr();
+ bool canUseBlockInput = CanUseBlockInputForTableContent(tableContent);
+ bool hasSetting = HasSetting(*settings, EYtSettingType::BlockInputReady);
+ if (canUseBlockInput && !hasSetting) {
+ settings = AddSetting(*settings, EYtSettingType::BlockInputReady, TExprNode::TPtr(), ctx);
+ } else if (!canUseBlockInput && hasSetting) {
+ settings = RemoveSetting(*settings, EYtSettingType::BlockInputReady, ctx);
+ } else {
+ return tableContent;
+ }
+ return Build<TYtTableContent>(ctx, node.Pos())
+ .InitFrom(tableContent)
+ .Settings(settings)
+ .Done();
+ }
+
+ bool CanUseBlockInputForTableContent(const TYtTableContent& tableContent) const {
+ if (auto readTable = tableContent.Input().Maybe<TYtReadTable>()) {
+ if (readTable.Cast().Input().Size() > 1) {
+ return false;
+ }
+
+ for (auto path : readTable.Cast().Input().Item(0).Paths()) {
+ if (!IsYtTableSuitableForArrowInput(path.Table(), [](const TString&) {})) {
+ return false;
+ }
+ }
+
+ } else if (auto output = tableContent.Input().Maybe<TYtOutput>()) {
+ auto outTable = GetOutTable(output.Cast());
+ if (!IsYtTableSuitableForArrowInput(outTable, [](const TString&) {})) {
+ return false;
+ }
+
+ } else {
+ YQL_ENSURE(false, "Expected " << TYtReadTable::CallableName() << " or " << TYtOutput::CallableName());
+ }
+
+ auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT);
+ auto supportedTypes = State_->Configuration->JobBlockInputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_TYPES);
+ auto supportedDataTypes = State_->Configuration->JobBlockInputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_DATA_TYPES);
+
+ auto inputType = tableContent.Ref().GetTypeAnn();
+ if (!CheckBlockIOSupportedTypes(*inputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit)) {
+ return false;
+ }
+
+ return true;
+ }
+
private:
const TYtState::TPtr State_;
const THolder<IGraphTransformer> Finalizer_;
diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp
index 0f1041f4bb..639fa14ca0 100644
--- a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp
@@ -1,6 +1,7 @@
#include "yql_yt_block_io_utils.h"
#include <yql/essentials/core/yql_opt_utils.h>
+#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
namespace NYql {
@@ -9,21 +10,20 @@ bool CheckBlockIOSupportedTypes(
const TSet<TString>& supportedTypes,
const TSet<NUdf::EDataSlot>& supportedDataTypes,
std::function<void(const TString&)> unsupportedTypeHandler,
+ size_t wideFlowLimit,
bool allowNestedOptionals
) {
- auto itemType = type.Cast<TFlowExprType>()->GetItemType();
- if (itemType->GetKind() == ETypeAnnotationKind::Multi) {
- auto& itemTypes = itemType->Cast<TMultiExprType>()->GetItems();
- if (itemTypes.empty()) {
- return false;
- }
+ auto& itemType = GetSeqItemType(type);
+ if (itemType.GetKind() == ETypeAnnotationKind::Multi) {
+ auto& itemTypes = itemType.Cast<TMultiExprType>()->GetItems();
if (!CheckSupportedTypes(itemTypes, supportedTypes, supportedDataTypes, std::move(unsupportedTypeHandler), allowNestedOptionals)) {
return false;
}
- } else if (itemType->GetKind() == ETypeAnnotationKind::Struct) {
- auto& items = itemType->Cast<TStructExprType>()->GetItems();
- if (items.empty()) {
+ } else if (itemType.GetKind() == ETypeAnnotationKind::Struct) {
+ auto& items = itemType.Cast<TStructExprType>()->GetItems();
+ if (items.empty() || items.size() > wideFlowLimit) {
+ unsupportedTypeHandler("fields count doesn't satisfy wide flow requirements");
return false;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h
index 58e94c9443..dc72518fe6 100644
--- a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h
+++ b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h
@@ -9,6 +9,7 @@ bool CheckBlockIOSupportedTypes(
const TSet<TString>& supportedTypes,
const TSet<NUdf::EDataSlot>& supportedDataTypes,
std::function<void(const TString&)> unsupportedTypeHandler,
+ size_t wideFlowLimit,
bool allowNestedOptionals = true
);
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp
index 8eada7e706..fcad1669af 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp
@@ -27,6 +27,7 @@ public:
AddHandler({TYtSection::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleSection));
AddHandler({TYtReadTable::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleReadTable));
AddHandler({TYtTableContent::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleTableContent));
+ AddHandler({TYtBlockTableContent::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleBlockTableContent));
AddHandler({TYtIsKeySwitch::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleDefault));
AddHandler({TYqlRowSpec::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleDefault));
@@ -188,6 +189,45 @@ public:
return TStatus::Ok;
}
+ TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) {
+ TYtBlockTableContent tableContent = input.Cast<TYtBlockTableContent>();
+
+ auto listType = tableContent.Input().Maybe<TYtOutput>()
+ ? tableContent.Input().Ref().GetTypeAnn()
+ : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back();
+ auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+
+ auto pathRename = [&](TPartOfConstraintBase::TPathType path) -> std::vector<TPartOfConstraintBase::TPathType> {
+ YQL_ENSURE(!path.empty());
+
+ auto fieldIndex = itemStructType->FindItem(path[0]);
+ YQL_ENSURE(fieldIndex.Defined());
+
+ path[0] = ctx.GetIndexAsString(*fieldIndex);
+ return { path };
+ };
+
+ TConstraintSet wideConstraints;
+ for (auto constraint : tableContent.Input().Ref().GetAllConstraints()) {
+ if (auto empty = dynamic_cast<const TEmptyConstraintNode*>(constraint)) {
+ wideConstraints.AddConstraint(ctx.MakeConstraint<TEmptyConstraintNode>());
+ } else if (auto sorted = dynamic_cast<const TSortedConstraintNode*>(constraint)) {
+ wideConstraints.AddConstraint(sorted->RenameFields(ctx, pathRename));
+ } else if (auto chopped = dynamic_cast<const TChoppedConstraintNode*>(constraint)) {
+ wideConstraints.AddConstraint(chopped->RenameFields(ctx, pathRename));
+ } else if (auto unique = dynamic_cast<const TUniqueConstraintNode*>(constraint)) {
+ wideConstraints.AddConstraint(unique->RenameFields(ctx, pathRename));
+ } else if (auto distinct = dynamic_cast<const TDistinctConstraintNode*>(constraint)) {
+ wideConstraints.AddConstraint(distinct->RenameFields(ctx, pathRename));
+ } else {
+ YQL_ENSURE(false, "unexpected constraint");
+ }
+ }
+
+ input.Ptr()->SetConstraints(wideConstraints);
+ return TStatus::Ok;
+ }
+
TStatus HandleDefault(TExprBase input, TExprContext& /*ctx*/) {
return UpdateAllChildLambdasConstraints(input.Ref());
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp
index 4f3ffa5444..3447fbf35b 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp
@@ -46,6 +46,7 @@ public:
AddHandler({TYtReadTable::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleReadTable));
AddHandler({TYtReadTableScheme::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleReadTableScheme));
AddHandler({TYtTableContent::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleTableContent));
+ AddHandler({TYtBlockTableContent::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleBlockTableContent));
AddHandler({TYtLength::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleLength));
AddHandler({TCoConfigure::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleConfigure));
AddHandler({TYtConfigure::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleYtConfigure));
@@ -846,7 +847,8 @@ public:
return TStatus::Error;
}
- const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split | EYtSettingType::Small;
+ const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor
+ | EYtSettingType::Split | EYtSettingType::Small | EYtSettingType::BlockInputReady;
if (!ValidateSettings(tableContent.Settings().Ref(), allowed, ctx)) {
return TStatus::Error;
}
@@ -858,6 +860,49 @@ public:
if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) {
return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx);
}
+
+ return TStatus::Ok;
+ }
+
+ TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) {
+ if (!EnsureArgsCount(input.Ref(), 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ const auto tableContent = input.Cast<TYtBlockTableContent>();
+
+ if (!tableContent.Input().Ref().IsCallable(TYtReadTable::CallableName())
+ && !tableContent.Input().Ref().IsCallable(TYtOutput::CallableName())) {
+ ctx.AddError(TIssue(ctx.GetPosition(tableContent.Input().Pos()), TStringBuilder()
+ << "Expected " << TYtReadTable::CallableName() << " or " << TYtOutput::CallableName()));
+ return TStatus::Error;
+ }
+
+ if (!EnsureTuple(tableContent.Settings().MutableRef(), ctx)) {
+ return TStatus::Error;
+ }
+
+ const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split | EYtSettingType::Small;
+ if (!ValidateSettings(tableContent.Settings().Ref(), allowed, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto listType = tableContent.Input().Maybe<TYtOutput>()
+ ? tableContent.Input().Ref().GetTypeAnn()
+ : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back();
+ auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+
+ TTypeAnnotationNode::TListType multiTypeItems;
+ for (auto& item: itemStructType->GetItems()) {
+ multiTypeItems.emplace_back(ctx.MakeType<TBlockExprType>(item->GetItemType()));
+ }
+ multiTypeItems.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)));
+ input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TMultiExprType>(multiTypeItems)));
+
+ if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) {
+ return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx);
+ }
+
return TStatus::Ok;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp
index 3ad3f683d9..1b51ea6baa 100644
--- a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp
@@ -49,6 +49,8 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName,
TType* const tupleTypeTables = ctx.ProgramBuilder.NewTupleType({strType, boolType, strType, ui64Type, ui64Type, boolType, ui32Type});
TType* const listTypeGroup = ctx.ProgramBuilder.NewListType(tupleTypeTables);
+ bool useBlocks = callName.EndsWith(TYtBlockTableContent::CallableName());
+
const TExprNode* settings = nullptr;
TMaybe<TSampleParams> sampling;
TVector<TRuntimeNode> groups;
@@ -227,9 +229,23 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName,
samplingTupleItems.push_back(ctx.ProgramBuilder.NewDataLiteral(isSystemSampling));
}
- auto outListType = ctx.ProgramBuilder.NewListType(outItemType);
+ TType* outType = nullptr;
+ if (useBlocks) {
+ auto structType = AS_TYPE(TStructType, outItemType);
+
+ std::vector<TType*> outputItems;
+ outputItems.reserve(structType->GetMembersCount());
+ for (size_t i = 0; i < structType->GetMembersCount(); i++) {
+ outputItems.push_back(ctx.ProgramBuilder.NewBlockType(structType->GetMemberType(i), TBlockType::EShape::Many));
+ }
+ outputItems.push_back(ctx.ProgramBuilder.NewBlockType(ctx.ProgramBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
+ outType = ctx.ProgramBuilder.NewStreamType(ctx.ProgramBuilder.NewMultiType(outputItems));
+
+ } else {
+ outType = ctx.ProgramBuilder.NewListType(outItemType);
+ }
- TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outListType);
+ TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outType);
call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(clusterName)); // cluster name
call.Add(ctx.ProgramBuilder.NewList(listTypeGroup, groups));
@@ -241,6 +257,10 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName,
call.Add(ctx.ProgramBuilder.NewEmptyTuple());
}
+ if (useBlocks) {
+ call.Add(TRuntimeNode(outItemType, true));
+ }
+
auto res = TRuntimeNode(call.Build(), false);
if (settings) {
@@ -479,6 +499,41 @@ void RegisterYtMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
}
});
+ compiler.AddCallable(TYtBlockTableContent::CallableName(),
+ [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
+ TYtBlockTableContent tableContent(&node);
+ if (node.GetConstraint<TEmptyConstraintNode>()) {
+ const auto streamType = ctx.BuildType(node, *node.GetTypeAnn());
+ return ctx.ProgramBuilder.EmptyIterator(streamType);
+ }
+
+ auto origItemStructType = (
+ tableContent.Input().Maybe<TYtOutput>()
+ ? tableContent.Input().Ref().GetTypeAnn()
+ : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()
+ )->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+
+ TMaybe<ui64> itemsCount;
+ TString name = ToString(TYtBlockTableContent::CallableName());
+ if (auto setting = NYql::GetSetting(tableContent.Settings().Ref(), EYtSettingType::ItemsCount)) {
+ itemsCount = FromString<ui64>(setting->Child(1)->Content());
+ }
+ if (NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::Small)) {
+ name.prepend("Small");
+ }
+ if (auto maybeRead = tableContent.Input().Maybe<TYtReadTable>()) {
+ auto read = maybeRead.Cast();
+ return BuildTableContentCall(name,
+ ctx.BuildType(node, *origItemStructType),
+ read.DataSource().Cluster().Value(), read.Input().Ref(), itemsCount, ctx, true);
+ } else {
+ auto output = tableContent.Input().Cast<TYtOutput>();
+ return BuildTableContentCall(name,
+ ctx.BuildType(node, *origItemStructType),
+ GetOutputOp(output).DataSink().Cluster().Value(), output.Ref(), itemsCount, ctx, true);
+ }
+ });
+
compiler.AddCallable({TYtTablePath::CallableName(), TYtTableRecord::CallableName(), TYtTableIndex::CallableName(), TYtIsKeySwitch::CallableName(), TYtRowNumber::CallableName()},
[](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
auto dataSlot = node.GetTypeAnn()->Cast<TDataExprType>()->GetSlot();