diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-21 11:21:28 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-21 13:10:08 +0300 |
commit | 439b2883e8cdd59e53080d3b9dbd834685af591c (patch) | |
tree | f3762a8663e40283f1ac6af8293003ae40c4b555 | |
parent | 17c28a1edb9012b519f5397ad24512fa206da9f1 (diff) | |
download | ydb-439b2883e8cdd59e53080d3b9dbd834685af591c.tar.gz |
KIKIMR-19091: correct blobs construction for restore and simple merge. provide settings into splitter
23 files changed, 502 insertions, 249 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index c39f6ce0178..122c4a126dc 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -27,6 +27,7 @@ class TBackgroundActivity; namespace NKikimr::NOlap { class TColumnEngineForLogs; class TVersionedIndex; +class TPortionInfoWithBlobs; struct TCompactionLimits { static constexpr const ui64 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant @@ -55,6 +56,13 @@ struct TCompactionLimits { i64 GranuleSizeForOverloadPrevent = WARNING_OVERLOAD_GRANULE_SIZE; i64 GranuleIndexedPortionsSizeLimit = WARNING_INSERTED_PORTIONS_SIZE; ui32 GranuleIndexedPortionsCountLimit = WARNING_INSERTED_PORTIONS_COUNT; + + TSplitSettings GetSplitSettings() const { + return TSplitSettings() + .SetMinBlobSize(0.5 * std::min<ui64>(MAX_BLOB_SIZE, GranuleSizeForOverloadPrevent)) + .SetMaxBlobSize(std::min<ui64>(MAX_BLOB_SIZE, GranuleSizeForOverloadPrevent)) + .SetMaxPortionSize(0.5 * GranuleSizeForOverloadPrevent); + } }; struct TPortionEvictionFeatures { @@ -252,21 +260,6 @@ public: } return sameBlobRanges; } - - /// Returns blob-ranges grouped by blob-id. - static THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GroupedBlobRanges(const std::vector<std::pair<TPortionInfoWithBlobs, TPortionEvictionFeatures>>& portions) { - Y_VERIFY(portions.size()); - - THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges; - for (const auto& [portionInfo, _] : portions) { - Y_VERIFY(!portionInfo.GetPortionInfo().Empty()); - - for (const auto& rec : portionInfo.GetPortionInfo().Records) { - sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange); - } - } - return sameBlobRanges; - } }; } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index dcdc6bee7c3..bc21ea014dd 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -68,7 +68,6 @@ bool TCompactColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp const ui64 granule = portionInfo.GetGranule(); const ui64 portion = portionInfo.GetPortion(); - // In case of race with eviction portion could become evicted const TPortionInfo& oldInfo = self.GetGranulePtrVerified(granule)->GetPortionVerified(portion); auto& granuleStart = self.Granules[granule]->Record.Mark; @@ -80,7 +79,6 @@ bool TCompactColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp self.ColumnsTable->Write(context.DB, portionInfo, record); } } - // Move portions in granules (zero-copy switch + append into new granules) for (auto& [info, dstGranule] : PortionsToMove) { const auto& portionInfo = info; @@ -140,9 +138,10 @@ void TCompactColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, } THashMap<ui64, ui64> TCompactColumnEngineChanges::TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const { + Y_VERIFY(SrcGranule || TmpGranuleIds.empty()); THashMap<ui64, ui64> granuleRemap; for (const auto& [mark, counter] : TmpGranuleIds) { - if (mark == SrcGranule.Mark) { + if (mark == SrcGranule->Mark) { Y_VERIFY(!counter); granuleRemap[counter] = GranuleMeta->GetGranuleId(); } else { @@ -200,7 +199,8 @@ ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark) } TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule) - : Limits(limits) + : TBase(limits.GetSplitSettings()) + , Limits(limits) , GranuleMeta(granule) , SrcGranule(srcGranule) { @@ -216,6 +216,22 @@ TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits Y_VERIFY(SwitchedPortions.size()); } +TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions) + : TBase(limits.GetSplitSettings()) + , Limits(limits) + , GranuleMeta(granule) +{ +// Y_VERIFY(GranuleMeta); + + SwitchedPortions.reserve(portions.size()); + for (const auto& [_, portionInfo] : portions) { + Y_VERIFY(portionInfo->IsActive()); + SwitchedPortions.push_back(*portionInfo); + Y_VERIFY(!GranuleMeta || portionInfo->GetGranule() == GranuleMeta->GetGranuleId()); + } + Y_VERIFY(SwitchedPortions.size()); +} + TCompactColumnEngineChanges::~TCompactColumnEngineChanges() { Y_VERIFY_DEBUG(!NActors::TlsActivationContext || !NeedGranuleStatusProvide); } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index 856de3af642..7148eb92990 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -15,7 +15,7 @@ private: protected: const TCompactionLimits Limits; std::shared_ptr<TGranuleMeta> GranuleMeta; - TCompactionSrcGranule SrcGranule; + std::optional<TCompactionSrcGranule> SrcGranule; virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; @@ -42,6 +42,7 @@ public: virtual THashSet<ui64> GetTouchedGranules() const override; + TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions); TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule); ~TCompactColumnEngineChanges(); diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp index cbb153eefef..8a38ef827de 100644 --- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp @@ -183,10 +183,11 @@ void TInGranuleCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TC } void TInGranuleCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { + Y_VERIFY(SrcGranule); TBase::DoStart(self); auto& g = *GranuleMeta; self.CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); - Y_VERIFY(InitInGranuleMerge(SrcGranule.Mark, SwitchedPortions, Limits, MergeBorders).Ok()); + Y_VERIFY(InitInGranuleMerge(SrcGranule->Mark, SwitchedPortions, Limits, MergeBorders).Ok()); } NColumnShard::ECumulativeCounters TInGranuleCompactColumnEngineChanges::GetCounterIndex(const bool isSuccess) const { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index cfdaad8484e..9426d81cab9 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -44,10 +44,9 @@ bool TInsertColumnEngineChanges::AddPathIfNotExists(ui64 pathId) { return false; } - Y_VERIFY(FirstGranuleId && ReservedGranuleIds); + Y_VERIFY(FirstGranuleId); ui64 granule = FirstGranuleId; ++FirstGranuleId; - --ReservedGranuleIds; NewGranules.emplace(granule, std::make_pair(pathId, DefaultMark)); PathToGranule[pathId].emplace_back(DefaultMark, granule); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index 5fa9d713f26..058a2d9c2e5 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -11,6 +11,7 @@ private: using TBase = TChangesWithAppend; std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const TIndexInfo& indexInfo, const TInsertedData& inserted) const; + std::vector<NOlap::TInsertedData> DataToIndex; protected: virtual void DoStart(NColumnShard::TColumnShard& self) override; @@ -23,14 +24,17 @@ protected: public: const TMark DefaultMark; THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule} - std::vector<NOlap::TInsertedData> DataToIndex; - ui32 ReservedGranuleIds{ 0 }; THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs; public: - TInsertColumnEngineChanges(const TMark& defaultMark) - : DefaultMark(defaultMark) - { + TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings) + : TBase(splitSettings) + , DataToIndex(std::move(dataToIndex)) + , DefaultMark(defaultMark) { + + } + const std::vector<NOlap::TInsertedData>& GetDataToIndex() const { + return DataToIndex; } virtual THashSet<ui64> GetTouchedGranules() const override { diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp index d9579a7f83a..779b608f3c0 100644 --- a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp @@ -5,8 +5,9 @@ namespace NKikimr::NOlap { TConclusionStatus TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { + Y_VERIFY(SrcGranule); const ui64 pathId = GranuleMeta->GetPathId(); - const TMark ts0 = SrcGranule.Mark; + const TMark ts0 = SrcGranule->Mark; std::vector<TPortionInfo>& portions = SwitchedPortions; std::vector<std::pair<TMark, ui64>> tsIds; diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index 073823a663e..3120ad55beb 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -11,15 +11,23 @@ void TTTLColumnEngineChanges::DoDebugString(TStringOutput& out) const { TBase::DoDebugString(out); if (ui32 evicted = PortionsToEvict.size()) { out << "evict " << evicted << " portions"; - for (auto& [portionInfo, evictionFeatures] : PortionsToEvict) { - out << portionInfo << " (to " << evictionFeatures.TargetTierName << ")"; + for (auto& info : PortionsToEvict) { + out << info.GetActualPortionInfo() << " (to " << info.GetFeatures().TargetTierName << ")"; } out << "; "; } } THashMap<NKikimr::NOlap::TUnifiedBlobId, std::vector<NKikimr::NOlap::TBlobRange>> TTTLColumnEngineChanges::GetGroupedBlobRanges() const { - return GroupedBlobRanges(PortionsToEvict); + Y_VERIFY(PortionsToEvict.size()); + + THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges; + for (auto&& p : PortionsToEvict) { + for (const auto& rec : p.GetPortionInfo().Records) { + sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange); + } + } + return sameBlobRanges; } bool TTTLColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) { @@ -27,8 +35,8 @@ bool TTTLColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyC return false; } - for (auto& [portionInfoWithBlobs, _] : PortionsToEvict) { - auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); + for (auto& info : PortionsToEvict) { + auto& portionInfo = info.GetActualPortionInfo(); const ui64 granule = portionInfo.GetGranule(); const ui64 portion = portionInfo.GetPortion(); if (!self.IsPortionExists(granule, portion)) { @@ -56,8 +64,9 @@ void TTTLColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWr THashSet<TUnifiedBlobId> protectedBlobs; self.IncCounter(NColumnShard::COUNTER_EVICTION_PORTIONS_WRITTEN, PortionsToEvict.size()); - for (auto& [portionInfoWithBlobs, evictionFeatures] : PortionsToEvict) { - auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); + for (const auto& info : PortionsToEvict) { + const auto& portionInfo = info.GetPortionWithBlobs().GetPortionInfo(); + const auto& evictionFeatures = info.GetFeatures(); // Mark exported blobs if (evictionFeatures.NeedExport) { auto& tierName = portionInfo.GetMeta().GetTierName(); @@ -125,8 +134,8 @@ void TTTLColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWr void TTTLColumnEngineChanges::DoCompile(TFinalizationContext& context) { TBase::DoCompile(context); - for (auto& [portionInfo, _] : PortionsToEvict) { - portionInfo.GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::EVICTED); + for (auto& info : PortionsToEvict) { + info.GetPortionWithBlobs().GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::EVICTED); } } @@ -157,10 +166,10 @@ void TTTLColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& s self.IncCounter(NColumnShard::COUNTER_EVICTION_BYTES_WRITTEN, context.BytesWritten); } -bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portionInfoWithBlobs, TPortionEvictionFeatures& evictFeatures, - const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, +bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, TConstructionContext& context) const { - TPortionInfo& portionInfo = portionInfoWithBlobs.GetPortionInfo(); + const TPortionInfo& portionInfo = info.GetPortionInfo(); + auto& evictFeatures = info.GetFeatures(); Y_VERIFY(portionInfo.GetMeta().GetTierName() != evictFeatures.TargetTierName); auto* tiering = Tiering.FindPtr(evictFeatures.PathId); @@ -168,8 +177,9 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio auto compression = tiering->GetCompression(evictFeatures.TargetTierName); if (!compression) { // Noting to recompress. We have no other kinds of evictions yet. - portionInfo.MutableMeta().SetTierName(evictFeatures.TargetTierName); evictFeatures.DataChanges = false; + info.SetPortionWithBlobs(TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs)); + info.GetPortionWithBlobs().GetPortionInfo().MutableMeta().SetTierName(evictFeatures.TargetTierName); return true; } @@ -184,37 +194,18 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio TSaverContext saverContext; saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression); - portionInfo.Records.clear(); - for (auto& rec : undo.Records) { - auto pos = resultSchema->GetFieldIndex(rec.ColumnId); - Y_VERIFY(pos >= 0); - auto field = resultSchema->GetFieldByIndex(pos); - - TString blob; - std::shared_ptr<arrow::RecordBatch> rb; - { - auto it = srcBlobs.find(rec.BlobRange); - Y_VERIFY(it != srcBlobs.end()); - rb = NArrow::TStatusValidator::GetValid(resultSchema->GetColumnLoader(rec.ColumnId)->Apply(it->second)); - auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext); - blob = columnSaver.Apply(rb); - } - Y_VERIFY(rb->num_columns() == 1); - if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { - return false; - } - if (portionInfoWithBlobs.GetBlobs().empty() || portionInfoWithBlobs.GetBlobs().back().GetSize() + blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { - portionInfoWithBlobs.StartBlob(0).AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo()); - } else { - portionInfoWithBlobs.GetBlobs().back().AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo()); - } + auto withBlobs = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs); + withBlobs.GetPortionInfo().MutableMeta().SetTierName(evictFeatures.TargetTierName); + std::optional<TPortionInfoWithBlobs> actualPortion = withBlobs.ChangeSaver(resultSchema, saverContext); + if (!actualPortion) { + return false; } + info.SetPortionWithBlobs(std::move(*actualPortion)); for (auto& rec : undo.Records) { evictedRecords.emplace_back(std::move(rec)); } - portionInfoWithBlobs.GetPortionInfo().AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName); return true; } @@ -226,13 +217,13 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi auto baseResult = TBase::DoConstructBlobs(context); Y_VERIFY(baseResult.Ok()); - std::vector<std::pair<TPortionInfoWithBlobs, TPortionEvictionFeatures>> evicted; + std::vector<TPortionForEviction> evicted; evicted.reserve(PortionsToEvict.size()); - for (auto& [portionInfo, evictFeatures] : PortionsToEvict) { - if (UpdateEvictedPortion(portionInfo, evictFeatures, Blobs, EvictedRecords, context)) { - Y_VERIFY(portionInfo.GetPortionInfo().GetMeta().GetTierName() == evictFeatures.TargetTierName); - evicted.emplace_back(std::move(portionInfo), evictFeatures); + for (auto&& info : PortionsToEvict) { + if (UpdateEvictedPortion(info, Blobs, EvictedRecords, context)) { + Y_VERIFY(info.GetPortionWithBlobs().GetPortionInfo().GetMeta().GetTierName() == info.GetFeatures().TargetTierName); + evicted.emplace_back(std::move(info)); } } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index d8689df4c97..fa9f7675a5b 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -11,10 +11,56 @@ private: THashMap<TString, TPathIdBlobs> ExportTierBlobs; ui64 ExportNo = 0; - bool UpdateEvictedPortion(TPortionInfoWithBlobs& portionInfo, - TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, + class TPortionForEviction { + private: + TPortionInfo PortionInfo; + TPortionEvictionFeatures Features; + std::optional<TPortionInfoWithBlobs> PortionWithBlobs; + public: + TPortionForEviction(const TPortionInfo& portion, TPortionEvictionFeatures&& features) + : PortionInfo(portion) + , Features(std::move(features)) + { + + } + + TPortionEvictionFeatures& GetFeatures() { + return Features; + } + + const TPortionEvictionFeatures& GetFeatures() const { + return Features; + } + + const TPortionInfo& GetPortionInfo() const { + Y_VERIFY(!PortionWithBlobs); + return PortionInfo; + } + + void SetPortionWithBlobs(TPortionInfoWithBlobs&& data) { + Y_VERIFY(!PortionWithBlobs); + PortionWithBlobs = std::move(data); + } + + TPortionInfoWithBlobs& GetPortionWithBlobs() { + Y_VERIFY(PortionWithBlobs); + return *PortionWithBlobs; + } + + const TPortionInfoWithBlobs& GetPortionWithBlobs() const { + Y_VERIFY(PortionWithBlobs); + return *PortionWithBlobs; + } + + const TPortionInfo& GetActualPortionInfo() const { + return PortionWithBlobs ? PortionWithBlobs->GetPortionInfo() : PortionInfo; + } + }; + + bool UpdateEvictedPortion(TPortionForEviction& info, const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, TConstructionContext& context) const; - std::vector<std::pair<TPortionInfoWithBlobs, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures} + + std::vector<TPortionForEviction> PortionsToEvict; // {portion, TPortionEvictionFeatures} protected: virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; @@ -32,8 +78,8 @@ public: } virtual THashSet<ui64> GetTouchedGranules() const override { auto result = TBase::GetTouchedGranules(); - for (const auto& [portionInfo, _] : PortionsToEvict) { - result.emplace(portionInfo.GetPortionInfo().GetGranule()); + for (auto&& info : PortionsToEvict) { + result.emplace(info.GetPortionInfo().GetGranule()); } return result; } @@ -45,7 +91,7 @@ public: void AddPortionToEvict(const TPortionInfo& info, TPortionEvictionFeatures&& features) { Y_VERIFY(!info.Empty()); Y_VERIFY(info.IsActive()); - PortionsToEvict.emplace_back(TPortionInfoWithBlobs(info, 0), std::move(features)); + PortionsToEvict.emplace_back(info, std::move(features)); } ui32 GetPortionsToEvictCount() const { @@ -57,11 +103,11 @@ public: } virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override { Y_VERIFY(index < PortionsToEvict.size()); - return &PortionsToEvict[index].first; + return &PortionsToEvict[index].GetPortionWithBlobs(); } virtual bool NeedWritePortion(const ui32 index) const override { Y_VERIFY(index < PortionsToEvict.size()); - return PortionsToEvict[index].second.DataChanges; + return PortionsToEvict[index].GetFeatures().DataChanges; } virtual TString TypeString() const override { diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 71a61f013d7..62d05cff2b0 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -98,24 +98,13 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions( stats = granuleMeta->BuildSerializationStats(resultSchema); } auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, saverContext, std::move(stats)); - TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch); + TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings); std::vector<std::vector<TOrderedColumnChunk>> portionBlobs; std::shared_ptr<arrow::RecordBatch> portionBatch; while (limiter.Next(portionBlobs, portionBatch)) { - TPortionInfo portionInfo(granule, 0, snapshot); - portionInfo.AddMetadata(*resultSchema, portionBatch, tierName); - - TPortionInfoWithBlobs infoWithBlob(std::move(portionInfo), portionBlobs.size()); - std::map<ui32, ui32> chunkIds; - THashMap<TBlobRange, TString> srcBlobs; - for (auto& blob : portionBlobs) { - auto& blobInfo = infoWithBlob.StartBlob(blob.size()); - for (auto&& chunk : blob) { - const TString data = chunk.GetData(); - srcBlobs.emplace(blobInfo.AddChunk(infoWithBlob, std::move(chunk), resultSchema->GetIndexInfo()).BlobRange, data); - } - } + TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(portionBlobs, portionBatch, granule, snapshot, resultSchema->GetIndexInfo()); + infoWithBlob.GetPortionInfo().AddMetadata(*resultSchema, portionBatch, tierName); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_appended", infoWithBlob.GetPortionInfo().DebugString()); out.emplace_back(std::move(infoWithBlob)); } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index e05e261a767..1189289effe 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -9,6 +9,7 @@ namespace NKikimr::NOlap { class TChangesWithAppend: public TColumnEngineChanges { private: THashMap<ui64, NOlap::TTiering> TieringInfo; + TSplitSettings SplitSettings; protected: virtual void DoDebugString(TStringOutput& out) const override; virtual void DoCompile(TFinalizationContext& context) override; @@ -24,6 +25,12 @@ protected: const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const; public: + TChangesWithAppend(const TSplitSettings& splitSettings) + : SplitSettings(splitSettings) + { + + } + virtual THashSet<ui64> GetTouchedGranules() const override { return {}; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 5f3efa477bb..ddd910bcf6b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -41,13 +41,6 @@ std::shared_ptr<NKikimr::NOlap::TTTLColumnEngineChanges> TColumnEngineForLogs::T return std::make_shared<TTTLColumnEngineChanges>(); } -std::shared_ptr<NKikimr::NOlap::TInsertColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildInsertChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot) { - auto changes = std::make_shared<TInsertColumnEngineChanges>(defaultMark); - changes->DataToIndex = std::move(blobsToIndex); - changes->InitSnapshot = initSnapshot; - return changes; -} - TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits) : GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, limits)) , TabletId(tabletId) @@ -275,9 +268,9 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept { Y_VERIFY(dataToIndex.size()); - auto changes = TChangesConstructor::BuildInsertChanges(DefaultMark(), std::move(dataToIndex), LastSnapshot); + auto changes = std::make_shared<TInsertColumnEngineChanges>(DefaultMark(), std::move(dataToIndex), TSplitSettings()); ui32 reserveGranules = 0; - for (const auto& data : changes->DataToIndex) { + for (const auto& data : changes->GetDataToIndex()) { const ui64 pathId = data.PathId; if (changes->PathToGranule.contains(pathId)) { @@ -302,7 +295,6 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st if (reserveGranules) { changes->FirstGranuleId = LastGranule + 1; - changes->ReservedGranuleIds = reserveGranules; LastGranule += reserveGranules; } @@ -330,7 +322,7 @@ std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompacti std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size()); - auto changes = TChangesConstructor::BuildCleanupChanges(snapshot); + auto changes = std::make_shared<TCleanupColumnEngineChanges>(); ui32 affectedRecords = 0; // Add all portions from dropped paths @@ -544,7 +536,7 @@ std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const TH AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size()) ("internal", EvictionsController.MutableNextCheckInstantForTierings().size()) ; - auto changes = TChangesConstructor::BuildTtlChanges(); + auto changes = std::make_shared<TTTLColumnEngineChanges>(); TTieringProcessContext context(maxEvictBytes, changes, busyGranules); bool hasExternalChanges = false; diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp index 56d228ce938..3cb47f5aa6f 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp @@ -1,24 +1,151 @@ #include "with_blobs.h" +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> namespace NKikimr::NOlap { +void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, TSimpleOrderedColumnChunk&& chunk) { + Y_VERIFY(!ResultBlob); + Size += chunk.GetData().size(); + auto address = chunk.GetChunkAddress(); + Y_VERIFY(owner.GetPortionInfo().GetRecordPointer(address)); + Y_VERIFY(ChunksOrdered.empty() || ChunksOrdered.back()->GetOffset() < chunk.GetOffset()); + auto dataInsert = Chunks.emplace(address, std::move(chunk)); + Y_VERIFY(dataInsert.second); + ChunksOrdered.emplace_back(&dataInsert.first->second); +} + const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info) { Y_VERIFY(!ResultBlob); - const ui32 columnId = chunk.GetColumnId(); - TColumnRecord rec(TChunkAddress(columnId, owner.ColumnChunkIds[columnId]++), chunk.GetColumn(), info); - rec.BlobRange.Offset = Size; + auto rec = TColumnRecord(chunk.GetChunkAddress(), chunk.GetColumn(), info); + + Y_VERIFY(chunk.GetOffset() == Size); + rec.BlobRange.Offset = chunk.GetOffset(); rec.BlobRange.Size = chunk.GetData().size(); - auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec)); Size += chunk.GetData().size(); - Chunks.emplace_back(std::move(chunk)); + + auto dataInsert = Chunks.emplace(rec.GetAddress(), std::move(chunk)); + Y_VERIFY(dataInsert.second); + ChunksOrdered.emplace_back(&dataInsert.first->second); + auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec)); return result; } void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) { - Y_VERIFY(StartRecordsIndex + Chunks.size() <= owner.PortionInfo.Records.size()); - for (ui32 idx = 0; idx < Chunks.size(); ++idx) { - owner.PortionInfo.Records[StartRecordsIndex + idx].BlobRange.BlobId = blobId; + auto it = owner.PortionInfo.Records.begin(); + for (auto&& i : Chunks) { + bool found = false; + for (; it != owner.PortionInfo.Records.end(); ++it) { + if (it->ColumnId == i.first.GetColumnId() && it->Chunk == i.first.GetChunk()) { + it->BlobRange.BlobId = blobId; + found = true; + break; + } + } + AFL_VERIFY(found)("address", i.second.DebugString()); + } +} + +std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnapshotSchema& data, const ISnapshotSchema& result) const { + if (!CachedBatch) { + THashMap<TBlobRange, TString> blobs; + for (auto&& i : PortionInfo.Records) { + blobs[i.BlobRange] = GetBlobByRangeVerified(i.ColumnId, i.Chunk); + Y_VERIFY(blobs[i.BlobRange].size() == i.BlobRange.Size); + } + CachedBatch = PortionInfo.AssembleInBatch(data, result, blobs); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(*CachedBatch, result.GetIndexInfo().GetReplaceKey())); + } + return *CachedBatch; +} + +NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::RestorePortion(const TPortionInfo& portion, const THashMap<TBlobRange, TString>& blobs) { + TPortionInfoWithBlobs result(portion); + const auto pred = [](const TColumnRecord& l, const TColumnRecord& r) { + return l.GetAddress() < r.GetAddress(); + }; + std::sort(result.PortionInfo.Records.begin(), result.PortionInfo.Records.end(), pred); + + THashMap<TUnifiedBlobId, std::vector<const TColumnRecord*>> recordsByBlob; + for (auto&& c : result.PortionInfo.Records) { + auto& blobRecords = recordsByBlob[c.BlobRange.BlobId]; + blobRecords.emplace_back(&c); + } + + const auto predOffset = [](const TColumnRecord* l, const TColumnRecord* r) { + return l->BlobRange.Offset < r->BlobRange.Offset; + }; + + for (auto&& i : recordsByBlob) { + std::sort(i.second.begin(), i.second.end(), predOffset); + auto builder = result.StartBlob(); + for (auto&& d : i.second) { + auto itBlob = blobs.find(d->BlobRange); + Y_VERIFY(itBlob != blobs.end()); + builder.RestoreChunk(TSimpleOrderedColumnChunk(d->GetAddress(), d->BlobRange.Offset, itBlob->second)); + } } + return result; +} + +std::vector<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::RestorePortions(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) { + std::vector<TPortionInfoWithBlobs> result; + for (auto&& i : portions) { + result.emplace_back(RestorePortion(i, blobs)); + } + return result; +} + +NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<std::vector<TOrderedColumnChunk>>& chunksByBlobs, + std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, const TIndexInfo& info) +{ + TPortionInfoWithBlobs result(TPortionInfo(granule, 0, snapshot), batch); + for (auto& blob : chunksByBlobs) { + auto blobInfo = result.StartBlob(); + for (auto&& chunk : blob) { + blobInfo.AddChunk(std::move(chunk), info); + } + } + + const auto pred = [](const TColumnRecord& l, const TColumnRecord& r) { + return l.GetAddress() < r.GetAddress(); + }; + std::sort(result.GetPortionInfo().Records.begin(), result.GetPortionInfo().Records.end(), pred); + return result; +} + +std::optional<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::ChangeSaver(ISnapshotSchema::TPtr currentSchema, const TSaverContext& saverContext) const { + TPortionInfoWithBlobs result(PortionInfo, CachedBatch); + result.PortionInfo.Records.clear(); + std::optional<TPortionInfoWithBlobs::TBlobInfo::TBuilder> bBuilder; + ui64 offset = 0; + for (auto& rec : PortionInfo.Records) { + auto field = currentSchema->GetFieldByColumnIdVerified(rec.ColumnId); + + const TString blobOriginal = GetBlobByRangeVerified(rec.ColumnId, rec.Chunk); + { + auto rb = NArrow::TStatusValidator::GetValid(currentSchema->GetColumnLoader(rec.ColumnId)->Apply(blobOriginal)); + auto columnSaver = currentSchema->GetColumnSaver(rec.ColumnId, saverContext); + const TString newBlob = columnSaver.Apply(rb); + if (newBlob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { + return {}; + } + if (!bBuilder || result.GetBlobs().back().GetSize() + newBlob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { + bBuilder = result.StartBlob(); + offset = 0; + } + Y_VERIFY(rb); + Y_VERIFY(rb->num_columns() == 1); + + bBuilder->AddChunk(TOrderedColumnChunk(rec.GetAddress(), offset, newBlob, rb->column(0)), currentSchema->GetIndexInfo()); + offset += newBlob.size(); + } + } + const auto pred = [](const TColumnRecord& l, const TColumnRecord& r) { + return l.GetAddress() < r.GetAddress(); + }; + std::sort(result.PortionInfo.Records.begin(), result.PortionInfo.Records.end(), pred); + + return result; } } diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h index 80fb9cbb88d..da7f3ffa7ad 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h @@ -10,97 +10,118 @@ class TPortionInfoWithBlobs { public: class TBlobInfo { private: + using TBlobChunks = std::map<TChunkAddress, TSimpleOrderedColumnChunk>; YDB_READONLY(ui64, Size, 0); - YDB_READONLY_DEF(std::vector<TOrderedColumnChunk>, Chunks); - const ui32 StartRecordsIndex; + YDB_READONLY_DEF(TBlobChunks, Chunks); + std::vector<const TSimpleOrderedColumnChunk*> ChunksOrdered; mutable std::optional<TString> ResultBlob; + const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info); + void RestoreChunk(const TPortionInfoWithBlobs& owner, TSimpleOrderedColumnChunk&& chunk); public: - explicit TBlobInfo(const ui32 predictedCount, TPortionInfoWithBlobs& owner) - : StartRecordsIndex(owner.GetPortionInfo().Records.size()) - { - Chunks.reserve(predictedCount); - } + class TBuilder { + private: + TBlobInfo* OwnerBlob; + TPortionInfoWithBlobs* OwnerPortion; + public: + TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion) + : OwnerBlob(&blob) + , OwnerPortion(&portion) + { + + } + const TColumnRecord& AddChunk(TOrderedColumnChunk&& chunk, const TIndexInfo& info) { + return OwnerBlob->AddChunk(*OwnerPortion, std::move(chunk), info); + } + void RestoreChunk(TSimpleOrderedColumnChunk&& chunk) { + OwnerBlob->RestoreChunk(*OwnerPortion, std::move(chunk)); + } + }; const TString& GetBlob() const { if (!ResultBlob) { TString result; result.reserve(Size); - for (auto&& i : Chunks) { - result.append(i.GetData()); + for (auto&& i : ChunksOrdered) { + result.append(i->GetData()); } ResultBlob = std::move(result); } return *ResultBlob; } - const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info); - void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); }; private: - std::map<ui32, ui32> ColumnChunkIds; TPortionInfo PortionInfo; YDB_READONLY_DEF(std::vector<TBlobInfo>, Blobs); mutable std::optional<std::shared_ptr<arrow::RecordBatch>> CachedBatch; + + explicit TPortionInfoWithBlobs(TPortionInfo&& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {}) + : PortionInfo(std::move(portionInfo)) + , CachedBatch(batch) { + } + + explicit TPortionInfoWithBlobs(const TPortionInfo& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {}) + : PortionInfo(portionInfo) + , CachedBatch(batch) { + } + + void SetPortionInfo(const TPortionInfo& portionInfo) { + PortionInfo = portionInfo; + } + + TBlobInfo::TBuilder StartBlob() { + Blobs.emplace_back(TBlobInfo()); + return TBlobInfo::TBuilder(Blobs.back(), *this); + } + public: - std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema& data, const ISnapshotSchema& result) const { - if (!CachedBatch) { - THashMap<ui32, ui32> chunkIds; - THashMap<std::pair<ui32, ui32>, TString> blobsByColumnChunk; - for (auto&& b : Blobs) { - for (auto&& i : b.GetChunks()) { - Y_VERIFY(blobsByColumnChunk.emplace(std::make_pair(i.GetColumnId(), chunkIds[i.GetColumnId()]++), i.GetData()).second); - } - } - THashMap<TBlobRange, TString> blobs; - for (auto&& i : PortionInfo.Records) { - auto it = blobsByColumnChunk.find(std::make_pair(i.ColumnId, i.Chunk)); - Y_VERIFY(it != blobsByColumnChunk.end()); - blobs[i.BlobRange] = it->second; - } - CachedBatch = PortionInfo.AssembleInBatch(data, result, blobs); - } - return *CachedBatch; + static std::vector<TPortionInfoWithBlobs> RestorePortions(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs); + static TPortionInfoWithBlobs RestorePortion(const TPortionInfo& portions, const THashMap<TBlobRange, TString>& blobs); + + std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema& data, const ISnapshotSchema& result) const; + + ui64 GetSize() const { + return PortionInfo.BlobsBytes(); } + static TPortionInfoWithBlobs BuildByBlobs(std::vector<std::vector<TOrderedColumnChunk>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch, + const ui64 granule, const TSnapshot& snapshot, const TIndexInfo& info); + + std::optional<TPortionInfoWithBlobs> ChangeSaver(ISnapshotSchema::TPtr currentSchema, const TSaverContext& saverContext) const; + const TString& GetBlobByRangeVerified(const ui32 columnId, const ui32 chunkId) const { - ui32 columnChunk = 0; for (auto&& b : Blobs) { - for (auto&& i : b.GetChunks()) { - if (i.GetColumnId() == columnId) { - if (columnChunk == chunkId) { - return i.GetData(); - } - ++columnChunk; - } + auto it = b.GetChunks().find(TChunkAddress(columnId, chunkId)); + if (it == b.GetChunks().end()) { + continue; + } else { + return it->second.GetData(); } } Y_VERIFY(false); } ui64 GetBlobFullSizeVerified(const ui32 columnId, const ui32 chunkId) const { - ui32 columnChunk = 0; for (auto&& b : Blobs) { - for (auto&& i : b.GetChunks()) { - if (i.GetColumnId() == columnId) { - if (columnChunk == chunkId) { - return b.GetSize(); - } - ++columnChunk; - } + auto it = b.GetChunks().find(TChunkAddress(columnId, chunkId)); + if (it == b.GetChunks().end()) { + continue; + } else { + return b.GetSize(); } } Y_VERIFY(false); } - TString DebugString() const { - return TStringBuilder() << PortionInfo.DebugString() << "blobs_count=" << Blobs.size() << ";"; - } - std::vector<TBlobInfo>& GetBlobs() { return Blobs; } + TString DebugString() const { + return TStringBuilder() << PortionInfo.DebugString() << ";blobs_count=" << Blobs.size() << ";"; + } + const TPortionInfo& GetPortionInfo() const { return PortionInfo; } @@ -109,25 +130,6 @@ public: return PortionInfo; } - void SetPortionInfo(const TPortionInfo& portionInfo) { - PortionInfo = portionInfo; - } - - explicit TPortionInfoWithBlobs(TPortionInfo&& portionInfo, const ui32 predictedBlobsCount) - : PortionInfo(portionInfo) { - Blobs.reserve(predictedBlobsCount); - } - - explicit TPortionInfoWithBlobs(const TPortionInfo& portionInfo, const ui32 predictedBlobsCount) - : PortionInfo(portionInfo) { - Blobs.reserve(predictedBlobsCount); - } - - TBlobInfo& StartBlob(const ui32 blobChunksCount) { - Blobs.emplace_back(TBlobInfo(blobChunksCount, *this)); - return Blobs.back(); - } - friend IOutputStream& operator << (IOutputStream& out, const TPortionInfoWithBlobs& info) { out << info.DebugString(); return out; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h index 2ca1d4a0f28..b109f8ee5f3 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h @@ -38,6 +38,11 @@ public: virtual int GetFieldIndex(const ui32 columnId) const = 0; std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const; std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const; + std::shared_ptr<arrow::Field> GetFieldByColumnIdVerified(const ui32 columnId) const { + auto result = GetFieldByColumnId(columnId); + Y_VERIFY(result); + return result; + } TString DebugString() const { return DoDebugString(); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 20f0e21a7ec..62b034283fa 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -6,7 +6,6 @@ #include <ydb/core/tx/columnshard/common/snapshot.h> #include <ydb/core/sys_view/common/schema.h> -#include <ydb/core/tablet_flat/flat_dbase_scheme.h> #include <ydb/core/tx/columnshard/common/scalars.h> #include <ydb/core/formats/arrow/dictionary/object.h> #include <ydb/core/formats/arrow/serializer/abstract.h> diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp index bcbead4500e..2caadd2e212 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp +++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp @@ -13,16 +13,16 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { } } std::vector<TSplittedBlob> result; - Y_VERIFY(TSplitSettings::MaxBlobSize >= 2 * TSplitSettings::MinBlobSize); + Y_VERIFY(Settings.GetMaxBlobSize() >= 2 * Settings.GetMinBlobSize()); while (chunksInProgress.size()) { - ui64 fullSize = 0; + i64 fullSize = 0; for (auto&& i : chunksInProgress) { fullSize += i.GetSize(); } - if (fullSize < TSplitSettings::MaxBlobSize) { + if (fullSize < Settings.GetMaxBlobSize()) { result.emplace_back(TSplittedBlob()); for (auto&& i : chunksInProgress) { - Y_VERIFY(result.back().Take(i)); + result.back().Take(i); } chunksInProgress.clear(); break; @@ -30,37 +30,40 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { bool hasNoSplitChanges = true; while (hasNoSplitChanges) { hasNoSplitChanges = false; - ui64 partSize = 0; + i64 partSize = 0; for (ui32 i = 0; i < chunksInProgress.size(); ++i) { - const ui64 nextPartSize = partSize + chunksInProgress[i].GetSize(); - const ui64 nextOtherSize = fullSize - nextPartSize; - const ui64 otherSize = fullSize - partSize; - if (nextPartSize >= TSplitSettings::MaxBlobSize || nextOtherSize < TSplitSettings::MinBlobSize) { - Y_VERIFY(otherSize >= TSplitSettings::MinBlobSize); - Y_VERIFY(partSize < TSplitSettings::MaxBlobSize); - if (partSize >= TSplitSettings::MinBlobSize) { + const i64 nextPartSize = partSize + chunksInProgress[i].GetSize(); + const i64 nextOtherSize = fullSize - nextPartSize; + const i64 otherSize = fullSize - partSize; + if (nextPartSize >= Settings.GetMaxBlobSize() || nextOtherSize < Settings.GetMinBlobSize()) { + Y_VERIFY(otherSize >= Settings.GetMinBlobSize()); + Y_VERIFY(partSize < Settings.GetMaxBlobSize()); + if (partSize >= Settings.GetMinBlobSize()) { result.emplace_back(TSplittedBlob()); for (ui32 chunk = 0; chunk < i; ++chunk) { - Y_VERIFY(result.back().Take(chunksInProgress[chunk])); + result.back().Take(chunksInProgress[chunk]); } + Counters->BySizeSplitter.OnCorrectSerialized(result.back().GetSize()); chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i); hasNoSplitChanges = true; } else { - Y_VERIFY(chunksInProgress[i].GetSize() > TSplitSettings::MinBlobSize - partSize); - Y_VERIFY(otherSize - (TSplitSettings::MinBlobSize - partSize) >= TSplitSettings::MinBlobSize); - chunksInProgress[i].AddSplit(TSplitSettings::MinBlobSize - partSize); + Y_VERIFY(chunksInProgress[i].GetSize() > Settings.GetMinBlobSize() - partSize); + Y_VERIFY(otherSize - (Settings.GetMinBlobSize() - partSize) >= Settings.GetMinBlobSize()); + chunksInProgress[i].AddSplit(Settings.GetMinBlobSize() - partSize); + Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i].GetSize()); std::vector<TSplittedColumnChunk> newChunks = chunksInProgress[i].InternalSplit(Schema->GetColumnSaver(chunksInProgress[i].GetColumnId()), Counters); chunksInProgress.erase(chunksInProgress.begin() + i); chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end()); TSplittedBlob newBlob; for (ui32 chunk = 0; chunk <= i; ++chunk) { - Y_VERIFY(newBlob.Take(chunksInProgress[chunk])); + newBlob.Take(chunksInProgress[chunk]); } - if (newBlob.GetSize() < TSplitSettings::MaxBlobSize) { + if (newBlob.GetSize() < Settings.GetMaxBlobSize()) { chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i + 1); result.emplace_back(std::move(newBlob)); + Counters->BySizeSplitter.OnCorrectSerialized(result.back().GetSize()); } } break; @@ -73,10 +76,11 @@ bool TBatchSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { return true; } -TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters) +TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings) : Schema(schema) , Batch(batch) , Counters(counters) + , Settings(settings) { Y_VERIFY(batch); RecordsCount = batch->num_rows(); @@ -93,7 +97,7 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> auto stats = schema->GetColumnSerializationStats(c.GetColumnId()); TSimpleSplitter splitter(columnSaver, Counters); splitter.SetStats(stats); - c.SetBlobs(splitter.Split(i, c.GetField(), TSplitSettings::MaxBlobSize)); + c.SetBlobs(splitter.Split(i, c.GetField(), Settings.GetMaxBlobSize())); Size += c.GetSize(); ++idx; } diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index da3c5db1f2b..2098a7e9342 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -20,6 +20,42 @@ public: virtual std::optional<TColumnSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& rb) const = 0; }; +template <class TContainer> +class TArrayView { +private: + typename TContainer::iterator Begin; + typename TContainer::iterator End; +public: + TArrayView(typename TContainer::iterator itBegin, typename TContainer::iterator itEnd) + : Begin(itBegin) + , End(itEnd) { + + } + + typename TContainer::iterator begin() { + return Begin; + } + + typename TContainer::iterator end() { + return End; + } + + typename TContainer::value_type& front() { + return *Begin; + } + + typename TContainer::value_type& operator[](const size_t index) { + return *(Begin + index); + } + + size_t size() { + return End - Begin; + } +}; + +template <class TObject> +using TVectorView = TArrayView<std::vector<TObject>>; + class TDefaultSchemaDetails: public ISchemaDetailInfo { private: ISnapshotSchema::TPtr Schema; @@ -55,8 +91,16 @@ private: ISchemaDetailInfo::TPtr Schema; YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch); std::shared_ptr<NColumnShard::TSplitterCounters> Counters; + TSplitSettings Settings; public: - TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters); + explicit TBatchSerializedSlice(TVectorView<TBatchSerializedSlice>&& objects) { + Y_VERIFY(objects.size()); + std::swap(*this, objects.front()); + for (ui32 i = 1; i < objects.size(); ++i) { + MergeSlice(std::move(objects[i])); + } + } + TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings); void MergeSlice(TBatchSerializedSlice&& slice); diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp index fb6f7f66196..5f4bae55b8a 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.cpp +++ b/ydb/core/tx/columnshard/splitter/chunks.cpp @@ -14,8 +14,12 @@ std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TCol return newChunks; } +TString TSimpleOrderedColumnChunk::DebugString() const { + return TStringBuilder() << "address=" << ChunkAddress.DebugString() << ";data_size=" << Data.size() << ";"; +} + TString TOrderedColumnChunk::DebugString() const { - return TStringBuilder() << "column_id=" << ColumnId << ";data_size=" << Data.size() << ";records_count=" << Column->length() << ";data=" << NArrow::DebugJson(Column, 3, 3) << ";"; + return TStringBuilder() << TBase::DebugString() << "records_count=" << Column->length() << ";data=" << NArrow::DebugJson(Column, 3, 3) << ";"; } } diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 921e13229d8..126f4c9e518 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -1,6 +1,7 @@ #pragma once #include "simple.h" #include <ydb/core/tx/columnshard/counters/splitter.h> +#include <ydb/core/tx/columnshard/engines/portions/column_record.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> @@ -8,11 +9,16 @@ namespace NKikimr::NOlap { class TSplitSettings { +private: + static const inline i64 DefaultMaxBlobSize = 8 * 1024 * 1024; + static const inline i64 DefaultMinBlobSize = 4 * 1024 * 1024; + static const inline i64 DefaultMinRecordsCount = 10000; + static const inline i64 DefaultMaxPortionSize = 4 * DefaultMaxBlobSize; + YDB_ACCESSOR(i64, MaxBlobSize, DefaultMaxBlobSize); + YDB_ACCESSOR(i64, MinBlobSize, DefaultMinBlobSize); + YDB_ACCESSOR(i64, MinRecordsCount, DefaultMinRecordsCount); + YDB_ACCESSOR(i64, MaxPortionSize, DefaultMaxPortionSize); public: - static const inline i64 MaxBlobSize = 8 * 1024 * 1024; - static const inline i64 MaxBlobSizeWithGap = 7 * 1024 * 1024; - static const inline i64 MinBlobSize = 4 * 1024 * 1024; - static const inline i64 MinRecordsCount = 10000; }; class TSplittedColumn; @@ -28,7 +34,7 @@ public: } std::vector<TSplittedColumnChunk> InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters); - ui64 GetSize() const { + i64 GetSize() const { return Data.GetSerializedChunk().size(); } @@ -87,26 +93,45 @@ public: class TSplittedBlob { private: - YDB_READONLY(ui64, Size, 0); + YDB_READONLY(i64, Size, 0); YDB_READONLY_DEF(std::vector<TSplittedColumnChunk>, Chunks); public: - bool Take(const TSplittedColumnChunk& chunk) { - if (Size + chunk.GetSize() < TSplitSettings::MaxBlobSize) { - Chunks.emplace_back(chunk); - Size += chunk.GetSize(); - return true; - } - return false; + void Take(const TSplittedColumnChunk& chunk) { + Chunks.emplace_back(chunk); + Size += chunk.GetSize(); } bool operator<(const TSplittedBlob& item) const { return Size > item.Size; } }; -class TOrderedColumnChunk { +class TSimpleOrderedColumnChunk { private: - YDB_READONLY(ui32, ColumnId, 0); + TChunkAddress ChunkAddress; YDB_READONLY_DEF(TString, Data); + YDB_READONLY(ui64, Offset, 0); +public: + ui64 GetSize() const { + return Data.size(); + } + + const TChunkAddress& GetChunkAddress() const { + return ChunkAddress; + } + + TSimpleOrderedColumnChunk(const TChunkAddress& chunkAddress, const ui64 offset, const TString& data) + : ChunkAddress(chunkAddress) + , Data(std::move(data)) + , Offset(offset) { + + } + + TString DebugString() const; +}; + +class TOrderedColumnChunk: public TSimpleOrderedColumnChunk { +private: + using TBase = TSimpleOrderedColumnChunk; std::shared_ptr<arrow::Array> Column; public: std::shared_ptr<arrow::Array> GetColumn() const { @@ -117,9 +142,8 @@ public: return Column->length(); } - TOrderedColumnChunk(const ui32 columnId, const TString& data, std::shared_ptr<arrow::Array> column) - : ColumnId(columnId) - , Data(std::move(data)) + TOrderedColumnChunk(const TChunkAddress& chunkAddress, const ui64 offset, const TString& data, std::shared_ptr<arrow::Array> column) + : TBase(chunkAddress, offset, data) , Column(column) { Y_VERIFY(Column); diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp index 7892859ff47..537d80a5ab5 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp @@ -13,22 +13,22 @@ public: } template <class TObject> - std::vector<ui32> Split(const std::vector<TObject>& objects) { + std::vector<TVectorView<TObject>> Split(std::vector<TObject>& objects) { ui64 fullSize = 0; for (auto&& i : objects) { fullSize += i.GetSize(); } if (fullSize <= BottomLimit) { - return {(ui32)objects.size()}; + return {TVectorView<TObject>(objects.begin(), objects.end())}; } ui64 currentSize = 0; ui64 currentStart = 0; - std::vector<ui32> result; + std::vector<TVectorView<TObject>> result; for (ui32 i = 0; i < objects.size(); ++i) { const ui64 nextSize = currentSize + objects[i].GetSize(); const ui64 nextOtherSize = fullSize - nextSize; if ((nextSize >= BottomLimit && nextOtherSize >= BottomLimit) || (i + 1 == objects.size())) { - result.emplace_back(i - currentStart + 1); + result.emplace_back(TVectorView<TObject>(objects.begin() + currentStart, objects.begin() + i + 1)); currentSize = 0; currentStart = i + 1; } else { @@ -40,33 +40,32 @@ public: }; TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, ISchemaDetailInfo::TPtr schemaInfo, - const std::shared_ptr<arrow::RecordBatch> batch) + const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings) : Counters(counters) , Batch(batch) + , Settings(settings) { Y_VERIFY(Batch->num_rows()); std::vector<TBatchSerializedSlice> slices; auto stats = schemaInfo->GetBatchSerializationStats(Batch); - ui32 recordsCount = TSplitSettings::MinRecordsCount; + ui32 recordsCount = Settings.GetMinRecordsCount(); if (stats) { - const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), TSplitSettings::MinBlobSize).value_or(recordsCount); - recordsCount = std::max(recordsCount, recordsCountForMinSize); + const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMinBlobSize()).value_or(recordsCount); + const ui32 recordsCountForMaxPortionSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMaxPortionSize()).value_or(recordsCount); + recordsCount = std::min(recordsCountForMaxPortionSize, std::max(recordsCount, recordsCountForMinSize)); } auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(Batch->num_rows(), recordsCount); for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) { std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize()); - TBatchSerializedSlice slice(current, schemaInfo, Counters); + TBatchSerializedSlice slice(current, schemaInfo, Counters, settings); slices.emplace_back(std::move(slice)); } - const std::vector<ui32> chunks = TSimilarSlicer(TSplitSettings::MinBlobSize).Split(slices); + auto chunks = TSimilarSlicer(Settings.GetMinBlobSize()).Split(slices); ui32 chunkStartPosition = 0; - for (auto&& i : chunks) { - Slices.emplace_back(std::move(slices[chunkStartPosition])); - for (ui32 pos = chunkStartPosition + 1; pos < chunkStartPosition + i; ++pos) { - Slices.back().MergeSlice(std::move(slices[pos])); - } - chunkStartPosition += i; + for (auto&& spanObjects : chunks) { + Slices.emplace_back(TBatchSerializedSlice(std::move(spanObjects))); + chunkStartPosition += spanObjects.size(); } Y_VERIFY(chunkStartPosition == slices.size()); ui32 recordsCountCheck = 0; @@ -83,6 +82,7 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio std::vector<TSplittedBlob> blobs; Slices.front().GroupBlobs(blobs); std::vector<std::vector<TOrderedColumnChunk>> result; + std::map<ui32, ui32> columnChunks; for (auto&& i : blobs) { if (blobs.size() == 1) { Counters->MonoBlobs.OnBlobData(i.GetSize()); @@ -90,8 +90,10 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio Counters->SplittedBlobs.OnBlobData(i.GetSize()); } std::vector<TOrderedColumnChunk> chunksForBlob; + ui64 offset = 0; for (auto&& c : i.GetChunks()) { - chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetSerializedChunk(), c.GetData().GetColumn()); + chunksForBlob.emplace_back(TChunkAddress(c.GetColumnId(), columnChunks[c.GetColumnId()]++), offset, c.GetData().GetSerializedChunk(), c.GetData().GetColumn()); + offset += c.GetSize(); } result.emplace_back(std::move(chunksForBlob)); } diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h index c1aca97cc31..8f5ae022274 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.h +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h @@ -15,9 +15,10 @@ private: std::deque<TBatchSerializedSlice> Slices; std::shared_ptr<NColumnShard::TSplitterCounters> Counters; std::shared_ptr<arrow::RecordBatch> Batch; + TSplitSettings Settings; public: TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, - ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch); + ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings); bool Next(std::vector<std::vector<TOrderedColumnChunk>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch); }; diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp index cf2956872a7..29bbfe307dd 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp @@ -61,10 +61,10 @@ Y_UNIT_TEST_SUITE(Splitter) { void Execute(std::shared_ptr<arrow::RecordBatch> batch) { NKikimr::NColumnShard::TIndexationCounters counters("test"); - NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch); + NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch, NKikimr::NOlap::TSplitSettings()); std::vector<std::vector<NKikimr::NOlap::TOrderedColumnChunk>> chunksForBlob; std::map<std::string, std::vector<std::shared_ptr<arrow::RecordBatch>>> restoredBatch; - std::vector<ui64> blobsSize; + std::vector<i64> blobsSize; bool hasMultiSplit = false; ui32 blobsCount = 0; ui32 slicesCount = 0; @@ -81,10 +81,11 @@ Y_UNIT_TEST_SUITE(Splitter) { std::set<ui32> blobColumnChunks; for (auto&& i : chunks) { ++chunksCount; - recordsCountByColumn[i.GetColumnId()] += i.GetRecordsCount(); - restoredBatch[Schema->GetColumnName(i.GetColumnId())].emplace_back(*Schema->GetColumnLoader(i.GetColumnId()).Apply(i.GetData())); + const ui32 columnId = i.GetChunkAddress().GetColumnId(); + recordsCountByColumn[columnId] += i.GetRecordsCount(); + restoredBatch[Schema->GetColumnName(columnId)].emplace_back(*Schema->GetColumnLoader(columnId).Apply(i.GetData())); blobSize += i.GetData().size(); - if (i.GetRecordsCount() != NKikimr::NOlap::TSplitSettings::MinRecordsCount && !blobColumnChunks.emplace(i.GetColumnId()).second) { + if (i.GetRecordsCount() != NKikimr::NOlap::TSplitSettings().GetMinRecordsCount() && !blobColumnChunks.emplace(columnId).second) { hasMultiSplit = true; } sb << "(" << i.DebugString() << ")"; @@ -110,8 +111,8 @@ Y_UNIT_TEST_SUITE(Splitter) { } Y_VERIFY(hasMultiSplit == HasMultiSplit); for (auto&& i : blobsSize) { - Y_VERIFY(i < NKikimr::NOlap::TSplitSettings::MaxBlobSize); - Y_VERIFY(i + 10000 >= NKikimr::NOlap::TSplitSettings::MinBlobSize || blobsSize.size() == 1); + Y_VERIFY(i < NKikimr::NOlap::TSplitSettings().GetMaxBlobSize()); + Y_VERIFY(i + 10000 >= NKikimr::NOlap::TSplitSettings().GetMinBlobSize() || blobsSize.size() == 1); } Y_VERIFY(restoredBatch.size() == (ui32)batch->num_columns()); for (auto&& i : batch->schema()->fields()) { |