aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-08-24 11:03:00 +0300
committervvvv <vvvv@ydb.tech>2023-08-24 11:25:29 +0300
commitaad1dfc4cceb262430a03a283d9d1d1b52a82257 (patch)
treeb37b628111796781fde05c70010eb035966737ab
parent2d9b1587926286b1a7f1b3e52f2fcdf8f96461e9 (diff)
downloadydb-aad1dfc4cceb262430a03a283d9d1d1b52a82257.tar.gz
Prefetch in resize & batch API for robin hood hash table
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp299
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp13
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp5
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h105
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ya.make.inc4
-rw-r--r--ydb/library/yql/utils/prefetch.h13
10 files changed, 333 insertions, 110 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
index cee4fa2c28..db7bf3a22f 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
@@ -9,6 +9,7 @@
add_library(minikql-comp_nodes-llvm)
target_compile_options(minikql-comp_nodes-llvm PRIVATE
+ -mprfchw
-DUSE_CURRENT_UDF_ABI_VERSION
$<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
index cd0ff9089f..18a9e65c47 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
@@ -9,6 +9,7 @@
add_library(minikql-comp_nodes-llvm)
target_compile_options(minikql-comp_nodes-llvm PRIVATE
+ -mprfchw
-DUSE_CURRENT_UDF_ABI_VERSION
$<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
index cd0ff9089f..18a9e65c47 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
@@ -9,6 +9,7 @@
add_library(minikql-comp_nodes-llvm)
target_compile_options(minikql-comp_nodes-llvm PRIVATE
+ -mprfchw
-DUSE_CURRENT_UDF_ABI_VERSION
$<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
index cee4fa2c28..db7bf3a22f 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
@@ -9,6 +9,7 @@
add_library(minikql-comp_nodes-llvm)
target_compile_options(minikql-comp_nodes-llvm PRIVATE
+ -mprfchw
-DUSE_CURRENT_UDF_ABI_VERSION
$<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index a78da69025..0a590e7eb3 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -15,6 +15,8 @@
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
+#include <ydb/library/yql/utils/prefetch.h>
+
#include <arrow/scalar.h>
#include <arrow/array/array_primitive.h>
#include <arrow/array/builder_primitive.h>
@@ -32,12 +34,14 @@ constexpr bool InlineAggState = false;
#ifdef USE_STD_UNORDERED
template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>, typename TSettings = void>
class TDynamicHashMapImpl {
+public:
using TMapType = std::unordered_map<TKey, std::vector<char>, THash, TEqual>;
using const_iterator = typename TMapType::const_iterator;
using iterator = typename TMapType::iterator;
-public:
- TDynamicHashMapImpl(size_t stateSize)
+
+ TDynamicHashMapImpl(size_t stateSize, const THash& hasher, const TEqual& equal)
: StateSize_(stateSize)
+ , Map_(0, hasher, equal)
{}
ui64 GetSize() const {
@@ -70,6 +74,15 @@ public:
return res.first;
}
+ template <typename TSink>
+ void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
+ for (size_t index = 0; index < batchRequest.size(); ++index) {
+ bool isNew;
+ auto iter = Insert(batchRequest[index].GetKey(), isNew);
+ sink(index, iter, isNew);
+ }
+ }
+
const TKey& GetKey(const_iterator it) const {
return it->first;
}
@@ -92,10 +105,15 @@ private:
template <typename TKey, typename TPayload, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>, typename TSettings = void>
class TFixedHashMapImpl {
+public:
using TMapType = std::unordered_map<TKey, TPayload, THash, TEqual>;
using const_iterator = typename TMapType::const_iterator;
using iterator = typename TMapType::iterator;
-public:
+
+ TFixedHashMapImpl(const THash& hasher, const TEqual& equal)
+ : Map_(0, hasher, equal)
+ {}
+
ui64 GetSize() const {
return Map_.size();
}
@@ -122,6 +140,15 @@ public:
return res.first;
}
+ template <typename TSink>
+ void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
+ for (size_t index = 0; index < batchRequest.size(); ++index) {
+ bool isNew;
+ auto iter = Insert(batchRequest[index].GetKey(), isNew);
+ sink(index, iter, isNew);
+ }
+ }
+
const TKey& GetKey(const_iterator it) const {
return it->first;
}
@@ -143,10 +170,15 @@ private:
template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>, typename TSettings = void>
class THashSetImpl {
+public:
using TSetType = std::unordered_set<TKey, THash, TEqual>;
using const_iterator = typename TSetType::const_iterator;
using iterator = typename TSetType::iterator;
-public:
+
+ THashSetImpl(const THash& hasher, const TEqual& equal)
+ : Set_(0, hasher, equal)
+ {}
+
ui64 GetSize() const {
return Set_.size();
}
@@ -173,6 +205,15 @@ public:
return res.first;
}
+ template <typename TSink>
+ void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
+ for (size_t index = 0; index < batchRequest.size(); ++index) {
+ bool isNew;
+ auto iter = Insert(batchRequest[index].GetKey(), isNew);
+ sink(index, iter, isNew);
+ }
+ }
+
void CheckGrow() {
}
@@ -219,10 +260,11 @@ struct TKey16 {
};
class TSSOKey {
-private:
+public:
static constexpr size_t SSO_Length = 15;
static_assert(SSO_Length < 128); // should fit into 7 bits
+private:
struct TExternal {
ui64 Length_;
const char* Ptr_;
@@ -234,6 +276,10 @@ private:
};
public:
+ TSSOKey(const TSSOKey& other) {
+ memcpy(U.A, other.U.A, SSO_Length + 1);
+ }
+
static bool CanBeInplace(TStringBuf data) {
return data.Size() + 1 <= sizeof(TSSOKey);
}
@@ -278,9 +324,12 @@ private:
union {
TExternal E;
TInplace I;
+ char A[SSO_Length + 1];
} U;
};
+static_assert(sizeof(TSSOKey) == TSSOKey::SSO_Length + 1);
+
}
}
}
@@ -725,8 +774,93 @@ public:
keysDatum.emplace_back(TArrowBlock::From(s.Values_[Keys_[i].Index]).GetDatum());
}
- TOutputBuffer out;
- out.Resize(sizeof(TKey));
+ std::array<TOutputBuffer, PrefetchBatchSize> out;
+ for (ui32 i = 0; i < PrefetchBatchSize; ++i) {
+ out[i].Resize(sizeof(TKey));
+ }
+
+ std::array<TRobinHoodBatchRequestItem<TKey>, PrefetchBatchSize> insertBatch;
+ std::array<ui64, PrefetchBatchSize> insertBatchRows;
+ std::array<char*, PrefetchBatchSize> insertBatchPayloads;
+ std::array<bool, PrefetchBatchSize> insertBatchIsNew;
+ ui32 insertBatchLen = 0;
+
+ auto processInsertBatch = [&]() {
+ for (ui32 i = 0; i < insertBatchLen; ++i) {
+ auto& r = insertBatch[i];
+ TStringBuf str = out[i].Finish();
+ TKey key = MakeKey<TKey>(str, KeyLength_);
+ r.ConstructKey(key);
+ }
+
+ if constexpr (UseSet) {
+ s.HashSet_->BatchInsert({insertBatch.data(), insertBatchLen},[&](size_t index, typename TState::TSetImpl::iterator iter, bool isNew) {
+ Y_UNUSED(index);
+ if (isNew) {
+ if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
+ MoveKeyToArena(s.HashSet_->GetKey(iter), s.Arena_, KeyLength_);
+ }
+ }
+ });
+ } else {
+ using THashTable = std::conditional_t<InlineAggState, typename TState::TDynMapImpl, typename TState::TFixedMapImpl>;
+ THashTable* hash;
+ if constexpr (!InlineAggState) {
+ hash = s.HashFixedMap_.get();
+ } else {
+ hash = s.HashMap_.get();
+ }
+
+ hash->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t index, typename THashTable::iterator iter, bool isNew) {
+ if (isNew) {
+ if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
+ MoveKeyToArena(hash->GetKey(iter), s.Arena_, KeyLength_);
+ }
+ }
+
+ if constexpr (UseArena) {
+ // prefetch payloads only
+ auto payload = hash->GetPayload(iter);
+ char* ptr;
+ if (isNew) {
+ ptr = (char*)s.Arena_.Alloc(s.TotalStateSize_);
+ *(char**)payload = ptr;
+ } else {
+ ptr = *(char**)payload;
+ }
+
+ insertBatchIsNew[index] = isNew;
+ insertBatchPayloads[index] = ptr;
+ NYql::PrefetchForWrite(ptr);
+ } else {
+ // process insert
+ auto payload = (char*)hash->GetPayload(iter);
+ auto row = insertBatchRows[index];
+ ui32 streamIndex = 0;
+ if constexpr (Many) {
+ streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row];
+ }
+
+ Insert(row, payload, isNew, streamIndex, output, s);
+ }
+ });
+
+ if constexpr (UseArena) {
+ for (ui32 i = 0; i < insertBatchLen; ++i) {
+ auto row = insertBatchRows[i];
+ ui32 streamIndex = 0;
+ if constexpr (Many) {
+ streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row];
+ }
+
+ bool isNew = insertBatchIsNew[i];
+ char* payload = insertBatchPayloads[i];
+ Insert(row, payload, isNew, streamIndex, output, s);
+ }
+ }
+ }
+ };
+
for (ui64 row = 0; row < batchLength; ++row) {
if constexpr (UseFilter) {
if (filterBitmap && !filterBitmap[row]) {
@@ -734,36 +868,21 @@ public:
}
}
- out.Rewind();
// encode key
+ out[insertBatchLen].Rewind();
for (ui32 i = 0; i < keysDatum.size(); ++i) {
- s.Readers_[i]->SaveItem(*keysDatum[i].array(), row, out);
+ s.Readers_[i]->SaveItem(*keysDatum[i].array(), row, out[insertBatchLen]);
}
- auto str = out.Finish();
- TKey key = MakeKey<TKey>(str, KeyLength_);
- if constexpr (UseSet) {
- bool isNew;
- auto iter = s.HashSet_->Insert(key, isNew);
- if (isNew) {
- if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
- MoveKeyToArena(s.HashSet_->GetKey(iter), s.Arena_, KeyLength_);
- }
-
- s.HashSet_->CheckGrow();
- }
- } else {
- ui32 streamIndex = 0;
- if constexpr (Many) {
- streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row];
- }
- if (!InlineAggState) {
- Insert(*s.HashFixedMap_, key, row, streamIndex, output, s);
- } else {
- Insert(*s.HashMap_, key, row, streamIndex, output, s);
- }
+ insertBatchRows[insertBatchLen] = row;
+ ++insertBatchLen;
+ if (insertBatchLen == PrefetchBatchSize) {
+ processInsertBatch();
+ insertBatchLen = 0;
}
}
+
+ processInsertBatch();
} else {
if (!s.HasValues_) {
s.IsFinished_ = true;
@@ -853,11 +972,11 @@ private:
ui32 TotalStateSize_ = 0;
size_t OutputBlockSize_ = 0;
std::unique_ptr<TDynMapImpl> HashMap_;
- typename TDynMapImpl::iterator HashMapIt_;
+ typename TDynMapImpl::const_iterator HashMapIt_;
std::unique_ptr<TSetImpl> HashSet_;
- typename TSetImpl::iterator HashSetIt_;
+ typename TSetImpl::const_iterator HashSetIt_;
std::unique_ptr<TFixedMapImpl> HashFixedMap_;
- typename TFixedMapImpl::iterator HashFixedMapIt_;
+ typename TFixedMapImpl::const_iterator HashFixedMapIt_;
TPagedArena Arena_;
TState(TMemoryUsageInfo* memInfo, ui32 keyLength, size_t width, std::optional<ui32>, const TVector<TAggParams<TAggregator>>& params,
@@ -960,21 +1079,10 @@ private:
return *static_cast<TState*>(state.AsBoxed().Get());
}
- template <typename THash>
- void Insert(THash& hash, const TKey& key, ui64 row, ui32 currentStreamIndex, NUdf::TUnboxedValue*const* output, TState& s) const {
- bool isNew;
- auto iter = hash.Insert(key, isNew);
- char* payload = (char*)hash.GetMutablePayload(iter);
- char* ptr;
+ void Insert(ui64 row, char* payload, bool isNew, ui32 currentStreamIndex, NUdf::TUnboxedValue*const* output, TState& s) const {
+ char* ptr = payload;
if (isNew) {
- if constexpr (UseArena) {
- ptr = (char*)s.Arena_.Alloc(s.TotalStateSize_);
- *(char**)payload = ptr;
- } else {
- ptr = payload;
- }
-
if constexpr (Many) {
static_assert(Finalize);
MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index");
@@ -999,19 +1107,7 @@ private:
ptr += s.Aggs_[i]->StateSize;
}
}
-
- if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
- MoveKeyToArena(hash.GetKey(iter), s.Arena_, KeyLength_);
- }
-
- hash.CheckGrow();
} else {
- if constexpr (UseArena) {
- ptr = *(char**)payload;
- } else {
- ptr = payload;
- }
-
if constexpr (Many) {
static_assert(Finalize);
MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index");
@@ -1045,49 +1141,82 @@ private:
}
template <typename THash>
- bool Iterate(THash& hash, typename THash::iterator& iter, NUdf::TUnboxedValue*const* output, TState& s) const {
+ bool Iterate(THash& hash, typename THash::const_iterator& iter, NUdf::TUnboxedValue*const* output, TState& s) const {
MKQL_ENSURE(s.WritingOutput_, "Supposed to be called at the end");
+ std::array<typename THash::const_iterator, PrefetchBatchSize> iters;
+ ui32 itersLen = 0;
+ auto iterateBatch = [&]() {
+ for (ui32 i = 0; i < itersLen; ++i) {
+ auto iter = iters[i];
+ const TKey& key = hash.GetKey(iter);
+ auto payload = (char*)hash.GetPayload(iter);
+ char* ptr;
+ if constexpr (UseArena) {
+ ptr = *(char**)payload;
+ } else {
+ ptr = payload;
+ }
+
+ TInputBuffer in(GetKeyView<TKey>(key, KeyLength_));
+ for (auto& kb : s.Builders_) {
+ kb->Add(in);
+ }
+
+ if constexpr (Many) {
+ for (ui32 i = 0; i < Streams_.size(); ++i) {
+ MKQL_ENSURE(ptr[i], "Missing partial aggregation state");
+ }
+
+ ptr += Streams_.size();
+ }
+
+ for (size_t i = 0; i < s.Aggs_.size(); ++i) {
+ if (output[Keys_.size() + i]) {
+ s.AggBuilders_[i]->Add(ptr);
+ s.Aggs_[i]->DestroyState(ptr);
+ }
+
+ ptr += s.Aggs_[i]->StateSize;
+ }
+ }
+ };
+
for (; iter != hash.End(); hash.Advance(iter)) {
if (!hash.IsValid(iter)) {
continue;
}
if (s.OutputBlockSize_ == MaxBlockLen_) {
+ iterateBatch();
return false;
}
- const TKey& key = hash.GetKey(iter);
- auto payload = (char*)hash.GetMutablePayload(iter);
- char* ptr;
- if constexpr (UseArena) {
- ptr = *(char**)payload;
- } else {
- ptr = payload;
- }
-
- TInputBuffer in(GetKeyView<TKey>(key, KeyLength_));
- for (auto& kb : s.Builders_) {
- kb->Add(in);
+ if (itersLen == iters.size()) {
+ iterateBatch();
+ itersLen = 0;
}
- if constexpr (Many) {
- for (ui32 i = 0; i < Streams_.size(); ++i) {
- MKQL_ENSURE(ptr[i], "Missing partial aggregation state");
- }
-
- ptr += Streams_.size();
+ iters[itersLen] = iter;
+ ++itersLen;
+ s.OutputBlockSize_++;
+ if constexpr (UseArena) {
+ auto payload = (char*)hash.GetPayload(iter);
+ auto ptr = *(char**)payload;
+ NYql::PrefetchForWrite(ptr);
}
- for (size_t i = 0; i < s.Aggs_.size(); ++i) {
- if (output[Keys_.size() + i]) {
- s.AggBuilders_[i]->Add(ptr);
- s.Aggs_[i]->DestroyState(ptr);
+ if constexpr (std::is_same<TKey, TSSOKey>::value) {
+ const auto& key = hash.GetKey(iter);
+ if (!key.IsInplace()) {
+ NYql::PrefetchForRead(key.AsView().Data());
}
-
- ptr += s.Aggs_[i]->StateSize;
+ } else if constexpr (std::is_same<TKey, TExternalFixedSizeKey>::value) {
+ const auto& key = hash.GetKey(iter);
+ NYql::PrefetchForRead(key.Data);
}
- s.OutputBlockSize_++;
}
+
+ iterateBatch();
return true;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
index 6a5d358dac..136c3efc52 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
@@ -2,7 +2,7 @@
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
-#include <arrow/array/builder_primitive.h>
+#include <ydb/library/yql/minikql/computation/mkql_block_builder.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -16,25 +16,22 @@ struct TState {
class TColumnBuilder : public IAggColumnBuilder {
public:
TColumnBuilder(ui64 size, TComputationContext& ctx)
- : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool)
+ : Builder_(TTypeInfoHelper(), arrow::uint64(), ctx.ArrowMemoryPool, size)
, Ctx_(ctx)
{
- ARROW_OK(Builder_.Reserve(size));
}
void Add(const void* state) final {
auto typedState = static_cast<const TState*>(state);
- Builder_.UnsafeAppend(typedState->Count_);
+ Builder_.Add(TBlockItem(typedState->Count_));
}
NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder_.FinishInternal(&result));
- return Ctx_.HolderFactory.CreateArrowBlock(result);
+ return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true));
}
private:
- arrow::UInt64Builder Builder_;
+ NYql::NUdf::TFixedSizeArrayBuilder<ui64, false> Builder_;
TComputationContext& Ctx_;
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
index d6e449cc92..e91762c2b6 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -77,11 +77,12 @@ public:
void Add(const void* state) final {
auto typedState = static_cast<const TAvgState*>(state);
+ auto tupleBuilder = static_cast<NUdf::TTupleArrayBuilder<true>*>(Builder_.get());
if (typedState->Count_) {
TBlockItem tupleItems[] = { TBlockItem(typedState->Sum_), TBlockItem(typedState->Count_)} ;
- Builder_->Add(TBlockItem(tupleItems));
+ tupleBuilder->Add(TBlockItem(tupleItems));
} else {
- Builder_->Add(TBlockItem());
+ tupleBuilder->Add(TBlockItem());
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
index 23699b1c53..ae3e9e4fa5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
@@ -3,6 +3,9 @@
#include <util/system/types.h>
#include <util/generic/yexception.h>
#include <vector>
+#include <span>
+
+#include <ydb/library/yql/utils/prefetch.h>
#include <util/digest/city.h>
#include <util/generic/scope.h>
@@ -15,6 +18,26 @@ struct TRobinHoodDefaultSettings {
static constexpr bool CacheHash = !std::is_arithmetic<TKey>::value;
};
+template <typename TKey>
+struct TRobinHoodBatchRequestItem {
+ // input
+ alignas(TKey) char KeyStorage[sizeof(TKey)];
+
+ const TKey& GetKey() const {
+ return *reinterpret_cast<const TKey*>(KeyStorage);
+ }
+
+ void ConstructKey(const TKey& key) {
+ new (KeyStorage) TKey(key);
+ }
+
+ // intermediate data
+ ui64 Hash;
+ char* InitialIterator;
+};
+
+constexpr ui32 PrefetchBatchSize = 64;
+
//TODO: only POD key & payloads are now supported
template <typename TKey, typename TEqual, typename THash, typename TAllocator, typename TDeriv, bool CacheHash>
class TRobinHoodHashBase {
@@ -75,7 +98,9 @@ protected:
public:
// returns iterator
Y_FORCE_INLINE char* Insert(TKey key, bool& isNew) {
- auto ret = InsertImpl(key, HashLocal(key), isNew, Capacity, Data, DataEnd);
+ auto hash = HashLocal(key);
+ auto ptr = MakeIterator(hash, Data, Capacity);
+ auto ret = InsertImpl(key, hash, isNew, Data, DataEnd, ptr);
Size += isNew ? 1 : 0;
return ret;
}
@@ -87,6 +112,28 @@ public:
}
}
+ template <typename TSink>
+ Y_NO_INLINE void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
+ while (2 * (Size + batchRequest.size()) >= Capacity) {
+ Grow();
+ }
+
+ for (size_t i = 0; i < batchRequest.size(); ++i) {
+ auto& r = batchRequest[i];
+ r.Hash = HashLocal(r.GetKey());
+ r.InitialIterator = MakeIterator(r.Hash, Data, Capacity);
+ NYql::PrefetchForWrite(r.InitialIterator);
+ }
+
+ for (size_t i = 0; i < batchRequest.size(); ++i) {
+ auto& r = batchRequest[i];
+ bool isNew;
+ auto iter = InsertImpl(r.GetKey(), r.Hash, isNew, Data, DataEnd, r.InitialIterator);
+ Size += isNew ? 1 : 0;
+ sink(i, iter, isNew);
+ }
+ }
+
ui64 GetCapacity() const {
return Capacity;
}
@@ -124,11 +171,11 @@ public:
return DataEnd;
}
- void Advance(char*& ptr) {
+ void Advance(char*& ptr) const {
ptr += AsDeriv().GetCellSize();
}
- void Advance(const char*& ptr) {
+ void Advance(const char*& ptr) const {
ptr += AsDeriv().GetCellSize();
}
@@ -161,12 +208,20 @@ public:
}
private:
- Y_FORCE_INLINE char* InsertImpl(TKey key, const ui64 hash, bool& isNew, ui64 capacity, char* data, char* dataEnd) {
- isNew = false;
- TPSLStorage psl(hash);
- // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
+ struct TInternalBatchRequestItem : TRobinHoodBatchRequestItem<TKey> {
+ char* OriginalIterator;
+ };
+
+ Y_FORCE_INLINE char* MakeIterator(const ui64 hash, char* data, ui64 capacity) {
+ // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
ui64 bucket = ((SelfHash ^ hash) * 11400714819323198485llu) & (capacity - 1);
char* ptr = data + AsDeriv().GetCellSize() * bucket;
+ return ptr;
+ }
+
+ Y_FORCE_INLINE char* InsertImpl(TKey key, const ui64 hash, bool& isNew, char* data, char* dataEnd, char* ptr) {
+ isNew = false;
+ TPSLStorage psl(hash);
char* returnPtr;
typename TDeriv::TPayloadStore tmpPayload;
for (;;) {
@@ -226,7 +281,7 @@ private:
}
}
- void Grow() {
+ Y_NO_INLINE void Grow() {
ui64 growFactor;
if (Capacity < 100'000) {
growFactor = 8;
@@ -242,28 +297,48 @@ private:
Allocator.deallocate(newData, newDataEnd - newData);
};
+ std::array<TInternalBatchRequestItem, PrefetchBatchSize> batch;
+ ui32 batchLen = 0;
for (auto iter = Begin(); iter != End(); Advance(iter)) {
if (GetPSL(iter).Distance < 0) {
continue;
}
- bool isNew;
- auto& key = GetKey(iter);
- char* newIter = nullptr;
+ if (batchLen == batch.size()) {
+ CopyBatch({batch.data(), batchLen}, newData, newDataEnd);
+ batchLen = 0;
+ }
+
+ auto& r = batch[batchLen++];
+ r.ConstructKey(GetKey(iter));
+ r.OriginalIterator = iter;
+
if constexpr (CacheHash) {
- newIter = InsertImpl(key, GetPSL(iter).Hash, isNew, newCapacity, newData, newDataEnd);
+ r.Hash = GetPSL(iter).Hash;
} else {
- newIter = InsertImpl(key, HashLocal(key), isNew, newCapacity, newData, newDataEnd);
+ r.Hash = HashLocal(r.GetKey());
}
- Y_ASSERT(isNew);
- AsDeriv().CopyPayload(GetMutablePayload(newIter), GetPayload(iter));
+
+ r.InitialIterator = MakeIterator(r.Hash, newData, newCapacity);
+ NYql::PrefetchForWrite(r.InitialIterator);
}
+ CopyBatch({batch.data(), batchLen}, newData, newDataEnd);
+
Capacity = newCapacity;
std::swap(Data, newData);
std::swap(DataEnd, newDataEnd);
}
+ Y_NO_INLINE void CopyBatch(std::span<TInternalBatchRequestItem> batch, char* newData, char* newDataEnd) {
+ for (auto& r : batch) {
+ bool isNew;
+ auto iter = InsertImpl(r.GetKey(), r.Hash, isNew, newData, newDataEnd, r.InitialIterator);
+ Y_ASSERT(isNew);
+ AsDeriv().CopyPayload(GetMutablePayload(iter), GetPayload(r.OriginalIterator));
+ }
+ }
+
void AdvancePointer(char*& ptr, char* begin, char* end) const {
ptr += AsDeriv().GetCellSize();
ptr = (ptr == end) ? begin : ptr;
diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make.inc b/ydb/library/yql/minikql/comp_nodes/ya.make.inc
index 18a5d2ef28..1a84c6e9ba 100644
--- a/ydb/library/yql/minikql/comp_nodes/ya.make.inc
+++ b/ydb/library/yql/minikql/comp_nodes/ya.make.inc
@@ -138,6 +138,10 @@ PEERDIR(
library/cpp/actors/core
)
+CFLAGS(
+ -mprfchw
+)
+
YQL_LAST_ABI_VERSION()
PROVIDES(mkql_comp_nodes)
diff --git a/ydb/library/yql/utils/prefetch.h b/ydb/library/yql/utils/prefetch.h
new file mode 100644
index 0000000000..50eea7df5c
--- /dev/null
+++ b/ydb/library/yql/utils/prefetch.h
@@ -0,0 +1,13 @@
+#pragma once
+
+namespace NYql {
+
+inline void PrefetchForRead(const void* ptr) {
+ __builtin_prefetch(ptr, 0, 3);
+}
+
+inline void PrefetchForWrite(void* ptr) {
+ __builtin_prefetch(ptr, 1, 3);
+}
+
+} // namespace NYql