aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-01-19 17:24:45 +0300
committerGitHub <noreply@github.com>2024-01-19 17:24:45 +0300
commit189588cdbed614b3bf079e253dd4b563f7fbb6fb (patch)
tree6550a32ffeb05550c8ffbb8de692c63b593cbc8c
parenta246d2e79fb3c2b44e72930f3444ad17755dda65 (diff)
downloadydb-189588cdbed614b3bf079e253dd4b563f7fbb6fb.tar.gz
use splitter for abstract data chunks. not columns only (#1147)
* use splitter for abstract data chunks. not columns only * fix build * fix build * fix build * fix build * fix test for portions->bs->tier->bs (in this case we have to rewrite blobId) * fix * optional useless * fix
-rw-r--r--ydb/core/tx/columnshard/blob.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h28
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.h1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/cleanup.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h11
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h13
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp45
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h14
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h17
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.cpp54
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.h26
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/column_features.h17
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h12
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp4
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.cpp87
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h36
-rw-r--r--ydb/core/tx/columnshard/splitter/blob_info.cpp3
-rw-r--r--ydb/core/tx/columnshard/splitter/blob_info.h5
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.cpp11
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h197
-rw-r--r--ydb/core/tx/columnshard/splitter/column_info.h79
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.cpp5
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.h2
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.cpp10
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.h8
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp8
-rw-r--r--ydb/library/accessor/validator.cpp1
-rw-r--r--ydb/library/accessor/validator.h12
-rw-r--r--ydb/library/accessor/ya.make2
38 files changed, 494 insertions, 311 deletions
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h
index ea3790266d..0c3cb27bdb 100644
--- a/ydb/core/tx/columnshard/blob.h
+++ b/ydb/core/tx/columnshard/blob.h
@@ -288,6 +288,10 @@ struct TBlobRange {
return BlobId;
}
+ bool IsValid() const {
+ return BlobId.IsValid() && Size && Offset + Size <= BlobId.BlobSize();
+ }
+
ui32 GetBlobSize() const {
return Size;
}
diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp
index 8c3feaa4cf..a0bd1c9266 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/columnshard_schema.cpp
@@ -2,31 +2,6 @@
namespace NKikimr::NColumnShard {
-bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback) {
- auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
- if (!rowset.IsReady()) {
- return false;
- }
-
- while (!rowset.EndOfSet()) {
- NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
- portion.SetPathId(rowset.GetValue<IndexColumns::PathId>());
- portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
- portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
- portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>());
-
- NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, dsGroupSelector);
-
- portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());
-
- callback(portion, chunkLoadContext);
-
- if (!rowset.Next())
- return false;
- }
- return true;
-}
-
bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) {
auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select();
if (!rowset.IsReady()) {
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index b2317147a4..36d7695ffe 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -508,34 +508,6 @@ struct Schema : NIceDb::Schema {
NOlap::TInsertTableAccessor& insertTable,
const TInstant& loadTime);
- // IndexColumns activities
-
- static void IndexColumns_Write(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
- auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
- auto rowProto = row.GetMeta().SerializeToProto();
- if (proto) {
- *rowProto.MutablePortionMeta() = std::move(*proto);
- }
- db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
- portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update(
- NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
- NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
- NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()),
- NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()),
- NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset),
- NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size),
- NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId())
- );
- }
-
- static void IndexColumns_Erase(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
- db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
- portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
- }
-
- static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector,
- const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback);
-
// IndexCounters
static void IndexCounters_Write(NIceDb::TNiceDb& db, ui32 counterId, ui64 value) {
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
index 6a99477510..8a5dacc48f 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
@@ -2,7 +2,6 @@
#include "settings.h"
#include <ydb/core/tx/columnshard/blobs_action/abstract/action.h>
#include <ydb/core/tx/columnshard/counters/indexation.h>
-#include <ydb/core/tx/columnshard/engines/columns_table.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
index ef91baef13..e3eacf5d40 100644
--- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
@@ -39,9 +39,7 @@ bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
continue;
}
- for (auto& record : portionInfo.Records) {
- context.DB.EraseColumn(portionInfo, record);
- }
+ portionInfo.RemoveFromDatabase(context.DB);
}
return true;
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp
index 0f54e06a93..cdb81296cf 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp
@@ -63,7 +63,7 @@ bool TPortionColumnCursor::NextChunk() {
ChunkRecordIndexStartPosition += CurrentChunkRecordsCount;
CurrentBlobChunk = BlobChunks[ChunkIdx];
CurrentColumnChunk = ColumnChunks[ChunkIdx];
- CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCount();
+ CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
return true;
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
index 5224ec5d6b..7503d42132 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
@@ -10,13 +10,13 @@ namespace NKikimr::NOlap::NCompaction {
class TPortionColumnCursor {
private:
- std::vector<IPortionColumnChunk::TPtr> BlobChunks;
+ std::vector<std::shared_ptr<IPortionDataChunk>> BlobChunks;
std::vector<const TColumnRecord*> ColumnChunks;
std::optional<ui32> RecordIndexStart;
YDB_READONLY(ui32, RecordIndexFinish, 0);
ui32 ChunkRecordIndexStartPosition = 0;
ui32 ChunkIdx = 0;
- IPortionColumnChunk::TPtr CurrentBlobChunk;
+ std::shared_ptr<IPortionDataChunk> CurrentBlobChunk;
const TColumnRecord* CurrentColumnChunk = nullptr;
ui32 CurrentChunkRecordsCount = 0;
std::shared_ptr<arrow::Array> CurrentArray;
@@ -38,19 +38,18 @@ public:
bool Fetch(TMergedColumn& column);
- TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
+ TPortionColumnCursor(const std::vector<std::shared_ptr<IPortionDataChunk>>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
: BlobChunks(columnChunks)
, ColumnChunks(records)
, ColumnLoader(loader)
- , PortionId(portionId)
- {
+ , PortionId(portionId) {
AFL_VERIFY(ColumnLoader);
Y_UNUSED(PortionId);
Y_ABORT_UNLESS(BlobChunks.size());
Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size());
CurrentBlobChunk = BlobChunks.front();
CurrentColumnChunk = ColumnChunks.front();
- CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCount();
+ CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
}
};
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp
index 8d31f9d06f..7b1dbdb361 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp
@@ -5,16 +5,15 @@
namespace NKikimr::NOlap::NCompaction {
-std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TChunkPreparation::DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const {
+std::vector<std::shared_ptr<IPortionDataChunk>> TChunkPreparation::DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const {
auto loader = SchemaInfo->GetColumnLoaderVerified(Record.ColumnId);
auto rb = NArrow::TStatusValidator::GetValid(loader->Apply(Data));
auto chunks = TSimpleSplitter(saver, counters).SplitBySizes(rb, Data, splitSizes);
- std::vector<IPortionColumnChunk::TPtr> newChunks;
+ std::vector<std::shared_ptr<IPortionDataChunk>> newChunks;
for (auto&& i : chunks) {
Y_ABORT_UNLESS(i.GetSlicedBatch()->num_columns() == 1);
- newChunks.emplace_back(std::make_shared<TChunkPreparation>(
- saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), ColumnId, SchemaInfo));
+ newChunks.emplace_back(std::make_shared<TChunkPreparation>(saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), GetColumnId(), SchemaInfo));
}
return newChunks;
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h
index b5a42f52dc..aa3a15ddce 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h
@@ -19,11 +19,11 @@ private:
std::shared_ptr<arrow::Scalar> First;
std::shared_ptr<arrow::Scalar> Last;
protected:
- virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const override;
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override;
virtual const TString& DoGetData() const override {
return Data;
}
- virtual ui32 DoGetRecordsCount() const override {
+ virtual ui32 DoGetRecordsCountImpl() const override {
return Record.GetMeta().GetNumRowsVerified();
}
virtual TString DoDebugString() const override {
@@ -70,14 +70,15 @@ private:
const ui32 RecordsCount;
TString Data;
protected:
- virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& /*saver*/, std::shared_ptr<NColumnShard::TSplitterCounters> /*counters*/, const std::vector<ui64>& /*splitSizes*/) const override {
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& /*saver*/, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/,
+ const std::vector<ui64>& /*splitSizes*/) const override {
AFL_VERIFY(false);
return {};
}
virtual const TString& DoGetData() const override {
return Data;
}
- virtual ui32 DoGetRecordsCount() const override {
+ virtual ui32 DoGetRecordsCountImpl() const override {
return RecordsCount;
}
virtual TString DoDebugString() const override {
@@ -107,7 +108,7 @@ public:
class TColumnPortionResult {
protected:
- std::vector<std::shared_ptr<IPortionColumnChunk>> Chunks;
+ std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
ui64 CurrentPortionRecords = 0;
const ui32 ColumnId;
ui64 PackedSize = 0;
@@ -121,7 +122,7 @@ public:
}
- const std::vector<std::shared_ptr<IPortionColumnChunk>>& GetChunks() const {
+ const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
return Chunks;
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp
index 18937cc641..ed03a8fcf5 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp
@@ -29,7 +29,7 @@ void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::Array>& data, const
}
}
-std::vector<NKikimr::NOlap::NCompaction::TColumnPortionResult> TMergedColumn::BuildResult() {
+std::vector<TColumnPortionResult> TMergedColumn::BuildResult() {
std::vector<TColumnPortionResult> result;
if (Portions.size()) {
Portions.back().FlushBuffer();
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index ff67fcbba7..f492aacbd4 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -102,7 +102,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
auto dataSchema = context.SchemaVersions.GetSchema(p.GetPortionInfo().GetMinSnapshot());
auto loader = dataSchema->GetColumnLoaderOptional(columnId);
std::vector<const TColumnRecord*> records;
- std::vector<IPortionColumnChunk::TPtr> chunks;
+ std::vector<std::shared_ptr<IPortionDataChunk>> chunks;
if (!p.ExtractColumnChunks(columnId, records, chunks)) {
AFL_VERIFY(!loader);
records = {nullptr};
@@ -177,7 +177,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, stats));
for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) {
- std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns;
+ std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns;
for (auto&& p : columnChunks) {
portionColumns.emplace(p.first, p.second[i].GetChunks());
}
@@ -190,7 +190,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
- std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
+ std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunksByBlobs = slice.GroupChunksByBlobs();
AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), SaverContext.GetStorageOperator()));
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 0003e91cf6..36b099471f 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -67,18 +67,14 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
self.UpsertPortion(portionInfo, &oldInfo);
- for (auto& record : portionInfo.Records) {
- context.DB.WriteColumn(portionInfo, record);
- }
+ portionInfo.SaveToDatabase(context.DB);
}
for (auto& portionInfoWithBlobs : AppendedPortions) {
auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
Y_ABORT_UNLESS(!portionInfo.Empty());
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
self.UpsertPortion(portionInfo);
- for (auto& record : portionInfo.Records) {
- context.DB.WriteColumn(portionInfo, record);
- }
+ portionInfo.SaveToDatabase(context.DB);
}
}
@@ -115,7 +111,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(cons
auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, SaverContext, stats);
TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings);
- std::vector<std::vector<IPortionColumnChunk::TPtr>> chunkByBlobs;
+ std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunkByBlobs;
std::shared_ptr<arrow::RecordBatch> portionBatch;
while (limiter.Next(chunkByBlobs, portionBatch)) {
TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(chunkByBlobs, nullptr, granule, snapshot, SaverContext.GetStorageOperator());
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 98cab6bef1..c608915528 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -169,7 +169,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
TSnapshot lastSnapshot(0, 0);
const TIndexInfo* currentIndexInfo = nullptr;
- auto result = db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
+ if (!db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
if (!currentIndexInfo || lastSnapshot != portion.GetMinSnapshot()) {
currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo();
lastSnapshot = portion.GetMinSnapshot();
@@ -178,11 +178,14 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
// Locate granule and append the record.
TColumnRecord rec(loadContext, *currentIndexInfo);
GetGranulePtrVerified(portion.GetPathId())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta());
- });
+ })) {
+ return false;
+ }
+
for (auto&& i : Tables) {
i.second->OnAfterPortionsLoad();
}
- return result;
+ return true;
}
bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index 57a7abb585..f3446b1bde 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -42,17 +42,56 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable,
void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
- NColumnShard::Schema::IndexColumns_Write(db, portion, row);
+ auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
+ auto rowProto = row.GetMeta().SerializeToProto();
+ if (proto) {
+ *rowProto.MutablePortionMeta() = std::move(*proto);
+ }
+ using IndexColumns = NColumnShard::Schema::IndexColumns;
+ db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
+ portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update(
+ NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
+ NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
+ NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()),
+ NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()),
+ NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset),
+ NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size),
+ NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId())
+ );
}
void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
- NColumnShard::Schema::IndexColumns_Erase(db, portion, row);
+ using IndexColumns = NColumnShard::Schema::IndexColumns;
+ db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
+ portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
}
bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) {
NIceDb::TNiceDb db(Database);
- return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, callback);
+ using IndexColumns = NColumnShard::Schema::IndexColumns;
+ auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
+ portion.SetPathId(rowset.GetValue<IndexColumns::PathId>());
+ portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
+ portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
+ portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>());
+
+ NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);
+
+ portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());
+
+ callback(portion, chunkLoadContext);
+
+ if (!rowset.Next())
+ return false;
+ }
+ return true;
}
void TDbWrapper::WriteCounter(ui32 counterId, ui64 value) {
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
index b04174b59a..688ecdb3c2 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.h
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -57,6 +57,12 @@ public:
ui16 Chunk = 0;
TBlobRange BlobRange;
+
+ void RegisterBlobId(const TUnifiedBlobId& blobId) {
+// AFL_VERIFY(!BlobRange.BlobId.GetTabletId())("original", BlobRange.BlobId.ToStringNew())("new", blobId.ToStringNew());
+ BlobRange.BlobId = blobId;
+ }
+
TColumnRecord(const TChunkAddress& address, const TBlobRange& range, TChunkMeta&& meta)
: Meta(std::move(meta))
, ColumnId(address.GetColumnId())
@@ -155,10 +161,11 @@ protected:
virtual const TString& DoGetData() const override {
return Data;
}
- virtual ui32 DoGetRecordsCount() const override {
+ virtual ui32 DoGetRecordsCountImpl() const override {
return ColumnRecord.GetMeta().GetNumRowsVerified();
}
- virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& /*saver*/, std::shared_ptr<NColumnShard::TSplitterCounters> /*counters*/, const std::vector<ui64>& /*splitSizes*/) const override {
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& /*saver*/, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/,
+ const std::vector<ui64>& /*splitSizes*/) const override {
Y_ABORT_UNLESS(false);
return {};
}
@@ -173,10 +180,9 @@ protected:
}
public:
TSimpleOrderedColumnChunk(const TColumnRecord& cRecord, const TString& data)
- : TBase(cRecord.ColumnId)
+ : TBase(cRecord.ColumnId, cRecord.Chunk)
, ColumnRecord(cRecord)
, Data(data) {
- ChunkIdx = cRecord.Chunk;
}
};
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 1d7c0d290d..d2fdb926ed 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -1,5 +1,6 @@
#include "portion_info.h"
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/db_wrapper.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
#include <util/system/tls.h>
#include <ydb/core/formats/arrow/size_calcer.h>
@@ -103,16 +104,16 @@ ui64 TPortionInfo::GetRawBytes(const std::vector<ui32>& columnIds) const {
return sum;
}
-ui64 TPortionInfo::GetRawBytes(const std::set<ui32>& columnIds) const {
+ui64 TPortionInfo::GetRawBytes(const std::set<ui32>& entityIds) const {
ui64 sum = 0;
const ui32 numRows = NumRows();
for (auto&& i : TIndexInfo::GetSpecialColumnIds()) {
- if (columnIds.contains(i)) {
+ if (entityIds.contains(i)) {
sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i);
}
}
for (auto&& r : Records) {
- if (columnIds.contains(r.ColumnId)) {
+ if (entityIds.contains(r.ColumnId)) {
sum += r.GetMeta().GetRawBytesVerified();
}
}
@@ -206,6 +207,16 @@ bool TPortionInfo::IsEqualWithSnapshots(const TPortionInfo& item) const {
&& Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot;
}
+void TPortionInfo::RemoveFromDatabase(IDbWrapper& db) const {
+ for (auto& record : Records) {
+ db.EraseColumn(*this, record);
+ }
+}
+void TPortionInfo::SaveToDatabase(IDbWrapper& db) const {
+ for (auto& record : Records) {
+ db.WriteColumn(*this, record);
+ }
+}
std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
Y_ABORT_UNLESS(!Blobs.empty());
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index b182e74dbf..a53c673489 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -11,6 +11,7 @@
namespace NKikimr::NOlap {
struct TIndexInfo;
+class IDbWrapper;
class TPortionInfo {
private:
@@ -35,6 +36,22 @@ public:
return PathId;
}
+ void RegisterBlobId(const TChunkAddress& address, const TUnifiedBlobId& blobId) {
+ bool found = false;
+ for (auto it = Records.begin(); it != Records.end(); ++it) {
+ if (it->ColumnId == address.GetEntityId() && it->Chunk == address.GetChunkIdx()) {
+ it->RegisterBlobId(blobId);
+ found = true;
+ break;
+ }
+ }
+ AFL_VERIFY(found)("address", address.DebugString());
+ }
+
+ void RemoveFromDatabase(IDbWrapper& db) const;
+
+ void SaveToDatabase(IDbWrapper& db) const;
+
bool OlderThen(const TPortionInfo& info) const {
return RecordSnapshotMin() < info.RecordSnapshotMin();
}
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
index 90e6a8270f..9d8adcac99 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
@@ -4,17 +4,18 @@
namespace NKikimr::NOlap {
-void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk) {
+void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) {
Y_ABORT_UNLESS(!ResultBlob);
const TString& data = chunk->GetData();
Size += data.size();
- auto address = TChunkAddress(chunk->GetColumnId(), chunk->GetChunkIdx());
+ auto address = chunk->GetChunkAddress();
Y_ABORT_UNLESS(owner.GetPortionInfo().GetRecordPointer(address));
Y_ABORT_UNLESS(Chunks.emplace(address, chunk).second);
ChunksOrdered.emplace_back(chunk);
}
-const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk) {
+void TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) {
+ AFL_VERIFY(chunk);
Y_ABORT_UNLESS(!ResultBlob);
TBlobRange bRange;
const TString& data = chunk->GetData();
@@ -22,34 +23,23 @@ const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWith
bRange.Offset = Size;
bRange.Size = data.size();
- TColumnRecord rec(TChunkAddress(chunk->GetColumnId(), chunk->GetChunkIdx()), bRange, chunk->BuildSimpleChunkMeta());
-
Size += data.size();
- Y_ABORT_UNLESS(Chunks.emplace(rec.GetAddress(), chunk).second);
+ Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddress(), chunk).second);
ChunksOrdered.emplace_back(chunk);
- auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec));
- return result;
+
+ chunk->AddIntoPortion(bRange, owner.PortionInfo);
}
void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) {
- auto it = owner.PortionInfo.Records.begin();
for (auto&& i : Chunks) {
- bool found = false;
- for (; it != owner.PortionInfo.Records.end(); ++it) {
- if (it->ColumnId == i.first.GetColumnId() && it->Chunk == i.first.GetChunk()) {
- it->BlobRange.BlobId = blobId;
- found = true;
- break;
- }
- }
- AFL_VERIFY(found)("address", i.second->DebugString());
+ owner.PortionInfo.RegisterBlobId(i.first, blobId);
}
}
-void TPortionInfoWithBlobs::TBlobInfo::ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap) {
- const auto pred = [this, &resultMap, columnId](const IPortionColumnChunk::TPtr& chunk) {
- if (chunk->GetColumnId() == columnId) {
+void TPortionInfoWithBlobs::TBlobInfo::ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap) {
+ const auto pred = [this, &resultMap, entityId](const std::shared_ptr<IPortionDataChunk>& chunk) {
+ if (chunk->GetEntityId() == entityId) {
resultMap.emplace(chunk->GetChunkAddress(), chunk);
Chunks.erase(chunk->GetChunkAddress());
return true;
@@ -130,9 +120,8 @@ std::vector<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::Restor
return result;
}
-NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<std::vector<IPortionColumnChunk::TPtr>>& chunksByBlobs,
- std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator)
-{
+NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule,
+ const TSnapshot& snapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator) {
TPortionInfoWithBlobs result(TPortionInfo(granule, 0, snapshot, bStorageOperator), batch);
for (auto& blob : chunksByBlobs) {
auto blobInfo = result.StartBlob();
@@ -180,16 +169,16 @@ std::optional<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::Chan
return result;
}
-std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TPortionInfoWithBlobs::GetColumnChunks(const ui32 columnId) const {
- std::map<TChunkAddress, IPortionColumnChunk::TPtr> sortedChunks;
+std::vector<std::shared_ptr<IPortionDataChunk>> TPortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const {
+ std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> sortedChunks;
for (auto&& b : GetBlobs()) {
for (auto&& i : b.GetChunks()) {
- if (i.second->GetColumnId() == columnId) {
+ if (i.second->GetEntityId() == entityId) {
sortedChunks.emplace(i.first, i.second);
}
}
}
- std::vector<IPortionColumnChunk::TPtr> result;
+ std::vector<std::shared_ptr<IPortionDataChunk>> result;
for (auto&& i : sortedChunks) {
AFL_VERIFY(i.second->GetChunkIdx() == result.size())("idx", i.second->GetChunkIdx())("size", result.size());
result.emplace_back(i.second);
@@ -197,16 +186,16 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TPortionInfoWithBlobs::Ge
return result;
}
-bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks) {
+bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks) {
records = GetPortionInfo().GetColumnChunksPointers(columnId);
if (records.empty()) {
return false;
}
- std::map<TChunkAddress, IPortionColumnChunk::TPtr> chunksMap;
+ std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> chunksMap;
for (auto&& i : Blobs) {
- i.ExtractColumnChunks(columnId, chunksMap);
+ i.ExtractEntityChunks(columnId, chunksMap);
}
- std::vector<IPortionColumnChunk::TPtr> chunksLocal;
+ std::vector<std::shared_ptr<IPortionDataChunk>> chunksLocal;
for (auto&& i : chunksMap) {
Y_ABORT_UNLESS(i.first.GetColumnId() == columnId);
Y_ABORT_UNLESS(i.first.GetChunk() == chunksLocal.size());
@@ -215,5 +204,4 @@ bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector
std::swap(chunksLocal, chunks);
return true;
}
-
}
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
index c7fa398811..1a7a6b7192 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
@@ -10,28 +10,29 @@ class TPortionInfoWithBlobs {
public:
class TBlobInfo {
private:
- using TBlobChunks = std::map<TChunkAddress, IPortionColumnChunk::TPtr>;
+ using TBlobChunks = std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>;
YDB_READONLY(ui64, Size, 0);
YDB_READONLY_DEF(TBlobChunks, Chunks);
- std::vector<IPortionColumnChunk::TPtr> ChunksOrdered;
+ std::vector<std::shared_ptr<IPortionDataChunk>> ChunksOrdered;
mutable std::optional<TString> ResultBlob;
- const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk);
- void RestoreChunk(const TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk);
+ void AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk);
+ void RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk);
+
public:
class TBuilder {
private:
TBlobInfo* OwnerBlob;
TPortionInfoWithBlobs* OwnerPortion;
+
public:
TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion)
: OwnerBlob(&blob)
, OwnerPortion(&portion) {
-
}
- const TColumnRecord& AddChunk(const IPortionColumnChunk::TPtr& chunk) {
+ void AddChunk(const std::shared_ptr<IPortionDataChunk>& chunk) {
return OwnerBlob->AddChunk(*OwnerPortion, chunk);
}
- void RestoreChunk(const IPortionColumnChunk::TPtr& chunk) {
+ void RestoreChunk(const std::shared_ptr<IPortionColumnChunk>& chunk) {
OwnerBlob->RestoreChunk(*OwnerPortion, chunk);
}
};
@@ -49,8 +50,7 @@ public:
}
void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId);
- void ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap);
-
+ void ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap);
};
private:
TPortionInfo PortionInfo;
@@ -82,16 +82,16 @@ public:
std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames = {}) const;
- std::vector<IPortionColumnChunk::TPtr> GetColumnChunks(const ui32 columnId) const;
+ std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const;
- bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks);
+ bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks);
ui64 GetSize() const {
return PortionInfo.BlobsBytes();
}
- static TPortionInfoWithBlobs BuildByBlobs(std::vector<std::vector<IPortionColumnChunk::TPtr>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch,
- const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator);
+ static TPortionInfoWithBlobs BuildByBlobs(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot,
+ const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator);
std::optional<TPortionInfoWithBlobs> ChangeSaver(ISnapshotSchema::TPtr currentSchema, const TSaverContext& saverContext) const;
diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h
index f12b146cad..11415ecc20 100644
--- a/ydb/core/tx/columnshard/engines/scheme/column_features.h
+++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h
@@ -7,6 +7,7 @@
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h>
+#include <ydb/core/formats/arrow/common/validation.h>
namespace NKikimr::NOlap {
@@ -19,8 +20,7 @@ private:
public:
TSaverContext(const std::shared_ptr<IBlobsStorageOperator>& storageOperator, const std::shared_ptr<IStoragesManager>& storagesManager)
: StorageOperator(storageOperator)
- , StoragesManager(storagesManager)
- {
+ , StoragesManager(storagesManager) {
}
@@ -86,8 +86,7 @@ public:
: Transformer(transformer)
, Deserializer(deserializer)
, ExpectedSchema(expectedSchema)
- , ColumnId(columnId)
- {
+ , ColumnId(columnId) {
Y_ABORT_UNLESS(ExpectedSchema);
auto fieldsCountStr = ::ToString(ExpectedSchema->num_fields());
Y_ABORT_UNLESS(ExpectedSchema->num_fields() == 1, "%s", fieldsCountStr.data());
@@ -118,6 +117,16 @@ public:
return columnArray;
}
}
+
+ std::shared_ptr<arrow::RecordBatch> ApplyVerified(const TString& data) const {
+ return NArrow::TStatusValidator::GetValid(Apply(data));
+ }
+
+ std::shared_ptr<arrow::Array> ApplyVerifiedColumn(const TString& data) const {
+ auto rb = ApplyVerified(data);
+ AFL_VERIFY(rb->num_columns() == 1)("schema", rb->schema()->ToString());
+ return rb->column(0);
+ }
};
struct TIndexInfo;
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 82eeb52686..9da8c6f3d3 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -22,9 +22,8 @@ static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns)
return out;
}
-TIndexInfo::TIndexInfo(const TString& name, ui32 id)
+TIndexInfo::TIndexInfo(const TString& name)
: NTable::TScheme::TTableSchema()
- , Id(id)
, Name(name)
{}
@@ -326,6 +325,12 @@ TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext
}
}
+std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoaderVerified(const ui32 columnId) const {
+ auto result = GetColumnLoaderOptional(columnId);
+ AFL_VERIFY(result);
+ return result;
+}
+
std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoaderOptional(const ui32 columnId) const {
auto it = ColumnFeatures.find(columnId);
if (it == ColumnFeatures.end()) {
@@ -451,7 +456,7 @@ std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& table
}
std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
- TIndexInfo result("", 0);
+ TIndexInfo result("");
if (!result.DeserializeFromProto(schema)) {
return std::nullopt;
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index b73bf480c1..2446df2d2a 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -34,7 +34,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema {
private:
THashMap<ui32, TColumnFeatures> ColumnFeatures;
THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache;
- TIndexInfo(const TString& name, ui32 id);
+ TIndexInfo(const TString& name);
bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const;
void BuildSchemaWithSpecials();
@@ -54,7 +54,6 @@ public:
TString DebugString() const {
TStringBuilder sb;
sb << "("
- << "id=" << Id << ";"
<< "version=" << Version << ";"
<< "name=" << Name << ";"
<< ")";
@@ -92,17 +91,12 @@ public:
public:
static TIndexInfo BuildDefault() {
- TIndexInfo result("dummy", 0);
+ TIndexInfo result("dummy");
return result;
}
static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
- /// Returns id of the index.
- ui32 GetId() const noexcept {
- return Id;
- }
-
static const std::vector<std::string>& SnapshotColumnNames() {
static std::vector<std::string> result = {SPEC_COL_PLAN_STEP, SPEC_COL_TX_ID};
return result;
@@ -114,6 +108,7 @@ public:
std::shared_ptr<arrow::Schema> GetColumnsSchema(const std::set<ui32>& columnIds) const;
TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const;
std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const;
+ std::shared_ptr<TColumnLoader> GetColumnLoaderVerified(const ui32 columnId) const;
/// Returns an id of the column located by name. The name should exists in the schema.
ui32 GetColumnId(const std::string& name) const;
@@ -206,7 +201,6 @@ public:
bool CheckCompatible(const TIndexInfo& other) const;
private:
- ui32 Id;
ui64 Version = 0;
TString Name;
std::shared_ptr<arrow::Schema> Schema;
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index bab5202a9a..aac2d2bdd1 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -56,12 +56,12 @@ void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionIn
if (it == Portions.end()) {
Y_ABORT_UNLESS(portion.Records.empty());
auto portionNew = std::make_shared<TPortionInfo>(portion);
- portionNew->AddRecord(indexInfo, rec, portionMeta);
it = Portions.emplace(portion.GetPortion(), portionNew).first;
} else {
AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString());
- it->second->AddRecord(indexInfo, rec, portionMeta);
}
+ it->second->AddRecord(indexInfo, rec, portionMeta);
+
if (portionMeta) {
it->second->InitOperator(Owner->GetStoragesManager()->InitializePortionOperator(*it->second), false);
}
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
index 06ce8cebd5..47dfe990eb 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
@@ -1,12 +1,13 @@
#include "batch_slice.h"
#include "simple.h"
+#include <ydb/library/accessor/validator.h>
namespace NKikimr::NOlap {
bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
- std::vector<IPortionColumnChunk::TPtr> chunksInProgress;
- std::sort(Columns.begin(), Columns.end());
- for (auto&& i : Columns) {
+ std::vector<std::shared_ptr<IPortionDataChunk>> chunksInProgress;
+ std::sort(Data.begin(), Data.end());
+ for (auto&& i : Data) {
for (auto&& p : i.GetChunks()) {
chunksInProgress.emplace_back(p);
}
@@ -49,12 +50,12 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
Y_ABORT_UNLESS((i64)chunksInProgress[i]->GetPackedSize() > Settings.GetMinBlobSize() - partSize);
Y_ABORT_UNLESS(otherSize - (Settings.GetMinBlobSize() - partSize) >= Settings.GetMinBlobSize());
- std::vector<IPortionColumnChunk::TPtr> newChunks;
- const bool splittable = chunksInProgress[i]->GetRecordsCount() > 1;
+ std::vector<std::shared_ptr<IPortionDataChunk>> newChunks;
+ const bool splittable = chunksInProgress[i]->IsSplittable();
if (splittable) {
Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i]->GetPackedSize());
const std::vector<ui64> sizes = {(ui64)(Settings.GetMinBlobSize() - partSize)};
- newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetColumnId()), Counters, sizes);
+ newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetEntityId()), Counters, sizes);
chunksInProgress.erase(chunksInProgress.begin() + i);
chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end());
}
@@ -81,10 +82,10 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
ui32 currentChunkIdx = 0;
for (auto&& i : result) {
for (auto&& c : i.GetChunks()) {
- Y_ABORT_UNLESS(c->GetColumnId());
- if (!lastColumnId || *lastColumnId != c->GetColumnId()) {
- Y_ABORT_UNLESS(columnIds.emplace(c->GetColumnId()).second);
- lastColumnId = c->GetColumnId();
+ Y_ABORT_UNLESS(c->GetEntityId());
+ if (!lastColumnId || *lastColumnId != c->GetEntityId()) {
+ Y_ABORT_UNLESS(columnIds.emplace(c->GetEntityId()).second);
+ lastColumnId = c->GetEntityId();
currentChunkIdx = 0;
}
c->SetChunkIdx(currentChunkIdx++);
@@ -94,60 +95,58 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
return true;
}
-TGeneralSerializedSlice::TGeneralSerializedSlice(const std::map<ui32, std::vector<IPortionColumnChunk::TPtr>>& data,
- ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings)
+TGeneralSerializedSlice::TGeneralSerializedSlice(const std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters,
+ const TSplitSettings& settings)
: Schema(schema)
, Counters(counters)
, Settings(settings) {
-
std::optional<ui32> recordsCount;
- for (auto&& [columnId, chunks] : data) {
- auto f = schema->GetField(columnId);
- TSplittedColumn column(f, columnId);
- column.AddChunks(chunks);
- if (!recordsCount) {
- recordsCount = column.GetRecordsCount();
- } else {
- AFL_VERIFY(*recordsCount == column.GetRecordsCount())("records_count", *recordsCount)("column", column.GetRecordsCount());
+ for (auto&& [entityId, chunks] : data) {
+ TSplittedEntity entity(entityId);
+ entity.SetChunks(chunks);
+ if (!!entity.GetRecordsCount()) {
+ if (!recordsCount) {
+ recordsCount = entity.GetRecordsCount();
+ } else {
+ AFL_VERIFY(*recordsCount == entity.GetRecordsCount())("records_count", *recordsCount)("column", entity.GetRecordsCount());
+ }
}
- Size += column.GetSize();
- Columns.emplace_back(std::move(column));
+ Size += entity.GetSize();
+ Data.emplace_back(std::move(entity));
}
Y_ABORT_UNLESS(recordsCount);
RecordsCount = *recordsCount;
}
-TGeneralSerializedSlice::TGeneralSerializedSlice(ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings)
- : Schema(schema)
+TGeneralSerializedSlice::TGeneralSerializedSlice(const ui32 recordsCount, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings)
+ : RecordsCount(recordsCount)
+ , Schema(schema)
, Counters(counters)
, Settings(settings)
{
}
-TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings)
- : TBase(schema, counters, settings)
+TBatchSerializedSlice::TBatchSerializedSlice(const std::shared_ptr<arrow::RecordBatch>& batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings)
+ : TBase(TValidator::CheckNotNull(batch)->num_rows(), schema, counters, settings)
, Batch(batch)
{
Y_ABORT_UNLESS(batch);
- RecordsCount = batch->num_rows();
- Columns.reserve(batch->num_columns());
+ Data.reserve(batch->num_columns());
for (auto&& i : batch->schema()->fields()) {
- TSplittedColumn c(i, schema->GetColumnId(i->name()));
- Columns.emplace_back(std::move(c));
+ TSplittedEntity c(schema->GetColumnId(i->name()));
+ Data.emplace_back(std::move(c));
}
ui32 idx = 0;
for (auto&& i : batch->columns()) {
- auto& c = Columns[idx];
- auto columnSaver = schema->GetColumnSaver(c.GetColumnId());
- auto stats = schema->GetColumnSerializationStats(c.GetColumnId());
+ auto& c = Data[idx];
+ auto columnSaver = schema->GetColumnSaver(c.GetEntityId());
+ auto stats = schema->GetColumnSerializationStats(c.GetEntityId());
TSimpleSplitter splitter(columnSaver, Counters);
- if (stats) {
- splitter.SetStats(*stats);
- }
- std::vector<IPortionColumnChunk::TPtr> chunks;
- for (auto&& i : splitter.Split(i, c.GetField(), Settings.GetMaxBlobSize())) {
- chunks.emplace_back(std::make_shared<TSplittedColumnChunk>(c.GetColumnId(), i, Schema));
+ splitter.SetStats(stats);
+ std::vector<std::shared_ptr<IPortionDataChunk>> chunks;
+ for (auto&& i : splitter.Split(i, Schema->GetField(c.GetEntityId()), Settings.GetMaxBlobSize())) {
+ chunks.emplace_back(std::make_shared<TSplittedColumnChunk>(c.GetEntityId(), i, Schema));
}
c.SetChunks(chunks);
Size += c.GetSize();
@@ -156,11 +155,11 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch>
}
void TGeneralSerializedSlice::MergeSlice(TGeneralSerializedSlice&& slice) {
- Y_ABORT_UNLESS(Columns.size() == slice.Columns.size());
+ Y_ABORT_UNLESS(Data.size() == slice.Data.size());
RecordsCount += slice.GetRecordsCount();
- for (ui32 i = 0; i < Columns.size(); ++i) {
- Size += slice.Columns[i].GetSize();
- Columns[i].Merge(std::move(slice.Columns[i]));
+ for (ui32 i = 0; i < Data.size(); ++i) {
+ Size += slice.Data[i].GetSize();
+ Data[i].Merge(std::move(slice.Data[i]));
}
}
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index feda743b26..d86db7ae8c 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -87,32 +87,34 @@ public:
};
class TGeneralSerializedSlice {
+private:
+ YDB_READONLY(ui32, RecordsCount, 0);
protected:
- std::vector<TSplittedColumn> Columns;
+ std::vector<TSplittedEntity> Data;
ui64 Size = 0;
- ui32 RecordsCount = 0;
ISchemaDetailInfo::TPtr Schema;
std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
TSplitSettings Settings;
TGeneralSerializedSlice() = default;
- const TSplittedColumn& GetColumnVerified(const std::string& fieldName) const {
- for (auto&& i : Columns) {
- if (i.GetField()->name() == fieldName) {
+ const TSplittedEntity& GetEntityDataVerified(const ui32& entityId) const {
+ for (auto&& i : Data) {
+ if (i.GetEntityId() == entityId) {
return i;
}
}
Y_ABORT_UNLESS(false);
- return Columns.front();
+ return Data.front();
}
+
public:
std::shared_ptr<arrow::RecordBatch> GetFirstLastPKBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const {
std::vector<std::shared_ptr<arrow::Array>> pkColumns;
for (auto&& i : pkSchema->fields()) {
auto aBuilder = NArrow::MakeBuilder(i);
- const TSplittedColumn& splittedColumn = GetColumnVerified(i->name());
- NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetFirstScalar()));
- NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetLastScalar()));
+ const TSplittedEntity& splittedEntity = GetEntityDataVerified(Schema->GetColumnId(i->name()));
+ NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedEntity.GetFirstScalar()));
+ NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedEntity.GetLastScalar()));
pkColumns.emplace_back(NArrow::TStatusValidator::GetValid(aBuilder->Finish()));
}
return arrow::RecordBatch::Make(pkSchema, 2, pkColumns);
@@ -121,12 +123,9 @@ public:
ui64 GetSize() const {
return Size;
}
- ui32 GetRecordsCount() const {
- return RecordsCount;
- }
- std::vector<std::vector<IPortionColumnChunk::TPtr>> GroupChunksByBlobs() {
- std::vector<std::vector<IPortionColumnChunk::TPtr>> result;
+ std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> GroupChunksByBlobs() {
+ std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> result;
std::vector<TSplittedBlob> blobs;
GroupBlobs(blobs);
for (auto&& i : blobs) {
@@ -142,8 +141,8 @@ public:
MergeSlice(std::move(objects[i]));
}
}
- TGeneralSerializedSlice(const std::map<ui32, std::vector<IPortionColumnChunk::TPtr>>& data, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings);
- TGeneralSerializedSlice(ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings);
+ TGeneralSerializedSlice(const std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings);
+ TGeneralSerializedSlice(const ui32 recordsCount, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings);
void MergeSlice(TGeneralSerializedSlice&& slice);
@@ -159,10 +158,9 @@ private:
using TBase = TGeneralSerializedSlice;
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
public:
- TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings);
+ TBatchSerializedSlice(const std::shared_ptr<arrow::RecordBatch>& batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings);
- explicit TBatchSerializedSlice(TVectorView<TBatchSerializedSlice>&& objects)
- {
+ explicit TBatchSerializedSlice(TVectorView<TBatchSerializedSlice>&& objects) {
Y_ABORT_UNLESS(objects.size());
std::swap(*this, objects.front());
for (ui32 i = 1; i < objects.size(); ++i) {
diff --git a/ydb/core/tx/columnshard/splitter/blob_info.cpp b/ydb/core/tx/columnshard/splitter/blob_info.cpp
index 4d0f4ef715..01a6e1a000 100644
--- a/ydb/core/tx/columnshard/splitter/blob_info.cpp
+++ b/ydb/core/tx/columnshard/splitter/blob_info.cpp
@@ -2,9 +2,8 @@
namespace NKikimr::NOlap {
-void TSplittedBlob::Take(const IPortionColumnChunk::TPtr& chunk) {
+void TSplittedBlob::Take(const std::shared_ptr<IPortionDataChunk>& chunk) {
Chunks.emplace_back(chunk);
Size += chunk->GetPackedSize();
}
-
}
diff --git a/ydb/core/tx/columnshard/splitter/blob_info.h b/ydb/core/tx/columnshard/splitter/blob_info.h
index 027f388d06..a4515f9f78 100644
--- a/ydb/core/tx/columnshard/splitter/blob_info.h
+++ b/ydb/core/tx/columnshard/splitter/blob_info.h
@@ -7,9 +7,10 @@ namespace NKikimr::NOlap {
class TSplittedBlob {
private:
YDB_READONLY(i64, Size, 0);
- YDB_READONLY_DEF(std::vector<IPortionColumnChunk::TPtr>, Chunks);
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<IPortionDataChunk>>, Chunks);
+
public:
- void Take(const IPortionColumnChunk::TPtr& chunk);
+ void Take(const std::shared_ptr<IPortionDataChunk>& chunk);
bool operator<(const TSplittedBlob& item) const {
return Size > item.Size;
}
diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp
index e4e6193345..0bf0b70fe6 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.cpp
+++ b/ydb/core/tx/columnshard/splitter/chunks.cpp
@@ -1,8 +1,10 @@
#include "chunks.h"
+#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
namespace NKikimr::NOlap {
-std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> IPortionColumnChunk::InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const {
+std::vector<std::shared_ptr<IPortionDataChunk>> IPortionColumnChunk::DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const {
ui64 sumSize = 0;
for (auto&& i : splitSizes) {
sumSize += i;
@@ -13,7 +15,7 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> IPortionColumnChunk::Inte
} else {
Y_ABORT_UNLESS(GetRecordsCount() >= splitSizes.size());
}
- auto result = DoInternalSplit(saver, counters, splitSizes);
+ auto result = DoInternalSplitImpl(saver, counters, splitSizes);
if (sumSize == GetPackedSize()) {
Y_ABORT_UNLESS(result.size() == splitSizes.size());
} else {
@@ -22,4 +24,9 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> IPortionColumnChunk::Inte
return result;
}
+void IPortionColumnChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const {
+ TColumnRecord rec(GetChunkAddress(), bRange, BuildSimpleChunkMeta());
+ portionInfo.AppendOneChunkColumn(std::move(rec));
+}
+
}
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index 36ca66c58e..1623bd2914 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -1,38 +1,75 @@
#pragma once
#include "chunk_meta.h"
-#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
+
#include <ydb/core/tx/columnshard/counters/splitter.h>
#include <ydb/core/tx/columnshard/engines/portions/common.h>
+#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
namespace NKikimr::NOlap {
-class IPortionColumnChunk {
-public:
- using TPtr = std::shared_ptr<IPortionColumnChunk>;
+class IPortionDataChunk {
+private:
+ YDB_READONLY(ui32, EntityId, 0);
+
+ std::optional<ui32> ChunkIdx;
+
protected:
- ui32 ColumnId = 0;
- ui16 ChunkIdx = 0;
- virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const = 0;
- virtual ui64 DoGetPackedSize() const {
+ ui64 DoGetPackedSize() const {
return GetData().size();
}
virtual const TString& DoGetData() const = 0;
- virtual ui32 DoGetRecordsCount() const = 0;
virtual TString DoDebugString() const = 0;
- virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const = 0;
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const = 0;
+ virtual bool DoIsSplittable() const = 0;
+ virtual std::optional<ui32> DoGetRecordsCount() const = 0;
virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const = 0;
virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const = 0;
-
+ virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const = 0;
public:
- IPortionColumnChunk(const ui32 columnId)
- : ColumnId(columnId)
- {
+ IPortionDataChunk(const ui32 entityId, const std::optional<ui16>& chunkIdx = {})
+ : EntityId(entityId)
+ , ChunkIdx(chunkIdx) {
+ }
+ virtual ~IPortionDataChunk() = default;
+
+ TString DebugString() const {
+ return DoDebugString();
}
- virtual ~IPortionColumnChunk() = default;
- TChunkAddress GetChunkAddress() const {
- return TChunkAddress(ColumnId, ChunkIdx);
+ const TString& GetData() const {
+ return DoGetData();
+ }
+
+ ui64 GetPackedSize() const {
+ return DoGetPackedSize();
+ }
+
+ std::optional<ui32> GetRecordsCount() const {
+ return DoGetRecordsCount();
+ }
+
+ ui32 GetRecordsCountVerified() const {
+ auto result = DoGetRecordsCount();
+ AFL_VERIFY(result);
+ return *result;
+ }
+
+ std::vector<std::shared_ptr<IPortionDataChunk>> InternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const {
+ return DoInternalSplit(saver, counters, splitSizes);
+ }
+
+ bool IsSplittable() const {
+ return DoIsSplittable();
+ }
+
+ ui16 GetChunkIdx() const {
+ AFL_VERIFY(!!ChunkIdx);
+ return *ChunkIdx;
+ }
+
+ void SetChunkIdx(const ui16 value) {
+ ChunkIdx = value;
}
std::shared_ptr<arrow::Scalar> GetFirstScalar() const {
@@ -46,38 +83,136 @@ public:
return result;
}
+ TChunkAddress GetChunkAddress() const {
+ return TChunkAddress(GetEntityId(), GetChunkIdx());
+ }
+
+ void AddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const {
+ return DoAddIntoPortion(bRange, portionInfo);
+ }
+};
+
+class IPortionColumnChunk : public IPortionDataChunk {
+private:
+ using TBase = IPortionDataChunk;
+
+protected:
+ virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const = 0;
+ virtual ui32 DoGetRecordsCountImpl() const = 0;
+ virtual std::optional<ui32> DoGetRecordsCount() const override final {
+ return DoGetRecordsCountImpl();
+ }
+
+ virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const override;
+
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const = 0;
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override;
+ virtual bool DoIsSplittable() const override {
+ return GetRecordsCount() > 1;
+ }
+
+public:
+ IPortionColumnChunk(const ui32 entityId, const std::optional<ui16>& chunkIdx = {})
+ : TBase(entityId, chunkIdx) {
+ }
+ virtual ~IPortionColumnChunk() = default;
+
TSimpleChunkMeta BuildSimpleChunkMeta() const {
return DoBuildSimpleChunkMeta();
}
ui32 GetColumnId() const {
- return ColumnId;
+ return GetEntityId();
}
+};
- ui16 GetChunkIdx() const {
- return ChunkIdx;
+class TChunkedColumnReader {
+private:
+ std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
+ std::shared_ptr<TColumnLoader> Loader;
+
+ std::shared_ptr<arrow::Array> CurrentChunk;
+ ui32 CurrentChunkIndex = 0;
+ ui32 CurrentRecordIndex = 0;
+public:
+ TChunkedColumnReader(const std::vector<std::shared_ptr<IPortionDataChunk>>& chunks, const std::shared_ptr<TColumnLoader>& loader)
+ : Chunks(chunks)
+ , Loader(loader)
+ {
+ if (Chunks.size()) {
+ CurrentChunk = Loader->ApplyVerifiedColumn(Chunks.front()->GetData());
+ }
}
- void SetChunkIdx(const ui16 value) {
- ChunkIdx = value;
+ const std::shared_ptr<arrow::Array>& GetCurrentChunk() const {
+ return CurrentChunk;
}
- TString DebugString() const {
- return DoDebugString();
+ ui32 GetCurrentRecordIndex() const {
+ return CurrentRecordIndex;
}
- ui32 GetRecordsCount() const {
- return DoGetRecordsCount();
+ bool IsCorrect() const {
+ return !!CurrentChunk;
}
- const TString& GetData() const {
- return DoGetData();
+ bool ReadNext() {
+ AFL_VERIFY(!!CurrentChunk);
+ if (++CurrentRecordIndex < CurrentChunk->length()) {
+ return true;
+ }
+ while (++CurrentChunkIndex < Chunks.size()) {
+ CurrentChunk = Loader->ApplyVerifiedColumn(Chunks[CurrentChunkIndex]->GetData());
+ CurrentRecordIndex = 0;
+ if (CurrentRecordIndex < CurrentChunk->length()) {
+ return true;
+ }
+ }
+ CurrentChunk = nullptr;
+ return false;
}
+};
- ui64 GetPackedSize() const {
- return DoGetPackedSize();
+class TChunkedBatchReader {
+private:
+ std::vector<TChunkedColumnReader> Columns;
+ bool IsCorrectFlag = true;
+public:
+ TChunkedBatchReader(const std::vector<TChunkedColumnReader>& columnReaders)
+ : Columns(columnReaders)
+ {
+ AFL_VERIFY(Columns.size());
+ for (auto&& i : Columns) {
+ AFL_VERIFY(i.IsCorrect());
+ }
+ }
+
+ bool IsCorrect() const {
+ return IsCorrectFlag;
+ }
+
+ bool ReadNext() {
+ std::optional<bool> result;
+ for (auto&& i : Columns) {
+ if (!result) {
+ result = i.ReadNext();
+ } else {
+ AFL_VERIFY(*result == i.ReadNext());
+ }
+ }
+ if (!*result) {
+ IsCorrectFlag = false;
+ }
+ return *result;
+ }
+
+ std::vector<TChunkedColumnReader>::const_iterator begin() const {
+ return Columns.begin();
}
- std::vector<IPortionColumnChunk::TPtr> InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const;
+ std::vector<TChunkedColumnReader>::const_iterator end() const {
+ return Columns.end();
+ }
};
+
}
diff --git a/ydb/core/tx/columnshard/splitter/column_info.h b/ydb/core/tx/columnshard/splitter/column_info.h
index d2c76e9535..ff2f39190f 100644
--- a/ydb/core/tx/columnshard/splitter/column_info.h
+++ b/ydb/core/tx/columnshard/splitter/column_info.h
@@ -7,14 +7,31 @@
namespace NKikimr::NOlap {
-class TSplittedColumn {
+class TSplittedEntity {
private:
+ YDB_READONLY(ui32, EntityId, 0);
YDB_READONLY(ui64, Size, 0);
- YDB_READONLY(ui32, ColumnId, 0);
- YDB_READONLY(ui32, RecordsCount, 0);
- YDB_READONLY_DEF(std::shared_ptr<arrow::Field>, Field);
- YDB_READONLY_DEF(std::vector<IPortionColumnChunk::TPtr>, Chunks);
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<IPortionDataChunk>>, Chunks);
+ YDB_READONLY_DEF(std::optional<ui32>, RecordsCount);
+
+protected:
+ template <class T>
+ const T& GetChunkAs(const ui32 idx) const {
+ AFL_VERIFY(idx < Chunks.size());
+ auto result = std::dynamic_pointer_cast<T>(Chunks[idx]);
+ AFL_VERIFY(result);
+ return *result;
+ }
+
public:
+ TSplittedEntity(const ui32 entityId)
+ : EntityId(entityId) {
+ AFL_VERIFY(EntityId);
+ }
+
+ bool operator<(const TSplittedEntity& item) const {
+ return Size > item.Size;
+ }
std::shared_ptr<arrow::Scalar> GetFirstScalar() const {
Y_ABORT_UNLESS(Chunks.size());
@@ -26,46 +43,40 @@ public:
return Chunks.back()->GetLastScalar();
}
- void Merge(TSplittedColumn&& c) {
+ void Merge(TSplittedEntity&& c) {
Size += c.Size;
- Y_ABORT_UNLESS(Field->name() == c.Field->name());
- Y_DEBUG_ABORT_UNLESS(Field->Equals(c.Field));
- Y_ABORT_UNLESS(ColumnId == c.ColumnId);
- Y_ABORT_UNLESS(ColumnId);
- RecordsCount += c.RecordsCount;
+ AFL_VERIFY(!!RecordsCount == !!c.RecordsCount);
+ if (RecordsCount) {
+ *RecordsCount += *c.RecordsCount;
+ }
+ AFL_VERIFY(EntityId == c.EntityId)("self", EntityId)("c", c.EntityId);
+ Y_ABORT_UNLESS(c.EntityId);
for (auto&& i : c.Chunks) {
Chunks.emplace_back(std::move(i));
}
}
- void SetChunks(const std::vector<IPortionColumnChunk::TPtr>& data) {
+ void SetChunks(const std::vector<std::shared_ptr<IPortionDataChunk>>& data) {
Y_ABORT_UNLESS(Chunks.empty());
+ std::optional<bool> hasRecords;
for (auto&& i : data) {
- Y_ABORT_UNLESS(i->GetColumnId() == ColumnId);
- Size += i->GetPackedSize();
- RecordsCount += i->GetRecordsCount();
- Chunks.emplace_back(i);
- }
- }
-
- void AddChunks(const std::vector<IPortionColumnChunk::TPtr>& data) {
- for (auto&& i : data) {
- Y_ABORT_UNLESS(i->GetColumnId() == ColumnId);
+ Y_ABORT_UNLESS(i->GetEntityId() == EntityId);
Size += i->GetPackedSize();
- RecordsCount += i->GetRecordsCount();
Chunks.emplace_back(i);
+ auto rc = i->GetRecordsCount();
+ if (!hasRecords) {
+ hasRecords = !!rc;
+ }
+ AFL_VERIFY(*hasRecords == !!rc);
+ if (!rc) {
+ continue;
+ }
+ if (!RecordsCount) {
+ RecordsCount = rc;
+ } else {
+ *RecordsCount += *rc;
+ }
}
}
-
- TSplittedColumn(std::shared_ptr<arrow::Field> f, const ui32 id)
- : ColumnId(id)
- , Field(f)
- {
-
- }
-
- bool operator<(const TSplittedColumn& item) const {
- return Size > item.Size;
- }
};
}
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
index f60a0d9e91..5fae273e44 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
@@ -39,13 +39,13 @@ TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters
Y_ABORT_UNLESS(recordsCountCheck == batch->num_rows());
}
-bool TRBSplitLimiter::Next(std::vector<std::vector<IPortionColumnChunk::TPtr>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch) {
+bool TRBSplitLimiter::Next(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch) {
if (!Slices.size()) {
return false;
}
std::vector<TSplittedBlob> blobs;
Slices.front().GroupBlobs(blobs);
- std::vector<std::vector<IPortionColumnChunk::TPtr>> result;
+ std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> result;
std::map<ui32, ui32> columnChunks;
for (auto&& i : blobs) {
if (blobs.size() == 1) {
@@ -60,5 +60,4 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<IPortionColumnChunk::TPtr>>&
Slices.pop_front();
return true;
}
-
}
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h
index 5053f2eb1d..0b0356b86a 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.h
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h
@@ -56,7 +56,7 @@ public:
TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters,
ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings);
- bool Next(std::vector<std::vector<IPortionColumnChunk::TPtr>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch);
+ bool Next(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch);
};
}
diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp
index 690309c190..7a155eb315 100644
--- a/ydb/core/tx/columnshard/splitter/simple.cpp
+++ b/ydb/core/tx/columnshard/splitter/simple.cpp
@@ -6,11 +6,11 @@
namespace NKikimr::NOlap {
-std::vector<IPortionColumnChunk::TPtr> TSplittedColumnChunk::DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const {
+std::vector<std::shared_ptr<IPortionDataChunk>> TSplittedColumnChunk::DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const {
auto chunks = TSimpleSplitter(saver, counters).SplitBySizes(Data.GetSlicedBatch(), Data.GetSerializedChunk(), splitSizes);
- std::vector<IPortionColumnChunk::TPtr> newChunks;
+ std::vector<std::shared_ptr<IPortionDataChunk>> newChunks;
for (auto&& i : chunks) {
- newChunks.emplace_back(std::make_shared<TSplittedColumnChunk>(ColumnId, i, SchemaInfo));
+ newChunks.emplace_back(std::make_shared<TSplittedColumnChunk>(GetColumnId(), i, SchemaInfo));
}
return newChunks;
}
@@ -19,7 +19,9 @@ TString TSplittedColumnChunk::DoDebugString() const {
return TStringBuilder() << "records_count=" << GetRecordsCount() << ";data=" << NArrow::DebugJson(Data.GetSlicedBatch(), 3, 3) << ";";
}
-std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::Array>& data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const {
+std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::Array>& data, const std::shared_ptr<arrow::Field>& field, const ui32 maxBlobSize) const {
+ AFL_VERIFY(data);
+ AFL_VERIFY(field);
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field});
auto batch = arrow::RecordBatch::Make(schema, data->length(), {data});
return Split(batch, maxBlobSize);
diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h
index 1878e72c6a..d83df17125 100644
--- a/ydb/core/tx/columnshard/splitter/simple.h
+++ b/ydb/core/tx/columnshard/splitter/simple.h
@@ -140,7 +140,7 @@ public:
return TLinearSplitInfo(countPacksMax, stepPackMin, objectsCount);
}
- std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::Array>& data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const;
+ std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::Array>& data, const std::shared_ptr<arrow::Field>& field, const ui32 maxBlobSize) const;
std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const;
std::vector<TSaverSplittedChunk> SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const;
std::vector<TSaverSplittedChunk> SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const;
@@ -152,18 +152,18 @@ private:
TSaverSplittedChunk Data;
ISchemaDetailInfo::TPtr SchemaInfo;
protected:
- virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const override;
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override;
virtual const TString& DoGetData() const override {
return Data.GetSerializedChunk();
}
- virtual ui32 DoGetRecordsCount() const override {
+ virtual ui32 DoGetRecordsCountImpl() const override {
return Data.GetRecordsCount();
}
virtual TString DoDebugString() const override;
virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const override {
- return TSimpleChunkMeta(Data.GetColumn(), SchemaInfo->NeedMinMaxForColumn(ColumnId), SchemaInfo->IsSortedColumn(ColumnId));
+ return TSimpleChunkMeta(Data.GetColumn(), SchemaInfo->NeedMinMaxForColumn(GetColumnId()), SchemaInfo->IsSortedColumn(GetColumnId()));
}
virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override {
diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
index 4b17b8a410..a8face4b53 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
@@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(Splitter) {
void Execute(std::shared_ptr<arrow::RecordBatch> batch) {
NKikimr::NColumnShard::TIndexationCounters counters("test");
NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch, NKikimr::NOlap::TSplitSettings());
- std::vector<std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr>> chunksForBlob;
+ std::vector<std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>>> chunksForBlob;
std::map<std::string, std::vector<std::shared_ptr<arrow::RecordBatch>>> restoredBatch;
std::vector<i64> blobsSize;
bool hasMultiSplit = false;
@@ -91,10 +91,12 @@ Y_UNIT_TEST_SUITE(Splitter) {
ui64 blobSize = 0;
sb << "[";
std::set<ui32> blobColumnChunks;
- for (auto&& i : chunks) {
+ for (auto&& iData : chunks) {
+ auto i = dynamic_pointer_cast<NKikimr::NOlap::IPortionColumnChunk>(iData);
+ AFL_VERIFY(i);
++chunksCount;
const ui32 columnId = i->GetColumnId();
- recordsCountByColumn[columnId] += i->GetRecordsCount();
+ recordsCountByColumn[columnId] += i->GetRecordsCountVerified();
restoredBatch[Schema->GetColumnName(columnId)].emplace_back(*Schema->GetColumnLoader(columnId).Apply(i->GetData()));
blobSize += i->GetData().size();
if (i->GetRecordsCount() != NKikimr::NOlap::TSplitSettings().GetMinRecordsCount() && !blobColumnChunks.emplace(columnId).second) {
diff --git a/ydb/library/accessor/validator.cpp b/ydb/library/accessor/validator.cpp
new file mode 100644
index 0000000000..34e22a943b
--- /dev/null
+++ b/ydb/library/accessor/validator.cpp
@@ -0,0 +1 @@
+#include "validator.h"
diff --git a/ydb/library/accessor/validator.h b/ydb/library/accessor/validator.h
new file mode 100644
index 0000000000..df81586f87
--- /dev/null
+++ b/ydb/library/accessor/validator.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <ydb/library/actors/core/log.h>
+
+class TValidator {
+public:
+ template <class T>
+ static const T& CheckNotNull(const T& object) {
+ AFL_VERIFY(!!object);
+ return object;
+ }
+}; \ No newline at end of file
diff --git a/ydb/library/accessor/ya.make b/ydb/library/accessor/ya.make
index 7b9629f0b3..430948d7ca 100644
--- a/ydb/library/accessor/ya.make
+++ b/ydb/library/accessor/ya.make
@@ -1,10 +1,12 @@
LIBRARY()
PEERDIR(
+ ydb/library/actors/core
)
SRCS(
accessor.cpp
+ validator.cpp
)
END()