aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2023-02-15 11:17:46 +0300
committeraidarsamer <aidarsamer@ydb.tech>2023-02-15 11:17:46 +0300
commit4f56624382f2f901b71fc5fdec8318455dce767f (patch)
tree0ec6e56f66b8d5a57db0e92fab2c9b375c432514
parent33e10c4ea8c8e1e34e64cc6419d825d2d5ed6020 (diff)
downloadydb-4f56624382f2f901b71fc5fdec8318455dce767f.tar.gz
Add block reads for column shard tables.
Add block reads for column shard tables
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp28
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h5
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json5
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp20
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp2
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h2
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp31
-rw-r--r--ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp32
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp16
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.cpp34
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_read_table.cpp131
-rw-r--r--ydb/core/kqp/runtime/kqp_read_table.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp241
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.h221
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data_ut.cpp24
-rw-r--r--ydb/core/protos/kqp_physical.proto7
-rw-r--r--ydb/core/protos/tx_datashard.proto7
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp1
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json5
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp56
-rw-r--r--ydb/library/yql/public/udf/arrow/util.cpp30
-rw-r--r--ydb/library/yql/public/udf/arrow/util.h2
27 files changed, 712 insertions, 209 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index 0ba8e1d6ea..d3213fe8d4 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -34,6 +34,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
return WrapKqpScanWideReadTableRanges(callable, ctx, *computeCtx);
}
+ if (name == "KqpBlockReadTableRanges"sv) {
+ return WrapKqpScanBlockReadTableRanges(callable, ctx, *computeCtx);
+ }
+
// only for _pure_ compute actors!
if (name == "KqpEnsure"sv) {
return WrapKqpEnsure(callable, ctx);
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 0d80e75017..4a8ca2587d 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -245,14 +245,14 @@ private:
case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
case NKikimrTxDataShard::EScanDataFormat::CELLVEC: {
if (!msg.Rows.empty()) {
- bytes = ScanData->AddRows(msg.Rows, {}, TaskRunner->GetHolderFactory());
+ bytes = ScanData->AddData(msg.Rows, {}, TaskRunner->GetHolderFactory());
rowsCount = msg.Rows.size();
}
break;
}
case NKikimrTxDataShard::EScanDataFormat::ARROW: {
if(msg.ArrowBatch != nullptr) {
- bytes = ScanData->AddRows(*msg.ArrowBatch, {}, TaskRunner->GetHolderFactory());
+ bytes = ScanData->AddData(*msg.ArrowBatch, {}, TaskRunner->GetHolderFactory());
rowsCount = msg.ArrowBatch->num_rows();
}
break;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index aa40b41e95..f3098274db 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -506,13 +506,13 @@ private:
case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
if (!msg.Rows.empty()) {
- bytes += ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory());
+ bytes += ScanData->AddData(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory());
rowsCount += msg.Rows.size();
}
break;
case NKikimrTxDataShard::EScanDataFormat::ARROW:
- if (!!msg.ArrowBatch) {
- bytes += ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory());
+ if (msg.ArrowBatch) {
+ bytes += ScanData->AddData(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory());
rowsCount += msg.ArrowBatch->num_rows();
}
break;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 128d66d7b9..e20216160c 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -36,6 +36,28 @@ using namespace NYql::NDq;
namespace {
+NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType ReadTypeToProto(const TTaskMeta::TReadInfo::EReadType& type) {
+ switch (type) {
+ case TTaskMeta::TReadInfo::EReadType::Rows:
+ return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS;
+ case TTaskMeta::TReadInfo::EReadType::Blocks:
+ return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS;
+ }
+
+ YQL_ENSURE(false, "Invalid read type in task meta.");
+}
+
+TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) {
+ switch (type) {
+ case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS:
+ return TTaskMeta::TReadInfo::EReadType::Rows;
+ case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS:
+ return TTaskMeta::TReadInfo::EReadType::Blocks;
+ default:
+ YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf.");
+ }
+}
+
class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> {
using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>;
@@ -45,7 +67,7 @@ public:
}
TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
ui32 executerDelayToRetryMs)
: TBase(std::move(request), database, userToken, counters, executerDelayToRetryMs, TWilsonKqp::ScanExecuter, "ScanExecuter")
@@ -153,6 +175,7 @@ private:
taskMeta.ReadInfo.ItemsLimit = itemsLimit;
taskMeta.ReadInfo.Reverse = reverse;
taskMeta.ReadInfo.Sorted = sorted;
+ taskMeta.ReadInfo.ReadType = TTaskMeta::TReadInfo::EReadType::Rows;
if (resultType) {
YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct
@@ -179,8 +202,8 @@ private:
return;
}
+ taskMeta.ReadInfo.ReadType = ReadTypeFromProto(readOlapRange->GetReadType());
taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram();
-
for (auto& name: readOlapRange->GetOlapProgramParameterNames()) {
taskMeta.ReadInfo.OlapProgram.ParameterNames.insert(name);
}
@@ -573,6 +596,7 @@ private:
protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse);
protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted);
+ protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType));
for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) {
auto* protoResultColumn = protoTaskMeta.AddResultColumns();
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 860d88bda4..b6bcc4c7d3 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -128,9 +128,14 @@ struct TTaskMeta {
};
struct TReadInfo {
+ enum class EReadType {
+ Rows,
+ Blocks
+ };
ui64 ItemsLimit = 0;
bool Reverse = false;
bool Sorted = false;
+ EReadType ReadType = EReadType::Rows;
TKqpOlapProgram OlapProgram;
TVector<NScheme::TTypeInfo> ResultColumnsTypes;
};
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 5a3d736117..65f09ff9ff 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -133,6 +133,11 @@
"Match": {"Type": "Callable", "Name": "KqpWideReadOlapTableRanges"}
},
{
+ "Name": "TKqpBlockReadOlapTableRanges",
+ "Base": "TKqpReadOlapTableRangesBase",
+ "Match": {"Type": "Callable", "Name": "KqpBlockReadOlapTableRanges"}
+ },
+ {
"Name": "TKqlLookupTableBase",
"Base": "TCallable",
"Match": {"Type": "CallableBase"},
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index ae4907c9b8..d5ad48d7fa 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -143,6 +143,24 @@ const TFlowExprType* GetWideRowsType(TExprContext& ctx, const TStructExprType* r
return ctx.MakeType<TFlowExprType>(wideRowType);
}
+const TFlowExprType* GetBlockRowsType(TExprContext& ctx, const TStructExprType* rowType) {
+ YQL_ENSURE(rowType);
+
+ const auto& columns = rowType->GetItems();
+
+ TTypeAnnotationNode::TListType items;
+ items.reserve(columns.size());
+
+ for (const auto& column: columns) {
+ items.push_back(ctx.MakeType<TBlockExprType>(column->GetItemType()));
+ }
+ // Last item is height of block
+ items.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)));
+
+ auto blockRowType = ctx.MakeType<TMultiExprType>(items);
+ return ctx.MakeType<TFlowExprType>(blockRowType);
+}
+
bool CalcKeyColumnsCount(TExprContext& ctx, const TPositionHandle pos, const TStructExprType& structType,
const TKikimrTableDescription& tableDesc, const TKikimrTableMetadata& metadata, ui32& keyColumnsCount)
{
@@ -348,6 +366,8 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
node->SetTypeAnn(ctx.MakeType<TFlowExprType>(processRowType));
} else if (TKqpWideReadOlapTableRanges::Match(node.Get())) {
node->SetTypeAnn(GetWideRowsType(ctx, processRowType->Cast<TStructExprType>()));
+ } else if (TKqpBlockReadOlapTableRanges::Match(node.Get())) {
+ node->SetTypeAnn(GetBlockRowsType(ctx, processRowType->Cast<TStructExprType>()));
} else {
YQL_ENSURE(false, "Unexpected ReadOlapTable callable." << node->Content());
}
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
index 61fd228853..e15be81baa 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
@@ -132,7 +132,7 @@ protected:
}
TMaybeNode<TExprBase> BuildWideReadTable(TExprBase node, TExprContext& ctx) {
- TExprBase output = KqpBuildWideReadTable(node, ctx);
+ TExprBase output = KqpBuildWideReadTable(node, ctx, TypesCtx);
DumpAppliedRule("BuildWideReadTable", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h
index 489a2afe1e..f69e39756f 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h
@@ -11,6 +11,6 @@
namespace NKikimr::NKqp::NOpt {
-NYql::NNodes::TExprBase KqpBuildWideReadTable(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx);
+NYql::NNodes::TExprBase KqpBuildWideReadTable(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, NYql::TTypeAnnotationContext& typesCtx);
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp
index 75d76930bd..41d26d3e30 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp
@@ -9,7 +9,7 @@ using namespace NYql;
using namespace NYql::NDq;
using namespace NYql::NNodes;
-TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx) {
+TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
if (!node.Maybe<TKqpReadTable>() &&
!node.Maybe<TKqpReadOlapTableRanges>() &&
!node.Maybe<TKqpReadTableRanges>())
@@ -58,14 +58,27 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx) {
} else if (auto maybeRead = node.Maybe<TKqpReadOlapTableRanges>()) {
auto read = maybeRead.Cast();
- wideRead = Build<TKqpWideReadOlapTableRanges>(ctx, node.Pos())
- .Table(read.Table())
- .Ranges(read.Ranges())
- .Columns(read.Columns())
- .Settings(read.Settings())
- .ExplainPrompt(read.ExplainPrompt())
- .Process(read.Process())
- .Done();
+ if (typesCtx.UseBlocks) {
+ wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
+ .Input<TKqpBlockReadOlapTableRanges>()
+ .Table(read.Table())
+ .Ranges(read.Ranges())
+ .Columns(read.Columns())
+ .Settings(read.Settings())
+ .ExplainPrompt(read.ExplainPrompt())
+ .Process(read.Process())
+ .Build()
+ .Done();
+ } else {
+ wideRead = Build<TKqpWideReadOlapTableRanges>(ctx, node.Pos())
+ .Table(read.Table())
+ .Ranges(read.Ranges())
+ .Columns(read.Columns())
+ .Settings(read.Settings())
+ .ExplainPrompt(read.ExplainPrompt())
+ .Process(read.Process())
+ .Done();
+ }
} else {
YQL_ENSURE(false, "Unknown read table operation: " << node.Ptr()->Content());
}
diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
index 3aeb51828c..662c2c0baa 100644
--- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
@@ -253,6 +253,7 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
);
});
+ // TODO: Rewrite to DqSource https://st.yandex-team.ru/KIKIMR-17161
compiler->AddCallable(TKqpWideReadOlapTableRanges::CallableName(),
[&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) {
TKqpWideReadOlapTableRanges readTable(&node);
@@ -283,6 +284,37 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
return result;
});
+ // TODO: Rewrite to DqSource https://st.yandex-team.ru/KIKIMR-17161
+ compiler->AddCallable(TKqpBlockReadOlapTableRanges::CallableName(),
+ [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) {
+ TKqpBlockReadOlapTableRanges readTable(&node);
+
+ const auto& tableMeta = ctx.GetTableMeta(readTable.Table());
+ ValidateRangesType(readTable.Ranges().Ref().GetTypeAnn(), tableMeta);
+
+ TKqpKeyRanges ranges = MakeComputedKeyRanges(readTable, ctx, buildCtx);
+
+ // Return type depends on the process program, so it is built explicitly.
+ TStringStream errorStream;
+ auto returnType = NCommon::BuildType(*readTable.Ref().GetTypeAnn(), ctx.PgmBuilder(), errorStream);
+ YQL_ENSURE(returnType, "Failed to build type: " << errorStream.Str());
+
+ // Process program for OLAP read is not present in MKQL, it is passed in range description
+ // in physical plan directly to executer. Read callables in MKQL only used to associate
+ // input stream of the graph with the external scans, so it doesn't make much sense to pass
+ // the process program through callable.
+ // We anyway move to explicit sources as external nodes in KQP program, so all the information
+ // about read settings will be passed in a side channel, not the program.
+ auto result = ctx.PgmBuilder().KqpBlockReadTableRanges(
+ MakeTableId(readTable.Table()),
+ ranges,
+ GetKqpColumns(tableMeta, readTable.Columns(), true),
+ returnType
+ );
+
+ return result;
+ });
+
compiler->AddCallable(TKqpLookupTable::CallableName(),
[&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) {
TKqpLookupTable lookupTable(&node);
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 026c1edeaa..4a25ca0abe 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -523,7 +523,7 @@ public:
[](const TItemExprType* first, const TItemExprType* second) {
return first->GetName() < second->GetName();
});
- inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(),
+ inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(),
[](const TItemExprType* first, const TItemExprType* second) {
return first->GetName() == second->GetName();
}),
@@ -638,6 +638,20 @@ private:
auto miniKqlResultType = GetMKqlResultType(readTableRanges.Process().Ref().GetTypeAnn());
FillOlapProgram(readTableRanges.Process(), miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange());
FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange());
+ } else if (auto maybeReadBlockTableRanges = node.Maybe<TKqpBlockReadOlapTableRanges>()) {
+ auto readTableRanges = maybeReadBlockTableRanges.Cast();
+ auto tableMeta = TablesData->ExistingTable(Cluster, readTableRanges.Table().Path()).Metadata;
+ YQL_ENSURE(tableMeta);
+
+ auto& tableOp = *stageProto.AddTableOps();
+ FillTablesMap(readTableRanges.Table(), readTableRanges.Columns(), tablesMap);
+ FillTableId(readTableRanges.Table(), *tableOp.MutableTable());
+ FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true);
+ FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadOlapRange());
+ auto miniKqlResultType = GetMKqlResultType(readTableRanges.Process().Ref().GetTypeAnn());
+ FillOlapProgram(readTableRanges.Process(), miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange());
+ FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange());
+ tableOp.MutableReadOlapRange()->SetReadType(NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS);
} else if (node.Maybe<TCoSort>()) {
hasSort = true;
} else if (node.Maybe<TCoFilterBase>()) {
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp
index 758b0efb69..78a230e7d3 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.cpp
+++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp
@@ -134,6 +134,16 @@ TType* MakeWideFlowType(TProgramBuilder& builder, TStructType* rowType) {
return builder.NewFlowType(builder.NewTupleType(tupleItems));
}
+TType* MakeBlockType(TProgramBuilder& builder, TStructType* rowType) {
+ std::vector<TType*> tupleItems;
+ tupleItems.reserve(rowType->GetMembersCount());
+ for (ui32 i = 0; i < rowType->GetMembersCount(); ++i) {
+ tupleItems.push_back(rowType->GetMemberType(i));
+ }
+
+ return builder.NewBlockType(builder.NewTupleType(tupleItems), TBlockType::EShape::Many);
+}
+
} // namespace
TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry)
@@ -203,6 +213,30 @@ TRuntimeNode TKqpProgramBuilder::KqpWideReadTableRanges(const TTableId& tableId,
return TRuntimeNode(builder.Build(), false);
}
+TRuntimeNode TKqpProgramBuilder::KqpBlockReadTableRanges(const TTableId& tableId, const TKqpKeyRanges& ranges,
+ const TArrayRef<TKqpTableColumn>& columns, TType* returnType)
+{
+ if (returnType == nullptr) {
+ auto rowType = GetRowType(*this, columns);
+ auto structType = AS_TYPE(TStructType, rowType);
+ returnType = MakeBlockType(*this, structType);
+ } else {
+ MKQL_ENSURE_S(returnType);
+ MKQL_ENSURE_S(returnType->IsFlow());
+ const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType();
+ MKQL_ENSURE_S(itemType->IsTuple());
+ }
+
+ TCallableBuilder builder(Env, __func__, returnType);
+ builder.Add(BuildTableIdLiteral(tableId, *this));
+ builder.Add(BuildKeyRangesNode(*this, ranges));
+ builder.Add(BuildColumnTags(*this, columns));
+ builder.Add(ranges.ItemsLimit);
+ builder.Add(NewDataLiteral(ranges.Reverse));
+
+ return TRuntimeNode(builder.Build(), false);
+}
+
TRuntimeNode TKqpProgramBuilder::KqpLookupTable(const TTableId& tableId, const TRuntimeNode& lookupKeys,
const TArrayRef<TKqpTableColumn>& keyColumns, const TArrayRef<TKqpTableColumn>& columns)
{
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h
index 997ce00ac3..a26938b8db 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.h
+++ b/ydb/core/kqp/runtime/kqp_program_builder.h
@@ -55,6 +55,9 @@ public:
TRuntimeNode KqpWideReadTableRanges(const TTableId& tableId, const TKqpKeyRanges& range,
const TArrayRef<TKqpTableColumn>& columns, TType* returnType);
+ TRuntimeNode KqpBlockReadTableRanges(const TTableId& tableId, const TKqpKeyRanges& range,
+ const TArrayRef<TKqpTableColumn>& columns, TType* returnType);
+
TRuntimeNode KqpLookupTable(const TTableId& tableId, const TRuntimeNode& lookupKeys,
const TArrayRef<TKqpTableColumn>& keyColumns, const TArrayRef<TKqpTableColumn>& columns);
diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp
index 4d10e53665..d86b28999a 100644
--- a/ydb/core/kqp/runtime/kqp_read_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_table.cpp
@@ -364,6 +364,115 @@ private:
TParseReadTableRangesResult ParseResult;
};
+class TKqpScanBlockReadTableWrapperBase : public TStatelessWideFlowCodegeneratorNode<TKqpScanBlockReadTableWrapperBase> {
+ using TBase = TStatelessWideFlowCodegeneratorNode<TKqpScanBlockReadTableWrapperBase>;
+public:
+ TKqpScanBlockReadTableWrapperBase(TKqpScanComputeContext& computeCtx, std::vector<EValueRepresentation>&& representations)
+ : TBase(this)
+ , ComputeCtx(computeCtx)
+ , Representations(std::move(representations)) {}
+
+ EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const {
+ Y_UNUSED(ctx);
+
+ if (!TableReader) {
+ TableReader = ComputeCtx.ReadTable(GetCallableId());
+ }
+
+ return TableReader->Next(output);
+ }
+
+#ifndef MKQL_DISABLE_CODEGEN
+ ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*& block) const {
+ auto& context = ctx.Codegen->GetContext();
+ const auto size = Representations.size();
+
+ Block.resize(size);
+
+ const auto valueType = Type::getInt128Ty(context);
+ const auto valuePtrType = PointerType::getUnqual(valueType);
+ const auto valuesPtr = CastInst::Create(Instruction::IntToPtr,
+ ConstantInt::get(Type::getInt64Ty(context), uintptr_t(Block.data())),
+ valuePtrType, "values", &ctx.Func->getEntryBlock().back());
+
+ ICodegeneratorInlineWideNode::TGettersList getters(size);
+ const auto indexType = Type::getInt32Ty(context);
+ for (auto i = 0U; i < size; ++i) {
+ getters[i] = [i, valueType, valuesPtr, indexType] (const TCodegenContext&, BasicBlock*& block) {
+ const auto loadPtr = GetElementPtrInst::Create(valueType, valuesPtr,
+ {ConstantInt::get(indexType, i)},
+ (TString("loadPtr_") += ToString(i)).c_str(),
+ block);
+ return new LoadInst(loadPtr, "load", block);
+ };
+ }
+
+ const auto fieldsType = ArrayType::get(valuePtrType, size);
+ const auto fields = new AllocaInst(fieldsType, 0U, "fields", &ctx.Func->getEntryBlock().back());
+
+ Value* init = UndefValue::get(fieldsType);
+ for (auto i = 0U; i < size; ++i) {
+ const auto pointer = GetElementPtrInst::Create(valueType, valuesPtr,
+ {ConstantInt::get(indexType, i)},
+ (TString("ptr_") += ToString(i)).c_str(),
+ &ctx.Func->getEntryBlock().back());
+
+ init = InsertValueInst::Create(init, pointer, {i}, (TString("insert_") += ToString(i)).c_str(),
+ &ctx.Func->getEntryBlock().back());
+ }
+
+ new StoreInst(init, fields, &ctx.Func->getEntryBlock().back());
+
+ const auto ptrType = PointerType::getUnqual(StructType::get(context));
+ const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TKqpScanBlockReadTableWrapperBase::DoCalculate));
+ const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
+ const auto funcType = FunctionType::get(Type::getInt32Ty(context), { self->getType(), ctx.Ctx->getType(), fields->getType() }, false);
+ const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block);
+ const auto result = CallInst::Create(funcPtr, { self, ctx.Ctx, fields }, "fetch", block);
+
+ return {result, std::move(getters)};
+ }
+#endif
+
+ virtual ui32 GetCallableId() const = 0;
+ virtual ui32 GetAllColumnsSize() const = 0;
+
+private:
+ TKqpScanComputeContext& ComputeCtx;
+ // Mutable is bad for computation pattern cache.
+ // Probably this hack is necessary for LLVM. Need to review
+ mutable TIntrusivePtr<IKqpTableReader> TableReader;
+ const std::vector<EValueRepresentation> Representations;
+ mutable std::vector<NUdf::TUnboxedValue> Block;
+};
+
+class TKqpScanBlockReadTableRangesWrapper : public TKqpScanBlockReadTableWrapperBase {
+public:
+ TKqpScanBlockReadTableRangesWrapper(TKqpScanComputeContext& computeCtx, const TParseReadTableRangesResult& parseResult,
+ IComputationNode* rangesNode, std::vector<EValueRepresentation>&& representations)
+ : TKqpScanBlockReadTableWrapperBase(computeCtx, std::move(representations))
+ , RangesNode(rangesNode)
+ , ParseResult(parseResult)
+ {}
+
+private:
+ ui32 GetCallableId() const {
+ return ParseResult.CallableId;
+ }
+
+ ui32 GetAllColumnsSize() const {
+ return ParseResult.Columns.size() + ParseResult.SystemColumns.size();
+ }
+
+ void RegisterDependencies() const {
+ this->FlowDependsOn(RangesNode);
+ }
+
+private:
+ IComputationNode* RangesNode;
+ TParseReadTableRangesResult ParseResult;
+};
+
} // namespace
IComputationNode* WrapKqpScanWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,
@@ -411,5 +520,27 @@ IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputati
return new TKqpScanWideReadTableWrapper(computeCtx, parseResult, fromNode, toNode, std::move(representations));
}
+IComputationNode* WrapKqpScanBlockReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,
+ TKqpScanComputeContext& computeCtx)
+{
+ std::vector<EValueRepresentation> representations;
+
+ auto parseResult = ParseWideReadTableRanges(callable);
+ auto rangesNode = LocateNode(ctx.NodeLocator, *parseResult.Ranges);
+
+ const auto type = callable.GetType()->GetReturnType();
+ const auto returnItemType = type->IsFlow() ?
+ AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType():
+ AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType();
+
+ const auto tupleType = AS_TYPE(TTupleType, returnItemType);
+
+ representations.reserve(tupleType->GetElementsCount());
+ for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
+ representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
+
+ return new TKqpScanBlockReadTableRangesWrapper(computeCtx, parseResult, rangesNode, std::move(representations));
+}
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_read_table.h b/ydb/core/kqp/runtime/kqp_read_table.h
index 010751df3d..1701e61079 100644
--- a/ydb/core/kqp/runtime/kqp_read_table.h
+++ b/ydb/core/kqp/runtime/kqp_read_table.h
@@ -46,6 +46,9 @@ IComputationNode* WrapKqpScanWideReadTableRanges(TCallable& callable, const TCom
IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputationNodeFactoryContext& ctx,
TKqpScanComputeContext& computeCtx);
+IComputationNode* WrapKqpScanBlockReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,
+ TKqpScanComputeContext& computeCtx);
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index 8205487ad4..2eb9913030 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -6,6 +6,7 @@
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/pack.h>
+#include <ydb/library/yql/public/udf/arrow/util.h>
#include <ydb/library/yql/utils/yql_panic.h>
namespace NKikimr {
@@ -118,9 +119,20 @@ NUdf::TUnboxedValue MakeUnboxedValue(arrow::Array* column, ui32 row) {
auto array = reinterpret_cast<TArrayType*>(column);
return NUdf::TUnboxedValuePod(static_cast<TValueType>(array->Value(row)));
}
+
+TKqpScanComputeContext::TScanData::EReadType ReadTypeFromProto(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType& type) {
+ switch (type) {
+ case NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS:
+ return TKqpScanComputeContext::TScanData::EReadType::Rows;
+ case NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS:
+ return TKqpScanComputeContext::TScanData::EReadType::Blocks;
+ default:
+ YQL_ENSURE(false, "Invalid read type from TScanTaskMeta protobuf.");
+ }
+}
} // namespace
-ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) {
+ui32 TKqpScanComputeContext::TScanData::TRowBatchReader::TRowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) {
ui32 resultColumnsCount = 0;
if (ColumnsCount) {
auto* data = &Cells[CurrentRow * CellsCountForRow];
@@ -135,6 +147,13 @@ ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxe
return resultColumnsCount;
}
+ui32 TKqpScanComputeContext::TScanData::TBlockBatchReader::TBlockBatch::FillBlockValues(NUdf::TUnboxedValue* const* result) {
+ for (ui32 i = 0; i < ColumnsCount; ++i) {
+ *result[i] = std::move(BatchValues[i]);
+ }
+ return ColumnsCount;
+}
+
namespace {
class TDefaultStatAccumulator {
@@ -409,7 +428,11 @@ std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& val
return {sizes.AllocatedBytes, sizes.DataBytes};
}
-ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* const* result) {
+ui32 TKqpScanComputeContext::TScanData::FillDataValues(NUdf::TUnboxedValue* const* result) {
+ return BatchReader->FillDataValues(result);
+}
+
+ui32 TKqpScanComputeContext::TScanData::TRowBatchReader::FillDataValues(NUdf::TUnboxedValue* const* result) {
YQL_ENSURE(!RowBatches.empty());
auto& batch = RowBatches.front();
const ui32 resultColumnsCount = batch.FillUnboxedCells(result);
@@ -422,15 +445,22 @@ ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* co
return resultColumnsCount;
}
+ui32 TKqpScanComputeContext::TScanData::TBlockBatchReader::FillDataValues(NUdf::TUnboxedValue* const* result) {
+ YQL_ENSURE(!BlockBatches.empty());
+ auto& batch = BlockBatches.front();
+ const ui32 resultColumnsCount = batch.FillBlockValues(result);
+ BlockBatches.pop();
+
+ StoredBytes -= batch.BytesForRecordEstimation();
+ YQL_ENSURE(BlockBatches.empty() == (StoredBytes < 1), "StoredBytes miscalculated!");
+ return resultColumnsCount;
+}
+
TKqpScanComputeContext::TScanData::TScanData(const TTableId& tableId, const TTableRange& range,
- const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys,
- const TSmallVec<TColumn>& resultColumns)
+ const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<TColumn>& resultColumns)
: TableId(tableId)
, Range(range)
- , SkipNullKeys(skipNullKeys)
- , Columns(columns)
- , SystemColumns(systemColumns)
- , ResultColumns(resultColumns)
+ , BatchReader(new TRowBatchReader(columns, systemColumns, resultColumns))
{}
TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
@@ -441,75 +471,39 @@ TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTrans
tableMeta.GetSysViewInfo(), tableMeta.GetSchemaVersion());
TablePath = meta.GetTable().GetTablePath();
- std::copy(meta.GetSkipNullKeys().begin(), meta.GetSkipNullKeys().end(), std::back_inserter(SkipNullKeys));
-
- Columns.reserve(meta.GetColumns().size());
- for (const auto& column : meta.GetColumns()) {
- NMiniKQL::TKqpScanComputeContext::TColumn c;
- c.Tag = column.GetId();
- auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(column.GetType(),
- column.HasTypeInfo() ? &column.GetTypeInfo() : nullptr);
- c.Type = typeInfoMod.TypeInfo;
- c.TypeMod = typeInfoMod.TypeMod;
-
- if (!IsSystemColumn(c.Tag)) {
- Columns.emplace_back(std::move(c));
- } else {
- SystemColumns.emplace_back(std::move(c));
- }
- }
-
- if (meta.GetResultColumns().empty() && !meta.HasOlapProgram()) {
- // Currently we define ResultColumns just for Olap tables in TKqpQueryCompiler
- ResultColumns = Columns;
- } else {
- ResultColumns.reserve(meta.GetResultColumns().size());
- for (const auto& resColumn : meta.GetResultColumns()) {
- NMiniKQL::TKqpScanComputeContext::TColumn c;
- c.Tag = resColumn.GetId();
- auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(resColumn.GetType(),
- resColumn.HasTypeInfo() ? &resColumn.GetTypeInfo() : nullptr);
- c.Type = typeInfoMod.TypeInfo;
- c.TypeMod = typeInfoMod.TypeMod;
-
- if (!IsSystemColumn(c.Tag)) {
- ResultColumns.emplace_back(std::move(c));
- } else {
- SystemColumns.emplace_back(std::move(c));
- }
- }
+ switch(ReadTypeFromProto(meta.GetReadType())) {
+ case TKqpScanComputeContext::TScanData::EReadType::Rows:
+ BatchReader.reset(new TRowBatchReader(meta));
+ break;
+ case TKqpScanComputeContext::TScanData::EReadType::Blocks:
+ BatchReader.reset(new TBlockBatchReader(meta));
+ break;
}
if (statsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
}
+
if (Y_UNLIKELY(statsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE)) {
ProfileStats = std::make_unique<TProfileStats>();
}
}
-
-ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) {
- if (Finished || batch.empty()) {
- return 0;
- }
-
+TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const TVector<TOwnedCellVec>& batch,
+ TMaybe<ui64> shardId, const THolderFactory& holderFactory)
+{
TBytesStatistics stats;
-
- TVector<ui64> bytesList;
- bytesList.reserve(batch.size());
-
TUnboxedValueVector cells;
- if (!ColumnsCount()) {
+ if (TotalColumnsCount == 0u) {
cells.resize(batch.size(), holderFactory.GetEmptyContainer());
stats.AddStatistics({ sizeof(ui64) * batch.size(), sizeof(ui64) * batch.size() });
} else {
- cells.resize(batch.size() * ColumnsCount());
+ cells.resize(batch.size() * TotalColumnsCount);
for (size_t rowIndex = 0; rowIndex < batch.size(); ++rowIndex) {
auto& row = batch[rowIndex];
- auto* vectorStart = &cells.data()[rowIndex * ColumnsCount()];
+ auto* vectorStart = &cells.data()[rowIndex * TotalColumnsCount];
for (ui32 i = 0; i < ResultColumns.size(); ++i) {
vectorStart[i] = GetCellValue(row[i], ResultColumns[i].Type);
}
@@ -519,10 +513,26 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba
}
}
if (cells.size()) {
- RowBatches.emplace(RowBatch(ColumnsCount(), batch.size(), std::move(cells), shardId, stats.AllocatedBytes));
+ RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.size(), std::move(cells), stats.AllocatedBytes));
}
-
StoredBytes += stats.AllocatedBytes;
+
+ return stats;
+}
+
+TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const TVector<TOwnedCellVec>& /*batch*/,
+ TMaybe<ui64> /*shardId*/, const THolderFactory& /*holderFactory*/)
+{
+ Y_VERIFY(false, "Batch of TOwnedCellVec should never be called for BlockBatchReader!");
+ return TBytesStatistics();
+}
+
+ui64 TKqpScanComputeContext::TScanData::AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) {
+ if (Finished || batch.empty()) {
+ return 0;
+ }
+
+ TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory);
if (BasicStats) {
BasicStats->Rows += batch.size();
BasicStats->Bytes += stats.DataBytes;
@@ -531,32 +541,27 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba
return stats.AllocatedBytes;
}
-ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch, TMaybe<ui64> shardId,
+TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId,
const THolderFactory& holderFactory)
{
- // RecordBatch hasn't empty method so check the number of rows
- if (Finished || batch.num_rows() == 0) {
- return 0;
- }
-
TBytesStatistics stats;
TUnboxedValueVector cells;
- if (!ColumnsCount()) {
+ if (TotalColumnsCount == 0u) {
cells.resize(batch.num_rows(), holderFactory.GetEmptyContainer());
stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() });
} else {
- cells.resize(batch.num_rows() * ColumnsCount());
+ cells.resize(batch.num_rows() * TotalColumnsCount);
for (size_t columnIndex = 0; columnIndex < ResultColumns.size(); ++columnIndex) {
stats.AddStatistics(
- WriteColumnValuesFromArrow(cells.data(), batch, columnIndex, ColumnsCount(), ResultColumns[columnIndex].Type)
+ WriteColumnValuesFromArrow(cells.data(), batch, columnIndex, TotalColumnsCount, ResultColumns[columnIndex].Type)
);
}
if (!SystemColumns.empty()) {
for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
- FillSystemColumns(&cells[rowIndex * ColumnsCount() + ResultColumns.size()], shardId, SystemColumns);
+ FillSystemColumns(&cells[rowIndex * TotalColumnsCount + ResultColumns.size()], shardId, SystemColumns);
}
stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue);
@@ -564,10 +569,55 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch,
}
if (cells.size()) {
- RowBatches.emplace(RowBatch(ColumnsCount(), batch.num_rows(), std::move(cells), shardId, stats.AllocatedBytes));
+ RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.num_rows(), std::move(cells), stats.AllocatedBytes));
+ }
+
+ StoredBytes += stats.AllocatedBytes;
+
+ return stats;
+}
+
+TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> /*shardId*/,
+ const THolderFactory& holderFactory)
+{
+ TBytesStatistics stats;
+ auto totalColsCount = TotalColumnsCount + 1;
+ TUnboxedValueVector batchValues;
+ batchValues.resize(totalColsCount);
+
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ batchValues[i] = holderFactory.CreateArrowBlock(arrow::Datum(batch.column_data(i)));
}
+ ui64 batchByteSize = NYql::NUdf::GetSizeOfArrowBatchInBytes(batch);
+ stats.AddStatistics({batchByteSize, batchByteSize});
+
+ // !!! TODO !!!
+ // if (!SystemColumns.empty()) {
+ // for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
+ // FillSystemColumns(&cells[rowIndex * ColumnsCount() + ResultColumns.size()], shardId, SystemColumns);
+ // }
+
+ // stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue);
+ // }
+ batchValues[totalColsCount - 1] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.num_rows())));
+ stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() });
+
+ BlockBatches.emplace(TBlockBatch(totalColsCount, batch.num_rows(), std::move(batchValues), stats.AllocatedBytes));
StoredBytes += stats.AllocatedBytes;
+
+ return stats;
+}
+
+ui64 TKqpScanComputeContext::TScanData::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId,
+ const THolderFactory& holderFactory)
+{
+ // RecordBatch hasn't empty method so check the number of rows
+ if (Finished || batch.num_rows() == 0) {
+ return 0;
+ }
+
+ TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory);
if (BasicStats) {
BasicStats->Rows += batch.num_rows();
BasicStats->Bytes += stats.DataBytes;
@@ -576,6 +626,53 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch,
return stats.AllocatedBytes;
}
+TKqpScanComputeContext::TScanData::IDataBatchReader::IDataBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns,
+ const TSmallVec<TColumn>& resultColumns)
+ : Columns(columns)
+ , SystemColumns(systemColumns)
+ , ResultColumns(resultColumns)
+ , TotalColumnsCount(resultColumns.size() + systemColumns.size())
+{}
+
+TKqpScanComputeContext::TScanData::IDataBatchReader::IDataBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta) {
+ Columns.reserve(meta.GetColumns().size());
+ for (const auto& column : meta.GetColumns()) {
+ NMiniKQL::TKqpScanComputeContext::TColumn c;
+ c.Tag = column.GetId();
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(column.GetType(),
+ column.HasTypeInfo() ? &column.GetTypeInfo() : nullptr);
+ c.Type = typeInfoMod.TypeInfo;
+ c.TypeMod = typeInfoMod.TypeMod;
+
+ if (!IsSystemColumn(c.Tag)) {
+ Columns.emplace_back(std::move(c));
+ } else {
+ SystemColumns.emplace_back(std::move(c));
+ }
+ }
+
+ if (meta.GetResultColumns().empty() && !meta.HasOlapProgram()) {
+ // Currently we define ResultColumns just for Olap tables in TKqpQueryCompiler
+ ResultColumns = Columns;
+ } else {
+ ResultColumns.reserve(meta.GetResultColumns().size());
+ for (const auto& resColumn : meta.GetResultColumns()) {
+ NMiniKQL::TKqpScanComputeContext::TColumn c;
+ c.Tag = resColumn.GetId();
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(resColumn.GetType(),
+ resColumn.HasTypeInfo() ? &resColumn.GetTypeInfo() : nullptr);
+ c.Type = typeInfoMod.TypeInfo;
+ c.TypeMod = typeInfoMod.TypeMod;
+
+ if (!IsSystemColumn(c.Tag)) {
+ ResultColumns.emplace_back(std::move(c));
+ }
+ }
+ }
+
+ TotalColumnsCount = ResultColumns.size() + SystemColumns.size();
+}
+
void TKqpScanComputeContext::AddTableScan(ui32, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
NYql::NDqProto::EDqStatsMode statsMode)
{
@@ -635,7 +732,7 @@ public:
return EFetchResult::Yield;
}
- ScanData.FillUnboxedCells(result);
+ ScanData.FillDataValues(result);
return EFetchResult::One;
}
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h
index 34fbcdcd61..12fe203051 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.h
+++ b/ydb/core/kqp/runtime/kqp_scan_data.h
@@ -31,9 +31,7 @@ struct TBytesStatistics {
TBytesStatistics(const ui64 allocated, const ui64 data)
: AllocatedBytes(allocated)
, DataBytes(data)
- {
-
- }
+ {}
TBytesStatistics operator+(const TBytesStatistics& item) const {
return TBytesStatistics(AllocatedBytes + item.AllocatedBytes, DataBytes + item.DataBytes);
@@ -83,45 +81,27 @@ public:
public:
TScanData(TScanData&&) = default; // needed to create TMap<ui32, TScanData> Scans
TScanData(const TTableId& tableId, const TTableRange& range, const TSmallVec<TColumn>& columns,
- const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys,
- const TSmallVec<TColumn>& resultColumns);
-
- ui32 ColumnsCount() const {
- return ResultColumns.size() + SystemColumns.size();
- }
+ const TSmallVec<TColumn>& systemColumns, const TSmallVec<TColumn>& resultColumns);
- ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result);
+ ui32 FillDataValues(NUdf::TUnboxedValue* const* result);
TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, NYql::NDqProto::EDqStatsMode statsMode);
- ~TScanData() {
- Y_VERIFY_DEBUG_S(RowBatches.empty(), "Buffer in TScanData was not cleared, data is leaking. "
- << "Queue of UnboxedValues must be emptied under allocator using Clear() method, but has "
- << RowBatches.size() << " elements!");
- }
+ ~TScanData() = default;
const TSmallVec<TColumn>& GetColumns() const {
- return Columns;
- }
-
- const TSmallVec<TColumn>& GetSystemColumns() const {
- return SystemColumns;
- }
-
- const TSmallVec<TColumn>& GetResultColumns() const {
- return ResultColumns;
+ return BatchReader->GetColumns();
}
- ui64 AddRows(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
-
- ui64 AddRows(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
+ ui64 AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
+ ui64 AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
bool IsEmpty() const {
- return RowBatches.empty();
+ return BatchReader->IsEmpty();
}
ui64 GetStoredBytes() const {
- return StoredBytes;
+ return BatchReader->GetStoredBytes();
}
void Finish() {
@@ -133,8 +113,7 @@ public:
}
void Clear() {
- TQueue<RowBatch> newQueue;
- std::swap(newQueue, RowBatches);
+ BatchReader->Clear();
}
public:
@@ -142,7 +121,6 @@ public:
TTableId TableId;
TString TablePath;
TSerializedTableRange Range;
- TSmallVec<bool> SkipNullKeys;
// shared with actor via TableReader
TIntrusivePtr<IKqpTableReader> TableReader;
@@ -163,53 +141,168 @@ public:
TDuration ScanWaitTime; // IScan waiting data time
};
+ enum class EReadType {
+ Rows,
+ Blocks
+ };
+
std::unique_ptr<TBasicStats> BasicStats;
std::unique_ptr<TProfileStats> ProfileStats;
private:
- class RowBatch {
- private:
- const ui32 CellsCountForRow;
- const ui32 ColumnsCount;
- const ui32 RowsCount;
- TUnboxedValueVector Cells;
- ui64 CurrentRow = 0;
- const ui64 AllocatedBytes;
+ class IDataBatchReader {
+ public:
+ IDataBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns,
+ const TSmallVec<TColumn>& resultColumns);
+
+ IDataBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta);
+
+ virtual ~IDataBatchReader() = default;
+
+ const TSmallVec<TColumn>& GetColumns() const {
+ return Columns;
+ }
+
+ ui64 GetStoredBytes() const {
+ return StoredBytes;
+ }
+
+ virtual TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0;
+ virtual TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0;
+ virtual ui32 FillDataValues(NUdf::TUnboxedValue* const* result) = 0;
+ virtual void Clear() = 0;
+ virtual bool IsEmpty() const = 0;
+ protected:
+ TSmallVec<TColumn> Columns;
+ TSmallVec<TColumn> SystemColumns;
+ TSmallVec<TColumn> ResultColumns;
+ ui32 TotalColumnsCount;
+ double StoredBytes = 0;
+ };
+
+ class TRowBatchReader : public IDataBatchReader {
public:
- TMaybe<ui64> ShardId;
-
- explicit RowBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId, const ui64 allocatedBytes)
- : CellsCountForRow(columnsCount ? columnsCount : 1)
- , ColumnsCount(columnsCount)
- , RowsCount(rowsCount)
- , Cells(std::move(cells))
- , AllocatedBytes(allocatedBytes)
- , ShardId(shardId)
- {
- Y_VERIFY(AllocatedBytes);
- Y_VERIFY(RowsCount);
+ TRowBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns,
+ const TSmallVec<TColumn>& resultColumns)
+ : IDataBatchReader(columns, systemColumns, resultColumns)
+ {}
+
+ TRowBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta)
+ : IDataBatchReader(meta)
+ {}
+
+ ~TRowBatchReader() {
+ Y_VERIFY_DEBUG_S(RowBatches.empty(), "Buffer in TRowBatchReader was not cleared, data is leaking. "
+ << "Queue of UnboxedValues must be emptied under allocator using Clear() method, but has "
+ << RowBatches.size() << " elements!");
+ }
+
+ TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
+ TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
+ ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override;
+
+ void Clear() override {
+ TQueue<TRowBatch> newQueue;
+ std::swap(newQueue, RowBatches);
}
- double BytesForRecordEstimation() {
- return 1.0 * AllocatedBytes / RowsCount;
+ bool IsEmpty() const override {
+ return RowBatches.empty();
}
+ private:
+ class TRowBatch {
+ private:
+ const ui32 CellsCountForRow;
+ const ui32 ColumnsCount;
+ const ui32 RowsCount;
+ TUnboxedValueVector Cells;
+ ui64 CurrentRow = 0;
+ const ui64 AllocatedBytes;
+ public:
+
+ explicit TRowBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& cells, const ui64 allocatedBytes)
+ : CellsCountForRow(columnsCount ? columnsCount : 1)
+ , ColumnsCount(columnsCount)
+ , RowsCount(rowsCount)
+ , Cells(std::move(cells))
+ , AllocatedBytes(allocatedBytes)
+ {
+ Y_VERIFY(AllocatedBytes);
+ Y_VERIFY(RowsCount);
+ }
+
+ double BytesForRecordEstimation() {
+ return 1.0 * AllocatedBytes / RowsCount;
+ }
+
+ bool IsFinished() {
+ return CurrentRow * CellsCountForRow == Cells.size();
+ }
+
+ ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result);
+ };
+
+ TQueue<TRowBatch> RowBatches;
+ };
- const NUdf::TUnboxedValue* GetCurrentData() const {
- return Cells.data() + CurrentRow * CellsCountForRow;
+ class TBlockBatchReader : public IDataBatchReader {
+ public:
+ TBlockBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns,
+ const TSmallVec<TColumn>& resultColumns)
+ : IDataBatchReader(columns, systemColumns, resultColumns)
+ {}
+
+ TBlockBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta)
+ : IDataBatchReader(meta)
+ {}
+
+ ~TBlockBatchReader() {
+ Y_VERIFY_DEBUG_S(BlockBatches.empty(), "Buffer in TBlockBatchReader was not cleared, data is leaking. "
+ << "Queue of UnboxedValues must be emptied under allocator using Clear() method, but has "
+ << BlockBatches.size() << " elements!");
}
- bool IsFinished() {
- return CurrentRow * CellsCountForRow == Cells.size();
+ TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
+ TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override;
+ ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override;
+
+ void Clear() override {
+ TQueue<TBlockBatch> newQueue;
+ std::swap(newQueue, BlockBatches);
}
- ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result);
+ bool IsEmpty() const override {
+ return BlockBatches.empty();
+ }
+ private:
+ class TBlockBatch {
+ private:
+ const ui32 ColumnsCount;
+ const ui32 RowsCount;
+ TUnboxedValueVector BatchValues;
+ const ui64 AllocatedBytes;
+ public:
+ explicit TBlockBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& value, const ui64 allocatedBytes)
+ : ColumnsCount(columnsCount)
+ , RowsCount(rowsCount)
+ , BatchValues(std::move(value))
+ , AllocatedBytes(allocatedBytes)
+ {
+ Y_VERIFY(AllocatedBytes);
+ Y_VERIFY(RowsCount);
+ }
+
+ double BytesForRecordEstimation() {
+ return 1.0 * AllocatedBytes;
+ }
+
+ ui32 FillBlockValues(NUdf::TUnboxedValue* const* result);
+ };
+
+ TQueue<TBlockBatch> BlockBatches;
};
- TSmallVec<TColumn> Columns;
- TSmallVec<TColumn> SystemColumns;
- TSmallVec<TColumn> ResultColumns;
- TQueue<RowBatch> RowBatches;
- double StoredBytes = 0;
+ std::unique_ptr<IDataBatchReader> BatchReader;
bool Finished = false;
};
diff --git a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
index 90eab85f74..451e9160ba 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
@@ -247,9 +247,9 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
TMemoryUsageInfo memInfo("");
THolderFactory factory(alloc.Ref(), memInfo);
- TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, {}, rows.front().Columns());
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, rows.front().Columns());
- scanData.AddRows(*batch, {}, factory);
+ scanData.AddData(*batch, {}, factory);
std::vector<NUdf::TUnboxedValue> container;
container.resize(20);
@@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
containerPtr.emplace_back(&i);
}
for (auto& row: rows) {
- scanData.FillUnboxedCells(containerPtr.data());
+ scanData.FillDataValues(containerPtr.data());
UNIT_ASSERT_EQUAL(container[0 ].Get<bool >(), row.Bool );
UNIT_ASSERT_EQUAL(container[1 ].Get<i8 >(), row.Int8 );
UNIT_ASSERT_EQUAL(container[2 ].Get<i16 >(), row.Int16 );
@@ -303,9 +303,9 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
.Type = TTypeInfo(NTypeIds::Int8)
};
resultCols.push_back(resCol);
- TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, {}, resultCols);
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, resultCols);
- scanData.AddRows(*batch, {}, factory);
+ scanData.AddData(*batch, {}, factory);
std::vector<NUdf::TUnboxedValue> container;
container.resize(1);
@@ -315,7 +315,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
}
for (auto& row: rows) {
- scanData.FillUnboxedCells(containerPtr.data());
+ scanData.FillDataValues(containerPtr.data());
UNIT_ASSERT_EQUAL(container[0].Get<i8>(), row.Int8);
}
@@ -329,9 +329,9 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
TMemoryUsageInfo memInfo("");
THolderFactory factory(alloc.Ref(), memInfo);
- TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}, {});
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {});
TVector<TOwnedCellVec> emptyBatch(1000);
- auto bytes = scanData.AddRows(emptyBatch, {}, factory);
+ auto bytes = scanData.AddData(emptyBatch, {}, factory);
UNIT_ASSERT(bytes > 0);
std::vector<NUdf::TUnboxedValue*> containerPtr;
@@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
for (const auto& row: emptyBatch) {
Y_UNUSED(row);
UNIT_ASSERT(!scanData.IsEmpty());
- UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0);
+ UNIT_ASSERT(scanData.FillDataValues(containerPtr.data()) == 0);
}
UNIT_ASSERT(scanData.IsEmpty());
}
@@ -348,18 +348,18 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__);
TMemoryUsageInfo memInfo("");
THolderFactory factory(alloc.Ref(), memInfo);
- TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}, {});
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {});
TVector<TDataRow> rows = TestRows();
std::shared_ptr<arrow::RecordBatch> anotherEmptyBatch = VectorToBatch(rows, rows.front().MakeArrowSchema());
- auto bytes = scanData.AddRows(*anotherEmptyBatch, {}, factory);
+ auto bytes = scanData.AddData(*anotherEmptyBatch, {}, factory);
UNIT_ASSERT(bytes > 0);
std::vector<NUdf::TUnboxedValue*> containerPtr;
for (const auto& row: rows) {
Y_UNUSED(row);
UNIT_ASSERT(!scanData.IsEmpty());
- UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0);
+ UNIT_ASSERT(scanData.FillDataValues(containerPtr.data()) == 0);
}
UNIT_ASSERT(scanData.IsEmpty());
}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index a31e49c817..48cba779ba 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -148,6 +148,10 @@ message TKqpPhyOpLookup {
}
message TKqpPhyOpReadOlapRanges {
+ enum EReadType {
+ ROWS = 0;
+ BLOCKS = 1;
+ }
// Parameter come here from computation stage. It has type Tuple(List(Tuple(RangeBegin, RangeEnd))))
// Where RangeBegin and RangeEnd are Tuple(KeyColumns, Inclusive)
// Where KeyColumns is values of start/end of range for corresponding key column in table
@@ -170,6 +174,9 @@ message TKqpPhyOpReadOlapRanges {
// Stores type of read result from Column Shard
Ydb.Type ResultType = 7;
+
+ // Type of read result: unboxed values or Arrow blocks of data
+ EReadType ReadType = 13;
}
message TKqpPhyOpReadRanges {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 5d886a2b1a..7b5a154b52 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -212,6 +212,11 @@ message TKqpTransaction {
repeated NKikimrTx.TKeyRange KeyRanges = 2;
}
+ enum EReadType {
+ ROWS = 0;
+ BLOCKS = 1;
+ }
+
optional TTableMeta Table = 1;
repeated TColumnMeta Columns = 2;
repeated uint32 KeyColumnTypes = 3; // for debug logs only
@@ -225,6 +230,8 @@ message TKqpTransaction {
optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables
optional bool Sorted = 11;
repeated TColumnMeta ResultColumns = 12;
+ // Type of read result: unboxed values or Arrow blocks of data
+ optional EReadType ReadType = 14;
}
optional EKqpTransactionType Type = 1;
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index 63ab5553e1..b369ac9db3 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -59,6 +59,7 @@ struct TKqpScanComputationMap {
TKqpScanComputationMap() {
Map["KqpWideReadTable"] = &WrapKqpScanWideReadTable;
Map["KqpWideReadTableRanges"] = &WrapKqpScanWideReadTableRanges;
+ Map["KqpBlockReadTableRanges"] = &WrapKqpScanBlockReadTableRanges;
}
THashMap<TString, TCallableScanBuilderFunc> Map;
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 44182d0962..caef313ca9 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -2331,6 +2331,11 @@
{"Index": 1, "Name": "KeySelectorLambda", "Type": "TCoLambda"},
{"Index": 2, "Name": "ListHandlerLambda", "Type": "TCoLambda"}
]
+ },
+ {
+ "Name": "TCoWideFromBlocks",
+ "Base": "TCoInputBase",
+ "Match": {"Type": "Callable", "Name": "WideFromBlocks"}
}
]
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index 372f9fd934..d0049d8c3b 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -453,6 +453,7 @@ private:
if (name == "Switch" || // KIKIMR-16457
name == "KqpWideReadTable" ||
name == "KqpWideReadTableRanges" ||
+ name == "KqpBlockReadTableRanges" ||
name == "KqpLookupTable" ||
name == "KqpReadTable" ||
name == "RangeMultiply" ||
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 6ea4959222..e28cfe15fd 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -47,6 +47,7 @@
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
+#include <ydb/library/yql/public/udf/arrow/util.h>
#include <ydb/library/yql/utils/yql_panic.h>
#include <ydb/library/yql/providers/s3/common/util.h>
@@ -674,35 +675,6 @@ private:
std::function<void()> WaitForFutureResolve;
};
-ui64 GetSizeOfData(const arrow::ArrayData& data) {
- ui64 size = sizeof(data);
- size += data.buffers.size() * sizeof(void*);
- size += data.child_data.size() * sizeof(void*);
- for (const auto& b : data.buffers) {
- if (b) {
- size += b->size();
- }
- }
-
- for (const auto& c : data.child_data) {
- if (c) {
- size += GetSizeOfData(*c);
- }
- }
-
- return size;
-}
-
-ui64 GetSizeOfBatch(const arrow::RecordBatch& batch) {
- ui64 size = sizeof(batch);
- size += batch.num_columns() * sizeof(void*);
- for (int i = 0; i < batch.num_columns(); ++i) {
- size += GetSizeOfData(*batch.column_data(i));
- }
-
- return size;
-}
-
class TS3ReadCoroImpl : public TActorCoroImpl {
friend class TS3StreamReadActor;
private:
@@ -728,15 +700,15 @@ private:
static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
public:
- TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
- const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
- const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader,
+ TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
+ const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
+ const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps)
- : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
- TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
+ : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
+ TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader),
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps)
{}
@@ -913,7 +885,7 @@ private:
} else {
buffer = std::make_unique<TReadBufferFromStream>(this);
}
-
+
const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
auto& readBuffer = decompress ? *decompress : *buffer;
@@ -949,7 +921,7 @@ private:
otherEvents.push_back(event->ReleaseBase());
event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
}
-
+
for (auto &e: otherEvents) {
Send(SelfActorId, e.Release());
}
@@ -987,7 +959,7 @@ private:
}
fileDesc.Cookie = result.Cookie;
- TArrowParquetBatchReader reader(std::move(fileDesc),
+ TArrowParquetBatchReader reader(std::move(fileDesc),
ArrowReader,
result.NumRowGroups,
std::move(columnIndices),
@@ -1002,10 +974,10 @@ private:
} else {
buffer = std::make_unique<TReadBufferFromStream>(this);
}
-
+
const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
-
+
auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings));
TBlockReader reader(std::move(stream));
ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal);
@@ -1082,7 +1054,7 @@ private:
Send(SelfActorId, e.Release());
}
};
-
+
for (;;) {
T batch;
@@ -1348,7 +1320,7 @@ private:
}
ui64 GetBlockSize(const TReadyBlock& block) const {
- return ReadSpec->Arrow ? GetSizeOfBatch(*block.Batch) : block.Block.bytes();
+ return ReadSpec->Arrow ? NUdf::GetSizeOfArrowBatchInBytes(*block.Batch) : block.Block.bytes();
}
void ReportMemoryUsage() const {
@@ -1525,7 +1497,7 @@ private:
void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) {
YQL_ENSURE(ReadSpec->Arrow);
IngressBytes += next->Get()->IngressDelta;
- auto size = GetSizeOfBatch(*next->Get()->Batch);
+ auto size = NUdf::GetSizeOfArrowBatchInBytes(*next->Get()->Batch);
QueueTotalDataSize += size;
if (Counters) {
QueueBlockCount->Inc();
diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp
index f87b9cebdb..aff82a8024 100644
--- a/ydb/library/yql/public/udf/arrow/util.cpp
+++ b/ydb/library/yql/public/udf/arrow/util.cpp
@@ -4,6 +4,7 @@
#include <arrow/array/array_base.h>
#include <arrow/chunked_array.h>
+#include <arrow/record_batch.h>
namespace NYql {
namespace NUdf {
@@ -67,6 +68,25 @@ private:
arrow::MemoryPool* Pool;
};
+ui64 GetSizeOfArrayDataInBytes(const arrow::ArrayData& data) {
+ ui64 size = sizeof(data);
+ size += data.buffers.size() * sizeof(void*);
+ size += data.child_data.size() * sizeof(void*);
+ for (const auto& b : data.buffers) {
+ if (b) {
+ size += b->size();
+ }
+ }
+
+ for (const auto& c : data.child_data) {
+ if (c) {
+ size += GetSizeOfArrayDataInBytes(*c);
+ }
+ }
+
+ return size;
+}
+
} // namespace
std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) {
@@ -150,5 +170,15 @@ std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arr
return result;
}
+ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch) {
+ ui64 size = sizeof(batch);
+ size += batch.num_columns() * sizeof(void*);
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ size += GetSizeOfArrayDataInBytes(*batch.column_data(i));
+ }
+
+ return size;
+}
+
}
}
diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h
index cf83be5ced..1ed4cf8172 100644
--- a/ydb/library/yql/public/udf/arrow/util.h
+++ b/ydb/library/yql/public/udf/arrow/util.h
@@ -32,6 +32,8 @@ inline bool IsNull(const arrow::ArrayData& data, size_t index) {
/// \brief same as arrow::AllocateResizableBuffer, but allows to control zero padding
std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arrow::MemoryPool* pool, bool zeroPad = false);
+ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch);
+
// similar to arrow::TypedBufferBuilder, but:
// 1) with UnsafeAdvance() method
// 2) shrinkToFit = false