aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@hotmail.com>2024-04-13 21:50:28 +0300
committerGitHub <noreply@github.com>2024-04-13 21:50:28 +0300
commitb70197fd28f146cc962605de5585ececcf9cc7bf (patch)
treec0b25475af3d0913c931015e41fd5f88fb2bec9b
parent0fd0d2a08bdae35d02f9bd71ef6a077841401394 (diff)
downloadydb-b70197fd28f146cc962605de5585ececcf9cc7bf.tar.gz
split portion-with-blobs class on read and write parts (#3714)
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.h5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/cleanup_portions.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/cleanup_tables.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.h5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h6
-rw-r--r--ydb/core/tx/columnshard/engines/portions/base_with_blobs.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/portions/base_with_blobs.h9
-rw-r--r--ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp (renamed from ydb/core/tx/columnshard/engines/portions/with_blobs.cpp)149
-rw-r--r--ydb/core/tx/columnshard/engines/portions/read_with_blobs.h69
-rw-r--r--ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp81
-rw-r--r--ydb/core/tx/columnshard/engines/portions/write_with_blobs.h (renamed from ydb/core/tx/columnshard/engines/portions/with_blobs.h)60
-rw-r--r--ydb/core/tx/columnshard/engines/portions/ya.make4
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h1
20 files changed, 255 insertions, 180 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
index 5e38a8601f1..339face80d3 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
@@ -11,7 +11,7 @@
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
+#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
#include <ydb/core/tx/columnshard/splitter/settings.h>
@@ -35,7 +35,6 @@ class TColumnShard;
namespace NKikimr::NOlap {
class TColumnEngineForLogs;
class TVersionedIndex;
-class TPortionInfoWithBlobs;
class TPortionEvictionFeatures {
private:
@@ -270,7 +269,7 @@ public:
void Start(NColumnShard::TColumnShard& self);
virtual ui32 GetWritePortionsCount() const = 0;
- virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0;
+ virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0;
virtual bool NeedWritePortion(const ui32 index) const = 0;
void WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context);
diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
index f4addb74f04..71a48e78be6 100644
--- a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
+++ b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
@@ -42,7 +42,7 @@ public:
virtual ui32 GetWritePortionsCount() const override {
return 0;
}
- virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override {
+ virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override {
return nullptr;
}
virtual bool NeedWritePortion(const ui32 /*index*/) const override {
diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h b/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h
index 5f5b7a9bd00..f39d33f5871 100644
--- a/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h
+++ b/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h
@@ -40,7 +40,7 @@ public:
virtual ui32 GetWritePortionsCount() const override {
return 0;
}
- virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override {
+ virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override {
return nullptr;
}
virtual bool NeedWritePortion(const ui32 /*index*/) const override {
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
index 7503d421323..493cd6268f6 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
@@ -3,7 +3,6 @@
#include <ydb/core/tx/columnshard/splitter/chunks.h>
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
-#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NOlap::NCompaction {
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index 14dad1e76bb..706a0924c5c 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -9,8 +9,9 @@
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/tx/columnshard/engines/portions/read_with_blobs.h>
+#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
-#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
#include <ydb/core/tx/columnshard/engines/storage/chunks/null_column.h>
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
#include <ydb/core/tx/columnshard/splitter/settings.h>
@@ -18,7 +19,7 @@
namespace NKikimr::NOlap::NCompaction {
-void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept {
+void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept {
std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
auto resultSchema = context.SchemaVersions.GetLastSchema();
{
@@ -43,7 +44,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TCon
}
}
-void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept {
+void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept {
static const TString portionIdFieldName = "$$__portion_id";
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
static const std::shared_ptr<arrow::Field> portionIdField = std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
@@ -194,7 +195,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
- AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, GranuleMeta->GetPathId(),
+ AppendedPortions.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), GranuleMeta->GetPathId(),
resultSchema->GetVersion(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager()));
AppendedPortions.back().FillStatistics(resultSchema->GetIndexInfo());
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
@@ -226,7 +227,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize);
{
- std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs, context.SchemaVersions, SaverContext.GetStoragesManager());
+ std::vector<TReadPortionInfoWithBlobs> portions = TReadPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs, context.SchemaVersions);
if (!HasAppData() || AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) {
BuildAppendedPortionsByChunks(context, std::move(portions));
} else {
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
index df583585d9c..69ce5ed1e8e 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
@@ -1,6 +1,7 @@
#pragma once
#include "compaction.h"
#include <ydb/core/formats/arrow/reader/position.h>
+#include <ydb/core/tx/columnshard/engines/portions/read_with_blobs.h>
namespace NKikimr::NOlap::NCompaction {
@@ -9,8 +10,8 @@ private:
using TBase = TCompactColumnEngineChanges;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
std::map<NArrow::NMerger::TSortableBatchPosition, bool> CheckPoints;
- void BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept;
- void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TPortionInfoWithBlobs>&& portions) noexcept;
+ void BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept;
+ void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept;
protected:
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
virtual TPortionMeta::EProduced GetResultProducedClass() const override {
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index 1caea149059..c2107a3515d 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -1,4 +1,5 @@
#include "ttl.h"
+#include <ydb/core/tx/columnshard/engines/portions/read_with_blobs.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
@@ -45,7 +46,7 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan
}
}
-std::optional<TPortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
+std::optional<TWritePortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
TConstructionContext& context) const
{
const TPortionInfo& portionInfo = info.GetPortionInfo();
@@ -53,11 +54,9 @@ std::optional<TPortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPorti
auto blobSchema = portionInfo.GetSchema(context.SchemaVersions);
Y_ABORT_UNLESS(portionInfo.GetMeta().GetTierName() != evictFeatures.GetTargetTierName() || blobSchema->GetVersion() < evictFeatures.GetTargetScheme()->GetVersion());
- auto portionWithBlobs = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs, blobSchema->GetIndexInfo(), SaverContext.GetStoragesManager());
- TPortionInfoWithBlobs result = TPortionInfoWithBlobs::SyncPortion(
+ auto portionWithBlobs = TReadPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs, blobSchema->GetIndexInfo());
+ std::optional<TWritePortionInfoWithBlobs> result = TReadPortionInfoWithBlobs::SyncPortion(
std::move(portionWithBlobs), blobSchema, evictFeatures.GetTargetScheme(), evictFeatures.GetTargetTierName(), SaverContext.GetStoragesManager(), context.Counters.SplitterCounters);
-
- result.GetPortionInfo().MutableMeta().SetTierName(evictFeatures.GetTargetTierName());
return std::move(result);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h
index c9cb1a989d9..27d76b750dc 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.h
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.h
@@ -40,7 +40,7 @@ private:
}
};
- std::optional<TPortionInfoWithBlobs> UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
+ std::optional<TWritePortionInfoWithBlobs> UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
TConstructionContext& context) const;
std::vector<TPortionForEviction> PortionsToEvict;
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 11f99a70266..ba99045e723 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -16,7 +16,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), true);
}
- const auto predRemoveDroppedTable = [self](const TPortionInfoWithBlobs& item) {
+ const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobs& item) {
auto& portionInfo = item.GetPortionInfo();
if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId());
@@ -98,7 +98,7 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
}
}
-std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch,
+std::vector<TWritePortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional<NArrow::NSerialization::TSerializerContainer>& overrideSaver) const {
Y_ABORT_UNLESS(batch->num_rows());
@@ -112,7 +112,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(cons
if (overrideSaver) {
schema->SetOverrideSerializer(*overrideSaver);
}
- std::vector<TPortionInfoWithBlobs> out;
+ std::vector<TWritePortionInfoWithBlobs> out;
{
std::vector<TBatchSerializedSlice> pages = TBatchSerializedSlice::BuildSimpleSlices(batch, NSplitter::TSplitSettings(), context.Counters.SplitterCounters, schema);
std::vector<TGeneralSerializedSlice> generalPages;
@@ -130,7 +130,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(cons
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
auto b = batch->Slice(recordIdx, slice.GetRecordsCount());
- out.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager()));
+ out.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager()));
out.back().FillStatistics(resultSchema->GetIndexInfo());
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index ebac536d011..1a99a11135f 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -16,7 +16,7 @@ protected:
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
virtual void DoStart(NColumnShard::TColumnShard& self) override;
- std::vector<TPortionInfoWithBlobs> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule,
+ std::vector<TWritePortionInfoWithBlobs> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule,
const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional<NArrow::NSerialization::TSerializerContainer>& overrideSaver) const;
virtual void DoDebugString(TStringOutput& out) const override {
@@ -45,11 +45,11 @@ public:
}
THashMap<TPortionAddress, TPortionInfo> PortionsToRemove;
- std::vector<TPortionInfoWithBlobs> AppendedPortions;
+ std::vector<TWritePortionInfoWithBlobs> AppendedPortions;
virtual ui32 GetWritePortionsCount() const override {
return AppendedPortions.size();
}
- virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override {
+ virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override {
Y_ABORT_UNLESS(index < AppendedPortions.size());
return &AppendedPortions[index];
}
diff --git a/ydb/core/tx/columnshard/engines/portions/base_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.cpp
new file mode 100644
index 00000000000..ffb62297253
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.cpp
@@ -0,0 +1,5 @@
+#include "base_with_blobs.h"
+
+namespace NKikimr::NOlap {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/portions/base_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.h
new file mode 100644
index 00000000000..8d290f96896
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/base_with_blobs.h
@@ -0,0 +1,9 @@
+#pragma once
+
+namespace NKikimr::NOlap {
+
+class TBasePortionInfoWithBlobs {
+public:
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
index 7a29d2779c7..09b98597378 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
@@ -1,4 +1,5 @@
-#include "with_blobs.h"
+#include "read_with_blobs.h"
+#include "write_with_blobs.h"
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
@@ -8,57 +9,19 @@
namespace NKikimr::NOlap {
-void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) {
- Y_ABORT_UNLESS(!ResultBlob);
- const TString& data = chunk->GetData();
- Size += data.size();
+void TReadPortionInfoWithBlobs::RestoreChunk(const std::shared_ptr<IPortionDataChunk>& chunk) {
auto address = chunk->GetChunkAddressVerified();
- AFL_VERIFY(owner.GetPortionInfo().HasEntityAddress(address))("address", address.DebugString());
+ AFL_VERIFY(GetPortionInfo().HasEntityAddress(address))("address", address.DebugString());
AFL_VERIFY(Chunks.emplace(address, chunk).second)("address", address.DebugString());
- ChunksOrdered.emplace_back(chunk);
}
-void TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) {
- AFL_VERIFY(chunk);
- Y_ABORT_UNLESS(!ResultBlob);
- const TString& data = chunk->GetData();
-
- TBlobRangeLink16 bRange(Size, data.size());
- Size += data.size();
-
- Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddressVerified(), chunk).second);
- ChunksOrdered.emplace_back(chunk);
-
- chunk->AddIntoPortionBeforeBlob(bRange, owner.PortionInfo);
-}
-
-void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) {
- const TBlobRangeLink16::TLinkId idx = owner.PortionInfo.RegisterBlobId(blobId);
- for (auto&& i : Chunks) {
- owner.PortionInfo.RegisterBlobIdx(i.first, idx);
- }
-}
-
-void TPortionInfoWithBlobs::TBlobInfo::ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap) {
- const auto pred = [this, &resultMap, entityId](const std::shared_ptr<IPortionDataChunk>& chunk) {
- if (chunk->GetEntityId() == entityId) {
- resultMap.emplace(chunk->GetChunkAddressVerified(), chunk);
- Chunks.erase(chunk->GetChunkAddressVerified());
- return true;
- } else {
- return false;
- }
- };
- ChunksOrdered.erase(std::remove_if(ChunksOrdered.begin(), ChunksOrdered.end(), pred), ChunksOrdered.end());
-}
-
-std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames) const {
+std::shared_ptr<arrow::RecordBatch> TReadPortionInfoWithBlobs::GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames) const {
Y_ABORT_UNLESS(data);
if (columnNames.empty()) {
if (!CachedBatch) {
THashMap<TChunkAddress, TString> blobs;
for (auto&& i : PortionInfo.Records) {
- blobs[i.GetAddress()] = GetBlobByRangeVerified(i.ColumnId, i.Chunk);
+ blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk);
Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size);
}
CachedBatch = PortionInfo.AssembleInBatch(*data, result, blobs);
@@ -77,67 +40,41 @@ std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnaps
auto filteredSchema = std::make_shared<TFilteredSnapshotSchema>(data, columnNames);
THashMap<TChunkAddress, TString> blobs;
for (auto&& i : PortionInfo.Records) {
- blobs[i.GetAddress()] = GetBlobByRangeVerified(i.ColumnId, i.Chunk);
+ blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk);
Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size);
}
return PortionInfo.AssembleInBatch(*data, *filteredSchema, blobs);
}
}
-NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo, const std::shared_ptr<IStoragesManager>& operators) {
- TPortionInfoWithBlobs result(portion);
+NKikimr::NOlap::TReadPortionInfoWithBlobs TReadPortionInfoWithBlobs::RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) {
+ TReadPortionInfoWithBlobs result(portion);
THashMap<TString, THashMap<TUnifiedBlobId, std::vector<std::shared_ptr<IPortionDataChunk>>>> records = result.PortionInfo.RestoreEntityChunks(blobs, indexInfo);
for (auto&& [storageId, recordsByBlob] : records) {
- auto storage = operators->GetOperatorVerified(storageId);
for (auto&& i : recordsByBlob) {
- auto builder = result.StartBlob(storage);
for (auto&& d : i.second) {
- builder.RestoreChunk(d);
+ result.RestoreChunk(d);
}
}
}
return result;
}
-std::vector<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
- const TVersionedIndex& tables, const std::shared_ptr<IStoragesManager>& operators) {
- std::vector<TPortionInfoWithBlobs> result;
+std::vector<NKikimr::NOlap::TReadPortionInfoWithBlobs> TReadPortionInfoWithBlobs::RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
+ const TVersionedIndex& tables) {
+ std::vector<TReadPortionInfoWithBlobs> result;
for (auto&& i : portions) {
const auto schema = i.GetSchema(tables);
- result.emplace_back(RestorePortion(i, blobs, schema->GetIndexInfo(), operators));
+ result.emplace_back(RestorePortion(i, blobs, schema->GetIndexInfo()));
}
return result;
}
-NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
- std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators)
-{
- TPortionInfoWithBlobs result = BuildByBlobs(std::move(chunks), TPortionInfo(granule, 0, schemaVersion, snapshot), operators);
- result.InitBatchCached(batch);
- return result;
-}
-
-TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion,
- const std::shared_ptr<IStoragesManager>& operators) {
- TPortionInfoWithBlobs result(basePortion.CopyBeforeChunksRebuild());
- for (auto&& blob : chunks) {
- auto storage = operators->GetOperatorVerified(blob.GetGroupName());
- auto blobInfo = result.StartBlob(storage);
- for (auto&& chunk : blob.GetChunks()) {
- blobInfo.AddChunk(chunk);
- }
- }
- result.GetPortionInfo().ReorderChunks();
- return result;
-}
-
-std::vector<std::shared_ptr<IPortionDataChunk>> TPortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const {
+std::vector<std::shared_ptr<IPortionDataChunk>> TReadPortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const {
std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> sortedChunks;
- for (auto&& b : GetBlobs()) {
- for (auto&& i : b.GetChunks()) {
- if (i.second->GetEntityId() == entityId) {
- sortedChunks.emplace(i.first, i.second);
- }
+ for (auto&& i : Chunks) {
+ if (i.second->GetEntityId() == entityId) {
+ sortedChunks.emplace(i.first, i.second);
}
}
std::vector<std::shared_ptr<IPortionDataChunk>> result;
@@ -148,18 +85,23 @@ std::vector<std::shared_ptr<IPortionDataChunk>> TPortionInfoWithBlobs::GetEntity
return result;
}
-bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks) {
- records = GetPortionInfo().GetColumnChunksPointers(columnId);
+bool TReadPortionInfoWithBlobs::ExtractColumnChunks(const ui32 entityId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks) {
+ records = GetPortionInfo().GetColumnChunksPointers(entityId);
if (records.empty()) {
return false;
}
std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> chunksMap;
- for (auto&& i : Blobs) {
- i.ExtractEntityChunks(columnId, chunksMap);
+ for (auto it = Chunks.begin(); it != Chunks.end();) {
+ if (it->first.GetEntityId() == entityId) {
+ chunksMap.emplace(it->first, std::move(it->second));
+ it = Chunks.erase(it);
+ } else {
+ ++it;
+ }
}
std::vector<std::shared_ptr<IPortionDataChunk>> chunksLocal;
for (auto&& i : chunksMap) {
- Y_ABORT_UNLESS(i.first.GetColumnId() == columnId);
+ Y_ABORT_UNLESS(i.first.GetColumnId() == entityId);
Y_ABORT_UNLESS(i.first.GetChunk() == chunksLocal.size());
chunksLocal.emplace_back(i.second);
}
@@ -167,23 +109,12 @@ bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector
return true;
}
-void TPortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) {
- NStatistics::TPortionStorage storage;
- for (auto&& i : index.GetStatisticsByName()) {
- THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> data;
- for (auto&& entityId : i.second->GetEntityIds()) {
- data.emplace(entityId, GetEntityChunks(entityId));
- }
- i.second->FillStatisticsData(data, storage, index);
- }
- PortionInfo.SetStatisticsStorage(std::move(storage));
-}
-
-TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&& source,
+std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion(TReadPortionInfoWithBlobs&& source,
const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages,
std::shared_ptr<NColumnShard::TSplitterCounters> counters) {
if (from->GetVersion() == to->GetVersion() && targetTier == source.GetPortionInfo().GetTierNameDef(IStoragesManager::DefaultStorageId)) {
- return std::move(source);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "we don't need sync portion");
+ return {};
}
NYDBTest::TControllers::GetColumnShardController()->OnPortionActualization(source.PortionInfo);
auto pages = source.PortionInfo.BuildPages();
@@ -192,10 +123,10 @@ TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&&
pageSizes.emplace_back(p.GetRecordsCount());
}
THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> columnChunks;
- for (auto&& i : source.Blobs) {
- for (auto&& c : i.GetChunks()) {
- columnChunks[c.first.GetColumnId()].emplace_back(c.second);
- }
+ for (auto&& c : source.Chunks) {
+ auto& chunks = columnChunks[c.first.GetColumnId()];
+ AFL_VERIFY(c.first.GetChunkIdx() == chunks.size());
+ chunks.emplace_back(c.second);
}
THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> entityChunksNew;
@@ -220,7 +151,7 @@ TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&&
auto schemaTo = std::make_shared<TDefaultSchemaDetails>(to, std::make_shared<TSerializationStats>());
TGeneralSerializedSlice slice(entityChunksNew, schemaTo, counters);
const NSplitter::TEntityGroups groups = to->GetIndexInfo().GetEntityGroupsByStorageId(targetTier, *storages);
- TPortionInfoWithBlobs result = TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), source.PortionInfo, storages);
+ TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), source.PortionInfo, storages);
result.GetPortionInfo().SetMinSnapshotDeprecated(to->GetSnapshot());
result.GetPortionInfo().SetSchemaVersion(to->GetVersion());
result.GetPortionInfo().MutableMeta().SetTierName(targetTier);
@@ -234,8 +165,14 @@ TPortionInfoWithBlobs TPortionInfoWithBlobs::SyncPortion(TPortionInfoWithBlobs&&
i.second->FillStatisticsData(entityChunksNew, storage, to->GetIndexInfo());
}
}
- result.PortionInfo.MutableMeta().ResetStatisticsStorage(std::move(storage));
+ result.MutablePortionInfo().MutableMeta().ResetStatisticsStorage(std::move(storage));
return result;
}
+const TString& TReadPortionInfoWithBlobs::GetBlobByAddressVerified(const ui32 columnId, const ui32 chunkId) const {
+ auto it = Chunks.find(TChunkAddress(columnId, chunkId));
+ AFL_VERIFY(it != Chunks.end())("column_id", columnId)("chunk_idx", chunkId);
+ return it->second->GetData();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h
new file mode 100644
index 00000000000..2c8553f9884
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h
@@ -0,0 +1,69 @@
+#pragma once
+#include "base_with_blobs.h"
+#include "portion_info.h"
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/splitter/blob_info.h>
+#include <ydb/core/tx/columnshard/splitter/chunks.h>
+#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h>
+#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap {
+
+class TVersionedIndex;
+class TWritePortionInfoWithBlobs;
+
+class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs {
+private:
+ using TBlobChunks = std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>;
+ YDB_READONLY_DEF(TBlobChunks, Chunks);
+ void RestoreChunk(const std::shared_ptr<IPortionDataChunk>& chunk);
+
+ TPortionInfo PortionInfo;
+ mutable std::optional<std::shared_ptr<arrow::RecordBatch>> CachedBatch;
+
+ explicit TReadPortionInfoWithBlobs(TPortionInfo&& portionInfo)
+ : PortionInfo(std::move(portionInfo)) {
+ }
+
+ explicit TReadPortionInfoWithBlobs(const TPortionInfo& portionInfo)
+ : PortionInfo(portionInfo) {
+ }
+
+ const TString& GetBlobByAddressVerified(const ui32 columnId, const ui32 chunkId) const;
+
+public:
+ static std::vector<TReadPortionInfoWithBlobs> RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
+ const TVersionedIndex& tables);
+ static TReadPortionInfoWithBlobs RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
+ const TIndexInfo& indexInfo);
+
+ std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames = {}) const;
+ static std::optional<TWritePortionInfoWithBlobs> SyncPortion(TReadPortionInfoWithBlobs&& source,
+ const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages,
+ std::shared_ptr<NColumnShard::TSplitterCounters> counters);
+
+ std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const;
+
+ bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks);
+
+ TString DebugString() const {
+ return TStringBuilder() << PortionInfo.DebugString() << ";";
+ }
+
+ const TPortionInfo& GetPortionInfo() const {
+ return PortionInfo;
+ }
+
+ TPortionInfo& GetPortionInfo() {
+ return PortionInfo;
+ }
+
+ friend IOutputStream& operator << (IOutputStream& out, const TReadPortionInfoWithBlobs& info) {
+ out << info.DebugString();
+ return out;
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
new file mode 100644
index 00000000000..99929745437
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
@@ -0,0 +1,81 @@
+#include "write_with_blobs.h"
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
+#include <ydb/core/tx/columnshard/blobs_reader/task.h>
+#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+
+namespace NKikimr::NOlap {
+
+void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) {
+ AFL_VERIFY(chunk);
+ Y_ABORT_UNLESS(!ResultBlob);
+ const TString& data = chunk->GetData();
+
+ TBlobRangeLink16 bRange(Size, data.size());
+ Size += data.size();
+
+ Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddressVerified(), chunk).second);
+ ChunksOrdered.emplace_back(chunk);
+
+ chunk->AddIntoPortionBeforeBlob(bRange, owner.PortionInfo);
+}
+
+void TWritePortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) {
+ const TBlobRangeLink16::TLinkId idx = owner.PortionInfo.RegisterBlobId(blobId);
+ for (auto&& i : Chunks) {
+ owner.PortionInfo.RegisterBlobIdx(i.first, idx);
+ }
+}
+
+TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
+ const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators)
+{
+ return BuildByBlobs(std::move(chunks), TPortionInfo(granule, 0, schemaVersion, snapshot), operators);
+}
+
+TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion,
+ const std::shared_ptr<IStoragesManager>& operators) {
+ TWritePortionInfoWithBlobs result(basePortion.CopyBeforeChunksRebuild());
+ for (auto&& blob : chunks) {
+ auto storage = operators->GetOperatorVerified(blob.GetGroupName());
+ auto blobInfo = result.StartBlob(storage);
+ for (auto&& chunk : blob.GetChunks()) {
+ blobInfo.AddChunk(chunk);
+ }
+ }
+ result.GetPortionInfo().ReorderChunks();
+ return result;
+}
+
+std::vector<std::shared_ptr<IPortionDataChunk>> TWritePortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const {
+ std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> sortedChunks;
+ for (auto&& b : GetBlobs()) {
+ for (auto&& i : b.GetChunks()) {
+ if (i.second->GetEntityId() == entityId) {
+ sortedChunks.emplace(i.first, i.second);
+ }
+ }
+ }
+ std::vector<std::shared_ptr<IPortionDataChunk>> result;
+ for (auto&& i : sortedChunks) {
+ AFL_VERIFY(i.second->GetChunkIdxVerified() == result.size())("idx", i.second->GetChunkIdxVerified())("size", result.size());
+ result.emplace_back(i.second);
+ }
+ return result;
+}
+
+void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) {
+ NStatistics::TPortionStorage storage;
+ for (auto&& i : index.GetStatisticsByName()) {
+ THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> data;
+ for (auto&& entityId : i.second->GetEntityIds()) {
+ data.emplace(entityId, GetEntityChunks(entityId));
+ }
+ i.second->FillStatisticsData(data, storage, index);
+ }
+ PortionInfo.SetStatisticsStorage(std::move(storage));
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h
index 83e2dc68fda..c84d97bd06c 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h
+++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h
@@ -1,4 +1,5 @@
#pragma once
+#include "base_with_blobs.h"
#include "portion_info.h"
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/splitter/blob_info.h>
@@ -10,9 +11,7 @@
namespace NKikimr::NOlap {
-class TVersionedIndex;
-
-class TPortionInfoWithBlobs {
+class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs {
public:
class TBlobInfo {
private:
@@ -22,8 +21,7 @@ public:
YDB_READONLY_DEF(std::shared_ptr<IBlobsStorageOperator>, Operator);
std::vector<std::shared_ptr<IPortionDataChunk>> ChunksOrdered;
mutable std::optional<TString> ResultBlob;
- void AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk);
- void RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk);
+ void AddChunk(TWritePortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk);
public:
TBlobInfo(const std::shared_ptr<IBlobsStorageOperator>& bOperator)
@@ -35,9 +33,9 @@ public:
class TBuilder {
private:
TBlobInfo* OwnerBlob;
- TPortionInfoWithBlobs* OwnerPortion;
+ TWritePortionInfoWithBlobs* OwnerPortion;
public:
- TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion)
+ TBuilder(TBlobInfo& blob, TWritePortionInfoWithBlobs& portion)
: OwnerBlob(&blob)
, OwnerPortion(&portion) {
}
@@ -48,9 +46,6 @@ public:
void AddChunk(const std::shared_ptr<IPortionDataChunk>& chunk) {
return OwnerBlob->AddChunk(*OwnerPortion, chunk);
}
- void RestoreChunk(const std::shared_ptr<IPortionDataChunk>& chunk) {
- OwnerBlob->RestoreChunk(*OwnerPortion, chunk);
- }
};
const TString& GetBlob() const {
@@ -65,26 +60,18 @@ public:
return *ResultBlob;
}
- void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId);
- void ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap);
+ void RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId);
};
private:
TPortionInfo PortionInfo;
YDB_READONLY_DEF(std::vector<TBlobInfo>, Blobs);
- mutable std::optional<std::shared_ptr<arrow::RecordBatch>> CachedBatch;
- explicit TPortionInfoWithBlobs(TPortionInfo&& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {})
- : PortionInfo(std::move(portionInfo))
- , CachedBatch(batch) {
+ explicit TWritePortionInfoWithBlobs(TPortionInfo&& portionInfo)
+ : PortionInfo(std::move(portionInfo)) {
}
- explicit TPortionInfoWithBlobs(const TPortionInfo& portionInfo, std::optional<std::shared_ptr<arrow::RecordBatch>> batch = {})
- : PortionInfo(portionInfo)
- , CachedBatch(batch) {
- }
-
- void SetPortionInfo(const TPortionInfo& portionInfo) {
- PortionInfo = portionInfo;
+ explicit TWritePortionInfoWithBlobs(const TPortionInfo& portionInfo)
+ : PortionInfo(portionInfo) {
}
TBlobInfo::TBuilder StartBlob(const std::shared_ptr<IBlobsStorageOperator>& bOperator) {
@@ -93,33 +80,18 @@ private:
}
public:
- void InitBatchCached(const std::shared_ptr<arrow::RecordBatch>& batch) {
- if (!batch) {
- return;
- }
- CachedBatch = batch;
+ TPortionInfo& MutablePortionInfo() {
+ return PortionInfo;
}
- static std::vector<TPortionInfoWithBlobs> RestorePortions(const std::vector<TPortionInfo>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
- const TVersionedIndex& tables, const std::shared_ptr<IStoragesManager>& operators);
- static TPortionInfoWithBlobs RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
- const TIndexInfo& indexInfo, const std::shared_ptr<IStoragesManager>& operators);
-
- std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames = {}) const;
- static TPortionInfoWithBlobs SyncPortion(TPortionInfoWithBlobs&& source,
- const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages,
- std::shared_ptr<NColumnShard::TSplitterCounters> counters);
-
std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const;
- bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks);
-
void FillStatistics(const TIndexInfo& index);
- static TPortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
- std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators);
+ static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
+ const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators);
- static TPortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion,
+ static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion,
const std::shared_ptr<IStoragesManager>& operators);
const TString& GetBlobByRangeVerified(const ui32 columnId, const ui32 chunkId) const {
@@ -162,7 +134,7 @@ public:
return PortionInfo;
}
- friend IOutputStream& operator << (IOutputStream& out, const TPortionInfoWithBlobs& info) {
+ friend IOutputStream& operator << (IOutputStream& out, const TWritePortionInfoWithBlobs& info) {
out << info.DebugString();
return out;
}
diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make
index 96254fce429..7143d9fc5ea 100644
--- a/ydb/core/tx/columnshard/engines/portions/ya.make
+++ b/ydb/core/tx/columnshard/engines/portions/ya.make
@@ -3,7 +3,9 @@ LIBRARY()
SRCS(
portion_info.cpp
column_record.cpp
- with_blobs.cpp
+ base_with_blobs.cpp
+ read_with_blobs.cpp
+ write_with_blobs.cpp
meta.cpp
common.cpp
index_chunk.cpp
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 6cae83e17ca..641b8e604b1 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
+#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/blobs_action/bs/storage.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h>
@@ -264,7 +265,7 @@ TString MakeTestBlob(i64 start = 0, i64 end = 100) {
return NArrow::SerializeBatchNoCompression(batch);
}
-void AddIdsToBlobs(std::vector<TPortionInfoWithBlobs>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) {
+void AddIdsToBlobs(std::vector<TWritePortionInfoWithBlobs>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) {
for (auto& portion : portions) {
for (auto& rec : portion.GetPortionInfo().Records) {
rec.BlobRange.BlobIdx = portion.GetPortionInfo().RegisterBlobId(MakeUnifiedBlobId(++step, portion.GetBlobFullSizeVerified(rec.ColumnId, rec.Chunk)));
diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
index 2c4a9e50f73..c2d0a1488b1 100644
--- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/tx/columnshard/defs.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
namespace NKikimr::NOlap {
@@ -18,7 +19,7 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T
}
auto* pInfo = changes.GetWritePortionInfo(i);
Y_ABORT_UNLESS(pInfo);
- TPortionInfoWithBlobs& portionWithBlobs = *pInfo;
+ TWritePortionInfoWithBlobs& portionWithBlobs = *pInfo;
for (auto&& b : portionWithBlobs.GetBlobs()) {
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId())));
b.RegisterBlobId(portionWithBlobs, task.GetBlobId());
diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
index 3f409598227..d18d0c688ab 100644
--- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
@@ -5,7 +5,6 @@
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
-#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/action.h>
namespace NKikimr::NOlap {