diff options
author | vvvv <vvvv@ydb.tech> | 2023-08-24 11:03:00 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-08-24 11:25:29 +0300 |
commit | aad1dfc4cceb262430a03a283d9d1d1b52a82257 (patch) | |
tree | b37b628111796781fde05c70010eb035966737ab | |
parent | 2d9b1587926286b1a7f1b3e52f2fcdf8f96461e9 (diff) | |
download | ydb-aad1dfc4cceb262430a03a283d9d1d1b52a82257.tar.gz |
Prefetch in resize & batch API for robin hood hash table
10 files changed, 333 insertions, 110 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt index cee4fa2c28..db7bf3a22f 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt @@ -9,6 +9,7 @@ add_library(minikql-comp_nodes-llvm) target_compile_options(minikql-comp_nodes-llvm PRIVATE + -mprfchw -DUSE_CURRENT_UDF_ABI_VERSION $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt index cd0ff9089f..18a9e65c47 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt @@ -9,6 +9,7 @@ add_library(minikql-comp_nodes-llvm) target_compile_options(minikql-comp_nodes-llvm PRIVATE + -mprfchw -DUSE_CURRENT_UDF_ABI_VERSION $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt index cd0ff9089f..18a9e65c47 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt @@ -9,6 +9,7 @@ add_library(minikql-comp_nodes-llvm) target_compile_options(minikql-comp_nodes-llvm PRIVATE + -mprfchw -DUSE_CURRENT_UDF_ABI_VERSION $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt index cee4fa2c28..db7bf3a22f 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt @@ -9,6 +9,7 @@ add_library(minikql-comp_nodes-llvm) target_compile_options(minikql-comp_nodes-llvm PRIVATE + -mprfchw -DUSE_CURRENT_UDF_ABI_VERSION $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index a78da69025..0a590e7eb3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -15,6 +15,8 @@ #include <ydb/library/yql/minikql/arrow/arrow_util.h> #include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> +#include <ydb/library/yql/utils/prefetch.h> + #include <arrow/scalar.h> #include <arrow/array/array_primitive.h> #include <arrow/array/builder_primitive.h> @@ -32,12 +34,14 @@ constexpr bool InlineAggState = false; #ifdef USE_STD_UNORDERED template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>, typename TSettings = void> class TDynamicHashMapImpl { +public: using TMapType = std::unordered_map<TKey, std::vector<char>, THash, TEqual>; using const_iterator = typename TMapType::const_iterator; using iterator = typename TMapType::iterator; -public: - TDynamicHashMapImpl(size_t stateSize) + + TDynamicHashMapImpl(size_t stateSize, const THash& hasher, const TEqual& equal) : StateSize_(stateSize) + , Map_(0, hasher, equal) {} ui64 GetSize() const { @@ -70,6 +74,15 @@ public: return res.first; } + template <typename TSink> + void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) { + for (size_t index = 0; index < batchRequest.size(); ++index) { + bool isNew; + auto iter = Insert(batchRequest[index].GetKey(), isNew); + sink(index, iter, isNew); + } + } + const TKey& GetKey(const_iterator it) const { return it->first; } @@ -92,10 +105,15 @@ private: template <typename TKey, typename TPayload, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>, typename TSettings = void> class TFixedHashMapImpl { +public: using TMapType = std::unordered_map<TKey, TPayload, THash, TEqual>; using const_iterator = typename TMapType::const_iterator; using iterator = typename TMapType::iterator; -public: + + TFixedHashMapImpl(const THash& hasher, const TEqual& equal) + : Map_(0, hasher, equal) + {} + ui64 GetSize() const { return Map_.size(); } @@ -122,6 +140,15 @@ public: return res.first; } + template <typename TSink> + void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) { + for (size_t index = 0; index < batchRequest.size(); ++index) { + bool isNew; + auto iter = Insert(batchRequest[index].GetKey(), isNew); + sink(index, iter, isNew); + } + } + const TKey& GetKey(const_iterator it) const { return it->first; } @@ -143,10 +170,15 @@ private: template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>, typename TSettings = void> class THashSetImpl { +public: using TSetType = std::unordered_set<TKey, THash, TEqual>; using const_iterator = typename TSetType::const_iterator; using iterator = typename TSetType::iterator; -public: + + THashSetImpl(const THash& hasher, const TEqual& equal) + : Set_(0, hasher, equal) + {} + ui64 GetSize() const { return Set_.size(); } @@ -173,6 +205,15 @@ public: return res.first; } + template <typename TSink> + void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) { + for (size_t index = 0; index < batchRequest.size(); ++index) { + bool isNew; + auto iter = Insert(batchRequest[index].GetKey(), isNew); + sink(index, iter, isNew); + } + } + void CheckGrow() { } @@ -219,10 +260,11 @@ struct TKey16 { }; class TSSOKey { -private: +public: static constexpr size_t SSO_Length = 15; static_assert(SSO_Length < 128); // should fit into 7 bits +private: struct TExternal { ui64 Length_; const char* Ptr_; @@ -234,6 +276,10 @@ private: }; public: + TSSOKey(const TSSOKey& other) { + memcpy(U.A, other.U.A, SSO_Length + 1); + } + static bool CanBeInplace(TStringBuf data) { return data.Size() + 1 <= sizeof(TSSOKey); } @@ -278,9 +324,12 @@ private: union { TExternal E; TInplace I; + char A[SSO_Length + 1]; } U; }; +static_assert(sizeof(TSSOKey) == TSSOKey::SSO_Length + 1); + } } } @@ -725,8 +774,93 @@ public: keysDatum.emplace_back(TArrowBlock::From(s.Values_[Keys_[i].Index]).GetDatum()); } - TOutputBuffer out; - out.Resize(sizeof(TKey)); + std::array<TOutputBuffer, PrefetchBatchSize> out; + for (ui32 i = 0; i < PrefetchBatchSize; ++i) { + out[i].Resize(sizeof(TKey)); + } + + std::array<TRobinHoodBatchRequestItem<TKey>, PrefetchBatchSize> insertBatch; + std::array<ui64, PrefetchBatchSize> insertBatchRows; + std::array<char*, PrefetchBatchSize> insertBatchPayloads; + std::array<bool, PrefetchBatchSize> insertBatchIsNew; + ui32 insertBatchLen = 0; + + auto processInsertBatch = [&]() { + for (ui32 i = 0; i < insertBatchLen; ++i) { + auto& r = insertBatch[i]; + TStringBuf str = out[i].Finish(); + TKey key = MakeKey<TKey>(str, KeyLength_); + r.ConstructKey(key); + } + + if constexpr (UseSet) { + s.HashSet_->BatchInsert({insertBatch.data(), insertBatchLen},[&](size_t index, typename TState::TSetImpl::iterator iter, bool isNew) { + Y_UNUSED(index); + if (isNew) { + if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { + MoveKeyToArena(s.HashSet_->GetKey(iter), s.Arena_, KeyLength_); + } + } + }); + } else { + using THashTable = std::conditional_t<InlineAggState, typename TState::TDynMapImpl, typename TState::TFixedMapImpl>; + THashTable* hash; + if constexpr (!InlineAggState) { + hash = s.HashFixedMap_.get(); + } else { + hash = s.HashMap_.get(); + } + + hash->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t index, typename THashTable::iterator iter, bool isNew) { + if (isNew) { + if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { + MoveKeyToArena(hash->GetKey(iter), s.Arena_, KeyLength_); + } + } + + if constexpr (UseArena) { + // prefetch payloads only + auto payload = hash->GetPayload(iter); + char* ptr; + if (isNew) { + ptr = (char*)s.Arena_.Alloc(s.TotalStateSize_); + *(char**)payload = ptr; + } else { + ptr = *(char**)payload; + } + + insertBatchIsNew[index] = isNew; + insertBatchPayloads[index] = ptr; + NYql::PrefetchForWrite(ptr); + } else { + // process insert + auto payload = (char*)hash->GetPayload(iter); + auto row = insertBatchRows[index]; + ui32 streamIndex = 0; + if constexpr (Many) { + streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row]; + } + + Insert(row, payload, isNew, streamIndex, output, s); + } + }); + + if constexpr (UseArena) { + for (ui32 i = 0; i < insertBatchLen; ++i) { + auto row = insertBatchRows[i]; + ui32 streamIndex = 0; + if constexpr (Many) { + streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row]; + } + + bool isNew = insertBatchIsNew[i]; + char* payload = insertBatchPayloads[i]; + Insert(row, payload, isNew, streamIndex, output, s); + } + } + } + }; + for (ui64 row = 0; row < batchLength; ++row) { if constexpr (UseFilter) { if (filterBitmap && !filterBitmap[row]) { @@ -734,36 +868,21 @@ public: } } - out.Rewind(); // encode key + out[insertBatchLen].Rewind(); for (ui32 i = 0; i < keysDatum.size(); ++i) { - s.Readers_[i]->SaveItem(*keysDatum[i].array(), row, out); + s.Readers_[i]->SaveItem(*keysDatum[i].array(), row, out[insertBatchLen]); } - auto str = out.Finish(); - TKey key = MakeKey<TKey>(str, KeyLength_); - if constexpr (UseSet) { - bool isNew; - auto iter = s.HashSet_->Insert(key, isNew); - if (isNew) { - if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { - MoveKeyToArena(s.HashSet_->GetKey(iter), s.Arena_, KeyLength_); - } - - s.HashSet_->CheckGrow(); - } - } else { - ui32 streamIndex = 0; - if constexpr (Many) { - streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row]; - } - if (!InlineAggState) { - Insert(*s.HashFixedMap_, key, row, streamIndex, output, s); - } else { - Insert(*s.HashMap_, key, row, streamIndex, output, s); - } + insertBatchRows[insertBatchLen] = row; + ++insertBatchLen; + if (insertBatchLen == PrefetchBatchSize) { + processInsertBatch(); + insertBatchLen = 0; } } + + processInsertBatch(); } else { if (!s.HasValues_) { s.IsFinished_ = true; @@ -853,11 +972,11 @@ private: ui32 TotalStateSize_ = 0; size_t OutputBlockSize_ = 0; std::unique_ptr<TDynMapImpl> HashMap_; - typename TDynMapImpl::iterator HashMapIt_; + typename TDynMapImpl::const_iterator HashMapIt_; std::unique_ptr<TSetImpl> HashSet_; - typename TSetImpl::iterator HashSetIt_; + typename TSetImpl::const_iterator HashSetIt_; std::unique_ptr<TFixedMapImpl> HashFixedMap_; - typename TFixedMapImpl::iterator HashFixedMapIt_; + typename TFixedMapImpl::const_iterator HashFixedMapIt_; TPagedArena Arena_; TState(TMemoryUsageInfo* memInfo, ui32 keyLength, size_t width, std::optional<ui32>, const TVector<TAggParams<TAggregator>>& params, @@ -960,21 +1079,10 @@ private: return *static_cast<TState*>(state.AsBoxed().Get()); } - template <typename THash> - void Insert(THash& hash, const TKey& key, ui64 row, ui32 currentStreamIndex, NUdf::TUnboxedValue*const* output, TState& s) const { - bool isNew; - auto iter = hash.Insert(key, isNew); - char* payload = (char*)hash.GetMutablePayload(iter); - char* ptr; + void Insert(ui64 row, char* payload, bool isNew, ui32 currentStreamIndex, NUdf::TUnboxedValue*const* output, TState& s) const { + char* ptr = payload; if (isNew) { - if constexpr (UseArena) { - ptr = (char*)s.Arena_.Alloc(s.TotalStateSize_); - *(char**)payload = ptr; - } else { - ptr = payload; - } - if constexpr (Many) { static_assert(Finalize); MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index"); @@ -999,19 +1107,7 @@ private: ptr += s.Aggs_[i]->StateSize; } } - - if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { - MoveKeyToArena(hash.GetKey(iter), s.Arena_, KeyLength_); - } - - hash.CheckGrow(); } else { - if constexpr (UseArena) { - ptr = *(char**)payload; - } else { - ptr = payload; - } - if constexpr (Many) { static_assert(Finalize); MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index"); @@ -1045,49 +1141,82 @@ private: } template <typename THash> - bool Iterate(THash& hash, typename THash::iterator& iter, NUdf::TUnboxedValue*const* output, TState& s) const { + bool Iterate(THash& hash, typename THash::const_iterator& iter, NUdf::TUnboxedValue*const* output, TState& s) const { MKQL_ENSURE(s.WritingOutput_, "Supposed to be called at the end"); + std::array<typename THash::const_iterator, PrefetchBatchSize> iters; + ui32 itersLen = 0; + auto iterateBatch = [&]() { + for (ui32 i = 0; i < itersLen; ++i) { + auto iter = iters[i]; + const TKey& key = hash.GetKey(iter); + auto payload = (char*)hash.GetPayload(iter); + char* ptr; + if constexpr (UseArena) { + ptr = *(char**)payload; + } else { + ptr = payload; + } + + TInputBuffer in(GetKeyView<TKey>(key, KeyLength_)); + for (auto& kb : s.Builders_) { + kb->Add(in); + } + + if constexpr (Many) { + for (ui32 i = 0; i < Streams_.size(); ++i) { + MKQL_ENSURE(ptr[i], "Missing partial aggregation state"); + } + + ptr += Streams_.size(); + } + + for (size_t i = 0; i < s.Aggs_.size(); ++i) { + if (output[Keys_.size() + i]) { + s.AggBuilders_[i]->Add(ptr); + s.Aggs_[i]->DestroyState(ptr); + } + + ptr += s.Aggs_[i]->StateSize; + } + } + }; + for (; iter != hash.End(); hash.Advance(iter)) { if (!hash.IsValid(iter)) { continue; } if (s.OutputBlockSize_ == MaxBlockLen_) { + iterateBatch(); return false; } - const TKey& key = hash.GetKey(iter); - auto payload = (char*)hash.GetMutablePayload(iter); - char* ptr; - if constexpr (UseArena) { - ptr = *(char**)payload; - } else { - ptr = payload; - } - - TInputBuffer in(GetKeyView<TKey>(key, KeyLength_)); - for (auto& kb : s.Builders_) { - kb->Add(in); + if (itersLen == iters.size()) { + iterateBatch(); + itersLen = 0; } - if constexpr (Many) { - for (ui32 i = 0; i < Streams_.size(); ++i) { - MKQL_ENSURE(ptr[i], "Missing partial aggregation state"); - } - - ptr += Streams_.size(); + iters[itersLen] = iter; + ++itersLen; + s.OutputBlockSize_++; + if constexpr (UseArena) { + auto payload = (char*)hash.GetPayload(iter); + auto ptr = *(char**)payload; + NYql::PrefetchForWrite(ptr); } - for (size_t i = 0; i < s.Aggs_.size(); ++i) { - if (output[Keys_.size() + i]) { - s.AggBuilders_[i]->Add(ptr); - s.Aggs_[i]->DestroyState(ptr); + if constexpr (std::is_same<TKey, TSSOKey>::value) { + const auto& key = hash.GetKey(iter); + if (!key.IsInplace()) { + NYql::PrefetchForRead(key.AsView().Data()); } - - ptr += s.Aggs_[i]->StateSize; + } else if constexpr (std::is_same<TKey, TExternalFixedSizeKey>::value) { + const auto& key = hash.GetKey(iter); + NYql::PrefetchForRead(key.Data); } - s.OutputBlockSize_++; } + + iterateBatch(); return true; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp index 6a5d358dac..136c3efc52 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp @@ -2,7 +2,7 @@ #include <ydb/library/yql/minikql/arrow/arrow_defs.h> -#include <arrow/array/builder_primitive.h> +#include <ydb/library/yql/minikql/computation/mkql_block_builder.h> namespace NKikimr { namespace NMiniKQL { @@ -16,25 +16,22 @@ struct TState { class TColumnBuilder : public IAggColumnBuilder { public: TColumnBuilder(ui64 size, TComputationContext& ctx) - : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool) + : Builder_(TTypeInfoHelper(), arrow::uint64(), ctx.ArrowMemoryPool, size) , Ctx_(ctx) { - ARROW_OK(Builder_.Reserve(size)); } void Add(const void* state) final { auto typedState = static_cast<const TState*>(state); - Builder_.UnsafeAppend(typedState->Count_); + Builder_.Add(TBlockItem(typedState->Count_)); } NUdf::TUnboxedValue Build() final { - std::shared_ptr<arrow::ArrayData> result; - ARROW_OK(Builder_.FinishInternal(&result)); - return Ctx_.HolderFactory.CreateArrowBlock(result); + return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true)); } private: - arrow::UInt64Builder Builder_; + NYql::NUdf::TFixedSizeArrayBuilder<ui64, false> Builder_; TComputationContext& Ctx_; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp index d6e449cc92..e91762c2b6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -77,11 +77,12 @@ public: void Add(const void* state) final { auto typedState = static_cast<const TAvgState*>(state); + auto tupleBuilder = static_cast<NUdf::TTupleArrayBuilder<true>*>(Builder_.get()); if (typedState->Count_) { TBlockItem tupleItems[] = { TBlockItem(typedState->Sum_), TBlockItem(typedState->Count_)} ; - Builder_->Add(TBlockItem(tupleItems)); + tupleBuilder->Add(TBlockItem(tupleItems)); } else { - Builder_->Add(TBlockItem()); + tupleBuilder->Add(TBlockItem()); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h index 23699b1c53..ae3e9e4fa5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h @@ -3,6 +3,9 @@ #include <util/system/types.h> #include <util/generic/yexception.h> #include <vector> +#include <span> + +#include <ydb/library/yql/utils/prefetch.h> #include <util/digest/city.h> #include <util/generic/scope.h> @@ -15,6 +18,26 @@ struct TRobinHoodDefaultSettings { static constexpr bool CacheHash = !std::is_arithmetic<TKey>::value; }; +template <typename TKey> +struct TRobinHoodBatchRequestItem { + // input + alignas(TKey) char KeyStorage[sizeof(TKey)]; + + const TKey& GetKey() const { + return *reinterpret_cast<const TKey*>(KeyStorage); + } + + void ConstructKey(const TKey& key) { + new (KeyStorage) TKey(key); + } + + // intermediate data + ui64 Hash; + char* InitialIterator; +}; + +constexpr ui32 PrefetchBatchSize = 64; + //TODO: only POD key & payloads are now supported template <typename TKey, typename TEqual, typename THash, typename TAllocator, typename TDeriv, bool CacheHash> class TRobinHoodHashBase { @@ -75,7 +98,9 @@ protected: public: // returns iterator Y_FORCE_INLINE char* Insert(TKey key, bool& isNew) { - auto ret = InsertImpl(key, HashLocal(key), isNew, Capacity, Data, DataEnd); + auto hash = HashLocal(key); + auto ptr = MakeIterator(hash, Data, Capacity); + auto ret = InsertImpl(key, hash, isNew, Data, DataEnd, ptr); Size += isNew ? 1 : 0; return ret; } @@ -87,6 +112,28 @@ public: } } + template <typename TSink> + Y_NO_INLINE void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) { + while (2 * (Size + batchRequest.size()) >= Capacity) { + Grow(); + } + + for (size_t i = 0; i < batchRequest.size(); ++i) { + auto& r = batchRequest[i]; + r.Hash = HashLocal(r.GetKey()); + r.InitialIterator = MakeIterator(r.Hash, Data, Capacity); + NYql::PrefetchForWrite(r.InitialIterator); + } + + for (size_t i = 0; i < batchRequest.size(); ++i) { + auto& r = batchRequest[i]; + bool isNew; + auto iter = InsertImpl(r.GetKey(), r.Hash, isNew, Data, DataEnd, r.InitialIterator); + Size += isNew ? 1 : 0; + sink(i, iter, isNew); + } + } + ui64 GetCapacity() const { return Capacity; } @@ -124,11 +171,11 @@ public: return DataEnd; } - void Advance(char*& ptr) { + void Advance(char*& ptr) const { ptr += AsDeriv().GetCellSize(); } - void Advance(const char*& ptr) { + void Advance(const char*& ptr) const { ptr += AsDeriv().GetCellSize(); } @@ -161,12 +208,20 @@ public: } private: - Y_FORCE_INLINE char* InsertImpl(TKey key, const ui64 hash, bool& isNew, ui64 capacity, char* data, char* dataEnd) { - isNew = false; - TPSLStorage psl(hash); - // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/ + struct TInternalBatchRequestItem : TRobinHoodBatchRequestItem<TKey> { + char* OriginalIterator; + }; + + Y_FORCE_INLINE char* MakeIterator(const ui64 hash, char* data, ui64 capacity) { + // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/ ui64 bucket = ((SelfHash ^ hash) * 11400714819323198485llu) & (capacity - 1); char* ptr = data + AsDeriv().GetCellSize() * bucket; + return ptr; + } + + Y_FORCE_INLINE char* InsertImpl(TKey key, const ui64 hash, bool& isNew, char* data, char* dataEnd, char* ptr) { + isNew = false; + TPSLStorage psl(hash); char* returnPtr; typename TDeriv::TPayloadStore tmpPayload; for (;;) { @@ -226,7 +281,7 @@ private: } } - void Grow() { + Y_NO_INLINE void Grow() { ui64 growFactor; if (Capacity < 100'000) { growFactor = 8; @@ -242,28 +297,48 @@ private: Allocator.deallocate(newData, newDataEnd - newData); }; + std::array<TInternalBatchRequestItem, PrefetchBatchSize> batch; + ui32 batchLen = 0; for (auto iter = Begin(); iter != End(); Advance(iter)) { if (GetPSL(iter).Distance < 0) { continue; } - bool isNew; - auto& key = GetKey(iter); - char* newIter = nullptr; + if (batchLen == batch.size()) { + CopyBatch({batch.data(), batchLen}, newData, newDataEnd); + batchLen = 0; + } + + auto& r = batch[batchLen++]; + r.ConstructKey(GetKey(iter)); + r.OriginalIterator = iter; + if constexpr (CacheHash) { - newIter = InsertImpl(key, GetPSL(iter).Hash, isNew, newCapacity, newData, newDataEnd); + r.Hash = GetPSL(iter).Hash; } else { - newIter = InsertImpl(key, HashLocal(key), isNew, newCapacity, newData, newDataEnd); + r.Hash = HashLocal(r.GetKey()); } - Y_ASSERT(isNew); - AsDeriv().CopyPayload(GetMutablePayload(newIter), GetPayload(iter)); + + r.InitialIterator = MakeIterator(r.Hash, newData, newCapacity); + NYql::PrefetchForWrite(r.InitialIterator); } + CopyBatch({batch.data(), batchLen}, newData, newDataEnd); + Capacity = newCapacity; std::swap(Data, newData); std::swap(DataEnd, newDataEnd); } + Y_NO_INLINE void CopyBatch(std::span<TInternalBatchRequestItem> batch, char* newData, char* newDataEnd) { + for (auto& r : batch) { + bool isNew; + auto iter = InsertImpl(r.GetKey(), r.Hash, isNew, newData, newDataEnd, r.InitialIterator); + Y_ASSERT(isNew); + AsDeriv().CopyPayload(GetMutablePayload(iter), GetPayload(r.OriginalIterator)); + } + } + void AdvancePointer(char*& ptr, char* begin, char* end) const { ptr += AsDeriv().GetCellSize(); ptr = (ptr == end) ? begin : ptr; diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make.inc b/ydb/library/yql/minikql/comp_nodes/ya.make.inc index 18a5d2ef28..1a84c6e9ba 100644 --- a/ydb/library/yql/minikql/comp_nodes/ya.make.inc +++ b/ydb/library/yql/minikql/comp_nodes/ya.make.inc @@ -138,6 +138,10 @@ PEERDIR( library/cpp/actors/core ) +CFLAGS( + -mprfchw +) + YQL_LAST_ABI_VERSION() PROVIDES(mkql_comp_nodes) diff --git a/ydb/library/yql/utils/prefetch.h b/ydb/library/yql/utils/prefetch.h new file mode 100644 index 0000000000..50eea7df5c --- /dev/null +++ b/ydb/library/yql/utils/prefetch.h @@ -0,0 +1,13 @@ +#pragma once + +namespace NYql { + +inline void PrefetchForRead(const void* ptr) { + __builtin_prefetch(ptr, 0, 3); +} + +inline void PrefetchForWrite(void* ptr) { + __builtin_prefetch(ptr, 1, 3); +} + +} // namespace NYql |