diff options
author | ivanmorozov333 <ivanmorozov@hotmail.com> | 2024-04-13 21:50:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-13 21:50:28 +0300 |
commit | b70197fd28f146cc962605de5585ececcf9cc7bf (patch) | |
tree | c0b25475af3d0913c931015e41fd5f88fb2bec9b | |
parent | 0fd0d2a08bdae35d02f9bd71ef6a077841401394 (diff) | |
download | ydb-b70197fd28f146cc962605de5585ececcf9cc7bf.tar.gz |
split portion-with-blobs class on read and write parts (#3714)
20 files changed, 255 insertions, 180 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 5e38a8601f1..339face80d3 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -11,7 +11,7 @@ #include <ydb/core/tx/columnshard/data_locks/manager/manager.h> #include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h> +#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h> #include <ydb/core/tx/columnshard/resource_subscriber/task.h> #include <ydb/core/tx/columnshard/splitter/settings.h> @@ -35,7 +35,6 @@ class TColumnShard; namespace NKikimr::NOlap { class TColumnEngineForLogs; class TVersionedIndex; -class TPortionInfoWithBlobs; class TPortionEvictionFeatures { private: @@ -270,7 +269,7 @@ public: void Start(NColumnShard::TColumnShard& self); virtual ui32 GetWritePortionsCount() const = 0; - virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0; + virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0; virtual bool NeedWritePortion(const ui32 index) const = 0; void WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context); diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h index f4addb74f04..71a48e78be6 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h @@ -42,7 +42,7 @@ public: virtual ui32 GetWritePortionsCount() const override { return 0; } - virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override { + virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override { return nullptr; } virtual bool NeedWritePortion(const ui32 /*index*/) const override { diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h b/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h index 5f5b7a9bd00..f39d33f5871 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h @@ -40,7 +40,7 @@ public: virtual ui32 GetWritePortionsCount() const override { return 0; } - virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override { + virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override { return nullptr; } virtual bool NeedWritePortion(const ui32 /*index*/) const override { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h index 7503d421323..493cd6268f6 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h @@ -3,7 +3,6 @@ #include <ydb/core/tx/columnshard/splitter/chunks.h> #include <ydb/core/tx/columnshard/engines/portions/column_record.h> #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> -#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NOlap::NCompaction { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 14dad1e76bb..706a0924c5c 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -9,8 +9,9 @@ #include <ydb/core/formats/arrow/simple_builder/array.h> #include <ydb/core/formats/arrow/simple_builder/filler.h> +#include <ydb/core/tx/columnshard/engines/portions/read_with_blobs.h> +#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> -#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h> #include <ydb/core/tx/columnshard/engines/storage/chunks/null_column.h> #include <ydb/core/tx/columnshard/splitter/batch_slice.h> #include <ydb/core/tx/columnshard/splitter/settings.h> @@ -18,7 +19,7 @@ namespace NKikimr::NOlap::NCompaction { -void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept { +void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept { std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults; auto resultSchema = context.SchemaVersions.GetLastSchema(); { @@ -43,7 +44,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TCon } } -void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept { +void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept { static const TString portionIdFieldName = "$$__portion_id"; static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; static const std::shared_ptr<arrow::Field> portionIdField = std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>()); @@ -194,7 +195,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i)); auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); - AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, GranuleMeta->GetPathId(), + AppendedPortions.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), GranuleMeta->GetPathId(), resultSchema->GetVersion(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager())); AppendedPortions.back().FillStatistics(resultSchema->GetIndexInfo()); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); @@ -226,7 +227,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize); { - std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs, context.SchemaVersions, SaverContext.GetStoragesManager()); + std::vector<TReadPortionInfoWithBlobs> portions = TReadPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs, context.SchemaVersions); if (!HasAppData() || AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) { BuildAppendedPortionsByChunks(context, std::move(portions)); } else { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index df583585d9c..69ce5ed1e8e 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -1,6 +1,7 @@ #pragma once #include "compaction.h" #include <ydb/core/formats/arrow/reader/position.h> +#include <ydb/core/tx/columnshard/engines/portions/read_with_blobs.h> namespace NKikimr::NOlap::NCompaction { @@ -9,8 +10,8 @@ private: using TBase = TCompactColumnEngineChanges; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; std::map<NArrow::NMerger::TSortableBatchPosition, bool> CheckPoints; - void BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept; - void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept; + void BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept; + void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept; protected: virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override; virtual TPortionMeta::EProduced GetResultProducedClass() const override { diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index 1caea149059..c2107a3515d 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -1,4 +1,5 @@ #include "ttl.h" +#include <ydb/core/tx/columnshard/engines/portions/read_with_blobs.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/core/tx/columnshard/columnshard_schema.h> @@ -45,7 +46,7 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan } } -std::optional<TPortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, +std::optional<TWritePortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, TConstructionContext& context) const { const TPortionInfo& portionInfo = info.GetPortionInfo(); @@ -53,11 +54,9 @@ std::optional<TPortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPorti auto blobSchema = portionInfo.GetSchema(context.SchemaVersions); Y_ABORT_UNLESS(portionInfo.GetMeta().GetTierName() != evictFeatures.GetTargetTierName() || blobSchema->GetVersion() < evictFeatures.GetTargetScheme()->GetVersion()); - auto portionWithBlobs = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs, blobSchema->GetIndexInfo(), SaverContext.GetStoragesManager()); - TPortionInfoWithBlobs result = TPortionInfoWithBlobs::SyncPortion( + auto portionWithBlobs = TReadPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs, blobSchema->GetIndexInfo()); + std::optional<TWritePortionInfoWithBlobs> result = TReadPortionInfoWithBlobs::SyncPortion( std::move(portionWithBlobs), blobSchema, evictFeatures.GetTargetScheme(), evictFeatures.GetTargetTierName(), SaverContext.GetStoragesManager(), context.Counters.SplitterCounters); - - result.GetPortionInfo().MutableMeta().SetTierName(evictFeatures.GetTargetTierName()); return std::move(result); } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index c9cb1a989d9..27d76b750dc 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -40,7 +40,7 @@ private: } }; - std::optional<TPortionInfoWithBlobs> UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, + std::optional<TWritePortionInfoWithBlobs> UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, TConstructionContext& context) const; std::vector<TPortionForEviction> PortionsToEvict; diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 11f99a70266..ba99045e723 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -16,7 +16,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), true); } - const auto predRemoveDroppedTable = [self](const TPortionInfoWithBlobs& item) { + const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobs& item) { auto& portionInfo = item.GetPortionInfo(); if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId()); @@ -98,7 +98,7 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) { } } -std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, +std::vector<TWritePortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional<NArrow::NSerialization::TSerializerContainer>& overrideSaver) const { Y_ABORT_UNLESS(batch->num_rows()); @@ -112,7 +112,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(cons if (overrideSaver) { schema->SetOverrideSerializer(*overrideSaver); } - std::vector<TPortionInfoWithBlobs> out; + std::vector<TWritePortionInfoWithBlobs> out; { std::vector<TBatchSerializedSlice> pages = TBatchSerializedSlice::BuildSimpleSlices(batch, NSplitter::TSplitSettings(), context.Counters.SplitterCounters, schema); std::vector<TGeneralSerializedSlice> generalPages; @@ -130,7 +130,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(cons for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i)); auto b = batch->Slice(recordIdx, slice.GetRecordsCount()); - out.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager())); + out.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager())); out.back().FillStatistics(resultSchema->GetIndexInfo()); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index ebac536d011..1a99a11135f 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -16,7 +16,7 @@ protected: virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; - std::vector<TPortionInfoWithBlobs> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, + std::vector<TWritePortionInfoWithBlobs> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional<NArrow::NSerialization::TSerializerContainer>& overrideSaver) const; virtual void DoDebugString(TStringOutput& out) const override { @@ -45,11 +45,11 @@ public: } THashMap<TPortionAddress, TPortionInfo> PortionsToRemove; - std::vector<TPortionInfoWithBlobs> AppendedPortions; + std::vector<TWritePortionInfoWithBlobs> AppendedPortions; virtual ui32 GetWritePortionsCount() const override { return AppendedPortions.size(); } - virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override { + virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override { Y_ABORT_UNLESS(index < AppendedPortions.size()); return &AppendedPortions[index]; } diff --git a/ydb/core/tx/columnshard/engines/portions/base_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.cpp new file mode 100644 index 00000000000..ffb62297253 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.cpp @@ -0,0 +1,5 @@ +#include "base_with_blobs.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/engines/portions/base_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.h new file mode 100644 index 00000000000..8d290f96896 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.h @@ -0,0 +1,9 @@ +#pragma once + +namespace NKikimr::NOlap { + +class TBasePortionInfoWithBlobs { +public: +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp index 7a29d2779c7..09b98597378 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp @@ -1,4 +1,5 @@ -#include "with_blobs.h" +#include "read_with_blobs.h" +#include "write_with_blobs.h" #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> @@ -8,57 +9,19 @@ namespace NKikimr::NOlap { -void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) { - Y_ABORT_UNLESS(!ResultBlob); - const TString& data = chunk->GetData(); - Size += data.size(); +void TReadPortionInfoWithBlobs::RestoreChunk(const std::shared_ptr<IPortionDataChunk>& chunk) { auto address = chunk->GetChunkAddressVerified(); - AFL_VERIFY(owner.GetPortionInfo().HasEntityAddress(address))("address", address.DebugString()); + AFL_VERIFY(GetPortionInfo().HasEntityAddress(address))("address", address.DebugString()); AFL_VERIFY(Chunks.emplace(address, chunk).second)("address", address.DebugString()); - ChunksOrdered.emplace_back(chunk); } -void TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) { - AFL_VERIFY(chunk); - Y_ABORT_UNLESS(!ResultBlob); - const TString& data = chunk->GetData(); - - TBlobRangeLink16 bRange(Size, data.size()); - Size += data.size(); - - Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddressVerified(), chunk).second); - ChunksOrdered.emplace_back(chunk); - - chunk->AddIntoPortionBeforeBlob(bRange, owner.PortionInfo); -} - -void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) { - const TBlobRangeLink16::TLinkId idx = owner.PortionInfo.RegisterBlobId(blobId); - for (auto&& i : Chunks) { - owner.PortionInfo.RegisterBlobIdx(i.first, idx); - } -} - -void TPortionInfoWithBlobs::TBlobInfo::ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap) { - const auto pred = [this, &resultMap, entityId](const std::shared_ptr<IPortionDataChunk>& chunk) { - if (chunk->GetEntityId() == entityId) { - resultMap.emplace(chunk->GetChunkAddressVerified(), chunk); - Chunks.erase(chunk->GetChunkAddressVerified()); - return true; - } else { - return false; - } - }; - ChunksOrdered.erase(std::remove_if(ChunksOrdered.begin(), ChunksOrdered.end(), pred), ChunksOrdered.end()); -} - -std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames) const { +std::shared_ptr<arrow::RecordBatch> TReadPortionInfoWithBlobs::GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames) const { Y_ABORT_UNLESS(data); if (columnNames.empty()) { if (!CachedBatch) { THashMap<TChunkAddress, TString> blobs; for (auto&& i : PortionInfo.Records) { - blobs[i.GetAddress()] = GetBlobByRangeVerified(i.ColumnId, i.Chunk); + blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk); Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size); } CachedBatch = PortionInfo.AssembleInBatch(*data, result, blobs); @@ -77,67 +40,41 @@ std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnaps auto filteredSchema = std::make_shared<TFilteredSnapshotSchema>(data, columnNames); THashMap<TChunkAddress, TString> blobs; for (auto&& i : PortionInfo.Records) { - blobs[i.GetAddress()] = GetBlobByRangeVerified(i.ColumnId, i.Chunk); + blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk); Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size); } return PortionInfo.AssembleInBatch(*data, *filteredSchema, blobs); } } -NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo, const std::shared_ptr<IStoragesManager>& operators) { - TPortionInfoWithBlobs result(portion); +NKikimr::NOlap::TReadPortionInfoWithBlobs TReadPortionInfoWithBlobs::RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) { + TReadPortionInfoWithBlobs result(portion); THashMap<TString, THashMap<TUnifiedBlobId, std::vector<std::shared_ptr<IPortionDataChunk>>>> records = result.PortionInfo.RestoreEntityChunks(blobs, indexInfo); for (auto&& [storageId, recordsByBlob] : records) { - auto storage = operators->GetOperatorVerified(storageId); for (auto&& i : recordsByBlob) { - auto builder = result.StartBlob(storage); for (auto&& d : i.second) { - builder.RestoreChunk(d); + result.RestoreChunk(d); } } } return result; } -std::vector<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, - const TVersionedIndex& tables, const std::shared_ptr<IStoragesManager>& operators) { - std::vector<TPortionInfoWithBlobs> result; +std::vector<NKikimr::NOlap::TReadPortionInfoWithBlobs> TReadPortionInfoWithBlobs::RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, + const TVersionedIndex& tables) { + std::vector<TReadPortionInfoWithBlobs> result; for (auto&& i : portions) { const auto schema = i.GetSchema(tables); - result.emplace_back(RestorePortion(i, blobs, schema->GetIndexInfo(), operators)); + result.emplace_back(RestorePortion(i, blobs, schema->GetIndexInfo())); } return result; } -NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, - std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators) -{ - TPortionInfoWithBlobs result = BuildByBlobs(std::move(chunks), TPortionInfo(granule, 0, schemaVersion, snapshot), operators); - result.InitBatchCached(batch); - return result; -} - -TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion, - const std::shared_ptr<IStoragesManager>& operators) { - TPortionInfoWithBlobs result(basePortion.CopyBeforeChunksRebuild()); - for (auto&& blob : chunks) { - auto storage = operators->GetOperatorVerified(blob.GetGroupName()); - auto blobInfo = result.StartBlob(storage); - for (auto&& chunk : blob.GetChunks()) { - blobInfo.AddChunk(chunk); - } - } - result.GetPortionInfo().ReorderChunks(); - return result; -} - -std::vector<std::shared_ptr<IPortionDataChunk>> TPortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const { +std::vector<std::shared_ptr<IPortionDataChunk>> TReadPortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const { std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> sortedChunks; - for (auto&& b : GetBlobs()) { - for (auto&& i : b.GetChunks()) { - if (i.second->GetEntityId() == entityId) { - sortedChunks.emplace(i.first, i.second); - } + for (auto&& i : Chunks) { + if (i.second->GetEntityId() == entityId) { + sortedChunks.emplace(i.first, i.second); } } std::vector<std::shared_ptr<IPortionDataChunk>> result; @@ -148,18 +85,23 @@ std::vector<std::shared_ptr<IPortionDataChunk>> TPortionInfoWithBlobs::GetEntity return result; } -bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks) { - records = GetPortionInfo().GetColumnChunksPointers(columnId); +bool TReadPortionInfoWithBlobs::ExtractColumnChunks(const ui32 entityId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks) { + records = GetPortionInfo().GetColumnChunksPointers(entityId); if (records.empty()) { return false; } std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> chunksMap; - for (auto&& i : Blobs) { - i.ExtractEntityChunks(columnId, chunksMap); + for (auto it = Chunks.begin(); it != Chunks.end();) { + if (it->first.GetEntityId() == entityId) { + chunksMap.emplace(it->first, std::move(it->second)); + it = Chunks.erase(it); + } else { + ++it; + } } std::vector<std::shared_ptr<IPortionDataChunk>> chunksLocal; for (auto&& i : chunksMap) { - Y_ABORT_UNLESS(i.first.GetColumnId() == columnId); + Y_ABORT_UNLESS(i.first.GetColumnId() == entityId); Y_ABORT_UNLESS(i.first.GetChunk() == chunksLocal.size()); chunksLocal.emplace_back(i.second); } @@ -167,23 +109,12 @@ bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector return true; } -void TPortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) { - NStatistics::TPortionStorage storage; - for (auto&& i : index.GetStatisticsByName()) { - THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> data; - for (auto&& entityId : i.second->GetEntityIds()) { - data.emplace(entityId, GetEntityChunks(entityId)); - } - i.second->FillStatisticsData(data, storage, index); - } - PortionInfo.SetStatisticsStorage(std::move(storage)); -} - -TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&& source, +std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion(TReadPortionInfoWithBlobs&& source, const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages, std::shared_ptr<NColumnShard::TSplitterCounters> counters) { if (from->GetVersion() == to->GetVersion() && targetTier == source.GetPortionInfo().GetTierNameDef(IStoragesManager::DefaultStorageId)) { - return std::move(source); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "we don't need sync portion"); + return {}; } NYDBTest::TControllers::GetColumnShardController()->OnPortionActualization(source.PortionInfo); auto pages = source.PortionInfo.BuildPages(); @@ -192,10 +123,10 @@ TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&& pageSizes.emplace_back(p.GetRecordsCount()); } THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> columnChunks; - for (auto&& i : source.Blobs) { - for (auto&& c : i.GetChunks()) { - columnChunks[c.first.GetColumnId()].emplace_back(c.second); - } + for (auto&& c : source.Chunks) { + auto& chunks = columnChunks[c.first.GetColumnId()]; + AFL_VERIFY(c.first.GetChunkIdx() == chunks.size()); + chunks.emplace_back(c.second); } THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> entityChunksNew; @@ -220,7 +151,7 @@ TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&& auto schemaTo = std::make_shared<TDefaultSchemaDetails>(to, std::make_shared<TSerializationStats>()); TGeneralSerializedSlice slice(entityChunksNew, schemaTo, counters); const NSplitter::TEntityGroups groups = to->GetIndexInfo().GetEntityGroupsByStorageId(targetTier, *storages); - TPortionInfoWithBlobs result = TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), source.PortionInfo, storages); + TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), source.PortionInfo, storages); result.GetPortionInfo().SetMinSnapshotDeprecated(to->GetSnapshot()); result.GetPortionInfo().SetSchemaVersion(to->GetVersion()); result.GetPortionInfo().MutableMeta().SetTierName(targetTier); @@ -234,8 +165,14 @@ TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&& i.second->FillStatisticsData(entityChunksNew, storage, to->GetIndexInfo()); } } - result.PortionInfo.MutableMeta().ResetStatisticsStorage(std::move(storage)); + result.MutablePortionInfo().MutableMeta().ResetStatisticsStorage(std::move(storage)); return result; } +const TString& TReadPortionInfoWithBlobs::GetBlobByAddressVerified(const ui32 columnId, const ui32 chunkId) const { + auto it = Chunks.find(TChunkAddress(columnId, chunkId)); + AFL_VERIFY(it != Chunks.end())("column_id", columnId)("chunk_idx", chunkId); + return it->second->GetData(); +} + } diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h new file mode 100644 index 00000000000..2c8553f9884 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h @@ -0,0 +1,69 @@ +#pragma once +#include "base_with_blobs.h" +#include "portion_info.h" +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/splitter/blob_info.h> +#include <ydb/core/tx/columnshard/splitter/chunks.h> +#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h> +#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h> + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap { + +class TVersionedIndex; +class TWritePortionInfoWithBlobs; + +class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs { +private: + using TBlobChunks = std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>; + YDB_READONLY_DEF(TBlobChunks, Chunks); + void RestoreChunk(const std::shared_ptr<IPortionDataChunk>& chunk); + + TPortionInfo PortionInfo; + mutable std::optional<std::shared_ptr<arrow::RecordBatch>> CachedBatch; + + explicit TReadPortionInfoWithBlobs(TPortionInfo&& portionInfo) + : PortionInfo(std::move(portionInfo)) { + } + + explicit TReadPortionInfoWithBlobs(const TPortionInfo& portionInfo) + : PortionInfo(portionInfo) { + } + + const TString& GetBlobByAddressVerified(const ui32 columnId, const ui32 chunkId) const; + +public: + static std::vector<TReadPortionInfoWithBlobs> RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, + const TVersionedIndex& tables); + static TReadPortionInfoWithBlobs RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, + const TIndexInfo& indexInfo); + + std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames = {}) const; + static std::optional<TWritePortionInfoWithBlobs> SyncPortion(TReadPortionInfoWithBlobs&& source, + const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages, + std::shared_ptr<NColumnShard::TSplitterCounters> counters); + + std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const; + + bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks); + + TString DebugString() const { + return TStringBuilder() << PortionInfo.DebugString() << ";"; + } + + const TPortionInfo& GetPortionInfo() const { + return PortionInfo; + } + + TPortionInfo& GetPortionInfo() { + return PortionInfo; + } + + friend IOutputStream& operator << (IOutputStream& out, const TReadPortionInfoWithBlobs& info) { + out << info.DebugString(); + return out; + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp new file mode 100644 index 00000000000..99929745437 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp @@ -0,0 +1,81 @@ +#include "write_with_blobs.h" +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> +#include <ydb/core/tx/columnshard/engines/column_engine.h> +#include <ydb/core/tx/columnshard/blobs_reader/task.h> +#include <ydb/core/tx/columnshard/splitter/batch_slice.h> +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> + +namespace NKikimr::NOlap { + +void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) { + AFL_VERIFY(chunk); + Y_ABORT_UNLESS(!ResultBlob); + const TString& data = chunk->GetData(); + + TBlobRangeLink16 bRange(Size, data.size()); + Size += data.size(); + + Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddressVerified(), chunk).second); + ChunksOrdered.emplace_back(chunk); + + chunk->AddIntoPortionBeforeBlob(bRange, owner.PortionInfo); +} + +void TWritePortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) { + const TBlobRangeLink16::TLinkId idx = owner.PortionInfo.RegisterBlobId(blobId); + for (auto&& i : Chunks) { + owner.PortionInfo.RegisterBlobIdx(i.first, idx); + } +} + +TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, + const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators) +{ + return BuildByBlobs(std::move(chunks), TPortionInfo(granule, 0, schemaVersion, snapshot), operators); +} + +TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion, + const std::shared_ptr<IStoragesManager>& operators) { + TWritePortionInfoWithBlobs result(basePortion.CopyBeforeChunksRebuild()); + for (auto&& blob : chunks) { + auto storage = operators->GetOperatorVerified(blob.GetGroupName()); + auto blobInfo = result.StartBlob(storage); + for (auto&& chunk : blob.GetChunks()) { + blobInfo.AddChunk(chunk); + } + } + result.GetPortionInfo().ReorderChunks(); + return result; +} + +std::vector<std::shared_ptr<IPortionDataChunk>> TWritePortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const { + std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> sortedChunks; + for (auto&& b : GetBlobs()) { + for (auto&& i : b.GetChunks()) { + if (i.second->GetEntityId() == entityId) { + sortedChunks.emplace(i.first, i.second); + } + } + } + std::vector<std::shared_ptr<IPortionDataChunk>> result; + for (auto&& i : sortedChunks) { + AFL_VERIFY(i.second->GetChunkIdxVerified() == result.size())("idx", i.second->GetChunkIdxVerified())("size", result.size()); + result.emplace_back(i.second); + } + return result; +} + +void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) { + NStatistics::TPortionStorage storage; + for (auto&& i : index.GetStatisticsByName()) { + THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> data; + for (auto&& entityId : i.second->GetEntityIds()) { + data.emplace(entityId, GetEntityChunks(entityId)); + } + i.second->FillStatisticsData(data, storage, index); + } + PortionInfo.SetStatisticsStorage(std::move(storage)); +} + +} diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h index 83e2dc68fda..c84d97bd06c 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h @@ -1,4 +1,5 @@ #pragma once +#include "base_with_blobs.h" #include "portion_info.h" #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/splitter/blob_info.h> @@ -10,9 +11,7 @@ namespace NKikimr::NOlap { -class TVersionedIndex; - -class TPortionInfoWithBlobs { +class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs { public: class TBlobInfo { private: @@ -22,8 +21,7 @@ public: YDB_READONLY_DEF(std::shared_ptr<IBlobsStorageOperator>, Operator); std::vector<std::shared_ptr<IPortionDataChunk>> ChunksOrdered; mutable std::optional<TString> ResultBlob; - void AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk); - void RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk); + void AddChunk(TWritePortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk); public: TBlobInfo(const std::shared_ptr<IBlobsStorageOperator>& bOperator) @@ -35,9 +33,9 @@ public: class TBuilder { private: TBlobInfo* OwnerBlob; - TPortionInfoWithBlobs* OwnerPortion; + TWritePortionInfoWithBlobs* OwnerPortion; public: - TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion) + TBuilder(TBlobInfo& blob, TWritePortionInfoWithBlobs& portion) : OwnerBlob(&blob) , OwnerPortion(&portion) { } @@ -48,9 +46,6 @@ public: void AddChunk(const std::shared_ptr<IPortionDataChunk>& chunk) { return OwnerBlob->AddChunk(*OwnerPortion, chunk); } - void RestoreChunk(const std::shared_ptr<IPortionDataChunk>& chunk) { - OwnerBlob->RestoreChunk(*OwnerPortion, chunk); - } }; const TString& GetBlob() const { @@ -65,26 +60,18 @@ public: return *ResultBlob; } - void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); - void ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap); + void RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); }; private: TPortionInfo PortionInfo; YDB_READONLY_DEF(std::vector<TBlobInfo>, Blobs); - mutable std::optional<std::shared_ptr<arrow::RecordBatch>> CachedBatch; - explicit TPortionInfoWithBlobs(TPortionInfo&& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {}) - : PortionInfo(std::move(portionInfo)) - , CachedBatch(batch) { + explicit TWritePortionInfoWithBlobs(TPortionInfo&& portionInfo) + : PortionInfo(std::move(portionInfo)) { } - explicit TPortionInfoWithBlobs(const TPortionInfo& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {}) - : PortionInfo(portionInfo) - , CachedBatch(batch) { - } - - void SetPortionInfo(const TPortionInfo& portionInfo) { - PortionInfo = portionInfo; + explicit TWritePortionInfoWithBlobs(const TPortionInfo& portionInfo) + : PortionInfo(portionInfo) { } TBlobInfo::TBuilder StartBlob(const std::shared_ptr<IBlobsStorageOperator>& bOperator) { @@ -93,33 +80,18 @@ private: } public: - void InitBatchCached(const std::shared_ptr<arrow::RecordBatch>& batch) { - if (!batch) { - return; - } - CachedBatch = batch; + TPortionInfo& MutablePortionInfo() { + return PortionInfo; } - static std::vector<TPortionInfoWithBlobs> RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, - const TVersionedIndex& tables, const std::shared_ptr<IStoragesManager>& operators); - static TPortionInfoWithBlobs RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, - const TIndexInfo& indexInfo, const std::shared_ptr<IStoragesManager>& operators); - - std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames = {}) const; - static TPortionInfoWithBlobs SyncPortion(TPortionInfoWithBlobs&& source, - const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages, - std::shared_ptr<NColumnShard::TSplitterCounters> counters); - std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const; - bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks); - void FillStatistics(const TIndexInfo& index); - static TPortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, - std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators); + static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, + const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators); - static TPortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion, + static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion, const std::shared_ptr<IStoragesManager>& operators); const TString& GetBlobByRangeVerified(const ui32 columnId, const ui32 chunkId) const { @@ -162,7 +134,7 @@ public: return PortionInfo; } - friend IOutputStream& operator << (IOutputStream& out, const TPortionInfoWithBlobs& info) { + friend IOutputStream& operator << (IOutputStream& out, const TWritePortionInfoWithBlobs& info) { out << info.DebugString(); return out; } diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make index 96254fce429..7143d9fc5ea 100644 --- a/ydb/core/tx/columnshard/engines/portions/ya.make +++ b/ydb/core/tx/columnshard/engines/portions/ya.make @@ -3,7 +3,9 @@ LIBRARY() SRCS( portion_info.cpp column_record.cpp - with_blobs.cpp + base_with_blobs.cpp + read_with_blobs.cpp + write_with_blobs.cpp meta.cpp common.cpp index_chunk.cpp diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 6cae83e17ca..641b8e604b1 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -8,6 +8,7 @@ #include <ydb/core/tx/columnshard/columnshard_ut_common.h> #include <ydb/core/tx/columnshard/engines/changes/compaction.h> +#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h> #include <ydb/core/tx/columnshard/blobs_action/bs/storage.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> #include <ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h> @@ -264,7 +265,7 @@ TString MakeTestBlob(i64 start = 0, i64 end = 100) { return NArrow::SerializeBatchNoCompression(batch); } -void AddIdsToBlobs(std::vector<TPortionInfoWithBlobs>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { +void AddIdsToBlobs(std::vector<TWritePortionInfoWithBlobs>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { for (auto& portion : portions) { for (auto& rec : portion.GetPortionInfo().Records) { rec.BlobRange.BlobIdx = portion.GetPortionInfo().RegisterBlobId(MakeUnifiedBlobId(++step, portion.GetBlobFullSizeVerified(rec.ColumnId, rec.Chunk))); diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp index 2c4a9e50f73..c2d0a1488b1 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp @@ -3,6 +3,7 @@ #include <ydb/core/tx/columnshard/defs.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h> +#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> namespace NKikimr::NOlap { @@ -18,7 +19,7 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T } auto* pInfo = changes.GetWritePortionInfo(i); Y_ABORT_UNLESS(pInfo); - TPortionInfoWithBlobs& portionWithBlobs = *pInfo; + TWritePortionInfoWithBlobs& portionWithBlobs = *pInfo; for (auto&& b : portionWithBlobs.GetBlobs()) { auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId()))); b.RegisterBlobId(portionWithBlobs, task.GetBlobId()); diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h index 3f409598227..d18d0c688ab 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h @@ -5,7 +5,6 @@ #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/columnshard/columnshard_private_events.h> -#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h> #include <ydb/core/tx/columnshard/blobs_action/abstract/action.h> namespace NKikimr::NOlap { |