summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilitov Mikhail <[email protected]>2026-06-24 22:17:46 +0200
committerGitHub <[email protected]>2026-06-24 22:17:46 +0200
commit9fdba3ab2c78cefdb7a52fe874f16dfdd3bbca91 (patch)
treececaa2ae8db37a601d5183b28133beeec71e6b5b
parenta5dc8f4abfb19d5ea64d287dfd203bfcc2c95726 (diff)
[BHJ] support scalar datums (#44406)
Co-authored-by: Copilot Autofix powered by AI <[email protected]>
-rw-r--r--ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp31
-rw-r--r--ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp3
-rw-r--r--ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp33
-rw-r--r--ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp53
-rw-r--r--ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h2
5 files changed, 121 insertions, 1 deletions
diff --git a/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp b/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp
index 7764c38b780..78c7ea6a99f 100644
--- a/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp
+++ b/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp
@@ -41,6 +41,8 @@ class TBlockPackedTupleSource : public NNonCopyable::TMoveOnly {
const TDqBlockJoinContext* meta,
TSides<std::unique_ptr<IBlockLayoutConverter>>& converters, ESide side)
: Side_(side)
+ , Meta_(meta)
+ , ArrowPool_(&ctx.ArrowMemoryPool)
, Stream_(stream.SelectSide(side))
, StreamValues_(Stream_->GetValue(ctx))
, Buff_(ctx.MutableValues.get() + meta->TempStateIndes.SelectSide(side), meta->InputTypes.SelectSide(side).size())
@@ -77,6 +79,7 @@ class TBlockPackedTupleSource : public NNonCopyable::TMoveOnly {
}
columns = std::move(permuted);
}
+ NormalizeScalarColumns(columns);
IBlockLayoutConverter::TPackResult result;
ArrowBlockToInternalConverter_->Pack(columns, result);
return One{std::move(result)};
@@ -91,8 +94,34 @@ class TBlockPackedTupleSource : public NNonCopyable::TMoveOnly {
return arrow;
}
+ void NormalizeScalarColumns(TVector<arrow::Datum>& columns) {
+ bool hasScalar = false;
+ for (const auto& column : columns) {
+ if (column.is_scalar()) {
+ hasScalar = true;
+ break;
+ }
+ }
+ if (!hasScalar) {
+ return;
+ }
+
+ const ui64 blockLen = GetBlockCount(Buff_[UserDataCols()]);
+ MKQL_ENSURE(blockLen > 0, "Got a scalar column in a zero-length block");
+
+ const auto& inputTypes = Meta_->InputTypes.SelectSide(Side_);
+ for (size_t j = 0; j < columns.size(); ++j) {
+ if (columns[j].is_scalar()) {
+ TType* itemType = inputTypes[j]->GetItemType();
+ columns[j] = MakeArrayFromScalar(*columns[j].scalar(), blockLen, itemType, *ArrowPool_);
+ }
+ }
+ }
+
bool Finished_ = false;
- [[maybe_unused]]ESide Side_;
+ ESide Side_;
+ const TDqBlockJoinContext* Meta_;
+ arrow::MemoryPool* ArrowPool_;
IComputationNode* Stream_;
NYql::NUdf::TUnboxedValue StreamValues_;
std::span<NYql::NUdf::TUnboxedValue> Buff_;
diff --git a/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp b/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp
index ca8ef32d24f..f8513e68763 100644
--- a/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp
+++ b/ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.cpp
@@ -655,6 +655,9 @@ class TBlockLayoutConverter : public IBlockLayoutConverter {
for (size_t i = 0; i < columns.size(); ++i) {
const auto& column = columns[i];
+ MKQL_ENSURE(column.is_array(),
+ Sprintf("Column %zu is not an array "
+ "(scalar columns must be materialized)", i));
auto data = Extractors_[i]->GetColumnsDataConst(column.array());
columnsData.insert(columnsData.end(), data.begin(), data.end());
diff --git a/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp b/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp
index 95109e8395b..be76fe5a029 100644
--- a/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp
+++ b/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp
@@ -53,6 +53,8 @@ struct TJoinTestData {
std::optional<ui64> JoinMemoryConstraint = std::nullopt;
int BlockSize = 128;
bool SliceBlocks = false;
+ TVector<int> ScalarizeLeftColumns;
+ TVector<int> ScalarizeRightColumns;
TBlockHashJoinSettings JoinSettings;
};
@@ -938,6 +940,31 @@ TJoinTestData OutputBufferBoundedTestData() {
return td;
}
+TJoinTestData ScalarPayloadInnerJoinTestData() {
+ TJoinTestData td;
+ auto& setup = *td.Setup;
+
+ TVector<ui64> leftKeys = {1, 2, 3, 4, 5};
+ TVector<ui64> leftValues = {100, 100, 100, 100, 100};
+
+ TVector<ui64> rightKeys = {2, 3, 4, 5, 6};
+ TVector<ui64> rightValues = {20, 30, 40, 50, 60};
+
+ TVector<ui64> expectedKeysLeft = {2, 3, 4, 5};
+ TVector<ui64> expectedValuesLeft = {100, 100, 100, 100};
+ TVector<ui64> expectedKeysRight = {2, 3, 4, 5};
+ TVector<ui64> expectedValuesRight = {20, 30, 40, 50};
+
+ td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues);
+ td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues);
+ td.Result =
+ ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight);
+
+ td.Kind = EJoinKind::Inner;
+ td.ScalarizeLeftColumns = {1};
+ return td;
+}
+
TJoinDescription MakeJoinDescription(TJoinTestData& td) {
FilterRenamesForSemiAndOnlyJoins(td);
TJoinDescription descr;
@@ -953,6 +980,8 @@ TJoinDescription MakeJoinDescription(TJoinTestData& td) {
descr.RightSource.ValuesList = td.Right.Value;
descr.BlockSize = td.BlockSize;
descr.SliceBlocks = td.SliceBlocks;
+ descr.ScalarizeLeftColumns = td.ScalarizeLeftColumns;
+ descr.ScalarizeRightColumns = td.ScalarizeRightColumns;
return descr;
}
@@ -1120,6 +1149,10 @@ Y_UNIT_TEST_SUITE(TDqHashJoinBasicTest) {
Test(SwappedKeyColumnsLeftSemiTestData(), true);
}
+ Y_UNIT_TEST(TestBlockJoinScalarColumn) {
+ Test(ScalarPayloadInnerJoinTestData(), true);
+ }
+
Y_UNIT_TEST(TestBlockSpilling) {
Test(SpillingTestData(), true);
}
diff --git a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp
index e1379faea70..2b40dcc1f9e 100644
--- a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp
+++ b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.cpp
@@ -9,6 +9,8 @@
#include <arrow/array/util.h>
#include <arrow/scalar.h>
+#include <algorithm>
+
namespace NKikimr::NMiniKQL {
namespace {
@@ -129,6 +131,49 @@ NUdf::TUnboxedValuePod SliceBlockList(
return holderFactory.CreateDirectListHolder(std::move(newList));
}
+NUdf::TUnboxedValuePod ScalarizeBlockList(
+ const THolderFactory& holderFactory,
+ NUdf::TUnboxedValuePod blockList,
+ size_t width,
+ const TVector<int>& scalarColumns)
+{
+ NUdf::TUnboxedValue iterator = blockList.GetListIterator();
+ NUdf::TUnboxedValue current;
+
+ TDefaultListRepresentation newList;
+
+ while (iterator.Next(current)) {
+ auto blockCountUV = current.GetElement(width);
+ ui64 blockCount =
+ TArrowBlock::From(blockCountUV).GetDatum()
+ .scalar_as<arrow::UInt64Scalar>().value;
+
+ NUdf::TUnboxedValue* items = nullptr;
+ auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items);
+
+ for (size_t i = 0; i < width; i++) {
+ auto colValue = current.GetElement(i);
+ const bool scalarize =
+ std::find(scalarColumns.begin(), scalarColumns.end(), static_cast<int>(i)) != scalarColumns.end();
+
+ if (scalarize && blockCount > 0) {
+ const auto& datum = TArrowBlock::From(colValue).GetDatum();
+ Y_ABORT_UNLESS(datum.is_array());
+ auto arr = arrow::MakeArray(datum.array());
+ auto scalar = ARROW_RESULT(arr->GetScalar(0));
+ items[i] = holderFactory.CreateArrowBlock(arrow::Datum(std::move(scalar)));
+ } else {
+ items[i] = std::move(colValue);
+ }
+ }
+
+ items[width] = MakeBlockCount(holderFactory, blockCount);
+ newList = newList.Append(std::move(tuple));
+ }
+
+ return holderFactory.CreateDirectListHolder(std::move(newList));
+}
+
void SetEntryPointValues(IComputationGraph& g, NYql::NUdf::TUnboxedValue left, NYql::NUdf::TUnboxedValue right) {
TComputationContext& ctx = g.GetContext();
g.GetEntryPoint(0, false)->SetValue(ctx, std::move(left));
@@ -228,6 +273,14 @@ THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJ
leftBlocks = SliceBlockList(ctx.HolderFactory, leftBlocks, descr.LeftSource.ColumnTypes.size());
rightBlocks = SliceBlockList(ctx.HolderFactory, rightBlocks, descr.RightSource.ColumnTypes.size());
}
+ if (!descr.ScalarizeLeftColumns.empty()) {
+ leftBlocks = ScalarizeBlockList(ctx.HolderFactory, leftBlocks, descr.LeftSource.ColumnTypes.size(),
+ descr.ScalarizeLeftColumns);
+ }
+ if (!descr.ScalarizeRightColumns.empty()) {
+ rightBlocks = ScalarizeBlockList(ctx.HolderFactory, rightBlocks, descr.RightSource.ColumnTypes.size(),
+ descr.ScalarizeRightColumns);
+ }
SetEntryPointValues(*graph, leftBlocks, rightBlocks);
return graph;
};
diff --git a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h
index d69fef62925..b510893d2dd 100644
--- a/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h
+++ b/ydb/library/yql/dq/comp_nodes/ut/join_perf/construct_join_graph.h
@@ -20,6 +20,8 @@ struct TJoinDescription {
std::optional<TDqUserRenames> CustomRenames;
int BlockSize = 128;
bool SliceBlocks = false;
+ TVector<int> ScalarizeLeftColumns;
+ TVector<int> ScalarizeRightColumns;
};
bool IsBlockJoin(ETestedJoinAlgo algo);