diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-11 07:49:19 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-11 08:19:30 +0300 |
commit | 8ae39e1bd0ae849c99563799c3a1eaad2360018d (patch) | |
tree | 63639a6b62c774ca1e4aa812b5999faeba6b529e | |
parent | d51343356504193b7cc73d7f5586aac2b220ec89 (diff) | |
download | ydb-8ae39e1bd0ae849c99563799c3a1eaad2360018d.tar.gz |
KIKIMR-18932: counters and stats usage for splitting
28 files changed, 422 insertions, 69 deletions
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt index 71ef940f82..b67a55fc72 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt @@ -24,4 +24,5 @@ target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt index edfdee32c7..24d27e4043 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt @@ -25,4 +25,5 @@ target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt index edfdee32c7..24d27e4043 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt @@ -25,4 +25,5 @@ target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt index 71ef940f82..b67a55fc72 100644 --- a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt @@ -24,4 +24,5 @@ target_sources(tx-columnshard-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/common/owner.cpp b/ydb/core/tx/columnshard/counters/common/owner.cpp index 69ea6bfd61..d5d4d5b28b 100644 --- a/ydb/core/tx/columnshard/counters/common/owner.cpp +++ b/ydb/core/tx/columnshard/counters/common/owner.cpp @@ -50,4 +50,8 @@ void TCommonCountersOwner::DeepSubGroup(const TString& id, const TString& value) SubGroup = SubGroup->GetSubgroup(id, value); } +void TCommonCountersOwner::DeepSubGroup(const TString& componentName) { + SubGroup = SubGroup->GetSubgroup("component", componentName); +} + } diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h index bf5e3b7525..a2f0b5273a 100644 --- a/ydb/core/tx/columnshard/counters/common/owner.h +++ b/ydb/core/tx/columnshard/counters/common/owner.h @@ -16,6 +16,19 @@ private: protected: std::shared_ptr<TValueAggregationAgent> GetValueAutoAggregations(const TString& name) const; std::shared_ptr<TValueAggregationClient> GetValueAutoAggregationsClient(const TString& name) const; + + TCommonCountersOwner(const TCommonCountersOwner& sameAs) + : SubGroup(sameAs.SubGroup) + , ModuleId(sameAs.ModuleId) { + + } + + TCommonCountersOwner(const TCommonCountersOwner& sameAs, const TString& componentName) + : SubGroup(sameAs.SubGroup) + , ModuleId(sameAs.ModuleId) + { + DeepSubGroup("component", componentName); + } public: const TString& GetModuleId() const { return ModuleId; @@ -25,6 +38,7 @@ public: NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const; NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const; void DeepSubGroup(const TString& id, const TString& value); + void DeepSubGroup(const TString& componentName); NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const; TCommonCountersOwner(const TString& module, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr); diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp index 194bdf0bb5..cbe62ae082 100644 --- a/ydb/core/tx/columnshard/counters/indexation.cpp +++ b/ydb/core/tx/columnshard/counters/indexation.cpp @@ -21,12 +21,6 @@ TIndexationCounters::TIndexationCounters(const TString& module) MovedPortions = TBase::GetDeriviative("Moved/Portions"); MovedPortionBytes = TBase::GetDeriviative("Moved/Bytes"); - TrashDataSerializationBytes = TBase::GetDeriviative("TrashDataSerialization/Bytes"); - TrashDataSerialization = TBase::GetDeriviative("TrashDataSerialization/Count"); - TrashDataSerializationHistogramBytes = TBase::GetHistogram("TrashDataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024)); - CorrectDataSerializationBytes = TBase::GetDeriviative("CorrectDataSerialization/Bytes"); - CorrectDataSerialization = TBase::GetDeriviative("CorrectDataSerialization/Count"); - CompactionDuration = TBase::GetHistogram("CompactionDuration", NMonitoring::ExponentialHistogram(18, 2, 20)); HistogramCompactionInputBytes = TBase::GetHistogram("CompactionInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024)); CompactionInputBytes = TBase::GetDeriviative("CompactionInput/Bytes"); @@ -39,6 +33,8 @@ TIndexationCounters::TIndexationCounters(const TString& module) TooSmallBlob = TBase::GetDeriviative("TooSmallBlob/Count"); TooSmallBlobFinish = TBase::GetDeriviative("TooSmallBlobFinish/Count"); TooSmallBlobStart = TBase::GetDeriviative("TooSmallBlobStart/Count"); + + SplitterCounters = std::make_shared<TSplitterCounters>(*this); } } diff --git a/ydb/core/tx/columnshard/counters/indexation.h b/ydb/core/tx/columnshard/counters/indexation.h index ac449a93b2..4b926d2609 100644 --- a/ydb/core/tx/columnshard/counters/indexation.h +++ b/ydb/core/tx/columnshard/counters/indexation.h @@ -1,6 +1,8 @@ #pragma once -#include <library/cpp/monlib/dynamic_counters/counters.h> #include "common/owner.h" +#include "splitter.h" + +#include <library/cpp/monlib/dynamic_counters/counters.h> namespace NKikimr::NColumnShard { @@ -24,11 +26,7 @@ public: NMonitoring::TDynamicCounters::TCounterPtr MovedPortions; NMonitoring::TDynamicCounters::TCounterPtr MovedPortionBytes; - NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerializationBytes; - NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerialization; - NMonitoring::THistogramPtr TrashDataSerializationHistogramBytes; - NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerializationBytes; - NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerialization; + std::shared_ptr<TSplitterCounters> SplitterCounters; NMonitoring::THistogramPtr SplittedPortionLargestColumnSize; NMonitoring::THistogramPtr SimpleSplitPortionLargestColumnSize; diff --git a/ydb/core/tx/columnshard/counters/splitter.cpp b/ydb/core/tx/columnshard/counters/splitter.cpp new file mode 100644 index 0000000000..6510d37d83 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/splitter.cpp @@ -0,0 +1,14 @@ +#include "splitter.h" + +namespace NKikimr::NColumnShard { + +TSplitterCounters::TSplitterCounters(const TCommonCountersOwner& owner) + : TBase(owner, "splitter") + , SimpleSplitter(owner, "simple") + , BySizeSplitter(owner, "by_size") + , SplittedBlobs(owner, "splitted") + , MonoBlobs(owner, "mono") +{ +} + +} diff --git a/ydb/core/tx/columnshard/counters/splitter.h b/ydb/core/tx/columnshard/counters/splitter.h new file mode 100644 index 0000000000..cbde8265a2 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/splitter.h @@ -0,0 +1,79 @@ +#pragma once +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include "common/owner.h" + +namespace NKikimr::NColumnShard { + +class TSplitterCaseCounters: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerializationBytes; + NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerialization; + NMonitoring::THistogramPtr TrashDataSerializationHistogramBytes; + + NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerializationBytes; + NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerialization; + NMonitoring::THistogramPtr CorrectDataSerializationHistogramBytes; +public: + TSplitterCaseCounters(const TCommonCountersOwner& owner, const TString& splitterType) + : TBase(owner) + { + DeepSubGroup("splitter_type", splitterType); + + TrashDataSerializationBytes = TBase::GetDeriviative("TrashDataSerialization/Bytes"); + TrashDataSerialization = TBase::GetDeriviative("TrashDataSerialization/Count"); + TrashDataSerializationHistogramBytes = TBase::GetHistogram("TrashDataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024)); + CorrectDataSerializationBytes = TBase::GetDeriviative("CorrectDataSerialization/Bytes"); + CorrectDataSerialization = TBase::GetDeriviative("CorrectDataSerialization/Count"); + CorrectDataSerializationHistogramBytes = TBase::GetHistogram("CorrectDataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024)); + } + + void OnTrashSerialized(const ui64 bytes) const { + TrashDataSerializationHistogramBytes->Collect(bytes); + TrashDataSerialization->Add(1); + TrashDataSerializationBytes->Add(bytes); + } + + void OnCorrectSerialized(const ui64 bytes) const { + CorrectDataSerializationHistogramBytes->Collect(bytes); + CorrectDataSerialization->Add(1); + CorrectDataSerializationBytes->Add(bytes); + } +}; + +class TBlobResultCounters: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr BlobsCount; + NMonitoring::TDynamicCounters::TCounterPtr BlobsBytes; + NMonitoring::THistogramPtr BlobsBytesHistogram; +public: + TBlobResultCounters(const TCommonCountersOwner& owner, const TString& blobsType) + : TBase(owner) { + DeepSubGroup("blobs_type", blobsType); + + BlobsCount = TBase::GetDeriviative("DataSerialization/Bytes"); + BlobsBytes = TBase::GetDeriviative("DataSerialization/Count"); + BlobsBytesHistogram = TBase::GetHistogram("DataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024)); + } + + void OnBlobData(const ui64 size) const { + BlobsCount->Add(1); + BlobsBytes->Add(size); + BlobsBytesHistogram->Collect(size); + } + +}; + +class TSplitterCounters: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; +public: + TSplitterCounters(const TCommonCountersOwner& owner); + const TSplitterCaseCounters SimpleSplitter; + const TSplitterCaseCounters BySizeSplitter; + const TBlobResultCounters SplittedBlobs; + const TBlobResultCounters MonoBlobs; +}; + +} diff --git a/ydb/core/tx/columnshard/counters/ya.make b/ydb/core/tx/columnshard/counters/ya.make index 3a333cef8d..aed1f68de2 100644 --- a/ydb/core/tx/columnshard/counters/ya.make +++ b/ydb/core/tx/columnshard/counters/ya.make @@ -8,6 +8,7 @@ SRCS( columnshard.cpp insert_table.cpp common_data.cpp + splitter.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt index edc599e27d..9c28205a55 100644 --- a/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt @@ -21,4 +21,5 @@ target_sources(tx-columnshard-splitter PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp ) diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt index 8afbcdfed6..575faf46d9 100644 --- a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt @@ -22,4 +22,5 @@ target_sources(tx-columnshard-splitter PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp ) diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt index 8afbcdfed6..575faf46d9 100644 --- a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt @@ -22,4 +22,5 @@ target_sources(tx-columnshard-splitter PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp ) diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt index edc599e27d..9c28205a55 100644 --- a/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt @@ -21,4 +21,5 @@ target_sources(tx-columnshard-splitter PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp ) diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp index 6f06b70a2e..bcbead4500 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp +++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp @@ -49,20 +49,18 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { Y_VERIFY(chunksInProgress[i].GetSize() > TSplitSettings::MinBlobSize - partSize); Y_VERIFY(otherSize - (TSplitSettings::MinBlobSize - partSize) >= TSplitSettings::MinBlobSize); chunksInProgress[i].AddSplit(TSplitSettings::MinBlobSize - partSize); - } - if (!hasNoSplitChanges) { - std::vector<TSplittedColumnChunk> newChunks = chunksInProgress[i].InternalSplit(Schema->GetColumnSaver(chunksInProgress[i].GetColumnId())); + + std::vector<TSplittedColumnChunk> newChunks = chunksInProgress[i].InternalSplit(Schema->GetColumnSaver(chunksInProgress[i].GetColumnId()), Counters); chunksInProgress.erase(chunksInProgress.begin() + i); chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end()); - result.emplace_back(TSplittedBlob()); + TSplittedBlob newBlob; for (ui32 chunk = 0; chunk <= i; ++chunk) { - Y_VERIFY(result.back().Take(chunksInProgress[chunk])); + Y_VERIFY(newBlob.Take(chunksInProgress[chunk])); } - if (result.back().GetSize() < TSplitSettings::MaxBlobSize) { + if (newBlob.GetSize() < TSplitSettings::MaxBlobSize) { chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i + 1); - } else { - result.pop_back(); + result.emplace_back(std::move(newBlob)); } } break; @@ -75,9 +73,10 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { return true; } -TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema) +TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters) : Schema(schema) , Batch(batch) + , Counters(counters) { Y_VERIFY(batch); RecordsCount = batch->num_rows(); @@ -91,7 +90,10 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> for (auto&& i : batch->columns()) { auto& c = Columns[idx]; auto columnSaver = schema->GetColumnSaver(c.GetColumnId()); - c.SetBlobs(TSimpleSplitter(columnSaver).Split(i, c.GetField(), TSplitSettings::MaxBlobSize)); + auto stats = schema->GetColumnSerializationStats(c.GetColumnId()); + TSimpleSplitter splitter(columnSaver, Counters); + splitter.SetStats(stats); + c.SetBlobs(splitter.Split(i, c.GetField(), TSplitSettings::MaxBlobSize)); Size += c.GetSize(); ++idx; } diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index a76b067dda..da3c5db1f2 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -1,5 +1,6 @@ #pragma once #include "chunks.h" +#include "stats.h" #include <ydb/core/tx/columnshard/counters/indexation.h> #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> @@ -15,19 +16,29 @@ public: virtual ~ISchemaDetailInfo() = default; virtual ui32 GetColumnId(const std::string& fieldName) const = 0; virtual TColumnSaver GetColumnSaver(const ui32 columnId) const = 0; + virtual std::optional<TColumnSerializationStat> GetColumnSerializationStats(const ui32 columnId) const = 0; + virtual std::optional<TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& rb) const = 0; }; class TDefaultSchemaDetails: public ISchemaDetailInfo { private: ISnapshotSchema::TPtr Schema; const TSaverContext Context; + TSerializationStats Stats; public: - TDefaultSchemaDetails(ISnapshotSchema::TPtr schema, const TSaverContext& context) + TDefaultSchemaDetails(ISnapshotSchema::TPtr schema, const TSaverContext& context, TSerializationStats&& stats) : Schema(schema) , Context(context) + , Stats(std::move(stats)) { } + virtual std::optional<TColumnSerializationStat> GetColumnSerializationStats(const ui32 columnId) const override { + return Stats.GetColumnInfo(columnId); + } + virtual std::optional<TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& rb) const override { + return Stats.GetStatsForRecordBatch(rb); + } virtual ui32 GetColumnId(const std::string& fieldName) const override { return Schema->GetColumnId(fieldName); } @@ -43,8 +54,9 @@ private: YDB_READONLY(ui32, RecordsCount, 0); ISchemaDetailInfo::TPtr Schema; YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch); + std::shared_ptr<NColumnShard::TSplitterCounters> Counters; public: - TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema); + TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters); void MergeSlice(TBatchSerializedSlice&& slice); diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp index 3fc0ce7f70..d5390412f8 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.cpp +++ b/ydb/core/tx/columnshard/splitter/chunks.cpp @@ -2,8 +2,8 @@ namespace NKikimr::NOlap { -std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TColumnSaver& saver) { - auto chunks = TSimpleSplitter(saver).SplitBySizes(Data.GetSlicedBatch(), Data.GetSerializedChunk(), SplitSizes); +std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters) { + auto chunks = TSimpleSplitter(saver, counters).SplitBySizes(Data.GetSlicedBatch(), Data.GetSerializedChunk(), SplitSizes); Y_VERIFY(chunks.size() == SplitSizes.size() + 1); std::vector<TSplittedColumnChunk> newChunks; for (auto&& i : chunks) { diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 994b124494..af5b28bacd 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -1,6 +1,6 @@ #pragma once #include "simple.h" -#include <ydb/core/tx/columnshard/counters/indexation.h> +#include <ydb/core/tx/columnshard/counters/splitter.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> @@ -9,10 +9,10 @@ namespace NKikimr::NOlap { class TSplitSettings { public: - static const inline ui64 MaxBlobSize = 8 * 1024 * 1024; - static const inline ui64 MaxBlobSizeWithGap = 7 * 1024 * 1024; - static const inline ui64 MinBlobSize = 4 * 1024 * 1024; - static const inline ui64 MinRecordsCount = 10000; + static const inline i64 MaxBlobSize = 8 * 1024 * 1024; + static const inline i64 MaxBlobSizeWithGap = 7 * 1024 * 1024; + static const inline i64 MinBlobSize = 4 * 1024 * 1024; + static const inline i64 MinRecordsCount = 10000; }; class TSplittedColumn; @@ -26,7 +26,7 @@ public: void AddSplit(const ui64 size) { SplitSizes.emplace_back(size); } - std::vector<TSplittedColumnChunk> InternalSplit(const TColumnSaver& saver); + std::vector<TSplittedColumnChunk> InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters); ui64 GetSize() const { return Data.GetSerializedChunk().size(); @@ -66,6 +66,7 @@ public: void SetBlobs(const std::vector<TSaverSplittedChunk>& data) { Y_VERIFY(Chunks.empty()); for (auto&& i : data) { + Y_VERIFY(i.IsCompatibleColumn(Field)); Size += i.GetSerializedChunk().size(); Chunks.emplace_back(ColumnId, i); } diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp index 76d1f51af8..a5210a72f2 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp @@ -39,25 +39,25 @@ public: } }; -TRBSplitLimiter::TRBSplitLimiter(const TGranuleMeta* /*granuleMeta*/, const NColumnShard::TIndexationCounters& counters, ISchemaDetailInfo::TPtr schemaInfo, +TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch) : Counters(counters) - , Batch(batch) { + , Batch(batch) +{ Y_VERIFY(Batch->num_rows()); std::vector<TBatchSerializedSlice> slices; - const ui32 countPacks = std::max<ui32>(1, (ui32)round(1.0 * Batch->num_rows() / TSplitSettings::MinRecordsCount)); - const ui32 stepPack = Batch->num_rows() / countPacks; - ui32 position = 0; - for (ui32 i = 0; i < countPacks; ++i) { - const ui32 packSize = (i + 1 == countPacks) ? Batch->num_rows() - position : stepPack; - std::shared_ptr<arrow::RecordBatch> current; - current = batch->Slice(position, packSize); - position += current->num_rows(); - TBatchSerializedSlice slice(current, schemaInfo); + auto stats = schemaInfo->GetBatchSerializationStats(Batch); + ui32 recordsCount = TSplitSettings::MinRecordsCount; + if (stats) { + const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), TSplitSettings::MinBlobSize).value_or(recordsCount); + recordsCount = std::max(recordsCount, recordsCountForMinSize); + } + auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(Batch->num_rows(), recordsCount); + for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) { + std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize()); + TBatchSerializedSlice slice(current, schemaInfo, Counters); slices.emplace_back(std::move(slice)); } - Y_VERIFY(position == batch->num_rows()); - Y_VERIFY(slices.size() == countPacks); const std::vector<ui32> chunks = TSimilarSlicer(TSplitSettings::MinBlobSize).Split(slices); ui32 chunkStartPosition = 0; @@ -84,6 +84,11 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio Slices.front().GroupBlobs(blobs); std::vector<std::vector<TOrderedColumnChunk>> result; for (auto&& i : blobs) { + if (blobs.size() == 1) { + Counters->MonoBlobs.OnBlobData(i.GetSize()); + } else { + Counters->SplittedBlobs.OnBlobData(i.GetSize()); + } std::vector<TOrderedColumnChunk> chunksForBlob; for (auto&& c : i.GetChunks()) { chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetRecordsCount(), c.GetData().GetSerializedChunk()); diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h index f76e7d20f0..c1aca97cc3 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.h +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h @@ -1,5 +1,6 @@ #pragma once #include "batch_slice.h" +#include "stats.h" #include <ydb/core/tx/columnshard/counters/indexation.h> #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> @@ -12,10 +13,10 @@ namespace NKikimr::NOlap { class TRBSplitLimiter { private: std::deque<TBatchSerializedSlice> Slices; - const NColumnShard::TIndexationCounters Counters; + std::shared_ptr<NColumnShard::TSplitterCounters> Counters; std::shared_ptr<arrow::RecordBatch> Batch; public: - TRBSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters, + TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch); bool Next(std::vector<std::vector<TOrderedColumnChunk>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch); diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp index e12746c6f8..98f704b151 100644 --- a/ydb/core/tx/columnshard/splitter/simple.cpp +++ b/ydb/core/tx/columnshard/splitter/simple.cpp @@ -2,48 +2,50 @@ namespace NKikimr::NOlap { -std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::Array> data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const { +std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::Array> data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const { auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field}); auto batch = arrow::RecordBatch::Make(schema, data->length(), {data}); return Split(batch, maxBlobSize); } -std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::RecordBatch> data, const ui32 maxBlobSize) const { +std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::RecordBatch> data, const ui32 maxBlobSize) const { Y_VERIFY(data->num_columns() == 1); - ui32 splitFactor = 1; + ui64 splitFactor = Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1; while (true) { Y_VERIFY(splitFactor < 100); std::vector<TSaverSplittedChunk> result; result.reserve(splitFactor); bool isCorrect = true; + ui64 serializedDataBytes = 0; if (splitFactor == 1) { TString blob = ColumnSaver.Apply(data); isCorrect = blob.size() < maxBlobSize; + serializedDataBytes += blob.size(); result.emplace_back(TSaverSplittedChunk(data, std::move(blob))); } else { - const ui32 sliceSize = data->num_rows() / splitFactor; - Y_VERIFY(sliceSize); - ui32 idx = 0; - for (ui32 i = 0; i < data->num_rows();) { - const ui32 sliceCurrentSize = (++idx == splitFactor) ? data->num_rows() - i : sliceSize; - Y_VERIFY(sliceCurrentSize); - auto slice = data->Slice(i, sliceCurrentSize); + TLinearSplitInfo linearSplitting = TSimpleSplitter::GetLinearSplittingByMax(data->num_rows(), data->num_rows() / splitFactor); + for (auto it = linearSplitting.StartIterator(); it.IsValid(); it.Next()) { + auto slice = data->Slice(it.GetPosition(), it.GetCurrentPackSize()); result.emplace_back(slice, ColumnSaver.Apply(slice)); + serializedDataBytes += result.back().GetSerializedChunk().size(); if (result.back().GetSerializedChunk().size() >= maxBlobSize) { isCorrect = false; + Y_VERIFY(!linearSplitting.IsMinimalGranularity()); break; } - i += slice->num_rows(); } } if (isCorrect) { + Counters->SimpleSplitter.OnCorrectSerialized(serializedDataBytes); return result; + } else { + Counters->SimpleSplitter.OnTrashSerialized(serializedDataBytes); } ++splitFactor; } } -std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const { +std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const { std::vector<TSaverSplittedChunk> result; ui64 position = 0; for (auto&& i : recordsCount) { @@ -55,7 +57,8 @@ std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::SplitByRecords return result; } -std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const { +std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const { + Counters->BySizeSplitter.OnTrashSerialized(dataSerialization.size()); auto splitPartSizesLocal = splitPartSizesExt; Y_VERIFY(data); { diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h index ba76e0c5aa..54fba342be 100644 --- a/ydb/core/tx/columnshard/splitter/simple.h +++ b/ydb/core/tx/columnshard/splitter/simple.h @@ -1,6 +1,8 @@ #pragma once #include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/counters/splitter.h> #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> +#include "stats.h" namespace NKikimr::NOlap { @@ -20,18 +22,113 @@ public: , SerializedChunk(std::move(serializedChunk)) { } + + bool IsCompatibleColumn(const std::shared_ptr<arrow::Field>& f) const { + if (!SlicedBatch) { + return false; + } + if (SlicedBatch->num_columns() != 1) { + return false; + } + if (!SlicedBatch->schema()->fields().front()->Equals(f)) { + return false; + } + return true; + } +}; + +class TLinearSplitInfo { +private: + YDB_READONLY(ui64, PacksCount, 0); + YDB_READONLY(ui64, PackSize, 0); + YDB_READONLY(ui64, ObjectsCount, 0); +public: + bool IsMinimalGranularity() const { + return PackSize == 1; + } + + TLinearSplitInfo(const ui64 packsCount, const ui64 packSize, const ui64 objectsCount) + : PacksCount(packsCount) + , PackSize(packSize) + , ObjectsCount(objectsCount) + { + Y_VERIFY(objectsCount >= packsCount); + Y_VERIFY(PackSize); + Y_VERIFY(PacksCount); + } + + class TIterator { + private: + const TLinearSplitInfo& Owner; + YDB_READONLY(ui64, Position, 0); + YDB_READONLY(ui64, CurrentPackSize, 0); + ui64 PackIdx = 0; + void InitPack() { + CurrentPackSize = (PackIdx + 1 == Owner.GetPacksCount()) ? Owner.ObjectsCount - Position : Owner.GetPackSize(); + } + public: + explicit TIterator(const TLinearSplitInfo& owner) + : Owner(owner) + { + InitPack(); + } + + bool IsValid() const { + if (Position < Owner.GetObjectsCount() && PackIdx < Owner.GetPacksCount()) { + return true; + } else { + Y_VERIFY(Position == Owner.GetObjectsCount() && PackIdx == Owner.GetPacksCount()); + return false; + } + } + + bool Next() { + Y_VERIFY(IsValid()); + Position += CurrentPackSize; + ++PackIdx; + InitPack(); + return IsValid(); + } + }; + + TIterator StartIterator() const { + return TIterator(*this); + } }; class TSimpleSplitter { private: TColumnSaver ColumnSaver; + YDB_ACCESSOR_DEF(std::optional<TColumnSerializationStat>, Stats); + std::shared_ptr<NColumnShard::TSplitterCounters> Counters; public: - TSimpleSplitter(const TColumnSaver& columnSaver) + explicit TSimpleSplitter(const TColumnSaver& columnSaver, std::shared_ptr<NColumnShard::TSplitterCounters> counters) : ColumnSaver(columnSaver) + , Counters(counters) { } + static TLinearSplitInfo GetOptimalLinearSplitting(const ui64 objectsCount, const i64 optimalPackSizeExt) { + const i64 optimalPackSize = optimalPackSizeExt ? optimalPackSizeExt : 1; + const ui32 countPacksMax = std::max<ui32>(1, (ui32)floor(1.0 * objectsCount / optimalPackSize)); + const ui32 countPacksMin = std::max<ui32>(1, (ui32)ceil(1.0 * objectsCount / optimalPackSize)); + const ui32 stepPackMax = objectsCount / countPacksMin; + const ui32 stepPackMin = objectsCount / countPacksMax; + if (std::abs(optimalPackSize - stepPackMax) > std::abs(optimalPackSize - stepPackMin)) { + return TLinearSplitInfo(countPacksMax, stepPackMin, objectsCount); + } else { + return TLinearSplitInfo(countPacksMin, stepPackMax, objectsCount); + } + } + + static TLinearSplitInfo GetLinearSplittingByMax(const ui64 objectsCount, const ui64 maxPackSizeExt) { + const ui64 maxPackSize = maxPackSizeExt ? maxPackSizeExt : 1; + const ui32 countPacksMax = std::max<ui32>(1, (ui32)floor(1.0 * objectsCount / maxPackSize)); + const ui32 stepPackMin = objectsCount / countPacksMax; + return TLinearSplitInfo(countPacksMax, stepPackMin, objectsCount); + } + std::vector<TSaverSplittedChunk> Split(std::shared_ptr<arrow::Array> data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const; std::vector<TSaverSplittedChunk> Split(std::shared_ptr<arrow::RecordBatch> data, const ui32 maxBlobSize) const; std::vector<TSaverSplittedChunk> SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const; diff --git a/ydb/core/tx/columnshard/splitter/splitter.cpp b/ydb/core/tx/columnshard/splitter/splitter.cpp index eb22d088b6..273300a099 100644 --- a/ydb/core/tx/columnshard/splitter/splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/splitter.cpp @@ -59,18 +59,12 @@ bool TSplitLimiter::Next(std::vector<TString>& portionBlobs, std::shared_ptr<arr auto array = currentBatch->GetColumnByName(columnName); Y_VERIFY(array); auto columnSaver = Schema->GetColumnSaver(columnSummary.GetColumnId(), saverContext); - TString blob = TPortionInfo::SerializeColumn(array, field, columnSaver); + TString blob = columnSaver.Apply(array, field); if (blob.size() >= TCompactionLimits::MAX_BLOB_SIZE) { - Counters.TrashDataSerializationBytes->Add(blob.size()); - Counters.TrashDataSerialization->Add(1); - Counters.TrashDataSerializationHistogramBytes->Collect(blob.size()); const double kffNew = 1.0 * ExpectedBlobSize / blob.size() * ReduceCorrectionKff; CurrentStepRecordsCount = currentBatch->num_rows() * kffNew; Y_VERIFY(CurrentStepRecordsCount); break; - } else { - Counters.CorrectDataSerializationBytes->Add(blob.size()); - Counters.CorrectDataSerialization->Add(1); } portionBlobs[idx] = std::move(blob); diff --git a/ydb/core/tx/columnshard/splitter/stats.cpp b/ydb/core/tx/columnshard/splitter/stats.cpp new file mode 100644 index 0000000000..1f2f835dce --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/stats.cpp @@ -0,0 +1,21 @@ +#include "stats.h" +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> + +namespace NKikimr::NOlap { + +std::optional<NKikimr::NOlap::TColumnSerializationStat> TSerializationStats::GetStatsForRecordBatch(const std::shared_ptr<arrow::RecordBatch>& rb) const { + std::optional<TColumnSerializationStat> result; + for (auto&& i : rb->schema()->fields()) { + auto columnInfo = GetColumnInfo(i->name()); + if (!columnInfo) { + return {}; + } else if (!result) { + result = *columnInfo; + } else { + result->Add(*columnInfo); + } + } + return result; +} + +} diff --git a/ydb/core/tx/columnshard/splitter/stats.h b/ydb/core/tx/columnshard/splitter/stats.h new file mode 100644 index 0000000000..38d1f12b8d --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/stats.h @@ -0,0 +1,95 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> + +#include <library/cpp/actors/core/log.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +#include <optional> +#include <map> +#include <deque> +#include <string> +#include <memory> + +namespace NKikimr::NOlap { + +class TColumnSerializationStat { +private: + YDB_READONLY(ui64, SerializedBytes, 0); + YDB_READONLY(ui64, RecordsCount, 0); +public: + TColumnSerializationStat(const ui64 bytes, const ui64 recordsCount) + : SerializedBytes(bytes) + , RecordsCount(recordsCount) + { + Y_VERIFY(RecordsCount); + } + + TColumnSerializationStat RecalcForRecordsCount(const ui64 recordsCount) const { + return TColumnSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount); + } + + void Add(const TColumnSerializationStat& item) { + SerializedBytes += item.SerializedBytes; + AFL_VERIFY(RecordsCount == item.RecordsCount)("self_count", RecordsCount)("new_count", item.RecordsCount); + } + + std::optional<ui64> PredictOptimalPackRecordsCount(const ui64 recordsCount, const ui64 blobSize) const { + if (!RecordsCount) { + return {}; + } + const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes; + if (fullSize < blobSize) { + return recordsCount; + } else { + return std::floor(1.0 * fullSize / blobSize * recordsCount); + } + } + + std::optional<ui64> PredictOptimalSplitFactor(const ui64 recordsCount, const ui64 blobSize) const { + if (!RecordsCount) { + return {}; + } + const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes; + if (fullSize < blobSize) { + return 1; + } else { + return std::floor(1.0 * fullSize / blobSize); + } + } +}; + +class TSerializationStats { +private: + std::deque<TColumnSerializationStat> ColumnStat; + std::map<ui32, TColumnSerializationStat*> StatsByColumnId; + std::map<std::string, TColumnSerializationStat*> StatsByColumnName; +public: + void AddStat(const ui32 columnId, const std::string& fieldName, const TColumnSerializationStat& info) { + ColumnStat.emplace_back(info); + StatsByColumnId.emplace(columnId, &ColumnStat.back()); + StatsByColumnName.emplace(fieldName, &ColumnStat.back()); + } + + std::optional<TColumnSerializationStat> GetColumnInfo(const ui32 columnId) const { + auto it = StatsByColumnId.find(columnId); + if (it == StatsByColumnId.end()) { + return {}; + } else { + return *it->second; + } + } + + std::optional<TColumnSerializationStat> GetColumnInfo(const std::string& columnName) const { + auto it = StatsByColumnName.find(columnName); + if (it == StatsByColumnName.end()) { + return {}; + } else { + return *it->second; + } + } + + std::optional<TColumnSerializationStat> GetStatsForRecordBatch(const std::shared_ptr<arrow::RecordBatch>& rb) const; +}; + +} diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp index 280fa41144..cf2956872a 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp @@ -19,6 +19,13 @@ Y_UNIT_TEST_SUITE(Splitter) { return NKikimr::NOlap::TColumnSaver(nullptr, std::make_shared<NKikimr::NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults())); } + virtual std::optional<NKikimr::NOlap::TColumnSerializationStat> GetColumnSerializationStats(const ui32 /*columnId*/) const override { + return {}; + } + virtual std::optional<NKikimr::NOlap::TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& /*rb*/) const override { + return {}; + } + NKikimr::NOlap::TColumnLoader GetColumnLoader(const ui32 columnId) const { arrow::FieldVector v = {std::make_shared<arrow::Field>(GetColumnName(columnId), std::make_shared<arrow::StringType>())}; auto schema = std::make_shared<arrow::Schema>(v); @@ -54,7 +61,7 @@ Y_UNIT_TEST_SUITE(Splitter) { void Execute(std::shared_ptr<arrow::RecordBatch> batch) { NKikimr::NColumnShard::TIndexationCounters counters("test"); - NKikimr::NOlap::TRBSplitLimiter limiter(nullptr, counters, Schema, batch); + NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch); std::vector<std::vector<NKikimr::NOlap::TOrderedColumnChunk>> chunksForBlob; std::map<std::string, std::vector<std::shared_ptr<arrow::RecordBatch>>> restoredBatch; std::vector<ui64> blobsSize; diff --git a/ydb/core/tx/columnshard/splitter/ya.make b/ydb/core/tx/columnshard/splitter/ya.make index 4d4ca24b75..29e68d508e 100644 --- a/ydb/core/tx/columnshard/splitter/ya.make +++ b/ydb/core/tx/columnshard/splitter/ya.make @@ -6,6 +6,7 @@ SRCS( chunks.cpp simple.cpp rb_splitter.cpp + stats.cpp ) PEERDIR( |