aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-08-21 11:21:28 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-08-21 13:10:08 +0300
commit439b2883e8cdd59e53080d3b9dbd834685af591c (patch)
treef3762a8663e40283f1ac6af8293003ae40c4b555
parent17c28a1edb9012b519f5397ad24512fa206da9f1 (diff)
downloadydb-439b2883e8cdd59e53080d3b9dbd834685af591c.tar.gz
KIKIMR-19091: correct blobs construction for restore and simple merge. provide settings into splitter
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.h23
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.h3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.h14
-rw-r--r--ydb/core/tx/columnshard/engines/changes/split_compaction.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp77
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.h62
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h7
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.cpp143
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.h140
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h1
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.cpp44
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h46
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.cpp6
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h60
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.cpp36
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.h3
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp15
23 files changed, 502 insertions, 249 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
index c39f6ce0178..122c4a126dc 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
@@ -27,6 +27,7 @@ class TBackgroundActivity;
namespace NKikimr::NOlap {
class TColumnEngineForLogs;
class TVersionedIndex;
+class TPortionInfoWithBlobs;
struct TCompactionLimits {
static constexpr const ui64 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant
@@ -55,6 +56,13 @@ struct TCompactionLimits {
i64 GranuleSizeForOverloadPrevent = WARNING_OVERLOAD_GRANULE_SIZE;
i64 GranuleIndexedPortionsSizeLimit = WARNING_INSERTED_PORTIONS_SIZE;
ui32 GranuleIndexedPortionsCountLimit = WARNING_INSERTED_PORTIONS_COUNT;
+
+ TSplitSettings GetSplitSettings() const {
+ return TSplitSettings()
+ .SetMinBlobSize(0.5 * std::min<ui64>(MAX_BLOB_SIZE, GranuleSizeForOverloadPrevent))
+ .SetMaxBlobSize(std::min<ui64>(MAX_BLOB_SIZE, GranuleSizeForOverloadPrevent))
+ .SetMaxPortionSize(0.5 * GranuleSizeForOverloadPrevent);
+ }
};
struct TPortionEvictionFeatures {
@@ -252,21 +260,6 @@ public:
}
return sameBlobRanges;
}
-
- /// Returns blob-ranges grouped by blob-id.
- static THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GroupedBlobRanges(const std::vector<std::pair<TPortionInfoWithBlobs, TPortionEvictionFeatures>>& portions) {
- Y_VERIFY(portions.size());
-
- THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges;
- for (const auto& [portionInfo, _] : portions) {
- Y_VERIFY(!portionInfo.GetPortionInfo().Empty());
-
- for (const auto& rec : portionInfo.GetPortionInfo().Records) {
- sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange);
- }
- }
- return sameBlobRanges;
- }
};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index dcdc6bee7c3..bc21ea014dd 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -68,7 +68,6 @@ bool TCompactColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp
const ui64 granule = portionInfo.GetGranule();
const ui64 portion = portionInfo.GetPortion();
- // In case of race with eviction portion could become evicted
const TPortionInfo& oldInfo = self.GetGranulePtrVerified(granule)->GetPortionVerified(portion);
auto& granuleStart = self.Granules[granule]->Record.Mark;
@@ -80,7 +79,6 @@ bool TCompactColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp
self.ColumnsTable->Write(context.DB, portionInfo, record);
}
}
- // Move portions in granules (zero-copy switch + append into new granules)
for (auto& [info, dstGranule] : PortionsToMove) {
const auto& portionInfo = info;
@@ -140,9 +138,10 @@ void TCompactColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self,
}
THashMap<ui64, ui64> TCompactColumnEngineChanges::TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const {
+ Y_VERIFY(SrcGranule || TmpGranuleIds.empty());
THashMap<ui64, ui64> granuleRemap;
for (const auto& [mark, counter] : TmpGranuleIds) {
- if (mark == SrcGranule.Mark) {
+ if (mark == SrcGranule->Mark) {
Y_VERIFY(!counter);
granuleRemap[counter] = GranuleMeta->GetGranuleId();
} else {
@@ -200,7 +199,8 @@ ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark)
}
TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule)
- : Limits(limits)
+ : TBase(limits.GetSplitSettings())
+ , Limits(limits)
, GranuleMeta(granule)
, SrcGranule(srcGranule)
{
@@ -216,6 +216,22 @@ TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits
Y_VERIFY(SwitchedPortions.size());
}
+TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions)
+ : TBase(limits.GetSplitSettings())
+ , Limits(limits)
+ , GranuleMeta(granule)
+{
+// Y_VERIFY(GranuleMeta);
+
+ SwitchedPortions.reserve(portions.size());
+ for (const auto& [_, portionInfo] : portions) {
+ Y_VERIFY(portionInfo->IsActive());
+ SwitchedPortions.push_back(*portionInfo);
+ Y_VERIFY(!GranuleMeta || portionInfo->GetGranule() == GranuleMeta->GetGranuleId());
+ }
+ Y_VERIFY(SwitchedPortions.size());
+}
+
TCompactColumnEngineChanges::~TCompactColumnEngineChanges() {
Y_VERIFY_DEBUG(!NActors::TlsActivationContext || !NeedGranuleStatusProvide);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h
index 856de3af642..7148eb92990 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.h
@@ -15,7 +15,7 @@ private:
protected:
const TCompactionLimits Limits;
std::shared_ptr<TGranuleMeta> GranuleMeta;
- TCompactionSrcGranule SrcGranule;
+ std::optional<TCompactionSrcGranule> SrcGranule;
virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
@@ -42,6 +42,7 @@ public:
virtual THashSet<ui64> GetTouchedGranules() const override;
+ TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions);
TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule);
~TCompactColumnEngineChanges();
diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
index cbb153eefef..8a38ef827de 100644
--- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
@@ -183,10 +183,11 @@ void TInGranuleCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TC
}
void TInGranuleCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
+ Y_VERIFY(SrcGranule);
TBase::DoStart(self);
auto& g = *GranuleMeta;
self.CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
- Y_VERIFY(InitInGranuleMerge(SrcGranule.Mark, SwitchedPortions, Limits, MergeBorders).Ok());
+ Y_VERIFY(InitInGranuleMerge(SrcGranule->Mark, SwitchedPortions, Limits, MergeBorders).Ok());
}
NColumnShard::ECumulativeCounters TInGranuleCompactColumnEngineChanges::GetCounterIndex(const bool isSuccess) const {
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index cfdaad8484e..9426d81cab9 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -44,10 +44,9 @@ bool TInsertColumnEngineChanges::AddPathIfNotExists(ui64 pathId) {
return false;
}
- Y_VERIFY(FirstGranuleId && ReservedGranuleIds);
+ Y_VERIFY(FirstGranuleId);
ui64 granule = FirstGranuleId;
++FirstGranuleId;
- --ReservedGranuleIds;
NewGranules.emplace(granule, std::make_pair(pathId, DefaultMark));
PathToGranule[pathId].emplace_back(DefaultMark, granule);
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h
index 5fa9d713f26..058a2d9c2e5 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.h
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.h
@@ -11,6 +11,7 @@ private:
using TBase = TChangesWithAppend;
std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const TIndexInfo& indexInfo, const TInsertedData& inserted) const;
+ std::vector<NOlap::TInsertedData> DataToIndex;
protected:
virtual void DoStart(NColumnShard::TColumnShard& self) override;
@@ -23,14 +24,17 @@ protected:
public:
const TMark DefaultMark;
THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule}
- std::vector<NOlap::TInsertedData> DataToIndex;
- ui32 ReservedGranuleIds{ 0 };
THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs;
public:
- TInsertColumnEngineChanges(const TMark& defaultMark)
- : DefaultMark(defaultMark)
- {
+ TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings)
+ : TBase(splitSettings)
+ , DataToIndex(std::move(dataToIndex))
+ , DefaultMark(defaultMark) {
+
+ }
+ const std::vector<NOlap::TInsertedData>& GetDataToIndex() const {
+ return DataToIndex;
}
virtual THashSet<ui64> GetTouchedGranules() const override {
diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
index d9579a7f83a..779b608f3c0 100644
--- a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
@@ -5,8 +5,9 @@
namespace NKikimr::NOlap {
TConclusionStatus TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
+ Y_VERIFY(SrcGranule);
const ui64 pathId = GranuleMeta->GetPathId();
- const TMark ts0 = SrcGranule.Mark;
+ const TMark ts0 = SrcGranule->Mark;
std::vector<TPortionInfo>& portions = SwitchedPortions;
std::vector<std::pair<TMark, ui64>> tsIds;
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index 073823a663e..3120ad55beb 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -11,15 +11,23 @@ void TTTLColumnEngineChanges::DoDebugString(TStringOutput& out) const {
TBase::DoDebugString(out);
if (ui32 evicted = PortionsToEvict.size()) {
out << "evict " << evicted << " portions";
- for (auto& [portionInfo, evictionFeatures] : PortionsToEvict) {
- out << portionInfo << " (to " << evictionFeatures.TargetTierName << ")";
+ for (auto& info : PortionsToEvict) {
+ out << info.GetActualPortionInfo() << " (to " << info.GetFeatures().TargetTierName << ")";
}
out << "; ";
}
}
THashMap<NKikimr::NOlap::TUnifiedBlobId, std::vector<NKikimr::NOlap::TBlobRange>> TTTLColumnEngineChanges::GetGroupedBlobRanges() const {
- return GroupedBlobRanges(PortionsToEvict);
+ Y_VERIFY(PortionsToEvict.size());
+
+ THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges;
+ for (auto&& p : PortionsToEvict) {
+ for (const auto& rec : p.GetPortionInfo().Records) {
+ sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange);
+ }
+ }
+ return sameBlobRanges;
}
bool TTTLColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
@@ -27,8 +35,8 @@ bool TTTLColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyC
return false;
}
- for (auto& [portionInfoWithBlobs, _] : PortionsToEvict) {
- auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
+ for (auto& info : PortionsToEvict) {
+ auto& portionInfo = info.GetActualPortionInfo();
const ui64 granule = portionInfo.GetGranule();
const ui64 portion = portionInfo.GetPortion();
if (!self.IsPortionExists(granule, portion)) {
@@ -56,8 +64,9 @@ void TTTLColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWr
THashSet<TUnifiedBlobId> protectedBlobs;
self.IncCounter(NColumnShard::COUNTER_EVICTION_PORTIONS_WRITTEN, PortionsToEvict.size());
- for (auto& [portionInfoWithBlobs, evictionFeatures] : PortionsToEvict) {
- auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
+ for (const auto& info : PortionsToEvict) {
+ const auto& portionInfo = info.GetPortionWithBlobs().GetPortionInfo();
+ const auto& evictionFeatures = info.GetFeatures();
// Mark exported blobs
if (evictionFeatures.NeedExport) {
auto& tierName = portionInfo.GetMeta().GetTierName();
@@ -125,8 +134,8 @@ void TTTLColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWr
void TTTLColumnEngineChanges::DoCompile(TFinalizationContext& context) {
TBase::DoCompile(context);
- for (auto& [portionInfo, _] : PortionsToEvict) {
- portionInfo.GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::EVICTED);
+ for (auto& info : PortionsToEvict) {
+ info.GetPortionWithBlobs().GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::EVICTED);
}
}
@@ -157,10 +166,10 @@ void TTTLColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& s
self.IncCounter(NColumnShard::COUNTER_EVICTION_BYTES_WRITTEN, context.BytesWritten);
}
-bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portionInfoWithBlobs, TPortionEvictionFeatures& evictFeatures,
- const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords,
+bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords,
TConstructionContext& context) const {
- TPortionInfo& portionInfo = portionInfoWithBlobs.GetPortionInfo();
+ const TPortionInfo& portionInfo = info.GetPortionInfo();
+ auto& evictFeatures = info.GetFeatures();
Y_VERIFY(portionInfo.GetMeta().GetTierName() != evictFeatures.TargetTierName);
auto* tiering = Tiering.FindPtr(evictFeatures.PathId);
@@ -168,8 +177,9 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio
auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
if (!compression) {
// Noting to recompress. We have no other kinds of evictions yet.
- portionInfo.MutableMeta().SetTierName(evictFeatures.TargetTierName);
evictFeatures.DataChanges = false;
+ info.SetPortionWithBlobs(TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs));
+ info.GetPortionWithBlobs().GetPortionInfo().MutableMeta().SetTierName(evictFeatures.TargetTierName);
return true;
}
@@ -184,37 +194,18 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio
TSaverContext saverContext;
saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression);
- portionInfo.Records.clear();
- for (auto& rec : undo.Records) {
- auto pos = resultSchema->GetFieldIndex(rec.ColumnId);
- Y_VERIFY(pos >= 0);
- auto field = resultSchema->GetFieldByIndex(pos);
-
- TString blob;
- std::shared_ptr<arrow::RecordBatch> rb;
- {
- auto it = srcBlobs.find(rec.BlobRange);
- Y_VERIFY(it != srcBlobs.end());
- rb = NArrow::TStatusValidator::GetValid(resultSchema->GetColumnLoader(rec.ColumnId)->Apply(it->second));
- auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext);
- blob = columnSaver.Apply(rb);
- }
- Y_VERIFY(rb->num_columns() == 1);
- if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
- return false;
- }
- if (portionInfoWithBlobs.GetBlobs().empty() || portionInfoWithBlobs.GetBlobs().back().GetSize() + blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
- portionInfoWithBlobs.StartBlob(0).AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo());
- } else {
- portionInfoWithBlobs.GetBlobs().back().AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo());
- }
+ auto withBlobs = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs);
+ withBlobs.GetPortionInfo().MutableMeta().SetTierName(evictFeatures.TargetTierName);
+ std::optional<TPortionInfoWithBlobs> actualPortion = withBlobs.ChangeSaver(resultSchema, saverContext);
+ if (!actualPortion) {
+ return false;
}
+ info.SetPortionWithBlobs(std::move(*actualPortion));
for (auto& rec : undo.Records) {
evictedRecords.emplace_back(std::move(rec));
}
- portionInfoWithBlobs.GetPortionInfo().AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName);
return true;
}
@@ -226,13 +217,13 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi
auto baseResult = TBase::DoConstructBlobs(context);
Y_VERIFY(baseResult.Ok());
- std::vector<std::pair<TPortionInfoWithBlobs, TPortionEvictionFeatures>> evicted;
+ std::vector<TPortionForEviction> evicted;
evicted.reserve(PortionsToEvict.size());
- for (auto& [portionInfo, evictFeatures] : PortionsToEvict) {
- if (UpdateEvictedPortion(portionInfo, evictFeatures, Blobs, EvictedRecords, context)) {
- Y_VERIFY(portionInfo.GetPortionInfo().GetMeta().GetTierName() == evictFeatures.TargetTierName);
- evicted.emplace_back(std::move(portionInfo), evictFeatures);
+ for (auto&& info : PortionsToEvict) {
+ if (UpdateEvictedPortion(info, Blobs, EvictedRecords, context)) {
+ Y_VERIFY(info.GetPortionWithBlobs().GetPortionInfo().GetMeta().GetTierName() == info.GetFeatures().TargetTierName);
+ evicted.emplace_back(std::move(info));
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h
index d8689df4c97..fa9f7675a5b 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.h
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.h
@@ -11,10 +11,56 @@ private:
THashMap<TString, TPathIdBlobs> ExportTierBlobs;
ui64 ExportNo = 0;
- bool UpdateEvictedPortion(TPortionInfoWithBlobs& portionInfo,
- TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
+ class TPortionForEviction {
+ private:
+ TPortionInfo PortionInfo;
+ TPortionEvictionFeatures Features;
+ std::optional<TPortionInfoWithBlobs> PortionWithBlobs;
+ public:
+ TPortionForEviction(const TPortionInfo& portion, TPortionEvictionFeatures&& features)
+ : PortionInfo(portion)
+ , Features(std::move(features))
+ {
+
+ }
+
+ TPortionEvictionFeatures& GetFeatures() {
+ return Features;
+ }
+
+ const TPortionEvictionFeatures& GetFeatures() const {
+ return Features;
+ }
+
+ const TPortionInfo& GetPortionInfo() const {
+ Y_VERIFY(!PortionWithBlobs);
+ return PortionInfo;
+ }
+
+ void SetPortionWithBlobs(TPortionInfoWithBlobs&& data) {
+ Y_VERIFY(!PortionWithBlobs);
+ PortionWithBlobs = std::move(data);
+ }
+
+ TPortionInfoWithBlobs& GetPortionWithBlobs() {
+ Y_VERIFY(PortionWithBlobs);
+ return *PortionWithBlobs;
+ }
+
+ const TPortionInfoWithBlobs& GetPortionWithBlobs() const {
+ Y_VERIFY(PortionWithBlobs);
+ return *PortionWithBlobs;
+ }
+
+ const TPortionInfo& GetActualPortionInfo() const {
+ return PortionWithBlobs ? PortionWithBlobs->GetPortionInfo() : PortionInfo;
+ }
+ };
+
+ bool UpdateEvictedPortion(TPortionForEviction& info, const THashMap<TBlobRange, TString>& srcBlobs,
std::vector<TColumnRecord>& evictedRecords, TConstructionContext& context) const;
- std::vector<std::pair<TPortionInfoWithBlobs, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures}
+
+ std::vector<TPortionForEviction> PortionsToEvict; // {portion, TPortionEvictionFeatures}
protected:
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
@@ -32,8 +78,8 @@ public:
}
virtual THashSet<ui64> GetTouchedGranules() const override {
auto result = TBase::GetTouchedGranules();
- for (const auto& [portionInfo, _] : PortionsToEvict) {
- result.emplace(portionInfo.GetPortionInfo().GetGranule());
+ for (auto&& info : PortionsToEvict) {
+ result.emplace(info.GetPortionInfo().GetGranule());
}
return result;
}
@@ -45,7 +91,7 @@ public:
void AddPortionToEvict(const TPortionInfo& info, TPortionEvictionFeatures&& features) {
Y_VERIFY(!info.Empty());
Y_VERIFY(info.IsActive());
- PortionsToEvict.emplace_back(TPortionInfoWithBlobs(info, 0), std::move(features));
+ PortionsToEvict.emplace_back(info, std::move(features));
}
ui32 GetPortionsToEvictCount() const {
@@ -57,11 +103,11 @@ public:
}
virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override {
Y_VERIFY(index < PortionsToEvict.size());
- return &PortionsToEvict[index].first;
+ return &PortionsToEvict[index].GetPortionWithBlobs();
}
virtual bool NeedWritePortion(const ui32 index) const override {
Y_VERIFY(index < PortionsToEvict.size());
- return PortionsToEvict[index].second.DataChanges;
+ return PortionsToEvict[index].GetFeatures().DataChanges;
}
virtual TString TypeString() const override {
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 71a61f013d7..62d05cff2b0 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -98,24 +98,13 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(
stats = granuleMeta->BuildSerializationStats(resultSchema);
}
auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, saverContext, std::move(stats));
- TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch);
+ TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings);
std::vector<std::vector<TOrderedColumnChunk>> portionBlobs;
std::shared_ptr<arrow::RecordBatch> portionBatch;
while (limiter.Next(portionBlobs, portionBatch)) {
- TPortionInfo portionInfo(granule, 0, snapshot);
- portionInfo.AddMetadata(*resultSchema, portionBatch, tierName);
-
- TPortionInfoWithBlobs infoWithBlob(std::move(portionInfo), portionBlobs.size());
- std::map<ui32, ui32> chunkIds;
- THashMap<TBlobRange, TString> srcBlobs;
- for (auto& blob : portionBlobs) {
- auto& blobInfo = infoWithBlob.StartBlob(blob.size());
- for (auto&& chunk : blob) {
- const TString data = chunk.GetData();
- srcBlobs.emplace(blobInfo.AddChunk(infoWithBlob, std::move(chunk), resultSchema->GetIndexInfo()).BlobRange, data);
- }
- }
+ TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(portionBlobs, portionBatch, granule, snapshot, resultSchema->GetIndexInfo());
+ infoWithBlob.GetPortionInfo().AddMetadata(*resultSchema, portionBatch, tierName);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_appended", infoWithBlob.GetPortionInfo().DebugString());
out.emplace_back(std::move(infoWithBlob));
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index e05e261a767..1189289effe 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -9,6 +9,7 @@ namespace NKikimr::NOlap {
class TChangesWithAppend: public TColumnEngineChanges {
private:
THashMap<ui64, NOlap::TTiering> TieringInfo;
+ TSplitSettings SplitSettings;
protected:
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoCompile(TFinalizationContext& context) override;
@@ -24,6 +25,12 @@ protected:
const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const;
public:
+ TChangesWithAppend(const TSplitSettings& splitSettings)
+ : SplitSettings(splitSettings)
+ {
+
+ }
+
virtual THashSet<ui64> GetTouchedGranules() const override {
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 5f3efa477bb..ddd910bcf6b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -41,13 +41,6 @@ std::shared_ptr<NKikimr::NOlap::TTTLColumnEngineChanges> TColumnEngineForLogs::T
return std::make_shared<TTTLColumnEngineChanges>();
}
-std::shared_ptr<NKikimr::NOlap::TInsertColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildInsertChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot) {
- auto changes = std::make_shared<TInsertColumnEngineChanges>(defaultMark);
- changes->DataToIndex = std::move(blobsToIndex);
- changes->InitSnapshot = initSnapshot;
- return changes;
-}
-
TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, limits))
, TabletId(tabletId)
@@ -275,9 +268,9 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept {
Y_VERIFY(dataToIndex.size());
- auto changes = TChangesConstructor::BuildInsertChanges(DefaultMark(), std::move(dataToIndex), LastSnapshot);
+ auto changes = std::make_shared<TInsertColumnEngineChanges>(DefaultMark(), std::move(dataToIndex), TSplitSettings());
ui32 reserveGranules = 0;
- for (const auto& data : changes->DataToIndex) {
+ for (const auto& data : changes->GetDataToIndex()) {
const ui64 pathId = data.PathId;
if (changes->PathToGranule.contains(pathId)) {
@@ -302,7 +295,6 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st
if (reserveGranules) {
changes->FirstGranuleId = LastGranule + 1;
- changes->ReservedGranuleIds = reserveGranules;
LastGranule += reserveGranules;
}
@@ -330,7 +322,7 @@ std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompacti
std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size());
- auto changes = TChangesConstructor::BuildCleanupChanges(snapshot);
+ auto changes = std::make_shared<TCleanupColumnEngineChanges>();
ui32 affectedRecords = 0;
// Add all portions from dropped paths
@@ -544,7 +536,7 @@ std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const TH
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size())
("internal", EvictionsController.MutableNextCheckInstantForTierings().size())
;
- auto changes = TChangesConstructor::BuildTtlChanges();
+ auto changes = std::make_shared<TTTLColumnEngineChanges>();
TTieringProcessContext context(maxEvictBytes, changes, busyGranules);
bool hasExternalChanges = false;
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
index 56d228ce938..3cb47f5aa6f 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
@@ -1,24 +1,151 @@
#include "with_blobs.h"
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
namespace NKikimr::NOlap {
+void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, TSimpleOrderedColumnChunk&& chunk) {
+ Y_VERIFY(!ResultBlob);
+ Size += chunk.GetData().size();
+ auto address = chunk.GetChunkAddress();
+ Y_VERIFY(owner.GetPortionInfo().GetRecordPointer(address));
+ Y_VERIFY(ChunksOrdered.empty() || ChunksOrdered.back()->GetOffset() < chunk.GetOffset());
+ auto dataInsert = Chunks.emplace(address, std::move(chunk));
+ Y_VERIFY(dataInsert.second);
+ ChunksOrdered.emplace_back(&dataInsert.first->second);
+}
+
const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info) {
Y_VERIFY(!ResultBlob);
- const ui32 columnId = chunk.GetColumnId();
- TColumnRecord rec(TChunkAddress(columnId, owner.ColumnChunkIds[columnId]++), chunk.GetColumn(), info);
- rec.BlobRange.Offset = Size;
+ auto rec = TColumnRecord(chunk.GetChunkAddress(), chunk.GetColumn(), info);
+
+ Y_VERIFY(chunk.GetOffset() == Size);
+ rec.BlobRange.Offset = chunk.GetOffset();
rec.BlobRange.Size = chunk.GetData().size();
- auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec));
Size += chunk.GetData().size();
- Chunks.emplace_back(std::move(chunk));
+
+ auto dataInsert = Chunks.emplace(rec.GetAddress(), std::move(chunk));
+ Y_VERIFY(dataInsert.second);
+ ChunksOrdered.emplace_back(&dataInsert.first->second);
+ auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec));
return result;
}
void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) {
- Y_VERIFY(StartRecordsIndex + Chunks.size() <= owner.PortionInfo.Records.size());
- for (ui32 idx = 0; idx < Chunks.size(); ++idx) {
- owner.PortionInfo.Records[StartRecordsIndex + idx].BlobRange.BlobId = blobId;
+ auto it = owner.PortionInfo.Records.begin();
+ for (auto&& i : Chunks) {
+ bool found = false;
+ for (; it != owner.PortionInfo.Records.end(); ++it) {
+ if (it->ColumnId == i.first.GetColumnId() && it->Chunk == i.first.GetChunk()) {
+ it->BlobRange.BlobId = blobId;
+ found = true;
+ break;
+ }
+ }
+ AFL_VERIFY(found)("address", i.second.DebugString());
+ }
+}
+
+std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnapshotSchema& data, const ISnapshotSchema& result) const {
+ if (!CachedBatch) {
+ THashMap<TBlobRange, TString> blobs;
+ for (auto&& i : PortionInfo.Records) {
+ blobs[i.BlobRange] = GetBlobByRangeVerified(i.ColumnId, i.Chunk);
+ Y_VERIFY(blobs[i.BlobRange].size() == i.BlobRange.Size);
+ }
+ CachedBatch = PortionInfo.AssembleInBatch(data, result, blobs);
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(*CachedBatch, result.GetIndexInfo().GetReplaceKey()));
+ }
+ return *CachedBatch;
+}
+
+NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::RestorePortion(const TPortionInfo& portion, const THashMap<TBlobRange, TString>& blobs) {
+ TPortionInfoWithBlobs result(portion);
+ const auto pred = [](const TColumnRecord& l, const TColumnRecord& r) {
+ return l.GetAddress() < r.GetAddress();
+ };
+ std::sort(result.PortionInfo.Records.begin(), result.PortionInfo.Records.end(), pred);
+
+ THashMap<TUnifiedBlobId, std::vector<const TColumnRecord*>> recordsByBlob;
+ for (auto&& c : result.PortionInfo.Records) {
+ auto& blobRecords = recordsByBlob[c.BlobRange.BlobId];
+ blobRecords.emplace_back(&c);
+ }
+
+ const auto predOffset = [](const TColumnRecord* l, const TColumnRecord* r) {
+ return l->BlobRange.Offset < r->BlobRange.Offset;
+ };
+
+ for (auto&& i : recordsByBlob) {
+ std::sort(i.second.begin(), i.second.end(), predOffset);
+ auto builder = result.StartBlob();
+ for (auto&& d : i.second) {
+ auto itBlob = blobs.find(d->BlobRange);
+ Y_VERIFY(itBlob != blobs.end());
+ builder.RestoreChunk(TSimpleOrderedColumnChunk(d->GetAddress(), d->BlobRange.Offset, itBlob->second));
+ }
}
+ return result;
+}
+
+std::vector<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::RestorePortions(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) {
+ std::vector<TPortionInfoWithBlobs> result;
+ for (auto&& i : portions) {
+ result.emplace_back(RestorePortion(i, blobs));
+ }
+ return result;
+}
+
+NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<std::vector<TOrderedColumnChunk>>& chunksByBlobs,
+ std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, const TIndexInfo& info)
+{
+ TPortionInfoWithBlobs result(TPortionInfo(granule, 0, snapshot), batch);
+ for (auto& blob : chunksByBlobs) {
+ auto blobInfo = result.StartBlob();
+ for (auto&& chunk : blob) {
+ blobInfo.AddChunk(std::move(chunk), info);
+ }
+ }
+
+ const auto pred = [](const TColumnRecord& l, const TColumnRecord& r) {
+ return l.GetAddress() < r.GetAddress();
+ };
+ std::sort(result.GetPortionInfo().Records.begin(), result.GetPortionInfo().Records.end(), pred);
+ return result;
+}
+
+std::optional<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::ChangeSaver(ISnapshotSchema::TPtr currentSchema, const TSaverContext& saverContext) const {
+ TPortionInfoWithBlobs result(PortionInfo, CachedBatch);
+ result.PortionInfo.Records.clear();
+ std::optional<TPortionInfoWithBlobs::TBlobInfo::TBuilder> bBuilder;
+ ui64 offset = 0;
+ for (auto& rec : PortionInfo.Records) {
+ auto field = currentSchema->GetFieldByColumnIdVerified(rec.ColumnId);
+
+ const TString blobOriginal = GetBlobByRangeVerified(rec.ColumnId, rec.Chunk);
+ {
+ auto rb = NArrow::TStatusValidator::GetValid(currentSchema->GetColumnLoader(rec.ColumnId)->Apply(blobOriginal));
+ auto columnSaver = currentSchema->GetColumnSaver(rec.ColumnId, saverContext);
+ const TString newBlob = columnSaver.Apply(rb);
+ if (newBlob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
+ return {};
+ }
+ if (!bBuilder || result.GetBlobs().back().GetSize() + newBlob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
+ bBuilder = result.StartBlob();
+ offset = 0;
+ }
+ Y_VERIFY(rb);
+ Y_VERIFY(rb->num_columns() == 1);
+
+ bBuilder->AddChunk(TOrderedColumnChunk(rec.GetAddress(), offset, newBlob, rb->column(0)), currentSchema->GetIndexInfo());
+ offset += newBlob.size();
+ }
+ }
+ const auto pred = [](const TColumnRecord& l, const TColumnRecord& r) {
+ return l.GetAddress() < r.GetAddress();
+ };
+ std::sort(result.PortionInfo.Records.begin(), result.PortionInfo.Records.end(), pred);
+
+ return result;
}
}
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
index 80fb9cbb88d..da7f3ffa7ad 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
@@ -10,97 +10,118 @@ class TPortionInfoWithBlobs {
public:
class TBlobInfo {
private:
+ using TBlobChunks = std::map<TChunkAddress, TSimpleOrderedColumnChunk>;
YDB_READONLY(ui64, Size, 0);
- YDB_READONLY_DEF(std::vector<TOrderedColumnChunk>, Chunks);
- const ui32 StartRecordsIndex;
+ YDB_READONLY_DEF(TBlobChunks, Chunks);
+ std::vector<const TSimpleOrderedColumnChunk*> ChunksOrdered;
mutable std::optional<TString> ResultBlob;
+ const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info);
+ void RestoreChunk(const TPortionInfoWithBlobs& owner, TSimpleOrderedColumnChunk&& chunk);
public:
- explicit TBlobInfo(const ui32 predictedCount, TPortionInfoWithBlobs& owner)
- : StartRecordsIndex(owner.GetPortionInfo().Records.size())
- {
- Chunks.reserve(predictedCount);
- }
+ class TBuilder {
+ private:
+ TBlobInfo* OwnerBlob;
+ TPortionInfoWithBlobs* OwnerPortion;
+ public:
+ TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion)
+ : OwnerBlob(&blob)
+ , OwnerPortion(&portion)
+ {
+
+ }
+ const TColumnRecord& AddChunk(TOrderedColumnChunk&& chunk, const TIndexInfo& info) {
+ return OwnerBlob->AddChunk(*OwnerPortion, std::move(chunk), info);
+ }
+ void RestoreChunk(TSimpleOrderedColumnChunk&& chunk) {
+ OwnerBlob->RestoreChunk(*OwnerPortion, std::move(chunk));
+ }
+ };
const TString& GetBlob() const {
if (!ResultBlob) {
TString result;
result.reserve(Size);
- for (auto&& i : Chunks) {
- result.append(i.GetData());
+ for (auto&& i : ChunksOrdered) {
+ result.append(i->GetData());
}
ResultBlob = std::move(result);
}
return *ResultBlob;
}
- const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info);
-
void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId);
};
private:
- std::map<ui32, ui32> ColumnChunkIds;
TPortionInfo PortionInfo;
YDB_READONLY_DEF(std::vector<TBlobInfo>, Blobs);
mutable std::optional<std::shared_ptr<arrow::RecordBatch>> CachedBatch;
+
+ explicit TPortionInfoWithBlobs(TPortionInfo&& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {})
+ : PortionInfo(std::move(portionInfo))
+ , CachedBatch(batch) {
+ }
+
+ explicit TPortionInfoWithBlobs(const TPortionInfo& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {})
+ : PortionInfo(portionInfo)
+ , CachedBatch(batch) {
+ }
+
+ void SetPortionInfo(const TPortionInfo& portionInfo) {
+ PortionInfo = portionInfo;
+ }
+
+ TBlobInfo::TBuilder StartBlob() {
+ Blobs.emplace_back(TBlobInfo());
+ return TBlobInfo::TBuilder(Blobs.back(), *this);
+ }
+
public:
- std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema& data, const ISnapshotSchema& result) const {
- if (!CachedBatch) {
- THashMap<ui32, ui32> chunkIds;
- THashMap<std::pair<ui32, ui32>, TString> blobsByColumnChunk;
- for (auto&& b : Blobs) {
- for (auto&& i : b.GetChunks()) {
- Y_VERIFY(blobsByColumnChunk.emplace(std::make_pair(i.GetColumnId(), chunkIds[i.GetColumnId()]++), i.GetData()).second);
- }
- }
- THashMap<TBlobRange, TString> blobs;
- for (auto&& i : PortionInfo.Records) {
- auto it = blobsByColumnChunk.find(std::make_pair(i.ColumnId, i.Chunk));
- Y_VERIFY(it != blobsByColumnChunk.end());
- blobs[i.BlobRange] = it->second;
- }
- CachedBatch = PortionInfo.AssembleInBatch(data, result, blobs);
- }
- return *CachedBatch;
+ static std::vector<TPortionInfoWithBlobs> RestorePortions(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs);
+ static TPortionInfoWithBlobs RestorePortion(const TPortionInfo& portions, const THashMap<TBlobRange, TString>& blobs);
+
+ std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema& data, const ISnapshotSchema& result) const;
+
+ ui64 GetSize() const {
+ return PortionInfo.BlobsBytes();
}
+ static TPortionInfoWithBlobs BuildByBlobs(std::vector<std::vector<TOrderedColumnChunk>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch,
+ const ui64 granule, const TSnapshot& snapshot, const TIndexInfo& info);
+
+ std::optional<TPortionInfoWithBlobs> ChangeSaver(ISnapshotSchema::TPtr currentSchema, const TSaverContext& saverContext) const;
+
const TString& GetBlobByRangeVerified(const ui32 columnId, const ui32 chunkId) const {
- ui32 columnChunk = 0;
for (auto&& b : Blobs) {
- for (auto&& i : b.GetChunks()) {
- if (i.GetColumnId() == columnId) {
- if (columnChunk == chunkId) {
- return i.GetData();
- }
- ++columnChunk;
- }
+ auto it = b.GetChunks().find(TChunkAddress(columnId, chunkId));
+ if (it == b.GetChunks().end()) {
+ continue;
+ } else {
+ return it->second.GetData();
}
}
Y_VERIFY(false);
}
ui64 GetBlobFullSizeVerified(const ui32 columnId, const ui32 chunkId) const {
- ui32 columnChunk = 0;
for (auto&& b : Blobs) {
- for (auto&& i : b.GetChunks()) {
- if (i.GetColumnId() == columnId) {
- if (columnChunk == chunkId) {
- return b.GetSize();
- }
- ++columnChunk;
- }
+ auto it = b.GetChunks().find(TChunkAddress(columnId, chunkId));
+ if (it == b.GetChunks().end()) {
+ continue;
+ } else {
+ return b.GetSize();
}
}
Y_VERIFY(false);
}
- TString DebugString() const {
- return TStringBuilder() << PortionInfo.DebugString() << "blobs_count=" << Blobs.size() << ";";
- }
-
std::vector<TBlobInfo>& GetBlobs() {
return Blobs;
}
+ TString DebugString() const {
+ return TStringBuilder() << PortionInfo.DebugString() << ";blobs_count=" << Blobs.size() << ";";
+ }
+
const TPortionInfo& GetPortionInfo() const {
return PortionInfo;
}
@@ -109,25 +130,6 @@ public:
return PortionInfo;
}
- void SetPortionInfo(const TPortionInfo& portionInfo) {
- PortionInfo = portionInfo;
- }
-
- explicit TPortionInfoWithBlobs(TPortionInfo&& portionInfo, const ui32 predictedBlobsCount)
- : PortionInfo(portionInfo) {
- Blobs.reserve(predictedBlobsCount);
- }
-
- explicit TPortionInfoWithBlobs(const TPortionInfo& portionInfo, const ui32 predictedBlobsCount)
- : PortionInfo(portionInfo) {
- Blobs.reserve(predictedBlobsCount);
- }
-
- TBlobInfo& StartBlob(const ui32 blobChunksCount) {
- Blobs.emplace_back(TBlobInfo(blobChunksCount, *this));
- return Blobs.back();
- }
-
friend IOutputStream& operator << (IOutputStream& out, const TPortionInfoWithBlobs& info) {
out << info.DebugString();
return out;
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
index 2ca1d4a0f28..b109f8ee5f3 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
@@ -38,6 +38,11 @@ public:
virtual int GetFieldIndex(const ui32 columnId) const = 0;
std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const;
std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const;
+ std::shared_ptr<arrow::Field> GetFieldByColumnIdVerified(const ui32 columnId) const {
+ auto result = GetFieldByColumnId(columnId);
+ Y_VERIFY(result);
+ return result;
+ }
TString DebugString() const {
return DoDebugString();
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 20f0e21a7ec..62b034283fa 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -6,7 +6,6 @@
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/sys_view/common/schema.h>
-#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
#include <ydb/core/formats/arrow/dictionary/object.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
index bcbead4500e..2caadd2e212 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
@@ -13,16 +13,16 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
}
}
std::vector<TSplittedBlob> result;
- Y_VERIFY(TSplitSettings::MaxBlobSize >= 2 * TSplitSettings::MinBlobSize);
+ Y_VERIFY(Settings.GetMaxBlobSize() >= 2 * Settings.GetMinBlobSize());
while (chunksInProgress.size()) {
- ui64 fullSize = 0;
+ i64 fullSize = 0;
for (auto&& i : chunksInProgress) {
fullSize += i.GetSize();
}
- if (fullSize < TSplitSettings::MaxBlobSize) {
+ if (fullSize < Settings.GetMaxBlobSize()) {
result.emplace_back(TSplittedBlob());
for (auto&& i : chunksInProgress) {
- Y_VERIFY(result.back().Take(i));
+ result.back().Take(i);
}
chunksInProgress.clear();
break;
@@ -30,37 +30,40 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
bool hasNoSplitChanges = true;
while (hasNoSplitChanges) {
hasNoSplitChanges = false;
- ui64 partSize = 0;
+ i64 partSize = 0;
for (ui32 i = 0; i < chunksInProgress.size(); ++i) {
- const ui64 nextPartSize = partSize + chunksInProgress[i].GetSize();
- const ui64 nextOtherSize = fullSize - nextPartSize;
- const ui64 otherSize = fullSize - partSize;
- if (nextPartSize >= TSplitSettings::MaxBlobSize || nextOtherSize < TSplitSettings::MinBlobSize) {
- Y_VERIFY(otherSize >= TSplitSettings::MinBlobSize);
- Y_VERIFY(partSize < TSplitSettings::MaxBlobSize);
- if (partSize >= TSplitSettings::MinBlobSize) {
+ const i64 nextPartSize = partSize + chunksInProgress[i].GetSize();
+ const i64 nextOtherSize = fullSize - nextPartSize;
+ const i64 otherSize = fullSize - partSize;
+ if (nextPartSize >= Settings.GetMaxBlobSize() || nextOtherSize < Settings.GetMinBlobSize()) {
+ Y_VERIFY(otherSize >= Settings.GetMinBlobSize());
+ Y_VERIFY(partSize < Settings.GetMaxBlobSize());
+ if (partSize >= Settings.GetMinBlobSize()) {
result.emplace_back(TSplittedBlob());
for (ui32 chunk = 0; chunk < i; ++chunk) {
- Y_VERIFY(result.back().Take(chunksInProgress[chunk]));
+ result.back().Take(chunksInProgress[chunk]);
}
+ Counters->BySizeSplitter.OnCorrectSerialized(result.back().GetSize());
chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i);
hasNoSplitChanges = true;
} else {
- Y_VERIFY(chunksInProgress[i].GetSize() > TSplitSettings::MinBlobSize - partSize);
- Y_VERIFY(otherSize - (TSplitSettings::MinBlobSize - partSize) >= TSplitSettings::MinBlobSize);
- chunksInProgress[i].AddSplit(TSplitSettings::MinBlobSize - partSize);
+ Y_VERIFY(chunksInProgress[i].GetSize() > Settings.GetMinBlobSize() - partSize);
+ Y_VERIFY(otherSize - (Settings.GetMinBlobSize() - partSize) >= Settings.GetMinBlobSize());
+ chunksInProgress[i].AddSplit(Settings.GetMinBlobSize() - partSize);
+ Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i].GetSize());
std::vector<TSplittedColumnChunk> newChunks = chunksInProgress[i].InternalSplit(Schema->GetColumnSaver(chunksInProgress[i].GetColumnId()), Counters);
chunksInProgress.erase(chunksInProgress.begin() + i);
chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end());
TSplittedBlob newBlob;
for (ui32 chunk = 0; chunk <= i; ++chunk) {
- Y_VERIFY(newBlob.Take(chunksInProgress[chunk]));
+ newBlob.Take(chunksInProgress[chunk]);
}
- if (newBlob.GetSize() < TSplitSettings::MaxBlobSize) {
+ if (newBlob.GetSize() < Settings.GetMaxBlobSize()) {
chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i + 1);
result.emplace_back(std::move(newBlob));
+ Counters->BySizeSplitter.OnCorrectSerialized(result.back().GetSize());
}
}
break;
@@ -73,10 +76,11 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
return true;
}
-TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters)
+TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings)
: Schema(schema)
, Batch(batch)
, Counters(counters)
+ , Settings(settings)
{
Y_VERIFY(batch);
RecordsCount = batch->num_rows();
@@ -93,7 +97,7 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch>
auto stats = schema->GetColumnSerializationStats(c.GetColumnId());
TSimpleSplitter splitter(columnSaver, Counters);
splitter.SetStats(stats);
- c.SetBlobs(splitter.Split(i, c.GetField(), TSplitSettings::MaxBlobSize));
+ c.SetBlobs(splitter.Split(i, c.GetField(), Settings.GetMaxBlobSize()));
Size += c.GetSize();
++idx;
}
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index da3c5db1f2b..2098a7e9342 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -20,6 +20,42 @@ public:
virtual std::optional<TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& rb) const = 0;
};
+template <class TContainer>
+class TArrayView {
+private:
+ typename TContainer::iterator Begin;
+ typename TContainer::iterator End;
+public:
+ TArrayView(typename TContainer::iterator itBegin, typename TContainer::iterator itEnd)
+ : Begin(itBegin)
+ , End(itEnd) {
+
+ }
+
+ typename TContainer::iterator begin() {
+ return Begin;
+ }
+
+ typename TContainer::iterator end() {
+ return End;
+ }
+
+ typename TContainer::value_type& front() {
+ return *Begin;
+ }
+
+ typename TContainer::value_type& operator[](const size_t index) {
+ return *(Begin + index);
+ }
+
+ size_t size() {
+ return End - Begin;
+ }
+};
+
+template <class TObject>
+using TVectorView = TArrayView<std::vector<TObject>>;
+
class TDefaultSchemaDetails: public ISchemaDetailInfo {
private:
ISnapshotSchema::TPtr Schema;
@@ -55,8 +91,16 @@ private:
ISchemaDetailInfo::TPtr Schema;
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
+ TSplitSettings Settings;
public:
- TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters);
+ explicit TBatchSerializedSlice(TVectorView<TBatchSerializedSlice>&& objects) {
+ Y_VERIFY(objects.size());
+ std::swap(*this, objects.front());
+ for (ui32 i = 1; i < objects.size(); ++i) {
+ MergeSlice(std::move(objects[i]));
+ }
+ }
+ TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings);
void MergeSlice(TBatchSerializedSlice&& slice);
diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp
index fb6f7f66196..5f4bae55b8a 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.cpp
+++ b/ydb/core/tx/columnshard/splitter/chunks.cpp
@@ -14,8 +14,12 @@ std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TCol
return newChunks;
}
+TString TSimpleOrderedColumnChunk::DebugString() const {
+ return TStringBuilder() << "address=" << ChunkAddress.DebugString() << ";data_size=" << Data.size() << ";";
+}
+
TString TOrderedColumnChunk::DebugString() const {
- return TStringBuilder() << "column_id=" << ColumnId << ";data_size=" << Data.size() << ";records_count=" << Column->length() << ";data=" << NArrow::DebugJson(Column, 3, 3) << ";";
+ return TStringBuilder() << TBase::DebugString() << "records_count=" << Column->length() << ";data=" << NArrow::DebugJson(Column, 3, 3) << ";";
}
}
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index 921e13229d8..126f4c9e518 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -1,6 +1,7 @@
#pragma once
#include "simple.h"
#include <ydb/core/tx/columnshard/counters/splitter.h>
+#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
@@ -8,11 +9,16 @@
namespace NKikimr::NOlap {
class TSplitSettings {
+private:
+ static const inline i64 DefaultMaxBlobSize = 8 * 1024 * 1024;
+ static const inline i64 DefaultMinBlobSize = 4 * 1024 * 1024;
+ static const inline i64 DefaultMinRecordsCount = 10000;
+ static const inline i64 DefaultMaxPortionSize = 4 * DefaultMaxBlobSize;
+ YDB_ACCESSOR(i64, MaxBlobSize, DefaultMaxBlobSize);
+ YDB_ACCESSOR(i64, MinBlobSize, DefaultMinBlobSize);
+ YDB_ACCESSOR(i64, MinRecordsCount, DefaultMinRecordsCount);
+ YDB_ACCESSOR(i64, MaxPortionSize, DefaultMaxPortionSize);
public:
- static const inline i64 MaxBlobSize = 8 * 1024 * 1024;
- static const inline i64 MaxBlobSizeWithGap = 7 * 1024 * 1024;
- static const inline i64 MinBlobSize = 4 * 1024 * 1024;
- static const inline i64 MinRecordsCount = 10000;
};
class TSplittedColumn;
@@ -28,7 +34,7 @@ public:
}
std::vector<TSplittedColumnChunk> InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters);
- ui64 GetSize() const {
+ i64 GetSize() const {
return Data.GetSerializedChunk().size();
}
@@ -87,26 +93,45 @@ public:
class TSplittedBlob {
private:
- YDB_READONLY(ui64, Size, 0);
+ YDB_READONLY(i64, Size, 0);
YDB_READONLY_DEF(std::vector<TSplittedColumnChunk>, Chunks);
public:
- bool Take(const TSplittedColumnChunk& chunk) {
- if (Size + chunk.GetSize() < TSplitSettings::MaxBlobSize) {
- Chunks.emplace_back(chunk);
- Size += chunk.GetSize();
- return true;
- }
- return false;
+ void Take(const TSplittedColumnChunk& chunk) {
+ Chunks.emplace_back(chunk);
+ Size += chunk.GetSize();
}
bool operator<(const TSplittedBlob& item) const {
return Size > item.Size;
}
};
-class TOrderedColumnChunk {
+class TSimpleOrderedColumnChunk {
private:
- YDB_READONLY(ui32, ColumnId, 0);
+ TChunkAddress ChunkAddress;
YDB_READONLY_DEF(TString, Data);
+ YDB_READONLY(ui64, Offset, 0);
+public:
+ ui64 GetSize() const {
+ return Data.size();
+ }
+
+ const TChunkAddress& GetChunkAddress() const {
+ return ChunkAddress;
+ }
+
+ TSimpleOrderedColumnChunk(const TChunkAddress& chunkAddress, const ui64 offset, const TString& data)
+ : ChunkAddress(chunkAddress)
+ , Data(std::move(data))
+ , Offset(offset) {
+
+ }
+
+ TString DebugString() const;
+};
+
+class TOrderedColumnChunk: public TSimpleOrderedColumnChunk {
+private:
+ using TBase = TSimpleOrderedColumnChunk;
std::shared_ptr<arrow::Array> Column;
public:
std::shared_ptr<arrow::Array> GetColumn() const {
@@ -117,9 +142,8 @@ public:
return Column->length();
}
- TOrderedColumnChunk(const ui32 columnId, const TString& data, std::shared_ptr<arrow::Array> column)
- : ColumnId(columnId)
- , Data(std::move(data))
+ TOrderedColumnChunk(const TChunkAddress& chunkAddress, const ui64 offset, const TString& data, std::shared_ptr<arrow::Array> column)
+ : TBase(chunkAddress, offset, data)
, Column(column)
{
Y_VERIFY(Column);
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
index 7892859ff47..537d80a5ab5 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
@@ -13,22 +13,22 @@ public:
}
template <class TObject>
- std::vector<ui32> Split(const std::vector<TObject>& objects) {
+ std::vector<TVectorView<TObject>> Split(std::vector<TObject>& objects) {
ui64 fullSize = 0;
for (auto&& i : objects) {
fullSize += i.GetSize();
}
if (fullSize <= BottomLimit) {
- return {(ui32)objects.size()};
+ return {TVectorView<TObject>(objects.begin(), objects.end())};
}
ui64 currentSize = 0;
ui64 currentStart = 0;
- std::vector<ui32> result;
+ std::vector<TVectorView<TObject>> result;
for (ui32 i = 0; i < objects.size(); ++i) {
const ui64 nextSize = currentSize + objects[i].GetSize();
const ui64 nextOtherSize = fullSize - nextSize;
if ((nextSize >= BottomLimit && nextOtherSize >= BottomLimit) || (i + 1 == objects.size())) {
- result.emplace_back(i - currentStart + 1);
+ result.emplace_back(TVectorView<TObject>(objects.begin() + currentStart, objects.begin() + i + 1));
currentSize = 0;
currentStart = i + 1;
} else {
@@ -40,33 +40,32 @@ public:
};
TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, ISchemaDetailInfo::TPtr schemaInfo,
- const std::shared_ptr<arrow::RecordBatch> batch)
+ const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings)
: Counters(counters)
, Batch(batch)
+ , Settings(settings)
{
Y_VERIFY(Batch->num_rows());
std::vector<TBatchSerializedSlice> slices;
auto stats = schemaInfo->GetBatchSerializationStats(Batch);
- ui32 recordsCount = TSplitSettings::MinRecordsCount;
+ ui32 recordsCount = Settings.GetMinRecordsCount();
if (stats) {
- const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), TSplitSettings::MinBlobSize).value_or(recordsCount);
- recordsCount = std::max(recordsCount, recordsCountForMinSize);
+ const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMinBlobSize()).value_or(recordsCount);
+ const ui32 recordsCountForMaxPortionSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMaxPortionSize()).value_or(recordsCount);
+ recordsCount = std::min(recordsCountForMaxPortionSize, std::max(recordsCount, recordsCountForMinSize));
}
auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(Batch->num_rows(), recordsCount);
for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) {
std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize());
- TBatchSerializedSlice slice(current, schemaInfo, Counters);
+ TBatchSerializedSlice slice(current, schemaInfo, Counters, settings);
slices.emplace_back(std::move(slice));
}
- const std::vector<ui32> chunks = TSimilarSlicer(TSplitSettings::MinBlobSize).Split(slices);
+ auto chunks = TSimilarSlicer(Settings.GetMinBlobSize()).Split(slices);
ui32 chunkStartPosition = 0;
- for (auto&& i : chunks) {
- Slices.emplace_back(std::move(slices[chunkStartPosition]));
- for (ui32 pos = chunkStartPosition + 1; pos < chunkStartPosition + i; ++pos) {
- Slices.back().MergeSlice(std::move(slices[pos]));
- }
- chunkStartPosition += i;
+ for (auto&& spanObjects : chunks) {
+ Slices.emplace_back(TBatchSerializedSlice(std::move(spanObjects)));
+ chunkStartPosition += spanObjects.size();
}
Y_VERIFY(chunkStartPosition == slices.size());
ui32 recordsCountCheck = 0;
@@ -83,6 +82,7 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio
std::vector<TSplittedBlob> blobs;
Slices.front().GroupBlobs(blobs);
std::vector<std::vector<TOrderedColumnChunk>> result;
+ std::map<ui32, ui32> columnChunks;
for (auto&& i : blobs) {
if (blobs.size() == 1) {
Counters->MonoBlobs.OnBlobData(i.GetSize());
@@ -90,8 +90,10 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio
Counters->SplittedBlobs.OnBlobData(i.GetSize());
}
std::vector<TOrderedColumnChunk> chunksForBlob;
+ ui64 offset = 0;
for (auto&& c : i.GetChunks()) {
- chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetSerializedChunk(), c.GetData().GetColumn());
+ chunksForBlob.emplace_back(TChunkAddress(c.GetColumnId(), columnChunks[c.GetColumnId()]++), offset, c.GetData().GetSerializedChunk(), c.GetData().GetColumn());
+ offset += c.GetSize();
}
result.emplace_back(std::move(chunksForBlob));
}
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h
index c1aca97cc31..8f5ae022274 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.h
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h
@@ -15,9 +15,10 @@ private:
std::deque<TBatchSerializedSlice> Slices;
std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
std::shared_ptr<arrow::RecordBatch> Batch;
+ TSplitSettings Settings;
public:
TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters,
- ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch);
+ ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings);
bool Next(std::vector<std::vector<TOrderedColumnChunk>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch);
};
diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
index cf2956872a7..29bbfe307dd 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
@@ -61,10 +61,10 @@ Y_UNIT_TEST_SUITE(Splitter) {
void Execute(std::shared_ptr<arrow::RecordBatch> batch) {
NKikimr::NColumnShard::TIndexationCounters counters("test");
- NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch);
+ NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch, NKikimr::NOlap::TSplitSettings());
std::vector<std::vector<NKikimr::NOlap::TOrderedColumnChunk>> chunksForBlob;
std::map<std::string, std::vector<std::shared_ptr<arrow::RecordBatch>>> restoredBatch;
- std::vector<ui64> blobsSize;
+ std::vector<i64> blobsSize;
bool hasMultiSplit = false;
ui32 blobsCount = 0;
ui32 slicesCount = 0;
@@ -81,10 +81,11 @@ Y_UNIT_TEST_SUITE(Splitter) {
std::set<ui32> blobColumnChunks;
for (auto&& i : chunks) {
++chunksCount;
- recordsCountByColumn[i.GetColumnId()] += i.GetRecordsCount();
- restoredBatch[Schema->GetColumnName(i.GetColumnId())].emplace_back(*Schema->GetColumnLoader(i.GetColumnId()).Apply(i.GetData()));
+ const ui32 columnId = i.GetChunkAddress().GetColumnId();
+ recordsCountByColumn[columnId] += i.GetRecordsCount();
+ restoredBatch[Schema->GetColumnName(columnId)].emplace_back(*Schema->GetColumnLoader(columnId).Apply(i.GetData()));
blobSize += i.GetData().size();
- if (i.GetRecordsCount() != NKikimr::NOlap::TSplitSettings::MinRecordsCount && !blobColumnChunks.emplace(i.GetColumnId()).second) {
+ if (i.GetRecordsCount() != NKikimr::NOlap::TSplitSettings().GetMinRecordsCount() && !blobColumnChunks.emplace(columnId).second) {
hasMultiSplit = true;
}
sb << "(" << i.DebugString() << ")";
@@ -110,8 +111,8 @@ Y_UNIT_TEST_SUITE(Splitter) {
}
Y_VERIFY(hasMultiSplit == HasMultiSplit);
for (auto&& i : blobsSize) {
- Y_VERIFY(i < NKikimr::NOlap::TSplitSettings::MaxBlobSize);
- Y_VERIFY(i + 10000 >= NKikimr::NOlap::TSplitSettings::MinBlobSize || blobsSize.size() == 1);
+ Y_VERIFY(i < NKikimr::NOlap::TSplitSettings().GetMaxBlobSize());
+ Y_VERIFY(i + 10000 >= NKikimr::NOlap::TSplitSettings().GetMinBlobSize() || blobsSize.size() == 1);
}
Y_VERIFY(restoredBatch.size() == (ui32)batch->num_columns());
for (auto&& i : batch->schema()->fields()) {