diff options
| author | Filitov Mikhail <[email protected]> | 2026-06-24 22:17:46 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-06-24 22:17:46 +0200 |
| commit | 9fdba3ab2c78cefdb7a52fe874f16dfdd3bbca91 (patch) | |
| tree | cecaa2ae8db37a601d5183b28133beeec71e6b5b | |
| parent | a5dc8f4abfb19d5ea64d287dfd203bfcc2c95726 (diff) | |
[BHJ] support scalar datums (#44406)
Co-authored-by: Copilot Autofix powered by AI <[email protected]>
5 files changed, 121 insertions, 1 deletions
diff --git a/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp b/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp index 7764c38b780..78c7ea6a99f 100644 --- a/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp +++ b/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp @@ -41,6 +41,8 @@ class TBlockPackedTupleSource : public NNonCopyable::TMoveOnly { const TDqBlockJoinContext* meta, TSides<std::unique_ptr<IBlockLayoutConverter>>& converters, ESide side) : Side_(side) + , Meta_(meta) + , ArrowPool_(&ctx.ArrowMemoryPool) , Stream_(stream.SelectSide(side)) , StreamValues_(Stream_->GetValue(ctx)) , Buff_(ctx.MutableValues.get() + meta->TempStateIndes.SelectSide(side), meta->InputTypes.SelectSide(side).size()) @@ -77,6 +79,7 @@ class TBlockPackedTupleSource : public NNonCopyable::TMoveOnly { } columns = std::move(permuted); } + NormalizeScalarColumns(columns); IBlockLayoutConverter::TPackResult result; ArrowBlockToInternalConverter_->Pack(columns, result); return One{std::move(result)}; @@ -91,8 +94,34 @@ class TBlockPackedTupleSource : public NNonCopyable::TMoveOnly { return arrow; } + void NormalizeScalarColumns(TVector<arrow::Datum>& columns) { + bool hasScalar = false; + for (const auto& column : columns) { + if (column.is_scalar()) { + hasScalar = true; + break; + } + } + if (!hasScalar) { + return; + } + + const ui64 blockLen = GetBlockCount(Buff_[UserDataCols()]); + MKQL_ENSURE(blockLen > 0, "Got a scalar column in a zero-length block"); + + const auto& inputTypes = Meta_->InputTypes.SelectSide(Side_); + for (size_t j = 0; j < columns.size(); ++j) { + if (columns[j].is_scalar()) { + TType* itemType = inputTypes[j]->GetItemType(); + columns[j] = MakeArrayFromScalar(*columns[j].scalar(), blockLen, itemType, *ArrowPool_); + } + } + } + bool Finished_ = false; - [[maybe_unused]]ESide Side_; + ESide Side_; + const TDqBlockJoinContext* Meta_; + arrow::MemoryPool* ArrowPool_; IComputationNode* Stream_; NYql::NUdf::TUnboxedValue StreamValues_; std::span<NYql::NUdf::TUnboxedValue> Buff_; diff --git a/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp b/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp index ca8ef32d24f..f8513e68763 100644 --- a/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp +++ b/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp @@ -655,6 +655,9 @@ class TBlockLayoutConverter : public IBlockLayoutConverter { for (size_t i = 0; i < columns.size(); ++i) { const auto& column = columns[i]; + MKQL_ENSURE(column.is_array(), + Sprintf("Column %zu is not an array " + "(scalar columns must be materialized)", i)); auto data = Extractors_[i]->GetColumnsDataConst(column.array()); columnsData.insert(columnsData.end(), data.begin(), data.end()); diff --git a/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp b/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp index 95109e8395b..be76fe5a029 100644 --- a/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp +++ b/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp @@ -53,6 +53,8 @@ struct TJoinTestData { std::optional<ui64> JoinMemoryConstraint = std::nullopt; int BlockSize = 128; bool SliceBlocks = false; + TVector<int> ScalarizeLeftColumns; + TVector<int> ScalarizeRightColumns; TBlockHashJoinSettings JoinSettings; }; @@ -938,6 +940,31 @@ TJoinTestData OutputBufferBoundedTestData() { return td; } +TJoinTestData ScalarPayloadInnerJoinTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + + TVector<ui64> leftKeys = {1, 2, 3, 4, 5}; + TVector<ui64> leftValues = {100, 100, 100, 100, 100}; + + TVector<ui64> rightKeys = {2, 3, 4, 5, 6}; + TVector<ui64> rightValues = {20, 30, 40, 50, 60}; + + TVector<ui64> expectedKeysLeft = {2, 3, 4, 5}; + TVector<ui64> expectedValuesLeft = {100, 100, 100, 100}; + TVector<ui64> expectedKeysRight = {2, 3, 4, 5}; + TVector<ui64> expectedValuesRight = {20, 30, 40, 50}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + + td.Kind = EJoinKind::Inner; + td.ScalarizeLeftColumns = {1}; + return td; +} + TJoinDescription MakeJoinDescription(TJoinTestData& td) { FilterRenamesForSemiAndOnlyJoins(td); TJoinDescription descr; @@ -953,6 +980,8 @@ TJoinDescription MakeJoinDescription(TJoinTestData& td) { descr.RightSource.ValuesList = td.Right.Value; descr.BlockSize = td.BlockSize; descr.SliceBlocks = td.SliceBlocks; + descr.ScalarizeLeftColumns = td.ScalarizeLeftColumns; + descr.ScalarizeRightColumns = td.ScalarizeRightColumns; return descr; } @@ -1120,6 +1149,10 @@ Y_UNIT_TEST_SUITE(TDqHashJoinBasicTest) { Test(SwappedKeyColumnsLeftSemiTestData(), true); } + Y_UNIT_TEST(TestBlockJoinScalarColumn) { + Test(ScalarPayloadInnerJoinTestData(), true); + } + Y_UNIT_TEST(TestBlockSpilling) { Test(SpillingTestData(), true); } diff --git a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp index e1379faea70..2b40dcc1f9e 100644 --- a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp +++ b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp @@ -9,6 +9,8 @@ #include <arrow/array/util.h> #include <arrow/scalar.h> +#include <algorithm> + namespace NKikimr::NMiniKQL { namespace { @@ -129,6 +131,49 @@ NUdf::TUnboxedValuePod SliceBlockList( return holderFactory.CreateDirectListHolder(std::move(newList)); } +NUdf::TUnboxedValuePod ScalarizeBlockList( + const THolderFactory& holderFactory, + NUdf::TUnboxedValuePod blockList, + size_t width, + const TVector<int>& scalarColumns) +{ + NUdf::TUnboxedValue iterator = blockList.GetListIterator(); + NUdf::TUnboxedValue current; + + TDefaultListRepresentation newList; + + while (iterator.Next(current)) { + auto blockCountUV = current.GetElement(width); + ui64 blockCount = + TArrowBlock::From(blockCountUV).GetDatum() + .scalar_as<arrow::UInt64Scalar>().value; + + NUdf::TUnboxedValue* items = nullptr; + auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items); + + for (size_t i = 0; i < width; i++) { + auto colValue = current.GetElement(i); + const bool scalarize = + std::find(scalarColumns.begin(), scalarColumns.end(), static_cast<int>(i)) != scalarColumns.end(); + + if (scalarize && blockCount > 0) { + const auto& datum = TArrowBlock::From(colValue).GetDatum(); + Y_ABORT_UNLESS(datum.is_array()); + auto arr = arrow::MakeArray(datum.array()); + auto scalar = ARROW_RESULT(arr->GetScalar(0)); + items[i] = holderFactory.CreateArrowBlock(arrow::Datum(std::move(scalar))); + } else { + items[i] = std::move(colValue); + } + } + + items[width] = MakeBlockCount(holderFactory, blockCount); + newList = newList.Append(std::move(tuple)); + } + + return holderFactory.CreateDirectListHolder(std::move(newList)); +} + void SetEntryPointValues(IComputationGraph& g, NYql::NUdf::TUnboxedValue left, NYql::NUdf::TUnboxedValue right) { TComputationContext& ctx = g.GetContext(); g.GetEntryPoint(0, false)->SetValue(ctx, std::move(left)); @@ -228,6 +273,14 @@ THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJ leftBlocks = SliceBlockList(ctx.HolderFactory, leftBlocks, descr.LeftSource.ColumnTypes.size()); rightBlocks = SliceBlockList(ctx.HolderFactory, rightBlocks, descr.RightSource.ColumnTypes.size()); } + if (!descr.ScalarizeLeftColumns.empty()) { + leftBlocks = ScalarizeBlockList(ctx.HolderFactory, leftBlocks, descr.LeftSource.ColumnTypes.size(), + descr.ScalarizeLeftColumns); + } + if (!descr.ScalarizeRightColumns.empty()) { + rightBlocks = ScalarizeBlockList(ctx.HolderFactory, rightBlocks, descr.RightSource.ColumnTypes.size(), + descr.ScalarizeRightColumns); + } SetEntryPointValues(*graph, leftBlocks, rightBlocks); return graph; }; diff --git a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h index d69fef62925..b510893d2dd 100644 --- a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h +++ b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h @@ -20,6 +20,8 @@ struct TJoinDescription { std::optional<TDqUserRenames> CustomRenames; int BlockSize = 128; bool SliceBlocks = false; + TVector<int> ScalarizeLeftColumns; + TVector<int> ScalarizeRightColumns; }; bool IsBlockJoin(ETestedJoinAlgo algo); |
