diff options
author | vladluk <vladluk@yandex-team.com> | 2024-11-27 18:46:11 +0300 |
---|---|---|
committer | vladluk <vladluk@yandex-team.com> | 2024-11-27 18:58:35 +0300 |
commit | 62c911c4023eaeaa30616cec70b8ec1401f73f04 (patch) | |
tree | f7b4758ca6130381091e85a7cce7aa845c668da9 /yql/essentials | |
parent | fca381ca1d4e2d8a3fa5395ff4bb81d3b1e72447 (diff) | |
download | ydb-62c911c4023eaeaa30616cec70b8ec1401f73f04.tar.gz |
Distribute value in spilling buckets more evenly
Distribute values more evenly between buckets using extra hash. And make so in-memory buckets processed before spilled buckets when extracting data.
Also add missing buckets counters increments.
commit_hash:501f4324fdf632eadcf221ccb8ae648238583cc0
Diffstat (limited to 'yql/essentials')
6 files changed, 116 insertions, 23 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 55fd5800b8..af9e1d4ce0 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -16,6 +16,9 @@ #include <util/string/cast.h> + +#include <contrib/libs/xxhash/xxhash.h> + namespace NKikimr { namespace NMiniKQL { @@ -487,8 +490,7 @@ public: return isNew ? ETasteResult::Init : ETasteResult::Update; } - auto hash = Hasher(ViewForKeyAndState.data()); - auto bucketId = hash % SpilledBucketCount; + auto bucketId = ChooseBucket(ViewForKeyAndState.data()); auto& bucket = SpilledBuckets[bucketId]; if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) { @@ -532,7 +534,14 @@ public: return value; } + private: + ui64 ChooseBucket(const NUdf::TUnboxedValuePod *const key) { + auto provided_hash = Hasher(key); + XXH64_hash_t bucket = XXH64(&provided_hash, sizeof(provided_hash), 0) % SpilledBucketCount; + return bucket; + } + EUpdateResult FlushSpillingBuffersAndWait() { UpdateSpillingBuckets(); @@ -595,14 +604,17 @@ private: SplitStateSpillingBucket = -1; } while (const auto keyAndState = static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Extract())) { - auto hash = Hasher(keyAndState); //Hasher uses only key for hashing - auto bucketId = hash % SpilledBucketCount; + auto bucketId = ChooseBucket(keyAndState); // This uses only key for hashing auto& bucket = SpilledBuckets[bucketId]; bucket.LineCount++; if (bucket.BucketState != TSpilledBucket::EBucketState::InMemory) { - bucket.BucketState = TSpilledBucket::EBucketState::SpillingState; + if (bucket.BucketState != TSpilledBucket::EBucketState::SpillingState) { + bucket.BucketState = TSpilledBucket::EBucketState::SpillingState; + SpillingBucketsCount++; + } + bucket.AsyncWriteOperation = bucket.SpilledState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()}); for (size_t i = 0; i < KeyAndStateType->GetElementsCount(); ++i) { //releasing values stored in unsafe TUnboxedValue buffer @@ -631,10 +643,11 @@ private: ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber(); SplitStateSpillingBucket = bucketNumToSpill; - InMemoryBucketsCount--; auto& bucket = SpilledBuckets[bucketNumToSpill]; bucket.BucketState = TSpilledBucket::EBucketState::SpillingState; + SpillingBucketsCount++; + InMemoryBucketsCount--; while (const auto keyAndState = static_cast<NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState->Extract())) { bucket.AsyncWriteOperation = bucket.SpilledState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()}); @@ -664,6 +677,7 @@ private: bucket.InMemoryProcessingState->ReadMore<false>(); bucket.BucketState = TSpilledBucket::EBucketState::SpillingData; + SpillingBucketsCount--; } } @@ -848,6 +862,12 @@ private: YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled"; MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error"); MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); + + std::sort(SpilledBuckets.begin(), SpilledBuckets.end(), [](const TSpilledBucket& lhs, const TSpilledBucket& rhs) { + bool lhs_in_memory = lhs.BucketState == TSpilledBucket::EBucketState::InMemory; + bool rhs_in_memory = rhs.BucketState == TSpilledBucket::EBucketState::InMemory; + return lhs_in_memory > rhs_in_memory; + }); break; } diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp index 8faf061d3b..55cb6babbf 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp @@ -1396,6 +1396,63 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto fetchStatus = streamVal.Fetch(item); UNIT_ASSERT_EQUAL(fetchStatus, NUdf::EFetchStatus::Finish); } + + Y_UNIT_TEST_LLVM(TestSpillingBucketsDistribution) { + const size_t expectedBucketsCount = 128; + const size_t sampleSize = 8 * 128; + + TSetup<LLVM, true> setup; + + std::vector<std::pair<ui64, ui64>> samples(sampleSize); + std::generate(samples.begin(), samples.end(), [key = (ui64)1] () mutable -> std::pair<ui64, ui64> { + key += 64; + return {key, 1}; + }); + + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui64>::Id), pb.NewDataType(NUdf::TDataType<ui64>::Id)})); + const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); + + const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideLastCombinerWithSpilling(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }), + [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back()}; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back())}; }, + [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), state.front()}; }), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); } + )); + + const auto spillerFactory = std::make_shared<TMockSpillerFactory>(); + const auto graph = setup.BuildGraph(pgmReturn, {list}); + graph->GetContext().SpillerFactory = spillerFactory; + + NUdf::TUnboxedValue* items = nullptr; + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(samples.size(), items)); + for (const auto& sample : samples) { + NUdf::TUnboxedValue* pair = nullptr; + *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair); + pair[0] = NUdf::TUnboxedValuePod(sample.first); + pair[1] = NUdf::TUnboxedValuePod(sample.second); + } + + const auto& value = graph->GetValue(); + + NUdf::TUnboxedValue item; + while (value.Fetch(item) != NUdf::EFetchStatus::Finish) { + ; + } + + UNIT_ASSERT_EQUAL_C(spillerFactory->GetCreatedSpillers().size(), 1, "WideLastCombiner expected to create one spiller "); + const auto wideCombinerSpiller = std::dynamic_pointer_cast<TMockSpiller>(spillerFactory->GetCreatedSpillers()[0]); + UNIT_ASSERT_C(wideCombinerSpiller != nullptr, "MockSpillerFactory expected to create only MockSpillers"); + + auto flushedBucketsSizes = wideCombinerSpiller->GetPutSizes(); + UNIT_ASSERT_EQUAL_C(flushedBucketsSizes.size(), expectedBucketsCount, "Spiller doesn't Put expected number of buckets"); + + auto anyEmpty = std::any_of(flushedBucketsSizes.begin(), flushedBucketsSizes.end(), [](size_t size) { return size == 0; }); + UNIT_ASSERT_C(!anyEmpty, "Spiller flushed empty bucket"); + } } Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerPerfTest) { diff --git a/yql/essentials/minikql/comp_nodes/ya.make.inc b/yql/essentials/minikql/comp_nodes/ya.make.inc index 3c1531eac7..518f5f3c43 100644 --- a/yql/essentials/minikql/comp_nodes/ya.make.inc +++ b/yql/essentials/minikql/comp_nodes/ya.make.inc @@ -143,7 +143,7 @@ COPY( AUTO FROM ${ORIG_SRC_DIR} ${ORIG_SOURCES} - OUTPUT_INCLUDES + OUTPUT_INCLUDES ${BINDIR}/yql/essentials/minikql/computation/mkql_computation_node_codegen.h ${BINDIR}/yql/essentials/minikql/computation/mkql_block_impl_codegen.h ${BINDIR}/yql/essentials/minikql/computation/mkql_llvm_base.h @@ -154,6 +154,7 @@ COPY( PEERDIR( contrib/libs/apache/arrow + contrib/libs/xxhash yql/essentials/types/binary_json yql/essentials/minikql yql/essentials/minikql/arrow diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_spiller_adapter.h index 462a2a7c5e..8ddcfe46be 100644 --- a/yql/essentials/minikql/computation/mkql_spiller_adapter.h +++ b/yql/essentials/minikql/computation/mkql_spiller_adapter.h @@ -28,12 +28,12 @@ public: /// In this case a caller must wait operation completion and call StoreCompleted. /// Design note: not using Subscribe on a Future here to avoid possible race condition std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) { - Packer.AddWideItem(wideItem.data(), wideItem.size()); - if(Packer.PackedSizeEstimate() > SizeLimit) { - return Spiller->Put(std::move(Packer.Finish())); - } else { + Packer.AddWideItem(wideItem.data(), wideItem.size()); + if (Packer.PackedSizeEstimate() > SizeLimit) { + return Spiller->Put(std::move(Packer.Finish())); + } else { return std::nullopt; - } + } } std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() { diff --git a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h index c053b2c52e..4b0b2ed24a 100644 --- a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h +++ b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h @@ -12,8 +12,17 @@ public: } ISpiller::TPtr CreateSpiller() override { - return CreateMockSpiller(); + auto new_spiller = CreateMockSpiller(); + Spillers_.push_back(new_spiller); + return new_spiller; } + + const std::vector<ISpiller::TPtr>& GetCreatedSpillers() const { + return Spillers_; + } + +private: + std::vector<ISpiller::TPtr> Spillers_; }; } // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mock_spiller_ut.h b/yql/essentials/minikql/computation/mock_spiller_ut.h index 42846eab1f..715018f3e0 100644 --- a/yql/essentials/minikql/computation/mock_spiller_ut.h +++ b/yql/essentials/minikql/computation/mock_spiller_ut.h @@ -11,22 +11,23 @@ namespace NKikimr::NMiniKQL { class TMockSpiller: public ISpiller{ public: TMockSpiller() - : NextKey(0) + : NextKey_(0) {} NThreading::TFuture<TKey> Put(NYql::TChunkedBuffer&& blob) override { auto promise = NThreading::NewPromise<ISpiller::TKey>(); - auto key = NextKey; - Storage[key] = std::move(blob); - NextKey++; + auto key = NextKey_; + Storage_[key] = std::move(blob); + PutSizes_.push_back(Storage_[key].Size()); + NextKey_++; promise.SetValue(key); return promise.GetFuture();; } NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Get(TKey key) override { auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>(); - if (auto it = Storage.find(key); it != Storage.end()) { + if (auto it = Storage_.find(key); it != Storage_.end()) { promise.SetValue(it->second); } else { promise.SetValue(std::nullopt); @@ -37,9 +38,9 @@ public: NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Extract(TKey key) override { auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>(); - if (auto it = Storage.find(key); it != Storage.end()) { + if (auto it = Storage_.find(key); it != Storage_.end()) { promise.SetValue(std::move(it->second)); - Storage.erase(it); + Storage_.erase(it); } else { promise.SetValue(std::nullopt); } @@ -49,12 +50,17 @@ public: NThreading::TFuture<void> Delete(TKey key) override { auto promise = NThreading::NewPromise<void>(); promise.SetValue(); - Storage.erase(key); + Storage_.erase(key); return promise.GetFuture(); } + + const std::vector<size_t>& GetPutSizes() const { + return PutSizes_; + } private: - ISpiller::TKey NextKey; - std::unordered_map<ISpiller::TKey, NYql::TChunkedBuffer> Storage; + ISpiller::TKey NextKey_; + std::unordered_map<ISpiller::TKey, NYql::TChunkedBuffer> Storage_; + std::vector<size_t> PutSizes_; }; inline ISpiller::TPtr CreateMockSpiller() { return std::make_shared<TMockSpiller>(); |