aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-08-11 07:49:19 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-08-11 08:19:30 +0300
commit8ae39e1bd0ae849c99563799c3a1eaad2360018d (patch)
tree63639a6b62c774ca1e4aa812b5999faeba6b529e
parentd51343356504193b7cc73d7f5586aac2b220ec89 (diff)
downloadydb-8ae39e1bd0ae849c99563799c3a1eaad2360018d.tar.gz
KIKIMR-18932: counters and stats usage for splitting
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/counters/common/owner.cpp4
-rw-r--r--ydb/core/tx/columnshard/counters/common/owner.h14
-rw-r--r--ydb/core/tx/columnshard/counters/indexation.cpp8
-rw-r--r--ydb/core/tx/columnshard/counters/indexation.h10
-rw-r--r--ydb/core/tx/columnshard/counters/splitter.cpp14
-rw-r--r--ydb/core/tx/columnshard/counters/splitter.h79
-rw-r--r--ydb/core/tx/columnshard/counters/ya.make1
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.cpp22
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h16
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.cpp4
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h13
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.cpp31
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.h5
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.cpp29
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.h99
-rw-r--r--ydb/core/tx/columnshard/splitter/splitter.cpp8
-rw-r--r--ydb/core/tx/columnshard/splitter/stats.cpp21
-rw-r--r--ydb/core/tx/columnshard/splitter/stats.h95
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp9
-rw-r--r--ydb/core/tx/columnshard/splitter/ya.make1
28 files changed, 422 insertions, 69 deletions
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
index 71ef940f82..b67a55fc72 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt
@@ -24,4 +24,5 @@ target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
index edfdee32c7..24d27e4043 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt
@@ -25,4 +25,5 @@ target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
index edfdee32c7..24d27e4043 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.linux-x86_64.txt
@@ -25,4 +25,5 @@ target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
index 71ef940f82..b67a55fc72 100644
--- a/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/CMakeLists.windows-x86_64.txt
@@ -24,4 +24,5 @@ target_sources(tx-columnshard-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/columnshard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/splitter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/common/owner.cpp b/ydb/core/tx/columnshard/counters/common/owner.cpp
index 69ea6bfd61..d5d4d5b28b 100644
--- a/ydb/core/tx/columnshard/counters/common/owner.cpp
+++ b/ydb/core/tx/columnshard/counters/common/owner.cpp
@@ -50,4 +50,8 @@ void TCommonCountersOwner::DeepSubGroup(const TString& id, const TString& value)
SubGroup = SubGroup->GetSubgroup(id, value);
}
+void TCommonCountersOwner::DeepSubGroup(const TString& componentName) {
+ SubGroup = SubGroup->GetSubgroup("component", componentName);
+}
+
}
diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h
index bf5e3b7525..a2f0b5273a 100644
--- a/ydb/core/tx/columnshard/counters/common/owner.h
+++ b/ydb/core/tx/columnshard/counters/common/owner.h
@@ -16,6 +16,19 @@ private:
protected:
std::shared_ptr<TValueAggregationAgent> GetValueAutoAggregations(const TString& name) const;
std::shared_ptr<TValueAggregationClient> GetValueAutoAggregationsClient(const TString& name) const;
+
+ TCommonCountersOwner(const TCommonCountersOwner& sameAs)
+ : SubGroup(sameAs.SubGroup)
+ , ModuleId(sameAs.ModuleId) {
+
+ }
+
+ TCommonCountersOwner(const TCommonCountersOwner& sameAs, const TString& componentName)
+ : SubGroup(sameAs.SubGroup)
+ , ModuleId(sameAs.ModuleId)
+ {
+ DeepSubGroup("component", componentName);
+ }
public:
const TString& GetModuleId() const {
return ModuleId;
@@ -25,6 +38,7 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const;
NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const;
void DeepSubGroup(const TString& id, const TString& value);
+ void DeepSubGroup(const TString& componentName);
NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const;
TCommonCountersOwner(const TString& module, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr);
diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp
index 194bdf0bb5..cbe62ae082 100644
--- a/ydb/core/tx/columnshard/counters/indexation.cpp
+++ b/ydb/core/tx/columnshard/counters/indexation.cpp
@@ -21,12 +21,6 @@ TIndexationCounters::TIndexationCounters(const TString& module)
MovedPortions = TBase::GetDeriviative("Moved/Portions");
MovedPortionBytes = TBase::GetDeriviative("Moved/Bytes");
- TrashDataSerializationBytes = TBase::GetDeriviative("TrashDataSerialization/Bytes");
- TrashDataSerialization = TBase::GetDeriviative("TrashDataSerialization/Count");
- TrashDataSerializationHistogramBytes = TBase::GetHistogram("TrashDataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024));
- CorrectDataSerializationBytes = TBase::GetDeriviative("CorrectDataSerialization/Bytes");
- CorrectDataSerialization = TBase::GetDeriviative("CorrectDataSerialization/Count");
-
CompactionDuration = TBase::GetHistogram("CompactionDuration", NMonitoring::ExponentialHistogram(18, 2, 20));
HistogramCompactionInputBytes = TBase::GetHistogram("CompactionInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024));
CompactionInputBytes = TBase::GetDeriviative("CompactionInput/Bytes");
@@ -39,6 +33,8 @@ TIndexationCounters::TIndexationCounters(const TString& module)
TooSmallBlob = TBase::GetDeriviative("TooSmallBlob/Count");
TooSmallBlobFinish = TBase::GetDeriviative("TooSmallBlobFinish/Count");
TooSmallBlobStart = TBase::GetDeriviative("TooSmallBlobStart/Count");
+
+ SplitterCounters = std::make_shared<TSplitterCounters>(*this);
}
}
diff --git a/ydb/core/tx/columnshard/counters/indexation.h b/ydb/core/tx/columnshard/counters/indexation.h
index ac449a93b2..4b926d2609 100644
--- a/ydb/core/tx/columnshard/counters/indexation.h
+++ b/ydb/core/tx/columnshard/counters/indexation.h
@@ -1,6 +1,8 @@
#pragma once
-#include <library/cpp/monlib/dynamic_counters/counters.h>
#include "common/owner.h"
+#include "splitter.h"
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NKikimr::NColumnShard {
@@ -24,11 +26,7 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr MovedPortions;
NMonitoring::TDynamicCounters::TCounterPtr MovedPortionBytes;
- NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerializationBytes;
- NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerialization;
- NMonitoring::THistogramPtr TrashDataSerializationHistogramBytes;
- NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerializationBytes;
- NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerialization;
+ std::shared_ptr<TSplitterCounters> SplitterCounters;
NMonitoring::THistogramPtr SplittedPortionLargestColumnSize;
NMonitoring::THistogramPtr SimpleSplitPortionLargestColumnSize;
diff --git a/ydb/core/tx/columnshard/counters/splitter.cpp b/ydb/core/tx/columnshard/counters/splitter.cpp
new file mode 100644
index 0000000000..6510d37d83
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/splitter.cpp
@@ -0,0 +1,14 @@
+#include "splitter.h"
+
+namespace NKikimr::NColumnShard {
+
+TSplitterCounters::TSplitterCounters(const TCommonCountersOwner& owner)
+ : TBase(owner, "splitter")
+ , SimpleSplitter(owner, "simple")
+ , BySizeSplitter(owner, "by_size")
+ , SplittedBlobs(owner, "splitted")
+ , MonoBlobs(owner, "mono")
+{
+}
+
+}
diff --git a/ydb/core/tx/columnshard/counters/splitter.h b/ydb/core/tx/columnshard/counters/splitter.h
new file mode 100644
index 0000000000..cbde8265a2
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/splitter.h
@@ -0,0 +1,79 @@
+#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include "common/owner.h"
+
+namespace NKikimr::NColumnShard {
+
+class TSplitterCaseCounters: public TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerializationBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr TrashDataSerialization;
+ NMonitoring::THistogramPtr TrashDataSerializationHistogramBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerializationBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr CorrectDataSerialization;
+ NMonitoring::THistogramPtr CorrectDataSerializationHistogramBytes;
+public:
+ TSplitterCaseCounters(const TCommonCountersOwner& owner, const TString& splitterType)
+ : TBase(owner)
+ {
+ DeepSubGroup("splitter_type", splitterType);
+
+ TrashDataSerializationBytes = TBase::GetDeriviative("TrashDataSerialization/Bytes");
+ TrashDataSerialization = TBase::GetDeriviative("TrashDataSerialization/Count");
+ TrashDataSerializationHistogramBytes = TBase::GetHistogram("TrashDataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024));
+ CorrectDataSerializationBytes = TBase::GetDeriviative("CorrectDataSerialization/Bytes");
+ CorrectDataSerialization = TBase::GetDeriviative("CorrectDataSerialization/Count");
+ CorrectDataSerializationHistogramBytes = TBase::GetHistogram("CorrectDataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024));
+ }
+
+ void OnTrashSerialized(const ui64 bytes) const {
+ TrashDataSerializationHistogramBytes->Collect(bytes);
+ TrashDataSerialization->Add(1);
+ TrashDataSerializationBytes->Add(bytes);
+ }
+
+ void OnCorrectSerialized(const ui64 bytes) const {
+ CorrectDataSerializationHistogramBytes->Collect(bytes);
+ CorrectDataSerialization->Add(1);
+ CorrectDataSerializationBytes->Add(bytes);
+ }
+};
+
+class TBlobResultCounters: public TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr BlobsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr BlobsBytes;
+ NMonitoring::THistogramPtr BlobsBytesHistogram;
+public:
+ TBlobResultCounters(const TCommonCountersOwner& owner, const TString& blobsType)
+ : TBase(owner) {
+ DeepSubGroup("blobs_type", blobsType);
+
+ BlobsCount = TBase::GetDeriviative("DataSerialization/Bytes");
+ BlobsBytes = TBase::GetDeriviative("DataSerialization/Count");
+ BlobsBytesHistogram = TBase::GetHistogram("DataSerialization/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024));
+ }
+
+ void OnBlobData(const ui64 size) const {
+ BlobsCount->Add(1);
+ BlobsBytes->Add(size);
+ BlobsBytesHistogram->Collect(size);
+ }
+
+};
+
+class TSplitterCounters: public TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+public:
+ TSplitterCounters(const TCommonCountersOwner& owner);
+ const TSplitterCaseCounters SimpleSplitter;
+ const TSplitterCaseCounters BySizeSplitter;
+ const TBlobResultCounters SplittedBlobs;
+ const TBlobResultCounters MonoBlobs;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/counters/ya.make b/ydb/core/tx/columnshard/counters/ya.make
index 3a333cef8d..aed1f68de2 100644
--- a/ydb/core/tx/columnshard/counters/ya.make
+++ b/ydb/core/tx/columnshard/counters/ya.make
@@ -8,6 +8,7 @@ SRCS(
columnshard.cpp
insert_table.cpp
common_data.cpp
+ splitter.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt
index edc599e27d..9c28205a55 100644
--- a/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt
@@ -21,4 +21,5 @@ target_sources(tx-columnshard-splitter PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp
)
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt
index 8afbcdfed6..575faf46d9 100644
--- a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt
@@ -22,4 +22,5 @@ target_sources(tx-columnshard-splitter PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp
)
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt
index 8afbcdfed6..575faf46d9 100644
--- a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt
@@ -22,4 +22,5 @@ target_sources(tx-columnshard-splitter PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp
)
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt
index edc599e27d..9c28205a55 100644
--- a/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt
@@ -21,4 +21,5 @@ target_sources(tx-columnshard-splitter PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/chunks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/simple.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/stats.cpp
)
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
index 6f06b70a2e..bcbead4500 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
@@ -49,20 +49,18 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
Y_VERIFY(chunksInProgress[i].GetSize() > TSplitSettings::MinBlobSize - partSize);
Y_VERIFY(otherSize - (TSplitSettings::MinBlobSize - partSize) >= TSplitSettings::MinBlobSize);
chunksInProgress[i].AddSplit(TSplitSettings::MinBlobSize - partSize);
- }
- if (!hasNoSplitChanges) {
- std::vector<TSplittedColumnChunk> newChunks = chunksInProgress[i].InternalSplit(Schema->GetColumnSaver(chunksInProgress[i].GetColumnId()));
+
+ std::vector<TSplittedColumnChunk> newChunks = chunksInProgress[i].InternalSplit(Schema->GetColumnSaver(chunksInProgress[i].GetColumnId()), Counters);
chunksInProgress.erase(chunksInProgress.begin() + i);
chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end());
- result.emplace_back(TSplittedBlob());
+ TSplittedBlob newBlob;
for (ui32 chunk = 0; chunk <= i; ++chunk) {
- Y_VERIFY(result.back().Take(chunksInProgress[chunk]));
+ Y_VERIFY(newBlob.Take(chunksInProgress[chunk]));
}
- if (result.back().GetSize() < TSplitSettings::MaxBlobSize) {
+ if (newBlob.GetSize() < TSplitSettings::MaxBlobSize) {
chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i + 1);
- } else {
- result.pop_back();
+ result.emplace_back(std::move(newBlob));
}
}
break;
@@ -75,9 +73,10 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
return true;
}
-TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema)
+TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters)
: Schema(schema)
, Batch(batch)
+ , Counters(counters)
{
Y_VERIFY(batch);
RecordsCount = batch->num_rows();
@@ -91,7 +90,10 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch>
for (auto&& i : batch->columns()) {
auto& c = Columns[idx];
auto columnSaver = schema->GetColumnSaver(c.GetColumnId());
- c.SetBlobs(TSimpleSplitter(columnSaver).Split(i, c.GetField(), TSplitSettings::MaxBlobSize));
+ auto stats = schema->GetColumnSerializationStats(c.GetColumnId());
+ TSimpleSplitter splitter(columnSaver, Counters);
+ splitter.SetStats(stats);
+ c.SetBlobs(splitter.Split(i, c.GetField(), TSplitSettings::MaxBlobSize));
Size += c.GetSize();
++idx;
}
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index a76b067dda..da3c5db1f2 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -1,5 +1,6 @@
#pragma once
#include "chunks.h"
+#include "stats.h"
#include <ydb/core/tx/columnshard/counters/indexation.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
@@ -15,19 +16,29 @@ public:
virtual ~ISchemaDetailInfo() = default;
virtual ui32 GetColumnId(const std::string& fieldName) const = 0;
virtual TColumnSaver GetColumnSaver(const ui32 columnId) const = 0;
+ virtual std::optional<TColumnSerializationStat> GetColumnSerializationStats(const ui32 columnId) const = 0;
+ virtual std::optional<TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& rb) const = 0;
};
class TDefaultSchemaDetails: public ISchemaDetailInfo {
private:
ISnapshotSchema::TPtr Schema;
const TSaverContext Context;
+ TSerializationStats Stats;
public:
- TDefaultSchemaDetails(ISnapshotSchema::TPtr schema, const TSaverContext& context)
+ TDefaultSchemaDetails(ISnapshotSchema::TPtr schema, const TSaverContext& context, TSerializationStats&& stats)
: Schema(schema)
, Context(context)
+ , Stats(std::move(stats))
{
}
+ virtual std::optional<TColumnSerializationStat> GetColumnSerializationStats(const ui32 columnId) const override {
+ return Stats.GetColumnInfo(columnId);
+ }
+ virtual std::optional<TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& rb) const override {
+ return Stats.GetStatsForRecordBatch(rb);
+ }
virtual ui32 GetColumnId(const std::string& fieldName) const override {
return Schema->GetColumnId(fieldName);
}
@@ -43,8 +54,9 @@ private:
YDB_READONLY(ui32, RecordsCount, 0);
ISchemaDetailInfo::TPtr Schema;
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
+ std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
public:
- TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema);
+ TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters);
void MergeSlice(TBatchSerializedSlice&& slice);
diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp
index 3fc0ce7f70..d5390412f8 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.cpp
+++ b/ydb/core/tx/columnshard/splitter/chunks.cpp
@@ -2,8 +2,8 @@
namespace NKikimr::NOlap {
-std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TColumnSaver& saver) {
- auto chunks = TSimpleSplitter(saver).SplitBySizes(Data.GetSlicedBatch(), Data.GetSerializedChunk(), SplitSizes);
+std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters) {
+ auto chunks = TSimpleSplitter(saver, counters).SplitBySizes(Data.GetSlicedBatch(), Data.GetSerializedChunk(), SplitSizes);
Y_VERIFY(chunks.size() == SplitSizes.size() + 1);
std::vector<TSplittedColumnChunk> newChunks;
for (auto&& i : chunks) {
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index 994b124494..af5b28bacd 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -1,6 +1,6 @@
#pragma once
#include "simple.h"
-#include <ydb/core/tx/columnshard/counters/indexation.h>
+#include <ydb/core/tx/columnshard/counters/splitter.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
@@ -9,10 +9,10 @@ namespace NKikimr::NOlap {
class TSplitSettings {
public:
- static const inline ui64 MaxBlobSize = 8 * 1024 * 1024;
- static const inline ui64 MaxBlobSizeWithGap = 7 * 1024 * 1024;
- static const inline ui64 MinBlobSize = 4 * 1024 * 1024;
- static const inline ui64 MinRecordsCount = 10000;
+ static const inline i64 MaxBlobSize = 8 * 1024 * 1024;
+ static const inline i64 MaxBlobSizeWithGap = 7 * 1024 * 1024;
+ static const inline i64 MinBlobSize = 4 * 1024 * 1024;
+ static const inline i64 MinRecordsCount = 10000;
};
class TSplittedColumn;
@@ -26,7 +26,7 @@ public:
void AddSplit(const ui64 size) {
SplitSizes.emplace_back(size);
}
- std::vector<TSplittedColumnChunk> InternalSplit(const TColumnSaver& saver);
+ std::vector<TSplittedColumnChunk> InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters);
ui64 GetSize() const {
return Data.GetSerializedChunk().size();
@@ -66,6 +66,7 @@ public:
void SetBlobs(const std::vector<TSaverSplittedChunk>& data) {
Y_VERIFY(Chunks.empty());
for (auto&& i : data) {
+ Y_VERIFY(i.IsCompatibleColumn(Field));
Size += i.GetSerializedChunk().size();
Chunks.emplace_back(ColumnId, i);
}
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
index 76d1f51af8..a5210a72f2 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
@@ -39,25 +39,25 @@ public:
}
};
-TRBSplitLimiter::TRBSplitLimiter(const TGranuleMeta* /*granuleMeta*/, const NColumnShard::TIndexationCounters& counters, ISchemaDetailInfo::TPtr schemaInfo,
+TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, ISchemaDetailInfo::TPtr schemaInfo,
const std::shared_ptr<arrow::RecordBatch> batch)
: Counters(counters)
- , Batch(batch) {
+ , Batch(batch)
+{
Y_VERIFY(Batch->num_rows());
std::vector<TBatchSerializedSlice> slices;
- const ui32 countPacks = std::max<ui32>(1, (ui32)round(1.0 * Batch->num_rows() / TSplitSettings::MinRecordsCount));
- const ui32 stepPack = Batch->num_rows() / countPacks;
- ui32 position = 0;
- for (ui32 i = 0; i < countPacks; ++i) {
- const ui32 packSize = (i + 1 == countPacks) ? Batch->num_rows() - position : stepPack;
- std::shared_ptr<arrow::RecordBatch> current;
- current = batch->Slice(position, packSize);
- position += current->num_rows();
- TBatchSerializedSlice slice(current, schemaInfo);
+ auto stats = schemaInfo->GetBatchSerializationStats(Batch);
+ ui32 recordsCount = TSplitSettings::MinRecordsCount;
+ if (stats) {
+ const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), TSplitSettings::MinBlobSize).value_or(recordsCount);
+ recordsCount = std::max(recordsCount, recordsCountForMinSize);
+ }
+ auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(Batch->num_rows(), recordsCount);
+ for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) {
+ std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize());
+ TBatchSerializedSlice slice(current, schemaInfo, Counters);
slices.emplace_back(std::move(slice));
}
- Y_VERIFY(position == batch->num_rows());
- Y_VERIFY(slices.size() == countPacks);
const std::vector<ui32> chunks = TSimilarSlicer(TSplitSettings::MinBlobSize).Split(slices);
ui32 chunkStartPosition = 0;
@@ -84,6 +84,11 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio
Slices.front().GroupBlobs(blobs);
std::vector<std::vector<TOrderedColumnChunk>> result;
for (auto&& i : blobs) {
+ if (blobs.size() == 1) {
+ Counters->MonoBlobs.OnBlobData(i.GetSize());
+ } else {
+ Counters->SplittedBlobs.OnBlobData(i.GetSize());
+ }
std::vector<TOrderedColumnChunk> chunksForBlob;
for (auto&& c : i.GetChunks()) {
chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetRecordsCount(), c.GetData().GetSerializedChunk());
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h
index f76e7d20f0..c1aca97cc3 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.h
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h
@@ -1,5 +1,6 @@
#pragma once
#include "batch_slice.h"
+#include "stats.h"
#include <ydb/core/tx/columnshard/counters/indexation.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
@@ -12,10 +13,10 @@ namespace NKikimr::NOlap {
class TRBSplitLimiter {
private:
std::deque<TBatchSerializedSlice> Slices;
- const NColumnShard::TIndexationCounters Counters;
+ std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
std::shared_ptr<arrow::RecordBatch> Batch;
public:
- TRBSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters,
+ TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters,
ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch);
bool Next(std::vector<std::vector<TOrderedColumnChunk>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch);
diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp
index e12746c6f8..98f704b151 100644
--- a/ydb/core/tx/columnshard/splitter/simple.cpp
+++ b/ydb/core/tx/columnshard/splitter/simple.cpp
@@ -2,48 +2,50 @@
namespace NKikimr::NOlap {
-std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::Array> data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const {
+std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::Array> data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const {
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field});
auto batch = arrow::RecordBatch::Make(schema, data->length(), {data});
return Split(batch, maxBlobSize);
}
-std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::RecordBatch> data, const ui32 maxBlobSize) const {
+std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(std::shared_ptr<arrow::RecordBatch> data, const ui32 maxBlobSize) const {
Y_VERIFY(data->num_columns() == 1);
- ui32 splitFactor = 1;
+ ui64 splitFactor = Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1;
while (true) {
Y_VERIFY(splitFactor < 100);
std::vector<TSaverSplittedChunk> result;
result.reserve(splitFactor);
bool isCorrect = true;
+ ui64 serializedDataBytes = 0;
if (splitFactor == 1) {
TString blob = ColumnSaver.Apply(data);
isCorrect = blob.size() < maxBlobSize;
+ serializedDataBytes += blob.size();
result.emplace_back(TSaverSplittedChunk(data, std::move(blob)));
} else {
- const ui32 sliceSize = data->num_rows() / splitFactor;
- Y_VERIFY(sliceSize);
- ui32 idx = 0;
- for (ui32 i = 0; i < data->num_rows();) {
- const ui32 sliceCurrentSize = (++idx == splitFactor) ? data->num_rows() - i : sliceSize;
- Y_VERIFY(sliceCurrentSize);
- auto slice = data->Slice(i, sliceCurrentSize);
+ TLinearSplitInfo linearSplitting = TSimpleSplitter::GetLinearSplittingByMax(data->num_rows(), data->num_rows() / splitFactor);
+ for (auto it = linearSplitting.StartIterator(); it.IsValid(); it.Next()) {
+ auto slice = data->Slice(it.GetPosition(), it.GetCurrentPackSize());
result.emplace_back(slice, ColumnSaver.Apply(slice));
+ serializedDataBytes += result.back().GetSerializedChunk().size();
if (result.back().GetSerializedChunk().size() >= maxBlobSize) {
isCorrect = false;
+ Y_VERIFY(!linearSplitting.IsMinimalGranularity());
break;
}
- i += slice->num_rows();
}
}
if (isCorrect) {
+ Counters->SimpleSplitter.OnCorrectSerialized(serializedDataBytes);
return result;
+ } else {
+ Counters->SimpleSplitter.OnTrashSerialized(serializedDataBytes);
}
++splitFactor;
}
}
-std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const {
+std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const {
std::vector<TSaverSplittedChunk> result;
ui64 position = 0;
for (auto&& i : recordsCount) {
@@ -55,7 +57,8 @@ std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::SplitByRecords
return result;
}
-std::vector<NKikimr::NOlap::TSaverSplittedChunk> TSimpleSplitter::SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const {
+std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const {
+ Counters->BySizeSplitter.OnTrashSerialized(dataSerialization.size());
auto splitPartSizesLocal = splitPartSizesExt;
Y_VERIFY(data);
{
diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h
index ba76e0c5aa..54fba342be 100644
--- a/ydb/core/tx/columnshard/splitter/simple.h
+++ b/ydb/core/tx/columnshard/splitter/simple.h
@@ -1,6 +1,8 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/counters/splitter.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
+#include "stats.h"
namespace NKikimr::NOlap {
@@ -20,18 +22,113 @@ public:
, SerializedChunk(std::move(serializedChunk)) {
}
+
+ bool IsCompatibleColumn(const std::shared_ptr<arrow::Field>& f) const {
+ if (!SlicedBatch) {
+ return false;
+ }
+ if (SlicedBatch->num_columns() != 1) {
+ return false;
+ }
+ if (!SlicedBatch->schema()->fields().front()->Equals(f)) {
+ return false;
+ }
+ return true;
+ }
+};
+
+class TLinearSplitInfo {
+private:
+ YDB_READONLY(ui64, PacksCount, 0);
+ YDB_READONLY(ui64, PackSize, 0);
+ YDB_READONLY(ui64, ObjectsCount, 0);
+public:
+ bool IsMinimalGranularity() const {
+ return PackSize == 1;
+ }
+
+ TLinearSplitInfo(const ui64 packsCount, const ui64 packSize, const ui64 objectsCount)
+ : PacksCount(packsCount)
+ , PackSize(packSize)
+ , ObjectsCount(objectsCount)
+ {
+ Y_VERIFY(objectsCount >= packsCount);
+ Y_VERIFY(PackSize);
+ Y_VERIFY(PacksCount);
+ }
+
+ class TIterator {
+ private:
+ const TLinearSplitInfo& Owner;
+ YDB_READONLY(ui64, Position, 0);
+ YDB_READONLY(ui64, CurrentPackSize, 0);
+ ui64 PackIdx = 0;
+ void InitPack() {
+ CurrentPackSize = (PackIdx + 1 == Owner.GetPacksCount()) ? Owner.ObjectsCount - Position : Owner.GetPackSize();
+ }
+ public:
+ explicit TIterator(const TLinearSplitInfo& owner)
+ : Owner(owner)
+ {
+ InitPack();
+ }
+
+ bool IsValid() const {
+ if (Position < Owner.GetObjectsCount() && PackIdx < Owner.GetPacksCount()) {
+ return true;
+ } else {
+ Y_VERIFY(Position == Owner.GetObjectsCount() && PackIdx == Owner.GetPacksCount());
+ return false;
+ }
+ }
+
+ bool Next() {
+ Y_VERIFY(IsValid());
+ Position += CurrentPackSize;
+ ++PackIdx;
+ InitPack();
+ return IsValid();
+ }
+ };
+
+ TIterator StartIterator() const {
+ return TIterator(*this);
+ }
};
class TSimpleSplitter {
private:
TColumnSaver ColumnSaver;
+ YDB_ACCESSOR_DEF(std::optional<TColumnSerializationStat>, Stats);
+ std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
public:
- TSimpleSplitter(const TColumnSaver& columnSaver)
+ explicit TSimpleSplitter(const TColumnSaver& columnSaver, std::shared_ptr<NColumnShard::TSplitterCounters> counters)
: ColumnSaver(columnSaver)
+ , Counters(counters)
{
}
+ static TLinearSplitInfo GetOptimalLinearSplitting(const ui64 objectsCount, const i64 optimalPackSizeExt) {
+ const i64 optimalPackSize = optimalPackSizeExt ? optimalPackSizeExt : 1;
+ const ui32 countPacksMax = std::max<ui32>(1, (ui32)floor(1.0 * objectsCount / optimalPackSize));
+ const ui32 countPacksMin = std::max<ui32>(1, (ui32)ceil(1.0 * objectsCount / optimalPackSize));
+ const ui32 stepPackMax = objectsCount / countPacksMin;
+ const ui32 stepPackMin = objectsCount / countPacksMax;
+ if (std::abs(optimalPackSize - stepPackMax) > std::abs(optimalPackSize - stepPackMin)) {
+ return TLinearSplitInfo(countPacksMax, stepPackMin, objectsCount);
+ } else {
+ return TLinearSplitInfo(countPacksMin, stepPackMax, objectsCount);
+ }
+ }
+
+ static TLinearSplitInfo GetLinearSplittingByMax(const ui64 objectsCount, const ui64 maxPackSizeExt) {
+ const ui64 maxPackSize = maxPackSizeExt ? maxPackSizeExt : 1;
+ const ui32 countPacksMax = std::max<ui32>(1, (ui32)floor(1.0 * objectsCount / maxPackSize));
+ const ui32 stepPackMin = objectsCount / countPacksMax;
+ return TLinearSplitInfo(countPacksMax, stepPackMin, objectsCount);
+ }
+
std::vector<TSaverSplittedChunk> Split(std::shared_ptr<arrow::Array> data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const;
std::vector<TSaverSplittedChunk> Split(std::shared_ptr<arrow::RecordBatch> data, const ui32 maxBlobSize) const;
std::vector<TSaverSplittedChunk> SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const;
diff --git a/ydb/core/tx/columnshard/splitter/splitter.cpp b/ydb/core/tx/columnshard/splitter/splitter.cpp
index eb22d088b6..273300a099 100644
--- a/ydb/core/tx/columnshard/splitter/splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/splitter.cpp
@@ -59,18 +59,12 @@ bool TSplitLimiter::Next(std::vector<TString>& portionBlobs, std::shared_ptr<arr
auto array = currentBatch->GetColumnByName(columnName);
Y_VERIFY(array);
auto columnSaver = Schema->GetColumnSaver(columnSummary.GetColumnId(), saverContext);
- TString blob = TPortionInfo::SerializeColumn(array, field, columnSaver);
+ TString blob = columnSaver.Apply(array, field);
if (blob.size() >= TCompactionLimits::MAX_BLOB_SIZE) {
- Counters.TrashDataSerializationBytes->Add(blob.size());
- Counters.TrashDataSerialization->Add(1);
- Counters.TrashDataSerializationHistogramBytes->Collect(blob.size());
const double kffNew = 1.0 * ExpectedBlobSize / blob.size() * ReduceCorrectionKff;
CurrentStepRecordsCount = currentBatch->num_rows() * kffNew;
Y_VERIFY(CurrentStepRecordsCount);
break;
- } else {
- Counters.CorrectDataSerializationBytes->Add(blob.size());
- Counters.CorrectDataSerialization->Add(1);
}
portionBlobs[idx] = std::move(blob);
diff --git a/ydb/core/tx/columnshard/splitter/stats.cpp b/ydb/core/tx/columnshard/splitter/stats.cpp
new file mode 100644
index 0000000000..1f2f835dce
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/stats.cpp
@@ -0,0 +1,21 @@
+#include "stats.h"
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+
+namespace NKikimr::NOlap {
+
+std::optional<NKikimr::NOlap::TColumnSerializationStat> TSerializationStats::GetStatsForRecordBatch(const std::shared_ptr<arrow::RecordBatch>& rb) const {
+ std::optional<TColumnSerializationStat> result;
+ for (auto&& i : rb->schema()->fields()) {
+ auto columnInfo = GetColumnInfo(i->name());
+ if (!columnInfo) {
+ return {};
+ } else if (!result) {
+ result = *columnInfo;
+ } else {
+ result->Add(*columnInfo);
+ }
+ }
+ return result;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/splitter/stats.h b/ydb/core/tx/columnshard/splitter/stats.h
new file mode 100644
index 0000000000..38d1f12b8d
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/stats.h
@@ -0,0 +1,95 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+
+#include <library/cpp/actors/core/log.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+#include <optional>
+#include <map>
+#include <deque>
+#include <string>
+#include <memory>
+
+namespace NKikimr::NOlap {
+
+class TColumnSerializationStat {
+private:
+ YDB_READONLY(ui64, SerializedBytes, 0);
+ YDB_READONLY(ui64, RecordsCount, 0);
+public:
+ TColumnSerializationStat(const ui64 bytes, const ui64 recordsCount)
+ : SerializedBytes(bytes)
+ , RecordsCount(recordsCount)
+ {
+ Y_VERIFY(RecordsCount);
+ }
+
+ TColumnSerializationStat RecalcForRecordsCount(const ui64 recordsCount) const {
+ return TColumnSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount);
+ }
+
+ void Add(const TColumnSerializationStat& item) {
+ SerializedBytes += item.SerializedBytes;
+ AFL_VERIFY(RecordsCount == item.RecordsCount)("self_count", RecordsCount)("new_count", item.RecordsCount);
+ }
+
+ std::optional<ui64> PredictOptimalPackRecordsCount(const ui64 recordsCount, const ui64 blobSize) const {
+ if (!RecordsCount) {
+ return {};
+ }
+ const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes;
+ if (fullSize < blobSize) {
+ return recordsCount;
+ } else {
+ return std::floor(1.0 * fullSize / blobSize * recordsCount);
+ }
+ }
+
+ std::optional<ui64> PredictOptimalSplitFactor(const ui64 recordsCount, const ui64 blobSize) const {
+ if (!RecordsCount) {
+ return {};
+ }
+ const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes;
+ if (fullSize < blobSize) {
+ return 1;
+ } else {
+ return std::floor(1.0 * fullSize / blobSize);
+ }
+ }
+};
+
+class TSerializationStats {
+private:
+ std::deque<TColumnSerializationStat> ColumnStat;
+ std::map<ui32, TColumnSerializationStat*> StatsByColumnId;
+ std::map<std::string, TColumnSerializationStat*> StatsByColumnName;
+public:
+ void AddStat(const ui32 columnId, const std::string& fieldName, const TColumnSerializationStat& info) {
+ ColumnStat.emplace_back(info);
+ StatsByColumnId.emplace(columnId, &ColumnStat.back());
+ StatsByColumnName.emplace(fieldName, &ColumnStat.back());
+ }
+
+ std::optional<TColumnSerializationStat> GetColumnInfo(const ui32 columnId) const {
+ auto it = StatsByColumnId.find(columnId);
+ if (it == StatsByColumnId.end()) {
+ return {};
+ } else {
+ return *it->second;
+ }
+ }
+
+ std::optional<TColumnSerializationStat> GetColumnInfo(const std::string& columnName) const {
+ auto it = StatsByColumnName.find(columnName);
+ if (it == StatsByColumnName.end()) {
+ return {};
+ } else {
+ return *it->second;
+ }
+ }
+
+ std::optional<TColumnSerializationStat> GetStatsForRecordBatch(const std::shared_ptr<arrow::RecordBatch>& rb) const;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
index 280fa41144..cf2956872a 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
@@ -19,6 +19,13 @@ Y_UNIT_TEST_SUITE(Splitter) {
return NKikimr::NOlap::TColumnSaver(nullptr, std::make_shared<NKikimr::NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults()));
}
+ virtual std::optional<NKikimr::NOlap::TColumnSerializationStat> GetColumnSerializationStats(const ui32 /*columnId*/) const override {
+ return {};
+ }
+ virtual std::optional<NKikimr::NOlap::TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& /*rb*/) const override {
+ return {};
+ }
+
NKikimr::NOlap::TColumnLoader GetColumnLoader(const ui32 columnId) const {
arrow::FieldVector v = {std::make_shared<arrow::Field>(GetColumnName(columnId), std::make_shared<arrow::StringType>())};
auto schema = std::make_shared<arrow::Schema>(v);
@@ -54,7 +61,7 @@ Y_UNIT_TEST_SUITE(Splitter) {
void Execute(std::shared_ptr<arrow::RecordBatch> batch) {
NKikimr::NColumnShard::TIndexationCounters counters("test");
- NKikimr::NOlap::TRBSplitLimiter limiter(nullptr, counters, Schema, batch);
+ NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch);
std::vector<std::vector<NKikimr::NOlap::TOrderedColumnChunk>> chunksForBlob;
std::map<std::string, std::vector<std::shared_ptr<arrow::RecordBatch>>> restoredBatch;
std::vector<ui64> blobsSize;
diff --git a/ydb/core/tx/columnshard/splitter/ya.make b/ydb/core/tx/columnshard/splitter/ya.make
index 4d4ca24b75..29e68d508e 100644
--- a/ydb/core/tx/columnshard/splitter/ya.make
+++ b/ydb/core/tx/columnshard/splitter/ya.make
@@ -6,6 +6,7 @@ SRCS(
chunks.cpp
simple.cpp
rb_splitter.cpp
+ stats.cpp
)
PEERDIR(