aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials
diff options
context:
space:
mode:
authorvladluk <vladluk@yandex-team.com>2024-11-27 18:46:11 +0300
committervladluk <vladluk@yandex-team.com>2024-11-27 18:58:35 +0300
commit62c911c4023eaeaa30616cec70b8ec1401f73f04 (patch)
treef7b4758ca6130381091e85a7cce7aa845c668da9 /yql/essentials
parentfca381ca1d4e2d8a3fa5395ff4bb81d3b1e72447 (diff)
downloadydb-62c911c4023eaeaa30616cec70b8ec1401f73f04.tar.gz
Distribute value in spilling buckets more evenly
Distribute values more evenly between buckets using extra hash. And make so in-memory buckets processed before spilled buckets when extracting data. Also add missing buckets counters increments. commit_hash:501f4324fdf632eadcf221ccb8ae648238583cc0
Diffstat (limited to 'yql/essentials')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp32
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp57
-rw-r--r--yql/essentials/minikql/comp_nodes/ya.make.inc3
-rw-r--r--yql/essentials/minikql/computation/mkql_spiller_adapter.h10
-rw-r--r--yql/essentials/minikql/computation/mock_spiller_factory_ut.h11
-rw-r--r--yql/essentials/minikql/computation/mock_spiller_ut.h26
6 files changed, 116 insertions, 23 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index 55fd5800b8..af9e1d4ce0 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -16,6 +16,9 @@
#include <util/string/cast.h>
+
+#include <contrib/libs/xxhash/xxhash.h>
+
namespace NKikimr {
namespace NMiniKQL {
@@ -487,8 +490,7 @@ public:
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
- auto hash = Hasher(ViewForKeyAndState.data());
- auto bucketId = hash % SpilledBucketCount;
+ auto bucketId = ChooseBucket(ViewForKeyAndState.data());
auto& bucket = SpilledBuckets[bucketId];
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) {
@@ -532,7 +534,14 @@ public:
return value;
}
+
private:
+ ui64 ChooseBucket(const NUdf::TUnboxedValuePod *const key) {
+ auto provided_hash = Hasher(key);
+ XXH64_hash_t bucket = XXH64(&provided_hash, sizeof(provided_hash), 0) % SpilledBucketCount;
+ return bucket;
+ }
+
EUpdateResult FlushSpillingBuffersAndWait() {
UpdateSpillingBuckets();
@@ -595,14 +604,17 @@ private:
SplitStateSpillingBucket = -1;
}
while (const auto keyAndState = static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Extract())) {
- auto hash = Hasher(keyAndState); //Hasher uses only key for hashing
- auto bucketId = hash % SpilledBucketCount;
+ auto bucketId = ChooseBucket(keyAndState); // This uses only key for hashing
auto& bucket = SpilledBuckets[bucketId];
bucket.LineCount++;
if (bucket.BucketState != TSpilledBucket::EBucketState::InMemory) {
- bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
+ if (bucket.BucketState != TSpilledBucket::EBucketState::SpillingState) {
+ bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
+ SpillingBucketsCount++;
+ }
+
bucket.AsyncWriteOperation = bucket.SpilledState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()});
for (size_t i = 0; i < KeyAndStateType->GetElementsCount(); ++i) {
//releasing values stored in unsafe TUnboxedValue buffer
@@ -631,10 +643,11 @@ private:
ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber();
SplitStateSpillingBucket = bucketNumToSpill;
- InMemoryBucketsCount--;
auto& bucket = SpilledBuckets[bucketNumToSpill];
bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
+ SpillingBucketsCount++;
+ InMemoryBucketsCount--;
while (const auto keyAndState = static_cast<NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState->Extract())) {
bucket.AsyncWriteOperation = bucket.SpilledState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()});
@@ -664,6 +677,7 @@ private:
bucket.InMemoryProcessingState->ReadMore<false>();
bucket.BucketState = TSpilledBucket::EBucketState::SpillingData;
+ SpillingBucketsCount--;
}
}
@@ -848,6 +862,12 @@ private:
YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled";
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
+
+ std::sort(SpilledBuckets.begin(), SpilledBuckets.end(), [](const TSpilledBucket& lhs, const TSpilledBucket& rhs) {
+ bool lhs_in_memory = lhs.BucketState == TSpilledBucket::EBucketState::InMemory;
+ bool rhs_in_memory = rhs.BucketState == TSpilledBucket::EBucketState::InMemory;
+ return lhs_in_memory > rhs_in_memory;
+ });
break;
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
index 8faf061d3b..55cb6babbf 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
@@ -1396,6 +1396,63 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
const auto fetchStatus = streamVal.Fetch(item);
UNIT_ASSERT_EQUAL(fetchStatus, NUdf::EFetchStatus::Finish);
}
+
+ Y_UNIT_TEST_LLVM(TestSpillingBucketsDistribution) {
+ const size_t expectedBucketsCount = 128;
+ const size_t sampleSize = 8 * 128;
+
+ TSetup<LLVM, true> setup;
+
+ std::vector<std::pair<ui64, ui64>> samples(sampleSize);
+ std::generate(samples.begin(), samples.end(), [key = (ui64)1] () mutable -> std::pair<ui64, ui64> {
+ key += 64;
+ return {key, 1};
+ });
+
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui64>::Id), pb.NewDataType(NUdf::TDataType<ui64>::Id)}));
+ const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
+
+ const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideLastCombinerWithSpilling(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back()}; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back())}; },
+ [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), state.front()}; }),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); }
+ ));
+
+ const auto spillerFactory = std::make_shared<TMockSpillerFactory>();
+ const auto graph = setup.BuildGraph(pgmReturn, {list});
+ graph->GetContext().SpillerFactory = spillerFactory;
+
+ NUdf::TUnboxedValue* items = nullptr;
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(samples.size(), items));
+ for (const auto& sample : samples) {
+ NUdf::TUnboxedValue* pair = nullptr;
+ *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
+ pair[0] = NUdf::TUnboxedValuePod(sample.first);
+ pair[1] = NUdf::TUnboxedValuePod(sample.second);
+ }
+
+ const auto& value = graph->GetValue();
+
+ NUdf::TUnboxedValue item;
+ while (value.Fetch(item) != NUdf::EFetchStatus::Finish) {
+ ;
+ }
+
+ UNIT_ASSERT_EQUAL_C(spillerFactory->GetCreatedSpillers().size(), 1, "WideLastCombiner expected to create one spiller ");
+ const auto wideCombinerSpiller = std::dynamic_pointer_cast<TMockSpiller>(spillerFactory->GetCreatedSpillers()[0]);
+ UNIT_ASSERT_C(wideCombinerSpiller != nullptr, "MockSpillerFactory expected to create only MockSpillers");
+
+ auto flushedBucketsSizes = wideCombinerSpiller->GetPutSizes();
+ UNIT_ASSERT_EQUAL_C(flushedBucketsSizes.size(), expectedBucketsCount, "Spiller doesn't Put expected number of buckets");
+
+ auto anyEmpty = std::any_of(flushedBucketsSizes.begin(), flushedBucketsSizes.end(), [](size_t size) { return size == 0; });
+ UNIT_ASSERT_C(!anyEmpty, "Spiller flushed empty bucket");
+ }
}
Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerPerfTest) {
diff --git a/yql/essentials/minikql/comp_nodes/ya.make.inc b/yql/essentials/minikql/comp_nodes/ya.make.inc
index 3c1531eac7..518f5f3c43 100644
--- a/yql/essentials/minikql/comp_nodes/ya.make.inc
+++ b/yql/essentials/minikql/comp_nodes/ya.make.inc
@@ -143,7 +143,7 @@ COPY(
AUTO
FROM ${ORIG_SRC_DIR}
${ORIG_SOURCES}
- OUTPUT_INCLUDES
+ OUTPUT_INCLUDES
${BINDIR}/yql/essentials/minikql/computation/mkql_computation_node_codegen.h
${BINDIR}/yql/essentials/minikql/computation/mkql_block_impl_codegen.h
${BINDIR}/yql/essentials/minikql/computation/mkql_llvm_base.h
@@ -154,6 +154,7 @@ COPY(
PEERDIR(
contrib/libs/apache/arrow
+ contrib/libs/xxhash
yql/essentials/types/binary_json
yql/essentials/minikql
yql/essentials/minikql/arrow
diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_spiller_adapter.h
index 462a2a7c5e..8ddcfe46be 100644
--- a/yql/essentials/minikql/computation/mkql_spiller_adapter.h
+++ b/yql/essentials/minikql/computation/mkql_spiller_adapter.h
@@ -28,12 +28,12 @@ public:
/// In this case a caller must wait operation completion and call StoreCompleted.
/// Design note: not using Subscribe on a Future here to avoid possible race condition
std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) {
- Packer.AddWideItem(wideItem.data(), wideItem.size());
- if(Packer.PackedSizeEstimate() > SizeLimit) {
- return Spiller->Put(std::move(Packer.Finish()));
- } else {
+ Packer.AddWideItem(wideItem.data(), wideItem.size());
+ if (Packer.PackedSizeEstimate() > SizeLimit) {
+ return Spiller->Put(std::move(Packer.Finish()));
+ } else {
return std::nullopt;
- }
+ }
}
std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() {
diff --git a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
index c053b2c52e..4b0b2ed24a 100644
--- a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
+++ b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
@@ -12,8 +12,17 @@ public:
}
ISpiller::TPtr CreateSpiller() override {
- return CreateMockSpiller();
+ auto new_spiller = CreateMockSpiller();
+ Spillers_.push_back(new_spiller);
+ return new_spiller;
}
+
+ const std::vector<ISpiller::TPtr>& GetCreatedSpillers() const {
+ return Spillers_;
+ }
+
+private:
+ std::vector<ISpiller::TPtr> Spillers_;
};
} // namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/computation/mock_spiller_ut.h b/yql/essentials/minikql/computation/mock_spiller_ut.h
index 42846eab1f..715018f3e0 100644
--- a/yql/essentials/minikql/computation/mock_spiller_ut.h
+++ b/yql/essentials/minikql/computation/mock_spiller_ut.h
@@ -11,22 +11,23 @@ namespace NKikimr::NMiniKQL {
class TMockSpiller: public ISpiller{
public:
TMockSpiller()
- : NextKey(0)
+ : NextKey_(0)
{}
NThreading::TFuture<TKey> Put(NYql::TChunkedBuffer&& blob) override {
auto promise = NThreading::NewPromise<ISpiller::TKey>();
- auto key = NextKey;
- Storage[key] = std::move(blob);
- NextKey++;
+ auto key = NextKey_;
+ Storage_[key] = std::move(blob);
+ PutSizes_.push_back(Storage_[key].Size());
+ NextKey_++;
promise.SetValue(key);
return promise.GetFuture();;
}
NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Get(TKey key) override {
auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>();
- if (auto it = Storage.find(key); it != Storage.end()) {
+ if (auto it = Storage_.find(key); it != Storage_.end()) {
promise.SetValue(it->second);
} else {
promise.SetValue(std::nullopt);
@@ -37,9 +38,9 @@ public:
NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Extract(TKey key) override {
auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>();
- if (auto it = Storage.find(key); it != Storage.end()) {
+ if (auto it = Storage_.find(key); it != Storage_.end()) {
promise.SetValue(std::move(it->second));
- Storage.erase(it);
+ Storage_.erase(it);
} else {
promise.SetValue(std::nullopt);
}
@@ -49,12 +50,17 @@ public:
NThreading::TFuture<void> Delete(TKey key) override {
auto promise = NThreading::NewPromise<void>();
promise.SetValue();
- Storage.erase(key);
+ Storage_.erase(key);
return promise.GetFuture();
}
+
+ const std::vector<size_t>& GetPutSizes() const {
+ return PutSizes_;
+ }
private:
- ISpiller::TKey NextKey;
- std::unordered_map<ISpiller::TKey, NYql::TChunkedBuffer> Storage;
+ ISpiller::TKey NextKey_;
+ std::unordered_map<ISpiller::TKey, NYql::TChunkedBuffer> Storage_;
+ std::vector<size_t> PutSizes_;
};
inline ISpiller::TPtr CreateMockSpiller() {
return std::make_shared<TMockSpiller>();