diff options
author | aidarsamer <aidarsamer@ydb.tech> | 2023-02-15 11:17:46 +0300 |
---|---|---|
committer | aidarsamer <aidarsamer@ydb.tech> | 2023-02-15 11:17:46 +0300 |
commit | 4f56624382f2f901b71fc5fdec8318455dce767f (patch) | |
tree | 0ec6e56f66b8d5a57db0e92fab2c9b375c432514 | |
parent | 33e10c4ea8c8e1e34e64cc6419d825d2d5ed6020 (diff) | |
download | ydb-4f56624382f2f901b71fc5fdec8318455dce767f.tar.gz |
Add block reads for column shard tables.
Add block reads for column shard tables
27 files changed, 712 insertions, 209 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 0ba8e1d6ea..d3213fe8d4 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -34,6 +34,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput return WrapKqpScanWideReadTableRanges(callable, ctx, *computeCtx); } + if (name == "KqpBlockReadTableRanges"sv) { + return WrapKqpScanBlockReadTableRanges(callable, ctx, *computeCtx); + } + // only for _pure_ compute actors! if (name == "KqpEnsure"sv) { return WrapKqpEnsure(callable, ctx); diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 0d80e75017..4a8ca2587d 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -245,14 +245,14 @@ private: case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: case NKikimrTxDataShard::EScanDataFormat::CELLVEC: { if (!msg.Rows.empty()) { - bytes = ScanData->AddRows(msg.Rows, {}, TaskRunner->GetHolderFactory()); + bytes = ScanData->AddData(msg.Rows, {}, TaskRunner->GetHolderFactory()); rowsCount = msg.Rows.size(); } break; } case NKikimrTxDataShard::EScanDataFormat::ARROW: { if(msg.ArrowBatch != nullptr) { - bytes = ScanData->AddRows(*msg.ArrowBatch, {}, TaskRunner->GetHolderFactory()); + bytes = ScanData->AddData(*msg.ArrowBatch, {}, TaskRunner->GetHolderFactory()); rowsCount = msg.ArrowBatch->num_rows(); } break; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index aa40b41e95..f3098274db 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -506,13 +506,13 @@ private: case NKikimrTxDataShard::EScanDataFormat::CELLVEC: case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: if (!msg.Rows.empty()) { - bytes += ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory()); + bytes += ScanData->AddData(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory()); rowsCount += msg.Rows.size(); } break; case NKikimrTxDataShard::EScanDataFormat::ARROW: - if (!!msg.ArrowBatch) { - bytes += ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory()); + if (msg.ArrowBatch) { + bytes += ScanData->AddData(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory()); rowsCount += msg.ArrowBatch->num_rows(); } break; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 128d66d7b9..e20216160c 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -36,6 +36,28 @@ using namespace NYql::NDq; namespace { +NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType ReadTypeToProto(const TTaskMeta::TReadInfo::EReadType& type) { + switch (type) { + case TTaskMeta::TReadInfo::EReadType::Rows: + return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS; + case TTaskMeta::TReadInfo::EReadType::Blocks: + return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS; + } + + YQL_ENSURE(false, "Invalid read type in task meta."); +} + +TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) { + switch (type) { + case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS: + return TTaskMeta::TReadInfo::EReadType::Rows; + case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS: + return TTaskMeta::TReadInfo::EReadType::Blocks; + default: + YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf."); + } +} + class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> { using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>; @@ -45,7 +67,7 @@ public: } TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, ui32 executerDelayToRetryMs) : TBase(std::move(request), database, userToken, counters, executerDelayToRetryMs, TWilsonKqp::ScanExecuter, "ScanExecuter") @@ -153,6 +175,7 @@ private: taskMeta.ReadInfo.ItemsLimit = itemsLimit; taskMeta.ReadInfo.Reverse = reverse; taskMeta.ReadInfo.Sorted = sorted; + taskMeta.ReadInfo.ReadType = TTaskMeta::TReadInfo::EReadType::Rows; if (resultType) { YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct @@ -179,8 +202,8 @@ private: return; } + taskMeta.ReadInfo.ReadType = ReadTypeFromProto(readOlapRange->GetReadType()); taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram(); - for (auto& name: readOlapRange->GetOlapProgramParameterNames()) { taskMeta.ReadInfo.OlapProgram.ParameterNames.insert(name); } @@ -573,6 +596,7 @@ private: protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse); protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit); protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted); + protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType)); for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) { auto* protoResultColumn = protoTaskMeta.AddResultColumns(); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 860d88bda4..b6bcc4c7d3 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -128,9 +128,14 @@ struct TTaskMeta { }; struct TReadInfo { + enum class EReadType { + Rows, + Blocks + }; ui64 ItemsLimit = 0; bool Reverse = false; bool Sorted = false; + EReadType ReadType = EReadType::Rows; TKqpOlapProgram OlapProgram; TVector<NScheme::TTypeInfo> ResultColumnsTypes; }; diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 5a3d736117..65f09ff9ff 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -133,6 +133,11 @@ "Match": {"Type": "Callable", "Name": "KqpWideReadOlapTableRanges"} }, { + "Name": "TKqpBlockReadOlapTableRanges", + "Base": "TKqpReadOlapTableRangesBase", + "Match": {"Type": "Callable", "Name": "KqpBlockReadOlapTableRanges"} + }, + { "Name": "TKqlLookupTableBase", "Base": "TCallable", "Match": {"Type": "CallableBase"}, diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index ae4907c9b8..d5ad48d7fa 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -143,6 +143,24 @@ const TFlowExprType* GetWideRowsType(TExprContext& ctx, const TStructExprType* r return ctx.MakeType<TFlowExprType>(wideRowType); } +const TFlowExprType* GetBlockRowsType(TExprContext& ctx, const TStructExprType* rowType) { + YQL_ENSURE(rowType); + + const auto& columns = rowType->GetItems(); + + TTypeAnnotationNode::TListType items; + items.reserve(columns.size()); + + for (const auto& column: columns) { + items.push_back(ctx.MakeType<TBlockExprType>(column->GetItemType())); + } + // Last item is height of block + items.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))); + + auto blockRowType = ctx.MakeType<TMultiExprType>(items); + return ctx.MakeType<TFlowExprType>(blockRowType); +} + bool CalcKeyColumnsCount(TExprContext& ctx, const TPositionHandle pos, const TStructExprType& structType, const TKikimrTableDescription& tableDesc, const TKikimrTableMetadata& metadata, ui32& keyColumnsCount) { @@ -348,6 +366,8 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx, node->SetTypeAnn(ctx.MakeType<TFlowExprType>(processRowType)); } else if (TKqpWideReadOlapTableRanges::Match(node.Get())) { node->SetTypeAnn(GetWideRowsType(ctx, processRowType->Cast<TStructExprType>())); + } else if (TKqpBlockReadOlapTableRanges::Match(node.Get())) { + node->SetTypeAnn(GetBlockRowsType(ctx, processRowType->Cast<TStructExprType>())); } else { YQL_ENSURE(false, "Unexpected ReadOlapTable callable." << node->Content()); } diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp index 61fd228853..e15be81baa 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp @@ -132,7 +132,7 @@ protected: } TMaybeNode<TExprBase> BuildWideReadTable(TExprBase node, TExprContext& ctx) { - TExprBase output = KqpBuildWideReadTable(node, ctx); + TExprBase output = KqpBuildWideReadTable(node, ctx, TypesCtx); DumpAppliedRule("BuildWideReadTable", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h index 489a2afe1e..f69e39756f 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_rules.h @@ -11,6 +11,6 @@ namespace NKikimr::NKqp::NOpt { -NYql::NNodes::TExprBase KqpBuildWideReadTable(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx); +NYql::NNodes::TExprBase KqpBuildWideReadTable(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, NYql::TTypeAnnotationContext& typesCtx); } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp index 75d76930bd..41d26d3e30 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp @@ -9,7 +9,7 @@ using namespace NYql; using namespace NYql::NDq; using namespace NYql::NNodes; -TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx) { +TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { if (!node.Maybe<TKqpReadTable>() && !node.Maybe<TKqpReadOlapTableRanges>() && !node.Maybe<TKqpReadTableRanges>()) @@ -58,14 +58,27 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx) { } else if (auto maybeRead = node.Maybe<TKqpReadOlapTableRanges>()) { auto read = maybeRead.Cast(); - wideRead = Build<TKqpWideReadOlapTableRanges>(ctx, node.Pos()) - .Table(read.Table()) - .Ranges(read.Ranges()) - .Columns(read.Columns()) - .Settings(read.Settings()) - .ExplainPrompt(read.ExplainPrompt()) - .Process(read.Process()) - .Done(); + if (typesCtx.UseBlocks) { + wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos()) + .Input<TKqpBlockReadOlapTableRanges>() + .Table(read.Table()) + .Ranges(read.Ranges()) + .Columns(read.Columns()) + .Settings(read.Settings()) + .ExplainPrompt(read.ExplainPrompt()) + .Process(read.Process()) + .Build() + .Done(); + } else { + wideRead = Build<TKqpWideReadOlapTableRanges>(ctx, node.Pos()) + .Table(read.Table()) + .Ranges(read.Ranges()) + .Columns(read.Columns()) + .Settings(read.Settings()) + .ExplainPrompt(read.ExplainPrompt()) + .Process(read.Process()) + .Done(); + } } else { YQL_ENSURE(false, "Unknown read table operation: " << node.Ptr()->Content()); } diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp index 3aeb51828c..662c2c0baa 100644 --- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp @@ -253,6 +253,7 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext& ); }); + // TODO: Rewrite to DqSource https://st.yandex-team.ru/KIKIMR-17161 compiler->AddCallable(TKqpWideReadOlapTableRanges::CallableName(), [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) { TKqpWideReadOlapTableRanges readTable(&node); @@ -283,6 +284,37 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext& return result; }); + // TODO: Rewrite to DqSource https://st.yandex-team.ru/KIKIMR-17161 + compiler->AddCallable(TKqpBlockReadOlapTableRanges::CallableName(), + [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) { + TKqpBlockReadOlapTableRanges readTable(&node); + + const auto& tableMeta = ctx.GetTableMeta(readTable.Table()); + ValidateRangesType(readTable.Ranges().Ref().GetTypeAnn(), tableMeta); + + TKqpKeyRanges ranges = MakeComputedKeyRanges(readTable, ctx, buildCtx); + + // Return type depends on the process program, so it is built explicitly. + TStringStream errorStream; + auto returnType = NCommon::BuildType(*readTable.Ref().GetTypeAnn(), ctx.PgmBuilder(), errorStream); + YQL_ENSURE(returnType, "Failed to build type: " << errorStream.Str()); + + // Process program for OLAP read is not present in MKQL, it is passed in range description + // in physical plan directly to executer. Read callables in MKQL only used to associate + // input stream of the graph with the external scans, so it doesn't make much sense to pass + // the process program through callable. + // We anyway move to explicit sources as external nodes in KQP program, so all the information + // about read settings will be passed in a side channel, not the program. + auto result = ctx.PgmBuilder().KqpBlockReadTableRanges( + MakeTableId(readTable.Table()), + ranges, + GetKqpColumns(tableMeta, readTable.Columns(), true), + returnType + ); + + return result; + }); + compiler->AddCallable(TKqpLookupTable::CallableName(), [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) { TKqpLookupTable lookupTable(&node); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 026c1edeaa..4a25ca0abe 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -523,7 +523,7 @@ public: [](const TItemExprType* first, const TItemExprType* second) { return first->GetName() < second->GetName(); }); - inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(), + inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(), [](const TItemExprType* first, const TItemExprType* second) { return first->GetName() == second->GetName(); }), @@ -638,6 +638,20 @@ private: auto miniKqlResultType = GetMKqlResultType(readTableRanges.Process().Ref().GetTypeAnn()); FillOlapProgram(readTableRanges.Process(), miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange()); FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange()); + } else if (auto maybeReadBlockTableRanges = node.Maybe<TKqpBlockReadOlapTableRanges>()) { + auto readTableRanges = maybeReadBlockTableRanges.Cast(); + auto tableMeta = TablesData->ExistingTable(Cluster, readTableRanges.Table().Path()).Metadata; + YQL_ENSURE(tableMeta); + + auto& tableOp = *stageProto.AddTableOps(); + FillTablesMap(readTableRanges.Table(), readTableRanges.Columns(), tablesMap); + FillTableId(readTableRanges.Table(), *tableOp.MutableTable()); + FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true); + FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadOlapRange()); + auto miniKqlResultType = GetMKqlResultType(readTableRanges.Process().Ref().GetTypeAnn()); + FillOlapProgram(readTableRanges.Process(), miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange()); + FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange()); + tableOp.MutableReadOlapRange()->SetReadType(NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS); } else if (node.Maybe<TCoSort>()) { hasSort = true; } else if (node.Maybe<TCoFilterBase>()) { diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index 758b0efb69..78a230e7d3 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -134,6 +134,16 @@ TType* MakeWideFlowType(TProgramBuilder& builder, TStructType* rowType) { return builder.NewFlowType(builder.NewTupleType(tupleItems)); } +TType* MakeBlockType(TProgramBuilder& builder, TStructType* rowType) { + std::vector<TType*> tupleItems; + tupleItems.reserve(rowType->GetMembersCount()); + for (ui32 i = 0; i < rowType->GetMembersCount(); ++i) { + tupleItems.push_back(rowType->GetMemberType(i)); + } + + return builder.NewBlockType(builder.NewTupleType(tupleItems), TBlockType::EShape::Many); +} + } // namespace TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry) @@ -203,6 +213,30 @@ TRuntimeNode TKqpProgramBuilder::KqpWideReadTableRanges(const TTableId& tableId, return TRuntimeNode(builder.Build(), false); } +TRuntimeNode TKqpProgramBuilder::KqpBlockReadTableRanges(const TTableId& tableId, const TKqpKeyRanges& ranges, + const TArrayRef<TKqpTableColumn>& columns, TType* returnType) +{ + if (returnType == nullptr) { + auto rowType = GetRowType(*this, columns); + auto structType = AS_TYPE(TStructType, rowType); + returnType = MakeBlockType(*this, structType); + } else { + MKQL_ENSURE_S(returnType); + MKQL_ENSURE_S(returnType->IsFlow()); + const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType(); + MKQL_ENSURE_S(itemType->IsTuple()); + } + + TCallableBuilder builder(Env, __func__, returnType); + builder.Add(BuildTableIdLiteral(tableId, *this)); + builder.Add(BuildKeyRangesNode(*this, ranges)); + builder.Add(BuildColumnTags(*this, columns)); + builder.Add(ranges.ItemsLimit); + builder.Add(NewDataLiteral(ranges.Reverse)); + + return TRuntimeNode(builder.Build(), false); +} + TRuntimeNode TKqpProgramBuilder::KqpLookupTable(const TTableId& tableId, const TRuntimeNode& lookupKeys, const TArrayRef<TKqpTableColumn>& keyColumns, const TArrayRef<TKqpTableColumn>& columns) { diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h index 997ce00ac3..a26938b8db 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.h +++ b/ydb/core/kqp/runtime/kqp_program_builder.h @@ -55,6 +55,9 @@ public: TRuntimeNode KqpWideReadTableRanges(const TTableId& tableId, const TKqpKeyRanges& range, const TArrayRef<TKqpTableColumn>& columns, TType* returnType); + TRuntimeNode KqpBlockReadTableRanges(const TTableId& tableId, const TKqpKeyRanges& range, + const TArrayRef<TKqpTableColumn>& columns, TType* returnType); + TRuntimeNode KqpLookupTable(const TTableId& tableId, const TRuntimeNode& lookupKeys, const TArrayRef<TKqpTableColumn>& keyColumns, const TArrayRef<TKqpTableColumn>& columns); diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp index 4d10e53665..d86b28999a 100644 --- a/ydb/core/kqp/runtime/kqp_read_table.cpp +++ b/ydb/core/kqp/runtime/kqp_read_table.cpp @@ -364,6 +364,115 @@ private: TParseReadTableRangesResult ParseResult; }; +class TKqpScanBlockReadTableWrapperBase : public TStatelessWideFlowCodegeneratorNode<TKqpScanBlockReadTableWrapperBase> { + using TBase = TStatelessWideFlowCodegeneratorNode<TKqpScanBlockReadTableWrapperBase>; +public: + TKqpScanBlockReadTableWrapperBase(TKqpScanComputeContext& computeCtx, std::vector<EValueRepresentation>&& representations) + : TBase(this) + , ComputeCtx(computeCtx) + , Representations(std::move(representations)) {} + + EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const { + Y_UNUSED(ctx); + + if (!TableReader) { + TableReader = ComputeCtx.ReadTable(GetCallableId()); + } + + return TableReader->Next(output); + } + +#ifndef MKQL_DISABLE_CODEGEN + ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*& block) const { + auto& context = ctx.Codegen->GetContext(); + const auto size = Representations.size(); + + Block.resize(size); + + const auto valueType = Type::getInt128Ty(context); + const auto valuePtrType = PointerType::getUnqual(valueType); + const auto valuesPtr = CastInst::Create(Instruction::IntToPtr, + ConstantInt::get(Type::getInt64Ty(context), uintptr_t(Block.data())), + valuePtrType, "values", &ctx.Func->getEntryBlock().back()); + + ICodegeneratorInlineWideNode::TGettersList getters(size); + const auto indexType = Type::getInt32Ty(context); + for (auto i = 0U; i < size; ++i) { + getters[i] = [i, valueType, valuesPtr, indexType] (const TCodegenContext&, BasicBlock*& block) { + const auto loadPtr = GetElementPtrInst::Create(valueType, valuesPtr, + {ConstantInt::get(indexType, i)}, + (TString("loadPtr_") += ToString(i)).c_str(), + block); + return new LoadInst(loadPtr, "load", block); + }; + } + + const auto fieldsType = ArrayType::get(valuePtrType, size); + const auto fields = new AllocaInst(fieldsType, 0U, "fields", &ctx.Func->getEntryBlock().back()); + + Value* init = UndefValue::get(fieldsType); + for (auto i = 0U; i < size; ++i) { + const auto pointer = GetElementPtrInst::Create(valueType, valuesPtr, + {ConstantInt::get(indexType, i)}, + (TString("ptr_") += ToString(i)).c_str(), + &ctx.Func->getEntryBlock().back()); + + init = InsertValueInst::Create(init, pointer, {i}, (TString("insert_") += ToString(i)).c_str(), + &ctx.Func->getEntryBlock().back()); + } + + new StoreInst(init, fields, &ctx.Func->getEntryBlock().back()); + + const auto ptrType = PointerType::getUnqual(StructType::get(context)); + const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TKqpScanBlockReadTableWrapperBase::DoCalculate)); + const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block); + const auto funcType = FunctionType::get(Type::getInt32Ty(context), { self->getType(), ctx.Ctx->getType(), fields->getType() }, false); + const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block); + const auto result = CallInst::Create(funcPtr, { self, ctx.Ctx, fields }, "fetch", block); + + return {result, std::move(getters)}; + } +#endif + + virtual ui32 GetCallableId() const = 0; + virtual ui32 GetAllColumnsSize() const = 0; + +private: + TKqpScanComputeContext& ComputeCtx; + // Mutable is bad for computation pattern cache. + // Probably this hack is necessary for LLVM. Need to review + mutable TIntrusivePtr<IKqpTableReader> TableReader; + const std::vector<EValueRepresentation> Representations; + mutable std::vector<NUdf::TUnboxedValue> Block; +}; + +class TKqpScanBlockReadTableRangesWrapper : public TKqpScanBlockReadTableWrapperBase { +public: + TKqpScanBlockReadTableRangesWrapper(TKqpScanComputeContext& computeCtx, const TParseReadTableRangesResult& parseResult, + IComputationNode* rangesNode, std::vector<EValueRepresentation>&& representations) + : TKqpScanBlockReadTableWrapperBase(computeCtx, std::move(representations)) + , RangesNode(rangesNode) + , ParseResult(parseResult) + {} + +private: + ui32 GetCallableId() const { + return ParseResult.CallableId; + } + + ui32 GetAllColumnsSize() const { + return ParseResult.Columns.size() + ParseResult.SystemColumns.size(); + } + + void RegisterDependencies() const { + this->FlowDependsOn(RangesNode); + } + +private: + IComputationNode* RangesNode; + TParseReadTableRangesResult ParseResult; +}; + } // namespace IComputationNode* WrapKqpScanWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, @@ -411,5 +520,27 @@ IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputati return new TKqpScanWideReadTableWrapper(computeCtx, parseResult, fromNode, toNode, std::move(representations)); } +IComputationNode* WrapKqpScanBlockReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, + TKqpScanComputeContext& computeCtx) +{ + std::vector<EValueRepresentation> representations; + + auto parseResult = ParseWideReadTableRanges(callable); + auto rangesNode = LocateNode(ctx.NodeLocator, *parseResult.Ranges); + + const auto type = callable.GetType()->GetReturnType(); + const auto returnItemType = type->IsFlow() ? + AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType(): + AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType(); + + const auto tupleType = AS_TYPE(TTupleType, returnItemType); + + representations.reserve(tupleType->GetElementsCount()); + for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) + representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); + + return new TKqpScanBlockReadTableRangesWrapper(computeCtx, parseResult, rangesNode, std::move(representations)); +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_read_table.h b/ydb/core/kqp/runtime/kqp_read_table.h index 010751df3d..1701e61079 100644 --- a/ydb/core/kqp/runtime/kqp_read_table.h +++ b/ydb/core/kqp/runtime/kqp_read_table.h @@ -46,6 +46,9 @@ IComputationNode* WrapKqpScanWideReadTableRanges(TCallable& callable, const TCom IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputationNodeFactoryContext& ctx, TKqpScanComputeContext& computeCtx); +IComputationNode* WrapKqpScanBlockReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, + TKqpScanComputeContext& computeCtx); + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index 8205487ad4..2eb9913030 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/parser/pg_wrapper/interface/pack.h> +#include <ydb/library/yql/public/udf/arrow/util.h> #include <ydb/library/yql/utils/yql_panic.h> namespace NKikimr { @@ -118,9 +119,20 @@ NUdf::TUnboxedValue MakeUnboxedValue(arrow::Array* column, ui32 row) { auto array = reinterpret_cast<TArrayType*>(column); return NUdf::TUnboxedValuePod(static_cast<TValueType>(array->Value(row))); } + +TKqpScanComputeContext::TScanData::EReadType ReadTypeFromProto(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType& type) { + switch (type) { + case NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS: + return TKqpScanComputeContext::TScanData::EReadType::Rows; + case NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS: + return TKqpScanComputeContext::TScanData::EReadType::Blocks; + default: + YQL_ENSURE(false, "Invalid read type from TScanTaskMeta protobuf."); + } +} } // namespace -ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) { +ui32 TKqpScanComputeContext::TScanData::TRowBatchReader::TRowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) { ui32 resultColumnsCount = 0; if (ColumnsCount) { auto* data = &Cells[CurrentRow * CellsCountForRow]; @@ -135,6 +147,13 @@ ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxe return resultColumnsCount; } +ui32 TKqpScanComputeContext::TScanData::TBlockBatchReader::TBlockBatch::FillBlockValues(NUdf::TUnboxedValue* const* result) { + for (ui32 i = 0; i < ColumnsCount; ++i) { + *result[i] = std::move(BatchValues[i]); + } + return ColumnsCount; +} + namespace { class TDefaultStatAccumulator { @@ -409,7 +428,11 @@ std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& val return {sizes.AllocatedBytes, sizes.DataBytes}; } -ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* const* result) { +ui32 TKqpScanComputeContext::TScanData::FillDataValues(NUdf::TUnboxedValue* const* result) { + return BatchReader->FillDataValues(result); +} + +ui32 TKqpScanComputeContext::TScanData::TRowBatchReader::FillDataValues(NUdf::TUnboxedValue* const* result) { YQL_ENSURE(!RowBatches.empty()); auto& batch = RowBatches.front(); const ui32 resultColumnsCount = batch.FillUnboxedCells(result); @@ -422,15 +445,22 @@ ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* co return resultColumnsCount; } +ui32 TKqpScanComputeContext::TScanData::TBlockBatchReader::FillDataValues(NUdf::TUnboxedValue* const* result) { + YQL_ENSURE(!BlockBatches.empty()); + auto& batch = BlockBatches.front(); + const ui32 resultColumnsCount = batch.FillBlockValues(result); + BlockBatches.pop(); + + StoredBytes -= batch.BytesForRecordEstimation(); + YQL_ENSURE(BlockBatches.empty() == (StoredBytes < 1), "StoredBytes miscalculated!"); + return resultColumnsCount; +} + TKqpScanComputeContext::TScanData::TScanData(const TTableId& tableId, const TTableRange& range, - const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys, - const TSmallVec<TColumn>& resultColumns) + const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<TColumn>& resultColumns) : TableId(tableId) , Range(range) - , SkipNullKeys(skipNullKeys) - , Columns(columns) - , SystemColumns(systemColumns) - , ResultColumns(resultColumns) + , BatchReader(new TRowBatchReader(columns, systemColumns, resultColumns)) {} TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, @@ -441,75 +471,39 @@ TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTrans tableMeta.GetSysViewInfo(), tableMeta.GetSchemaVersion()); TablePath = meta.GetTable().GetTablePath(); - std::copy(meta.GetSkipNullKeys().begin(), meta.GetSkipNullKeys().end(), std::back_inserter(SkipNullKeys)); - - Columns.reserve(meta.GetColumns().size()); - for (const auto& column : meta.GetColumns()) { - NMiniKQL::TKqpScanComputeContext::TColumn c; - c.Tag = column.GetId(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(column.GetType(), - column.HasTypeInfo() ? &column.GetTypeInfo() : nullptr); - c.Type = typeInfoMod.TypeInfo; - c.TypeMod = typeInfoMod.TypeMod; - - if (!IsSystemColumn(c.Tag)) { - Columns.emplace_back(std::move(c)); - } else { - SystemColumns.emplace_back(std::move(c)); - } - } - - if (meta.GetResultColumns().empty() && !meta.HasOlapProgram()) { - // Currently we define ResultColumns just for Olap tables in TKqpQueryCompiler - ResultColumns = Columns; - } else { - ResultColumns.reserve(meta.GetResultColumns().size()); - for (const auto& resColumn : meta.GetResultColumns()) { - NMiniKQL::TKqpScanComputeContext::TColumn c; - c.Tag = resColumn.GetId(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(resColumn.GetType(), - resColumn.HasTypeInfo() ? &resColumn.GetTypeInfo() : nullptr); - c.Type = typeInfoMod.TypeInfo; - c.TypeMod = typeInfoMod.TypeMod; - - if (!IsSystemColumn(c.Tag)) { - ResultColumns.emplace_back(std::move(c)); - } else { - SystemColumns.emplace_back(std::move(c)); - } - } + switch(ReadTypeFromProto(meta.GetReadType())) { + case TKqpScanComputeContext::TScanData::EReadType::Rows: + BatchReader.reset(new TRowBatchReader(meta)); + break; + case TKqpScanComputeContext::TScanData::EReadType::Blocks: + BatchReader.reset(new TBlockBatchReader(meta)); + break; } if (statsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); } + if (Y_UNLIKELY(statsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE)) { ProfileStats = std::make_unique<TProfileStats>(); } } - -ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) { - if (Finished || batch.empty()) { - return 0; - } - +TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const TVector<TOwnedCellVec>& batch, + TMaybe<ui64> shardId, const THolderFactory& holderFactory) +{ TBytesStatistics stats; - - TVector<ui64> bytesList; - bytesList.reserve(batch.size()); - TUnboxedValueVector cells; - if (!ColumnsCount()) { + if (TotalColumnsCount == 0u) { cells.resize(batch.size(), holderFactory.GetEmptyContainer()); stats.AddStatistics({ sizeof(ui64) * batch.size(), sizeof(ui64) * batch.size() }); } else { - cells.resize(batch.size() * ColumnsCount()); + cells.resize(batch.size() * TotalColumnsCount); for (size_t rowIndex = 0; rowIndex < batch.size(); ++rowIndex) { auto& row = batch[rowIndex]; - auto* vectorStart = &cells.data()[rowIndex * ColumnsCount()]; + auto* vectorStart = &cells.data()[rowIndex * TotalColumnsCount]; for (ui32 i = 0; i < ResultColumns.size(); ++i) { vectorStart[i] = GetCellValue(row[i], ResultColumns[i].Type); } @@ -519,10 +513,26 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba } } if (cells.size()) { - RowBatches.emplace(RowBatch(ColumnsCount(), batch.size(), std::move(cells), shardId, stats.AllocatedBytes)); + RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.size(), std::move(cells), stats.AllocatedBytes)); } - StoredBytes += stats.AllocatedBytes; + + return stats; +} + +TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const TVector<TOwnedCellVec>& /*batch*/, + TMaybe<ui64> /*shardId*/, const THolderFactory& /*holderFactory*/) +{ + Y_VERIFY(false, "Batch of TOwnedCellVec should never be called for BlockBatchReader!"); + return TBytesStatistics(); +} + +ui64 TKqpScanComputeContext::TScanData::AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) { + if (Finished || batch.empty()) { + return 0; + } + + TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory); if (BasicStats) { BasicStats->Rows += batch.size(); BasicStats->Bytes += stats.DataBytes; @@ -531,32 +541,27 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba return stats.AllocatedBytes; } -ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, +TBytesStatistics TKqpScanComputeContext::TScanData::TRowBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) { - // RecordBatch hasn't empty method so check the number of rows - if (Finished || batch.num_rows() == 0) { - return 0; - } - TBytesStatistics stats; TUnboxedValueVector cells; - if (!ColumnsCount()) { + if (TotalColumnsCount == 0u) { cells.resize(batch.num_rows(), holderFactory.GetEmptyContainer()); stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() }); } else { - cells.resize(batch.num_rows() * ColumnsCount()); + cells.resize(batch.num_rows() * TotalColumnsCount); for (size_t columnIndex = 0; columnIndex < ResultColumns.size(); ++columnIndex) { stats.AddStatistics( - WriteColumnValuesFromArrow(cells.data(), batch, columnIndex, ColumnsCount(), ResultColumns[columnIndex].Type) + WriteColumnValuesFromArrow(cells.data(), batch, columnIndex, TotalColumnsCount, ResultColumns[columnIndex].Type) ); } if (!SystemColumns.empty()) { for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { - FillSystemColumns(&cells[rowIndex * ColumnsCount() + ResultColumns.size()], shardId, SystemColumns); + FillSystemColumns(&cells[rowIndex * TotalColumnsCount + ResultColumns.size()], shardId, SystemColumns); } stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue); @@ -564,10 +569,55 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch, } if (cells.size()) { - RowBatches.emplace(RowBatch(ColumnsCount(), batch.num_rows(), std::move(cells), shardId, stats.AllocatedBytes)); + RowBatches.emplace(TRowBatch(TotalColumnsCount, batch.num_rows(), std::move(cells), stats.AllocatedBytes)); + } + + StoredBytes += stats.AllocatedBytes; + + return stats; +} + +TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> /*shardId*/, + const THolderFactory& holderFactory) +{ + TBytesStatistics stats; + auto totalColsCount = TotalColumnsCount + 1; + TUnboxedValueVector batchValues; + batchValues.resize(totalColsCount); + + for (int i = 0; i < batch.num_columns(); ++i) { + batchValues[i] = holderFactory.CreateArrowBlock(arrow::Datum(batch.column_data(i))); } + ui64 batchByteSize = NYql::NUdf::GetSizeOfArrowBatchInBytes(batch); + stats.AddStatistics({batchByteSize, batchByteSize}); + + // !!! TODO !!! + // if (!SystemColumns.empty()) { + // for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { + // FillSystemColumns(&cells[rowIndex * ColumnsCount() + ResultColumns.size()], shardId, SystemColumns); + // } + + // stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue); + // } + batchValues[totalColsCount - 1] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.num_rows()))); + stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() }); + + BlockBatches.emplace(TBlockBatch(totalColsCount, batch.num_rows(), std::move(batchValues), stats.AllocatedBytes)); StoredBytes += stats.AllocatedBytes; + + return stats; +} + +ui64 TKqpScanComputeContext::TScanData::AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, + const THolderFactory& holderFactory) +{ + // RecordBatch hasn't empty method so check the number of rows + if (Finished || batch.num_rows() == 0) { + return 0; + } + + TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory); if (BasicStats) { BasicStats->Rows += batch.num_rows(); BasicStats->Bytes += stats.DataBytes; @@ -576,6 +626,53 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch, return stats.AllocatedBytes; } +TKqpScanComputeContext::TScanData::IDataBatchReader::IDataBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, + const TSmallVec<TColumn>& resultColumns) + : Columns(columns) + , SystemColumns(systemColumns) + , ResultColumns(resultColumns) + , TotalColumnsCount(resultColumns.size() + systemColumns.size()) +{} + +TKqpScanComputeContext::TScanData::IDataBatchReader::IDataBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta) { + Columns.reserve(meta.GetColumns().size()); + for (const auto& column : meta.GetColumns()) { + NMiniKQL::TKqpScanComputeContext::TColumn c; + c.Tag = column.GetId(); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(column.GetType(), + column.HasTypeInfo() ? &column.GetTypeInfo() : nullptr); + c.Type = typeInfoMod.TypeInfo; + c.TypeMod = typeInfoMod.TypeMod; + + if (!IsSystemColumn(c.Tag)) { + Columns.emplace_back(std::move(c)); + } else { + SystemColumns.emplace_back(std::move(c)); + } + } + + if (meta.GetResultColumns().empty() && !meta.HasOlapProgram()) { + // Currently we define ResultColumns just for Olap tables in TKqpQueryCompiler + ResultColumns = Columns; + } else { + ResultColumns.reserve(meta.GetResultColumns().size()); + for (const auto& resColumn : meta.GetResultColumns()) { + NMiniKQL::TKqpScanComputeContext::TColumn c; + c.Tag = resColumn.GetId(); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(resColumn.GetType(), + resColumn.HasTypeInfo() ? &resColumn.GetTypeInfo() : nullptr); + c.Type = typeInfoMod.TypeInfo; + c.TypeMod = typeInfoMod.TypeMod; + + if (!IsSystemColumn(c.Tag)) { + ResultColumns.emplace_back(std::move(c)); + } + } + } + + TotalColumnsCount = ResultColumns.size() + SystemColumns.size(); +} + void TKqpScanComputeContext::AddTableScan(ui32, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, NYql::NDqProto::EDqStatsMode statsMode) { @@ -635,7 +732,7 @@ public: return EFetchResult::Yield; } - ScanData.FillUnboxedCells(result); + ScanData.FillDataValues(result); return EFetchResult::One; } diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h index 34fbcdcd61..12fe203051 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.h +++ b/ydb/core/kqp/runtime/kqp_scan_data.h @@ -31,9 +31,7 @@ struct TBytesStatistics { TBytesStatistics(const ui64 allocated, const ui64 data) : AllocatedBytes(allocated) , DataBytes(data) - { - - } + {} TBytesStatistics operator+(const TBytesStatistics& item) const { return TBytesStatistics(AllocatedBytes + item.AllocatedBytes, DataBytes + item.DataBytes); @@ -83,45 +81,27 @@ public: public: TScanData(TScanData&&) = default; // needed to create TMap<ui32, TScanData> Scans TScanData(const TTableId& tableId, const TTableRange& range, const TSmallVec<TColumn>& columns, - const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys, - const TSmallVec<TColumn>& resultColumns); - - ui32 ColumnsCount() const { - return ResultColumns.size() + SystemColumns.size(); - } + const TSmallVec<TColumn>& systemColumns, const TSmallVec<TColumn>& resultColumns); - ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result); + ui32 FillDataValues(NUdf::TUnboxedValue* const* result); TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, NYql::NDqProto::EDqStatsMode statsMode); - ~TScanData() { - Y_VERIFY_DEBUG_S(RowBatches.empty(), "Buffer in TScanData was not cleared, data is leaking. " - << "Queue of UnboxedValues must be emptied under allocator using Clear() method, but has " - << RowBatches.size() << " elements!"); - } + ~TScanData() = default; const TSmallVec<TColumn>& GetColumns() const { - return Columns; - } - - const TSmallVec<TColumn>& GetSystemColumns() const { - return SystemColumns; - } - - const TSmallVec<TColumn>& GetResultColumns() const { - return ResultColumns; + return BatchReader->GetColumns(); } - ui64 AddRows(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); - - ui64 AddRows(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); + ui64 AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); + ui64 AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); bool IsEmpty() const { - return RowBatches.empty(); + return BatchReader->IsEmpty(); } ui64 GetStoredBytes() const { - return StoredBytes; + return BatchReader->GetStoredBytes(); } void Finish() { @@ -133,8 +113,7 @@ public: } void Clear() { - TQueue<RowBatch> newQueue; - std::swap(newQueue, RowBatches); + BatchReader->Clear(); } public: @@ -142,7 +121,6 @@ public: TTableId TableId; TString TablePath; TSerializedTableRange Range; - TSmallVec<bool> SkipNullKeys; // shared with actor via TableReader TIntrusivePtr<IKqpTableReader> TableReader; @@ -163,53 +141,168 @@ public: TDuration ScanWaitTime; // IScan waiting data time }; + enum class EReadType { + Rows, + Blocks + }; + std::unique_ptr<TBasicStats> BasicStats; std::unique_ptr<TProfileStats> ProfileStats; private: - class RowBatch { - private: - const ui32 CellsCountForRow; - const ui32 ColumnsCount; - const ui32 RowsCount; - TUnboxedValueVector Cells; - ui64 CurrentRow = 0; - const ui64 AllocatedBytes; + class IDataBatchReader { + public: + IDataBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, + const TSmallVec<TColumn>& resultColumns); + + IDataBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta); + + virtual ~IDataBatchReader() = default; + + const TSmallVec<TColumn>& GetColumns() const { + return Columns; + } + + ui64 GetStoredBytes() const { + return StoredBytes; + } + + virtual TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0; + virtual TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) = 0; + virtual ui32 FillDataValues(NUdf::TUnboxedValue* const* result) = 0; + virtual void Clear() = 0; + virtual bool IsEmpty() const = 0; + protected: + TSmallVec<TColumn> Columns; + TSmallVec<TColumn> SystemColumns; + TSmallVec<TColumn> ResultColumns; + ui32 TotalColumnsCount; + double StoredBytes = 0; + }; + + class TRowBatchReader : public IDataBatchReader { public: - TMaybe<ui64> ShardId; - - explicit RowBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId, const ui64 allocatedBytes) - : CellsCountForRow(columnsCount ? columnsCount : 1) - , ColumnsCount(columnsCount) - , RowsCount(rowsCount) - , Cells(std::move(cells)) - , AllocatedBytes(allocatedBytes) - , ShardId(shardId) - { - Y_VERIFY(AllocatedBytes); - Y_VERIFY(RowsCount); + TRowBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, + const TSmallVec<TColumn>& resultColumns) + : IDataBatchReader(columns, systemColumns, resultColumns) + {} + + TRowBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta) + : IDataBatchReader(meta) + {} + + ~TRowBatchReader() { + Y_VERIFY_DEBUG_S(RowBatches.empty(), "Buffer in TRowBatchReader was not cleared, data is leaking. " + << "Queue of UnboxedValues must be emptied under allocator using Clear() method, but has " + << RowBatches.size() << " elements!"); + } + + TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; + TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; + ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override; + + void Clear() override { + TQueue<TRowBatch> newQueue; + std::swap(newQueue, RowBatches); } - double BytesForRecordEstimation() { - return 1.0 * AllocatedBytes / RowsCount; + bool IsEmpty() const override { + return RowBatches.empty(); } + private: + class TRowBatch { + private: + const ui32 CellsCountForRow; + const ui32 ColumnsCount; + const ui32 RowsCount; + TUnboxedValueVector Cells; + ui64 CurrentRow = 0; + const ui64 AllocatedBytes; + public: + + explicit TRowBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& cells, const ui64 allocatedBytes) + : CellsCountForRow(columnsCount ? columnsCount : 1) + , ColumnsCount(columnsCount) + , RowsCount(rowsCount) + , Cells(std::move(cells)) + , AllocatedBytes(allocatedBytes) + { + Y_VERIFY(AllocatedBytes); + Y_VERIFY(RowsCount); + } + + double BytesForRecordEstimation() { + return 1.0 * AllocatedBytes / RowsCount; + } + + bool IsFinished() { + return CurrentRow * CellsCountForRow == Cells.size(); + } + + ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result); + }; + + TQueue<TRowBatch> RowBatches; + }; - const NUdf::TUnboxedValue* GetCurrentData() const { - return Cells.data() + CurrentRow * CellsCountForRow; + class TBlockBatchReader : public IDataBatchReader { + public: + TBlockBatchReader(const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, + const TSmallVec<TColumn>& resultColumns) + : IDataBatchReader(columns, systemColumns, resultColumns) + {} + + TBlockBatchReader(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta) + : IDataBatchReader(meta) + {} + + ~TBlockBatchReader() { + Y_VERIFY_DEBUG_S(BlockBatches.empty(), "Buffer in TBlockBatchReader was not cleared, data is leaking. " + << "Queue of UnboxedValues must be emptied under allocator using Clear() method, but has " + << BlockBatches.size() << " elements!"); } - bool IsFinished() { - return CurrentRow * CellsCountForRow == Cells.size(); + TBytesStatistics AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; + TBytesStatistics AddData(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) override; + ui32 FillDataValues(NUdf::TUnboxedValue* const* result) override; + + void Clear() override { + TQueue<TBlockBatch> newQueue; + std::swap(newQueue, BlockBatches); } - ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result); + bool IsEmpty() const override { + return BlockBatches.empty(); + } + private: + class TBlockBatch { + private: + const ui32 ColumnsCount; + const ui32 RowsCount; + TUnboxedValueVector BatchValues; + const ui64 AllocatedBytes; + public: + explicit TBlockBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& value, const ui64 allocatedBytes) + : ColumnsCount(columnsCount) + , RowsCount(rowsCount) + , BatchValues(std::move(value)) + , AllocatedBytes(allocatedBytes) + { + Y_VERIFY(AllocatedBytes); + Y_VERIFY(RowsCount); + } + + double BytesForRecordEstimation() { + return 1.0 * AllocatedBytes; + } + + ui32 FillBlockValues(NUdf::TUnboxedValue* const* result); + }; + + TQueue<TBlockBatch> BlockBatches; }; - TSmallVec<TColumn> Columns; - TSmallVec<TColumn> SystemColumns; - TSmallVec<TColumn> ResultColumns; - TQueue<RowBatch> RowBatches; - double StoredBytes = 0; + std::unique_ptr<IDataBatchReader> BatchReader; bool Finished = false; }; diff --git a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp index 90eab85f74..451e9160ba 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp @@ -247,9 +247,9 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { TMemoryUsageInfo memInfo(""); THolderFactory factory(alloc.Ref(), memInfo); - TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, {}, rows.front().Columns()); + TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, rows.front().Columns()); - scanData.AddRows(*batch, {}, factory); + scanData.AddData(*batch, {}, factory); std::vector<NUdf::TUnboxedValue> container; container.resize(20); @@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { containerPtr.emplace_back(&i); } for (auto& row: rows) { - scanData.FillUnboxedCells(containerPtr.data()); + scanData.FillDataValues(containerPtr.data()); UNIT_ASSERT_EQUAL(container[0 ].Get<bool >(), row.Bool ); UNIT_ASSERT_EQUAL(container[1 ].Get<i8 >(), row.Int8 ); UNIT_ASSERT_EQUAL(container[2 ].Get<i16 >(), row.Int16 ); @@ -303,9 +303,9 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { .Type = TTypeInfo(NTypeIds::Int8) }; resultCols.push_back(resCol); - TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, {}, resultCols); + TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, resultCols); - scanData.AddRows(*batch, {}, factory); + scanData.AddData(*batch, {}, factory); std::vector<NUdf::TUnboxedValue> container; container.resize(1); @@ -315,7 +315,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { } for (auto& row: rows) { - scanData.FillUnboxedCells(containerPtr.data()); + scanData.FillDataValues(containerPtr.data()); UNIT_ASSERT_EQUAL(container[0].Get<i8>(), row.Int8); } @@ -329,9 +329,9 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { TMemoryUsageInfo memInfo(""); THolderFactory factory(alloc.Ref(), memInfo); - TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}, {}); + TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}); TVector<TOwnedCellVec> emptyBatch(1000); - auto bytes = scanData.AddRows(emptyBatch, {}, factory); + auto bytes = scanData.AddData(emptyBatch, {}, factory); UNIT_ASSERT(bytes > 0); std::vector<NUdf::TUnboxedValue*> containerPtr; @@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { for (const auto& row: emptyBatch) { Y_UNUSED(row); UNIT_ASSERT(!scanData.IsEmpty()); - UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0); + UNIT_ASSERT(scanData.FillDataValues(containerPtr.data()) == 0); } UNIT_ASSERT(scanData.IsEmpty()); } @@ -348,18 +348,18 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__); TMemoryUsageInfo memInfo(""); THolderFactory factory(alloc.Ref(), memInfo); - TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}, {}); + TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}); TVector<TDataRow> rows = TestRows(); std::shared_ptr<arrow::RecordBatch> anotherEmptyBatch = VectorToBatch(rows, rows.front().MakeArrowSchema()); - auto bytes = scanData.AddRows(*anotherEmptyBatch, {}, factory); + auto bytes = scanData.AddData(*anotherEmptyBatch, {}, factory); UNIT_ASSERT(bytes > 0); std::vector<NUdf::TUnboxedValue*> containerPtr; for (const auto& row: rows) { Y_UNUSED(row); UNIT_ASSERT(!scanData.IsEmpty()); - UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0); + UNIT_ASSERT(scanData.FillDataValues(containerPtr.data()) == 0); } UNIT_ASSERT(scanData.IsEmpty()); } diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index a31e49c817..48cba779ba 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -148,6 +148,10 @@ message TKqpPhyOpLookup { } message TKqpPhyOpReadOlapRanges { + enum EReadType { + ROWS = 0; + BLOCKS = 1; + } // Parameter come here from computation stage. It has type Tuple(List(Tuple(RangeBegin, RangeEnd)))) // Where RangeBegin and RangeEnd are Tuple(KeyColumns, Inclusive) // Where KeyColumns is values of start/end of range for corresponding key column in table @@ -170,6 +174,9 @@ message TKqpPhyOpReadOlapRanges { // Stores type of read result from Column Shard Ydb.Type ResultType = 7; + + // Type of read result: unboxed values or Arrow blocks of data + EReadType ReadType = 13; } message TKqpPhyOpReadRanges { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 5d886a2b1a..7b5a154b52 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -212,6 +212,11 @@ message TKqpTransaction { repeated NKikimrTx.TKeyRange KeyRanges = 2; } + enum EReadType { + ROWS = 0; + BLOCKS = 1; + } + optional TTableMeta Table = 1; repeated TColumnMeta Columns = 2; repeated uint32 KeyColumnTypes = 3; // for debug logs only @@ -225,6 +230,8 @@ message TKqpTransaction { optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables optional bool Sorted = 11; repeated TColumnMeta ResultColumns = 12; + // Type of read result: unboxed values or Arrow blocks of data + optional EReadType ReadType = 14; } optional EKqpTransactionType Type = 1; diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index 63ab5553e1..b369ac9db3 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -59,6 +59,7 @@ struct TKqpScanComputationMap { TKqpScanComputationMap() { Map["KqpWideReadTable"] = &WrapKqpScanWideReadTable; Map["KqpWideReadTableRanges"] = &WrapKqpScanWideReadTableRanges; + Map["KqpBlockReadTableRanges"] = &WrapKqpScanBlockReadTableRanges; } THashMap<TString, TCallableScanBuilderFunc> Map; diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 44182d0962..caef313ca9 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -2331,6 +2331,11 @@ {"Index": 1, "Name": "KeySelectorLambda", "Type": "TCoLambda"}, {"Index": 2, "Name": "ListHandlerLambda", "Type": "TCoLambda"} ] + }, + { + "Name": "TCoWideFromBlocks", + "Base": "TCoInputBase", + "Match": {"Type": "Callable", "Name": "WideFromBlocks"} } ] } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index 372f9fd934..d0049d8c3b 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -453,6 +453,7 @@ private: if (name == "Switch" || // KIKIMR-16457 name == "KqpWideReadTable" || name == "KqpWideReadTableRanges" || + name == "KqpBlockReadTableRanges" || name == "KqpLookupTable" || name == "KqpReadTable" || name == "RangeMultiply" || diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 6ea4959222..e28cfe15fd 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -47,6 +47,7 @@ #include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> #include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> +#include <ydb/library/yql/public/udf/arrow/util.h> #include <ydb/library/yql/utils/yql_panic.h> #include <ydb/library/yql/providers/s3/common/util.h> @@ -674,35 +675,6 @@ private: std::function<void()> WaitForFutureResolve; }; -ui64 GetSizeOfData(const arrow::ArrayData& data) { - ui64 size = sizeof(data); - size += data.buffers.size() * sizeof(void*); - size += data.child_data.size() * sizeof(void*); - for (const auto& b : data.buffers) { - if (b) { - size += b->size(); - } - } - - for (const auto& c : data.child_data) { - if (c) { - size += GetSizeOfData(*c); - } - } - - return size; -} - -ui64 GetSizeOfBatch(const arrow::RecordBatch& batch) { - ui64 size = sizeof(batch); - size += batch.num_columns() * sizeof(void*); - for (int i = 0; i < batch.num_columns(); ++i) { - size += GetSizeOfData(*batch.column_data(i)); - } - - return size; -} - class TS3ReadCoroImpl : public TActorCoroImpl { friend class TS3StreamReadActor; private: @@ -728,15 +700,15 @@ private: static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; public: - TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, - const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, - const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, + TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, + const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, + const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps) - : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), - TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), + : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), + TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader), DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps) {} @@ -913,7 +885,7 @@ private: } else { buffer = std::make_unique<TReadBufferFromStream>(this); } - + const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); auto& readBuffer = decompress ? *decompress : *buffer; @@ -949,7 +921,7 @@ private: otherEvents.push_back(event->ReleaseBase()); event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); } - + for (auto &e: otherEvents) { Send(SelfActorId, e.Release()); } @@ -987,7 +959,7 @@ private: } fileDesc.Cookie = result.Cookie; - TArrowParquetBatchReader reader(std::move(fileDesc), + TArrowParquetBatchReader reader(std::move(fileDesc), ArrowReader, result.NumRowGroups, std::move(columnIndices), @@ -1002,10 +974,10 @@ private: } else { buffer = std::make_unique<TReadBufferFromStream>(this); } - + const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); - + auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings)); TBlockReader reader(std::move(stream)); ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal); @@ -1082,7 +1054,7 @@ private: Send(SelfActorId, e.Release()); } }; - + for (;;) { T batch; @@ -1348,7 +1320,7 @@ private: } ui64 GetBlockSize(const TReadyBlock& block) const { - return ReadSpec->Arrow ? GetSizeOfBatch(*block.Batch) : block.Block.bytes(); + return ReadSpec->Arrow ? NUdf::GetSizeOfArrowBatchInBytes(*block.Batch) : block.Block.bytes(); } void ReportMemoryUsage() const { @@ -1525,7 +1497,7 @@ private: void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) { YQL_ENSURE(ReadSpec->Arrow); IngressBytes += next->Get()->IngressDelta; - auto size = GetSizeOfBatch(*next->Get()->Batch); + auto size = NUdf::GetSizeOfArrowBatchInBytes(*next->Get()->Batch); QueueTotalDataSize += size; if (Counters) { QueueBlockCount->Inc(); diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp index f87b9cebdb..aff82a8024 100644 --- a/ydb/library/yql/public/udf/arrow/util.cpp +++ b/ydb/library/yql/public/udf/arrow/util.cpp @@ -4,6 +4,7 @@ #include <arrow/array/array_base.h> #include <arrow/chunked_array.h> +#include <arrow/record_batch.h> namespace NYql { namespace NUdf { @@ -67,6 +68,25 @@ private: arrow::MemoryPool* Pool; }; +ui64 GetSizeOfArrayDataInBytes(const arrow::ArrayData& data) { + ui64 size = sizeof(data); + size += data.buffers.size() * sizeof(void*); + size += data.child_data.size() * sizeof(void*); + for (const auto& b : data.buffers) { + if (b) { + size += b->size(); + } + } + + for (const auto& c : data.child_data) { + if (c) { + size += GetSizeOfArrayDataInBytes(*c); + } + } + + return size; +} + } // namespace std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) { @@ -150,5 +170,15 @@ std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arr return result; } +ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch) { + ui64 size = sizeof(batch); + size += batch.num_columns() * sizeof(void*); + for (int i = 0; i < batch.num_columns(); ++i) { + size += GetSizeOfArrayDataInBytes(*batch.column_data(i)); + } + + return size; +} + } } diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h index cf83be5ced..1ed4cf8172 100644 --- a/ydb/library/yql/public/udf/arrow/util.h +++ b/ydb/library/yql/public/udf/arrow/util.h @@ -32,6 +32,8 @@ inline bool IsNull(const arrow::ArrayData& data, size_t index) { /// \brief same as arrow::AllocateResizableBuffer, but allows to control zero padding std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arrow::MemoryPool* pool, bool zeroPad = false); +ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch); + // similar to arrow::TypedBufferBuilder, but: // 1) with UnsafeAdvance() method // 2) shrinkToFit = false |