summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornfrmtk <[email protected]>2025-09-29 12:59:43 +0300
committerGitHub <[email protected]>2025-09-29 14:59:43 +0500
commitb7238df9db415b0f4b41bee2d3587a2065a1d086 (patch)
tree886da7ede159c205f3c1aba85b391b79d4339679
parentcd6071c39304f5b251d92fe16195b8e0a29a2513 (diff)
join build side table interface (#25892)
-rw-r--r--ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp43
-rw-r--r--ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h50
-rw-r--r--ydb/library/yql/dq/comp_nodes/type_utils.h61
3 files changed, 98 insertions, 56 deletions
diff --git a/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp b/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp
index 7a69d9deeb3..d7614aa883e 100644
--- a/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp
+++ b/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp
@@ -25,49 +25,6 @@ using NUdf::TUnboxedValuePod;
namespace {
-struct TWideUnboxedEqual
-{
- TWideUnboxedEqual(const TKeyTypes& types)
- : Types(types)
- {
- }
-
- bool operator()(const NUdf::TUnboxedValuePod* left, const NUdf::TUnboxedValuePod* right) const {
- for (ui32 i = 0U; i < Types.size(); ++i)
- if (CompareValues(Types[i].first, true, Types[i].second, left[i], right[i]))
- return false;
- return true;
- }
-
- const TKeyTypes& Types;
-};
-
-struct TWideUnboxedHasher
-{
- TWideUnboxedHasher(const TKeyTypes& types)
- : Types(types)
- {
- }
-
- NUdf::THashType operator()(const NUdf::TUnboxedValuePod* values) const {
- if (Types.size() == 1U)
- if (const auto v = *values)
- return NUdf::GetValueHash(Types.front().first, v);
- else
- return HashOfNull;
-
- NUdf::THashType hash = 0ULL;
- for (const auto& type : Types) {
- if (const auto v = *values++)
- hash = CombineHashes(hash, NUdf::GetValueHash(type.first, v));
- else
- hash = CombineHashes(hash, HashOfNull);
- }
- return hash;
- }
-
- const TKeyTypes& Types;
-};
using TEqualsPtr = bool(*)(const NUdf::TUnboxedValuePod*, const NUdf::TUnboxedValuePod*);
using THashPtr = NUdf::THashType(*)(const NUdf::TUnboxedValuePod*);
diff --git a/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h b/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h
new file mode 100644
index 00000000000..97652e32e89
--- /dev/null
+++ b/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h
@@ -0,0 +1,50 @@
+#pragma once
+#include "type_utils.h"
+#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
+
+namespace NKikimr::NMiniKQL::NJoinTable {
+
+using TTuple = NYql::NUdf::TUnboxedValue*;
+
+class TStdJoinTable {
+ public:
+ TStdJoinTable(int tupleSize, NKikimr::NMiniKQL::TWideUnboxedEqual eq, NKikimr::NMiniKQL::TWideUnboxedHasher hash)
+ : TupleSize(tupleSize), BuiltTable(1, hash, eq)
+ {}
+
+ void Add(std::span<NYql::NUdf::TUnboxedValue> tuple) {
+ Y_ABORT_UNLESS(BuiltTable.empty(), "JoinTable is built already");
+ Y_ABORT_UNLESS(std::ssize(tuple) == TupleSize, "tuple size promise vs actual mismatch");
+ for (int idx = 0; idx < TupleSize; ++idx) {
+ Tuples.push_back(tuple[idx]);
+ }
+ }
+
+ void Build() {
+ Y_ABORT_UNLESS(BuiltTable.empty(), "JoinTable is built already");
+ for (int index = 0; index < std::ssize(Tuples); index += TupleSize) {
+ TTuple thisTuple = &Tuples[index];
+ auto [it, ok] = BuiltTable.emplace(thisTuple, std::vector{thisTuple});
+ if (!ok) {
+ it->second.emplace_back(thisTuple);
+ }
+ }
+ }
+
+ void Lookup(TTuple key, std::function<void(TTuple)> produce) const {
+ Y_ABORT_IF(BuiltTable.empty(), "call Build first");
+ auto it = BuiltTable.find(key);
+ if (it != BuiltTable.end()) {
+ std::ranges::for_each(it->second, produce);
+ }
+ }
+
+ private:
+ const int TupleSize;
+ std::vector<NYql::NUdf::TUnboxedValue> Tuples;
+ std::unordered_map<TTuple, std::vector<TTuple>, NKikimr::NMiniKQL::TWideUnboxedHasher,
+ NKikimr::NMiniKQL::TWideUnboxedEqual>
+ BuiltTable;
+};
+
+} // namespace NKikimr::NMiniKQL::NJoinTable \ No newline at end of file
diff --git a/ydb/library/yql/dq/comp_nodes/type_utils.h b/ydb/library/yql/dq/comp_nodes/type_utils.h
index dbf4de4cc87..5709637a1ce 100644
--- a/ydb/library/yql/dq/comp_nodes/type_utils.h
+++ b/ydb/library/yql/dq/comp_nodes/type_utils.h
@@ -1,15 +1,56 @@
#pragma once
-#include <yql/essentials/minikql/mkql_program_builder.h>
-#include <yql/essentials/minikql/mkql_node.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/defs.h>
+#include <yql/essentials/minikql/mkql_node.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
-#include <vector>
#include <algorithm>
+#include <vector>
namespace NKikimr {
namespace NMiniKQL {
+struct TWideUnboxedEqual {
+ TWideUnboxedEqual(const TKeyTypes& types)
+ : Types(types)
+ {}
+
+ bool operator()(const NUdf::TUnboxedValuePod* left, const NUdf::TUnboxedValuePod* right) const {
+ for (ui32 i = 0U; i < Types.size(); ++i)
+ if (CompareValues(Types[i].first, true, Types[i].second, left[i], right[i]))
+ return false;
+ return true;
+ }
+
+ const TKeyTypes& Types;
+};
+
+struct TWideUnboxedHasher {
+ TWideUnboxedHasher(const TKeyTypes& types)
+ : Types(types)
+ {}
+
+ NUdf::THashType operator()(const NUdf::TUnboxedValuePod* values) const {
+ if (Types.size() == 1U)
+ if (const auto v = *values)
+ return NUdf::GetValueHash(Types.front().first, v);
+ else
+ return HashOfNull;
+
+ NUdf::THashType hash = 0ULL;
+ for (const auto& type : Types) {
+ if (const auto v = *values++)
+ hash = CombineHashes(hash, NUdf::GetValueHash(type.first, v));
+ else
+ hash = CombineHashes(hash, HashOfNull);
+ }
+ return hash;
+ }
+
+ const TKeyTypes& Types;
+};
+
inline bool UnwrapBlockTypes(const TArrayRef<TType* const>& typeComponents, std::vector<TType*>& result)
{
bool hasBlock = false;
@@ -31,15 +72,9 @@ inline bool UnwrapBlockTypes(const TArrayRef<TType* const>& typeComponents, std:
inline void WrapArrayBlockTypes(std::vector<TType*>& types, const TProgramBuilder& pb)
{
- std::transform(
- types.begin(),
- types.end(),
- types.begin(),
- [&](TType* type) {
- return pb.NewBlockType(type, TBlockType::EShape::Many);
- }
- );
+ std::transform(types.begin(), types.end(), types.begin(),
+ [&](TType* type) { return pb.NewBlockType(type, TBlockType::EShape::Many); });
}
-}
-} \ No newline at end of file
+} // namespace NMiniKQL
+} // namespace NKikimr \ No newline at end of file