diff options
| author | nfrmtk <[email protected]> | 2025-09-29 12:59:43 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-09-29 14:59:43 +0500 |
| commit | b7238df9db415b0f4b41bee2d3587a2065a1d086 (patch) | |
| tree | 886da7ede159c205f3c1aba85b391b79d4339679 | |
| parent | cd6071c39304f5b251d92fe16195b8e0a29a2513 (diff) | |
join build side table interface (#25892)
| -rw-r--r-- | ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp | 43 | ||||
| -rw-r--r-- | ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h | 50 | ||||
| -rw-r--r-- | ydb/library/yql/dq/comp_nodes/type_utils.h | 61 |
3 files changed, 98 insertions, 56 deletions
diff --git a/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp b/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp index 7a69d9deeb3..d7614aa883e 100644 --- a/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp +++ b/ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp @@ -25,49 +25,6 @@ using NUdf::TUnboxedValuePod; namespace { -struct TWideUnboxedEqual -{ - TWideUnboxedEqual(const TKeyTypes& types) - : Types(types) - { - } - - bool operator()(const NUdf::TUnboxedValuePod* left, const NUdf::TUnboxedValuePod* right) const { - for (ui32 i = 0U; i < Types.size(); ++i) - if (CompareValues(Types[i].first, true, Types[i].second, left[i], right[i])) - return false; - return true; - } - - const TKeyTypes& Types; -}; - -struct TWideUnboxedHasher -{ - TWideUnboxedHasher(const TKeyTypes& types) - : Types(types) - { - } - - NUdf::THashType operator()(const NUdf::TUnboxedValuePod* values) const { - if (Types.size() == 1U) - if (const auto v = *values) - return NUdf::GetValueHash(Types.front().first, v); - else - return HashOfNull; - - NUdf::THashType hash = 0ULL; - for (const auto& type : Types) { - if (const auto v = *values++) - hash = CombineHashes(hash, NUdf::GetValueHash(type.first, v)); - else - hash = CombineHashes(hash, HashOfNull); - } - return hash; - } - - const TKeyTypes& Types; -}; using TEqualsPtr = bool(*)(const NUdf::TUnboxedValuePod*, const NUdf::TUnboxedValuePod*); using THashPtr = NUdf::THashType(*)(const NUdf::TUnboxedValuePod*); diff --git a/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h b/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h new file mode 100644 index 00000000000..97652e32e89 --- /dev/null +++ b/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h @@ -0,0 +1,50 @@ +#pragma once +#include "type_utils.h" +#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> + +namespace NKikimr::NMiniKQL::NJoinTable { + +using TTuple = NYql::NUdf::TUnboxedValue*; + +class TStdJoinTable { + public: + TStdJoinTable(int tupleSize, NKikimr::NMiniKQL::TWideUnboxedEqual eq, NKikimr::NMiniKQL::TWideUnboxedHasher hash) + : TupleSize(tupleSize), BuiltTable(1, hash, eq) + {} + + void Add(std::span<NYql::NUdf::TUnboxedValue> tuple) { + Y_ABORT_UNLESS(BuiltTable.empty(), "JoinTable is built already"); + Y_ABORT_UNLESS(std::ssize(tuple) == TupleSize, "tuple size promise vs actual mismatch"); + for (int idx = 0; idx < TupleSize; ++idx) { + Tuples.push_back(tuple[idx]); + } + } + + void Build() { + Y_ABORT_UNLESS(BuiltTable.empty(), "JoinTable is built already"); + for (int index = 0; index < std::ssize(Tuples); index += TupleSize) { + TTuple thisTuple = &Tuples[index]; + auto [it, ok] = BuiltTable.emplace(thisTuple, std::vector{thisTuple}); + if (!ok) { + it->second.emplace_back(thisTuple); + } + } + } + + void Lookup(TTuple key, std::function<void(TTuple)> produce) const { + Y_ABORT_IF(BuiltTable.empty(), "call Build first"); + auto it = BuiltTable.find(key); + if (it != BuiltTable.end()) { + std::ranges::for_each(it->second, produce); + } + } + + private: + const int TupleSize; + std::vector<NYql::NUdf::TUnboxedValue> Tuples; + std::unordered_map<TTuple, std::vector<TTuple>, NKikimr::NMiniKQL::TWideUnboxedHasher, + NKikimr::NMiniKQL::TWideUnboxedEqual> + BuiltTable; +}; + +} // namespace NKikimr::NMiniKQL::NJoinTable
\ No newline at end of file diff --git a/ydb/library/yql/dq/comp_nodes/type_utils.h b/ydb/library/yql/dq/comp_nodes/type_utils.h index dbf4de4cc87..5709637a1ce 100644 --- a/ydb/library/yql/dq/comp_nodes/type_utils.h +++ b/ydb/library/yql/dq/comp_nodes/type_utils.h @@ -1,15 +1,56 @@ #pragma once -#include <yql/essentials/minikql/mkql_program_builder.h> -#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> #include <yql/essentials/minikql/defs.h> +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/mkql_program_builder.h> -#include <vector> #include <algorithm> +#include <vector> namespace NKikimr { namespace NMiniKQL { +struct TWideUnboxedEqual { + TWideUnboxedEqual(const TKeyTypes& types) + : Types(types) + {} + + bool operator()(const NUdf::TUnboxedValuePod* left, const NUdf::TUnboxedValuePod* right) const { + for (ui32 i = 0U; i < Types.size(); ++i) + if (CompareValues(Types[i].first, true, Types[i].second, left[i], right[i])) + return false; + return true; + } + + const TKeyTypes& Types; +}; + +struct TWideUnboxedHasher { + TWideUnboxedHasher(const TKeyTypes& types) + : Types(types) + {} + + NUdf::THashType operator()(const NUdf::TUnboxedValuePod* values) const { + if (Types.size() == 1U) + if (const auto v = *values) + return NUdf::GetValueHash(Types.front().first, v); + else + return HashOfNull; + + NUdf::THashType hash = 0ULL; + for (const auto& type : Types) { + if (const auto v = *values++) + hash = CombineHashes(hash, NUdf::GetValueHash(type.first, v)); + else + hash = CombineHashes(hash, HashOfNull); + } + return hash; + } + + const TKeyTypes& Types; +}; + inline bool UnwrapBlockTypes(const TArrayRef<TType* const>& typeComponents, std::vector<TType*>& result) { bool hasBlock = false; @@ -31,15 +72,9 @@ inline bool UnwrapBlockTypes(const TArrayRef<TType* const>& typeComponents, std: inline void WrapArrayBlockTypes(std::vector<TType*>& types, const TProgramBuilder& pb) { - std::transform( - types.begin(), - types.end(), - types.begin(), - [&](TType* type) { - return pb.NewBlockType(type, TBlockType::EShape::Many); - } - ); + std::transform(types.begin(), types.end(), types.begin(), + [&](TType* type) { return pb.NewBlockType(type, TBlockType::EShape::Many); }); } -} -}
\ No newline at end of file +} // namespace NMiniKQL +} // namespace NKikimr
\ No newline at end of file |
