diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-01-19 17:24:45 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-19 17:24:45 +0300 |
commit | 189588cdbed614b3bf079e253dd4b563f7fbb6fb (patch) | |
tree | 6550a32ffeb05550c8ffbb8de692c63b593cbc8c | |
parent | a246d2e79fb3c2b44e72930f3444ad17755dda65 (diff) | |
download | ydb-189588cdbed614b3bf079e253dd4b563f7fbb6fb.tar.gz |
use splitter for abstract data chunks. not columns only (#1147)
* use splitter for abstract data chunks. not columns only
* fix build
* fix build
* fix build
* fix build
* fix test for portions->bs->tier->bs (in this case we have to rewrite blobId)
* fix
* optional useless
* fix
38 files changed, 494 insertions, 311 deletions
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h index ea3790266d..0c3cb27bdb 100644 --- a/ydb/core/tx/columnshard/blob.h +++ b/ydb/core/tx/columnshard/blob.h @@ -288,6 +288,10 @@ struct TBlobRange { return BlobId; } + bool IsValid() const { + return BlobId.IsValid() && Size && Offset + Size <= BlobId.BlobSize(); + } + ui32 GetBlobSize() const { return Size; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 8c3feaa4cf..a0bd1c9266 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -2,31 +2,6 @@ namespace NKikimr::NColumnShard { -bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback) { - auto rowset = db.Table<IndexColumns>().Prefix(0).Select(); - if (!rowset.IsReady()) { - return false; - } - - while (!rowset.EndOfSet()) { - NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); - portion.SetPathId(rowset.GetValue<IndexColumns::PathId>()); - portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); - portion.SetPortion(rowset.GetValue<IndexColumns::Portion>()); - portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>()); - - NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, dsGroupSelector); - - portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); - - callback(portion, chunkLoadContext); - - if (!rowset.Next()) - return false; - } - return true; -} - bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) { auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select(); if (!rowset.IsReady()) { diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index b2317147a4..36d7695ffe 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -508,34 +508,6 @@ struct Schema : NIceDb::Schema { NOlap::TInsertTableAccessor& insertTable, const TInstant& loadTime); - // IndexColumns activities - - static void IndexColumns_Write(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { - auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk); - auto rowProto = row.GetMeta().SerializeToProto(); - if (proto) { - *rowProto.MutablePortionMeta() = std::move(*proto); - } - db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, - portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update( - NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()), - NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()), - NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()), - NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()), - NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset), - NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size), - NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId()) - ); - } - - static void IndexColumns_Erase(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { - db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, - portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete(); - } - - static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, - const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback); - // IndexCounters static void IndexCounters_Write(NIceDb::TNiceDb& db, ui32 counterId, ui64 value) { diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 6a99477510..8a5dacc48f 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -2,7 +2,6 @@ #include "settings.h" #include <ydb/core/tx/columnshard/blobs_action/abstract/action.h> #include <ydb/core/tx/columnshard/counters/indexation.h> -#include <ydb/core/tx/columnshard/engines/columns_table.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/tx/columnshard/engines/portions/with_blobs.h> #include <ydb/core/tx/columnshard/resource_subscriber/task.h> diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp index ef91baef13..e3eacf5d40 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp @@ -39,9 +39,7 @@ bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString()); continue; } - for (auto& record : portionInfo.Records) { - context.DB.EraseColumn(portionInfo, record); - } + portionInfo.RemoveFromDatabase(context.DB); } return true; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp index 0f54e06a93..cdb81296cf 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp @@ -63,7 +63,7 @@ bool TPortionColumnCursor::NextChunk() { ChunkRecordIndexStartPosition += CurrentChunkRecordsCount; CurrentBlobChunk = BlobChunks[ChunkIdx]; CurrentColumnChunk = ColumnChunks[ChunkIdx]; - CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCount(); + CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified(); return true; } } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h index 5224ec5d6b..7503d42132 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h @@ -10,13 +10,13 @@ namespace NKikimr::NOlap::NCompaction { class TPortionColumnCursor { private: - std::vector<IPortionColumnChunk::TPtr> BlobChunks; + std::vector<std::shared_ptr<IPortionDataChunk>> BlobChunks; std::vector<const TColumnRecord*> ColumnChunks; std::optional<ui32> RecordIndexStart; YDB_READONLY(ui32, RecordIndexFinish, 0); ui32 ChunkRecordIndexStartPosition = 0; ui32 ChunkIdx = 0; - IPortionColumnChunk::TPtr CurrentBlobChunk; + std::shared_ptr<IPortionDataChunk> CurrentBlobChunk; const TColumnRecord* CurrentColumnChunk = nullptr; ui32 CurrentChunkRecordsCount = 0; std::shared_ptr<arrow::Array> CurrentArray; @@ -38,19 +38,18 @@ public: bool Fetch(TMergedColumn& column); - TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId) + TPortionColumnCursor(const std::vector<std::shared_ptr<IPortionDataChunk>>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId) : BlobChunks(columnChunks) , ColumnChunks(records) , ColumnLoader(loader) - , PortionId(portionId) - { + , PortionId(portionId) { AFL_VERIFY(ColumnLoader); Y_UNUSED(PortionId); Y_ABORT_UNLESS(BlobChunks.size()); Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size()); CurrentBlobChunk = BlobChunks.front(); CurrentColumnChunk = ColumnChunks.front(); - CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCount(); + CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified(); } }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp index 8d31f9d06f..7b1dbdb361 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp @@ -5,16 +5,15 @@ namespace NKikimr::NOlap::NCompaction { -std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TChunkPreparation::DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const { +std::vector<std::shared_ptr<IPortionDataChunk>> TChunkPreparation::DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const { auto loader = SchemaInfo->GetColumnLoaderVerified(Record.ColumnId); auto rb = NArrow::TStatusValidator::GetValid(loader->Apply(Data)); auto chunks = TSimpleSplitter(saver, counters).SplitBySizes(rb, Data, splitSizes); - std::vector<IPortionColumnChunk::TPtr> newChunks; + std::vector<std::shared_ptr<IPortionDataChunk>> newChunks; for (auto&& i : chunks) { Y_ABORT_UNLESS(i.GetSlicedBatch()->num_columns() == 1); - newChunks.emplace_back(std::make_shared<TChunkPreparation>( - saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), ColumnId, SchemaInfo)); + newChunks.emplace_back(std::make_shared<TChunkPreparation>(saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), GetColumnId(), SchemaInfo)); } return newChunks; } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h index b5a42f52dc..aa3a15ddce 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h @@ -19,11 +19,11 @@ private: std::shared_ptr<arrow::Scalar> First; std::shared_ptr<arrow::Scalar> Last; protected: - virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const override; + virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override; virtual const TString& DoGetData() const override { return Data; } - virtual ui32 DoGetRecordsCount() const override { + virtual ui32 DoGetRecordsCountImpl() const override { return Record.GetMeta().GetNumRowsVerified(); } virtual TString DoDebugString() const override { @@ -70,14 +70,15 @@ private: const ui32 RecordsCount; TString Data; protected: - virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& /*saver*/, std::shared_ptr<NColumnShard::TSplitterCounters> /*counters*/, const std::vector<ui64>& /*splitSizes*/) const override { + virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& /*saver*/, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/, + const std::vector<ui64>& /*splitSizes*/) const override { AFL_VERIFY(false); return {}; } virtual const TString& DoGetData() const override { return Data; } - virtual ui32 DoGetRecordsCount() const override { + virtual ui32 DoGetRecordsCountImpl() const override { return RecordsCount; } virtual TString DoDebugString() const override { @@ -107,7 +108,7 @@ public: class TColumnPortionResult { protected: - std::vector<std::shared_ptr<IPortionColumnChunk>> Chunks; + std::vector<std::shared_ptr<IPortionDataChunk>> Chunks; ui64 CurrentPortionRecords = 0; const ui32 ColumnId; ui64 PackedSize = 0; @@ -121,7 +122,7 @@ public: } - const std::vector<std::shared_ptr<IPortionColumnChunk>>& GetChunks() const { + const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const { return Chunks; } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp index 18937cc641..ed03a8fcf5 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp @@ -29,7 +29,7 @@ void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::Array>& data, const } } -std::vector<NKikimr::NOlap::NCompaction::TColumnPortionResult> TMergedColumn::BuildResult() { +std::vector<TColumnPortionResult> TMergedColumn::BuildResult() { std::vector<TColumnPortionResult> result; if (Portions.size()) { Portions.back().FlushBuffer(); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index ff67fcbba7..f492aacbd4 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -102,7 +102,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc auto dataSchema = context.SchemaVersions.GetSchema(p.GetPortionInfo().GetMinSnapshot()); auto loader = dataSchema->GetColumnLoaderOptional(columnId); std::vector<const TColumnRecord*> records; - std::vector<IPortionColumnChunk::TPtr> chunks; + std::vector<std::shared_ptr<IPortionDataChunk>> chunks; if (!p.ExtractColumnChunks(columnId, records, chunks)) { AFL_VERIFY(!loader); records = {nullptr}; @@ -177,7 +177,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, stats)); for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) { - std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns; + std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns; for (auto&& p : columnChunks) { portionColumns.emplace(p.first, p.second[i].GetChunks()); } @@ -190,7 +190,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i)); auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); - std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs(); + std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunksByBlobs = slice.GroupChunksByBlobs(); AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), SaverContext.GetStorageOperator())); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 0003e91cf6..36b099471f 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -67,18 +67,14 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); self.UpsertPortion(portionInfo, &oldInfo); - for (auto& record : portionInfo.Records) { - context.DB.WriteColumn(portionInfo, record); - } + portionInfo.SaveToDatabase(context.DB); } for (auto& portionInfoWithBlobs : AppendedPortions) { auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); Y_ABORT_UNLESS(!portionInfo.Empty()); AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); self.UpsertPortion(portionInfo); - for (auto& record : portionInfo.Records) { - context.DB.WriteColumn(portionInfo, record); - } + portionInfo.SaveToDatabase(context.DB); } } @@ -115,7 +111,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(cons auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, SaverContext, stats); TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings); - std::vector<std::vector<IPortionColumnChunk::TPtr>> chunkByBlobs; + std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunkByBlobs; std::shared_ptr<arrow::RecordBatch> portionBatch; while (limiter.Next(chunkByBlobs, portionBatch)) { TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(chunkByBlobs, nullptr, granule, snapshot, SaverContext.GetStorageOperator()); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 98cab6bef1..c608915528 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -169,7 +169,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) { bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { TSnapshot lastSnapshot(0, 0); const TIndexInfo* currentIndexInfo = nullptr; - auto result = db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { + if (!db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { if (!currentIndexInfo || lastSnapshot != portion.GetMinSnapshot()) { currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo(); lastSnapshot = portion.GetMinSnapshot(); @@ -178,11 +178,14 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { // Locate granule and append the record. TColumnRecord rec(loadContext, *currentIndexInfo); GetGranulePtrVerified(portion.GetPathId())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta()); - }); + })) { + return false; + } + for (auto&& i : Tables) { i.second->OnAfterPortionsLoad(); } - return result; + return true; } bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index 57a7abb585..f3446b1bde 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -42,17 +42,56 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable, void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) { NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexColumns_Write(db, portion, row); + auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk); + auto rowProto = row.GetMeta().SerializeToProto(); + if (proto) { + *rowProto.MutablePortionMeta() = std::move(*proto); + } + using IndexColumns = NColumnShard::Schema::IndexColumns; + db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, + portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update( + NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()), + NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()), + NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()), + NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()), + NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset), + NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size), + NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId()) + ); } void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) { NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexColumns_Erase(db, portion, row); + using IndexColumns = NColumnShard::Schema::IndexColumns; + db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, + portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete(); } bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) { NIceDb::TNiceDb db(Database); - return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, callback); + using IndexColumns = NColumnShard::Schema::IndexColumns; + auto rowset = db.Table<IndexColumns>().Prefix(0).Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); + portion.SetPathId(rowset.GetValue<IndexColumns::PathId>()); + portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); + portion.SetPortion(rowset.GetValue<IndexColumns::Portion>()); + portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>()); + + NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector); + + portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); + + callback(portion, chunkLoadContext); + + if (!rowset.Next()) + return false; + } + return true; } void TDbWrapper::WriteCounter(ui32 counterId, ui64 value) { diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index b04174b59a..688ecdb3c2 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -57,6 +57,12 @@ public: ui16 Chunk = 0; TBlobRange BlobRange; + + void RegisterBlobId(const TUnifiedBlobId& blobId) { +// AFL_VERIFY(!BlobRange.BlobId.GetTabletId())("original", BlobRange.BlobId.ToStringNew())("new", blobId.ToStringNew()); + BlobRange.BlobId = blobId; + } + TColumnRecord(const TChunkAddress& address, const TBlobRange& range, TChunkMeta&& meta) : Meta(std::move(meta)) , ColumnId(address.GetColumnId()) @@ -155,10 +161,11 @@ protected: virtual const TString& DoGetData() const override { return Data; } - virtual ui32 DoGetRecordsCount() const override { + virtual ui32 DoGetRecordsCountImpl() const override { return ColumnRecord.GetMeta().GetNumRowsVerified(); } - virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& /*saver*/, std::shared_ptr<NColumnShard::TSplitterCounters> /*counters*/, const std::vector<ui64>& /*splitSizes*/) const override { + virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& /*saver*/, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/, + const std::vector<ui64>& /*splitSizes*/) const override { Y_ABORT_UNLESS(false); return {}; } @@ -173,10 +180,9 @@ protected: } public: TSimpleOrderedColumnChunk(const TColumnRecord& cRecord, const TString& data) - : TBase(cRecord.ColumnId) + : TBase(cRecord.ColumnId, cRecord.Chunk) , ColumnRecord(cRecord) , Data(data) { - ChunkIdx = cRecord.Chunk; } }; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 1d7c0d290d..d2fdb926ed 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -1,5 +1,6 @@ #include "portion_info.h" #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/db_wrapper.h> #include <ydb/core/formats/arrow/arrow_filter.h> #include <util/system/tls.h> #include <ydb/core/formats/arrow/size_calcer.h> @@ -103,16 +104,16 @@ ui64 TPortionInfo::GetRawBytes(const std::vector<ui32>& columnIds) const { return sum; } -ui64 TPortionInfo::GetRawBytes(const std::set<ui32>& columnIds) const { +ui64 TPortionInfo::GetRawBytes(const std::set<ui32>& entityIds) const { ui64 sum = 0; const ui32 numRows = NumRows(); for (auto&& i : TIndexInfo::GetSpecialColumnIds()) { - if (columnIds.contains(i)) { + if (entityIds.contains(i)) { sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i); } } for (auto&& r : Records) { - if (columnIds.contains(r.ColumnId)) { + if (entityIds.contains(r.ColumnId)) { sum += r.GetMeta().GetRawBytesVerified(); } } @@ -206,6 +207,16 @@ bool TPortionInfo::IsEqualWithSnapshots(const TPortionInfo& item) const { && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot; } +void TPortionInfo::RemoveFromDatabase(IDbWrapper& db) const { + for (auto& record : Records) { + db.EraseColumn(*this, record); + } +} +void TPortionInfo::SaveToDatabase(IDbWrapper& db) const { + for (auto& record : Records) { + db.WriteColumn(*this, record); + } +} std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_ABORT_UNLESS(!Blobs.empty()); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index b182e74dbf..a53c673489 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -11,6 +11,7 @@ namespace NKikimr::NOlap { struct TIndexInfo; +class IDbWrapper; class TPortionInfo { private: @@ -35,6 +36,22 @@ public: return PathId; } + void RegisterBlobId(const TChunkAddress& address, const TUnifiedBlobId& blobId) { + bool found = false; + for (auto it = Records.begin(); it != Records.end(); ++it) { + if (it->ColumnId == address.GetEntityId() && it->Chunk == address.GetChunkIdx()) { + it->RegisterBlobId(blobId); + found = true; + break; + } + } + AFL_VERIFY(found)("address", address.DebugString()); + } + + void RemoveFromDatabase(IDbWrapper& db) const; + + void SaveToDatabase(IDbWrapper& db) const; + bool OlderThen(const TPortionInfo& info) const { return RecordSnapshotMin() < info.RecordSnapshotMin(); } diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp index 90e6a8270f..9d8adcac99 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp @@ -4,17 +4,18 @@ namespace NKikimr::NOlap { -void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk) { +void TPortionInfoWithBlobs::TBlobInfo::RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) { Y_ABORT_UNLESS(!ResultBlob); const TString& data = chunk->GetData(); Size += data.size(); - auto address = TChunkAddress(chunk->GetColumnId(), chunk->GetChunkIdx()); + auto address = chunk->GetChunkAddress(); Y_ABORT_UNLESS(owner.GetPortionInfo().GetRecordPointer(address)); Y_ABORT_UNLESS(Chunks.emplace(address, chunk).second); ChunksOrdered.emplace_back(chunk); } -const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk) { +void TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) { + AFL_VERIFY(chunk); Y_ABORT_UNLESS(!ResultBlob); TBlobRange bRange; const TString& data = chunk->GetData(); @@ -22,34 +23,23 @@ const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWith bRange.Offset = Size; bRange.Size = data.size(); - TColumnRecord rec(TChunkAddress(chunk->GetColumnId(), chunk->GetChunkIdx()), bRange, chunk->BuildSimpleChunkMeta()); - Size += data.size(); - Y_ABORT_UNLESS(Chunks.emplace(rec.GetAddress(), chunk).second); + Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddress(), chunk).second); ChunksOrdered.emplace_back(chunk); - auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec)); - return result; + + chunk->AddIntoPortion(bRange, owner.PortionInfo); } void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) { - auto it = owner.PortionInfo.Records.begin(); for (auto&& i : Chunks) { - bool found = false; - for (; it != owner.PortionInfo.Records.end(); ++it) { - if (it->ColumnId == i.first.GetColumnId() && it->Chunk == i.first.GetChunk()) { - it->BlobRange.BlobId = blobId; - found = true; - break; - } - } - AFL_VERIFY(found)("address", i.second->DebugString()); + owner.PortionInfo.RegisterBlobId(i.first, blobId); } } -void TPortionInfoWithBlobs::TBlobInfo::ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap) { - const auto pred = [this, &resultMap, columnId](const IPortionColumnChunk::TPtr& chunk) { - if (chunk->GetColumnId() == columnId) { +void TPortionInfoWithBlobs::TBlobInfo::ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap) { + const auto pred = [this, &resultMap, entityId](const std::shared_ptr<IPortionDataChunk>& chunk) { + if (chunk->GetEntityId() == entityId) { resultMap.emplace(chunk->GetChunkAddress(), chunk); Chunks.erase(chunk->GetChunkAddress()); return true; @@ -130,9 +120,8 @@ std::vector<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::Restor return result; } -NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<std::vector<IPortionColumnChunk::TPtr>>& chunksByBlobs, - std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator) -{ +NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, + const TSnapshot& snapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator) { TPortionInfoWithBlobs result(TPortionInfo(granule, 0, snapshot, bStorageOperator), batch); for (auto& blob : chunksByBlobs) { auto blobInfo = result.StartBlob(); @@ -180,16 +169,16 @@ std::optional<NKikimr::NOlap::TPortionInfoWithBlobs> TPortionInfoWithBlobs::Chan return result; } -std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TPortionInfoWithBlobs::GetColumnChunks(const ui32 columnId) const { - std::map<TChunkAddress, IPortionColumnChunk::TPtr> sortedChunks; +std::vector<std::shared_ptr<IPortionDataChunk>> TPortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const { + std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> sortedChunks; for (auto&& b : GetBlobs()) { for (auto&& i : b.GetChunks()) { - if (i.second->GetColumnId() == columnId) { + if (i.second->GetEntityId() == entityId) { sortedChunks.emplace(i.first, i.second); } } } - std::vector<IPortionColumnChunk::TPtr> result; + std::vector<std::shared_ptr<IPortionDataChunk>> result; for (auto&& i : sortedChunks) { AFL_VERIFY(i.second->GetChunkIdx() == result.size())("idx", i.second->GetChunkIdx())("size", result.size()); result.emplace_back(i.second); @@ -197,16 +186,16 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TPortionInfoWithBlobs::Ge return result; } -bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks) { +bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks) { records = GetPortionInfo().GetColumnChunksPointers(columnId); if (records.empty()) { return false; } - std::map<TChunkAddress, IPortionColumnChunk::TPtr> chunksMap; + std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> chunksMap; for (auto&& i : Blobs) { - i.ExtractColumnChunks(columnId, chunksMap); + i.ExtractEntityChunks(columnId, chunksMap); } - std::vector<IPortionColumnChunk::TPtr> chunksLocal; + std::vector<std::shared_ptr<IPortionDataChunk>> chunksLocal; for (auto&& i : chunksMap) { Y_ABORT_UNLESS(i.first.GetColumnId() == columnId); Y_ABORT_UNLESS(i.first.GetChunk() == chunksLocal.size()); @@ -215,5 +204,4 @@ bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector std::swap(chunksLocal, chunks); return true; } - } diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h index c7fa398811..1a7a6b7192 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h @@ -10,28 +10,29 @@ class TPortionInfoWithBlobs { public: class TBlobInfo { private: - using TBlobChunks = std::map<TChunkAddress, IPortionColumnChunk::TPtr>; + using TBlobChunks = std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>; YDB_READONLY(ui64, Size, 0); YDB_READONLY_DEF(TBlobChunks, Chunks); - std::vector<IPortionColumnChunk::TPtr> ChunksOrdered; + std::vector<std::shared_ptr<IPortionDataChunk>> ChunksOrdered; mutable std::optional<TString> ResultBlob; - const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk); - void RestoreChunk(const TPortionInfoWithBlobs& owner, const IPortionColumnChunk::TPtr& chunk); + void AddChunk(TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk); + void RestoreChunk(const TPortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk); + public: class TBuilder { private: TBlobInfo* OwnerBlob; TPortionInfoWithBlobs* OwnerPortion; + public: TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion) : OwnerBlob(&blob) , OwnerPortion(&portion) { - } - const TColumnRecord& AddChunk(const IPortionColumnChunk::TPtr& chunk) { + void AddChunk(const std::shared_ptr<IPortionDataChunk>& chunk) { return OwnerBlob->AddChunk(*OwnerPortion, chunk); } - void RestoreChunk(const IPortionColumnChunk::TPtr& chunk) { + void RestoreChunk(const std::shared_ptr<IPortionColumnChunk>& chunk) { OwnerBlob->RestoreChunk(*OwnerPortion, chunk); } }; @@ -49,8 +50,7 @@ public: } void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); - void ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap); - + void ExtractEntityChunks(const ui32 entityId, std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>>& resultMap); }; private: TPortionInfo PortionInfo; @@ -82,16 +82,16 @@ public: std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames = {}) const; - std::vector<IPortionColumnChunk::TPtr> GetColumnChunks(const ui32 columnId) const; + std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const; - bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks); + bool ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<std::shared_ptr<IPortionDataChunk>>& chunks); ui64 GetSize() const { return PortionInfo.BlobsBytes(); } - static TPortionInfoWithBlobs BuildByBlobs(std::vector<std::vector<IPortionColumnChunk::TPtr>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch, - const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator); + static TPortionInfoWithBlobs BuildByBlobs(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& chunksByBlobs, std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, + const std::shared_ptr<NOlap::IBlobsStorageOperator>& bStorageOperator); std::optional<TPortionInfoWithBlobs> ChangeSaver(ISnapshotSchema::TPtr currentSchema, const TSaverContext& saverContext) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h index f12b146cad..11415ecc20 100644 --- a/ydb/core/tx/columnshard/engines/scheme/column_features.h +++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h @@ -7,6 +7,7 @@ #include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> +#include <ydb/core/formats/arrow/common/validation.h> namespace NKikimr::NOlap { @@ -19,8 +20,7 @@ private: public: TSaverContext(const std::shared_ptr<IBlobsStorageOperator>& storageOperator, const std::shared_ptr<IStoragesManager>& storagesManager) : StorageOperator(storageOperator) - , StoragesManager(storagesManager) - { + , StoragesManager(storagesManager) { } @@ -86,8 +86,7 @@ public: : Transformer(transformer) , Deserializer(deserializer) , ExpectedSchema(expectedSchema) - , ColumnId(columnId) - { + , ColumnId(columnId) { Y_ABORT_UNLESS(ExpectedSchema); auto fieldsCountStr = ::ToString(ExpectedSchema->num_fields()); Y_ABORT_UNLESS(ExpectedSchema->num_fields() == 1, "%s", fieldsCountStr.data()); @@ -118,6 +117,16 @@ public: return columnArray; } } + + std::shared_ptr<arrow::RecordBatch> ApplyVerified(const TString& data) const { + return NArrow::TStatusValidator::GetValid(Apply(data)); + } + + std::shared_ptr<arrow::Array> ApplyVerifiedColumn(const TString& data) const { + auto rb = ApplyVerified(data); + AFL_VERIFY(rb->num_columns() == 1)("schema", rb->schema()->ToString()); + return rb->column(0); + } }; struct TIndexInfo; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 82eeb52686..9da8c6f3d3 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -22,9 +22,8 @@ static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns) return out; } -TIndexInfo::TIndexInfo(const TString& name, ui32 id) +TIndexInfo::TIndexInfo(const TString& name) : NTable::TScheme::TTableSchema() - , Id(id) , Name(name) {} @@ -326,6 +325,12 @@ TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext } } +std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoaderVerified(const ui32 columnId) const { + auto result = GetColumnLoaderOptional(columnId); + AFL_VERIFY(result); + return result; +} + std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoaderOptional(const ui32 columnId) const { auto it = ColumnFeatures.find(columnId); if (it == ColumnFeatures.end()) { @@ -451,7 +456,7 @@ std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& table } std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { - TIndexInfo result("", 0); + TIndexInfo result(""); if (!result.DeserializeFromProto(schema)) { return std::nullopt; } diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index b73bf480c1..2446df2d2a 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -34,7 +34,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { private: THashMap<ui32, TColumnFeatures> ColumnFeatures; THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache; - TIndexInfo(const TString& name, ui32 id); + TIndexInfo(const TString& name); bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const; void BuildSchemaWithSpecials(); @@ -54,7 +54,6 @@ public: TString DebugString() const { TStringBuilder sb; sb << "(" - << "id=" << Id << ";" << "version=" << Version << ";" << "name=" << Name << ";" << ")"; @@ -92,17 +91,12 @@ public: public: static TIndexInfo BuildDefault() { - TIndexInfo result("dummy", 0); + TIndexInfo result("dummy"); return result; } static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); - /// Returns id of the index. - ui32 GetId() const noexcept { - return Id; - } - static const std::vector<std::string>& SnapshotColumnNames() { static std::vector<std::string> result = {SPEC_COL_PLAN_STEP, SPEC_COL_TX_ID}; return result; @@ -114,6 +108,7 @@ public: std::shared_ptr<arrow::Schema> GetColumnsSchema(const std::set<ui32>& columnIds) const; TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const; std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const; + std::shared_ptr<TColumnLoader> GetColumnLoaderVerified(const ui32 columnId) const; /// Returns an id of the column located by name. The name should exists in the schema. ui32 GetColumnId(const std::string& name) const; @@ -206,7 +201,6 @@ public: bool CheckCompatible(const TIndexInfo& other) const; private: - ui32 Id; ui64 Version = 0; TString Name; std::shared_ptr<arrow::Schema> Schema; diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index bab5202a9a..aac2d2bdd1 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -56,12 +56,12 @@ void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionIn if (it == Portions.end()) { Y_ABORT_UNLESS(portion.Records.empty()); auto portionNew = std::make_shared<TPortionInfo>(portion); - portionNew->AddRecord(indexInfo, rec, portionMeta); it = Portions.emplace(portion.GetPortion(), portionNew).first; } else { AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString()); - it->second->AddRecord(indexInfo, rec, portionMeta); } + it->second->AddRecord(indexInfo, rec, portionMeta); + if (portionMeta) { it->second->InitOperator(Owner->GetStoragesManager()->InitializePortionOperator(*it->second), false); } diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp index 06ce8cebd5..47dfe990eb 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp +++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp @@ -1,12 +1,13 @@ #include "batch_slice.h" #include "simple.h" +#include <ydb/library/accessor/validator.h> namespace NKikimr::NOlap { bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { - std::vector<IPortionColumnChunk::TPtr> chunksInProgress; - std::sort(Columns.begin(), Columns.end()); - for (auto&& i : Columns) { + std::vector<std::shared_ptr<IPortionDataChunk>> chunksInProgress; + std::sort(Data.begin(), Data.end()); + for (auto&& i : Data) { for (auto&& p : i.GetChunks()) { chunksInProgress.emplace_back(p); } @@ -49,12 +50,12 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { Y_ABORT_UNLESS((i64)chunksInProgress[i]->GetPackedSize() > Settings.GetMinBlobSize() - partSize); Y_ABORT_UNLESS(otherSize - (Settings.GetMinBlobSize() - partSize) >= Settings.GetMinBlobSize()); - std::vector<IPortionColumnChunk::TPtr> newChunks; - const bool splittable = chunksInProgress[i]->GetRecordsCount() > 1; + std::vector<std::shared_ptr<IPortionDataChunk>> newChunks; + const bool splittable = chunksInProgress[i]->IsSplittable(); if (splittable) { Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i]->GetPackedSize()); const std::vector<ui64> sizes = {(ui64)(Settings.GetMinBlobSize() - partSize)}; - newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetColumnId()), Counters, sizes); + newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetEntityId()), Counters, sizes); chunksInProgress.erase(chunksInProgress.begin() + i); chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end()); } @@ -81,10 +82,10 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { ui32 currentChunkIdx = 0; for (auto&& i : result) { for (auto&& c : i.GetChunks()) { - Y_ABORT_UNLESS(c->GetColumnId()); - if (!lastColumnId || *lastColumnId != c->GetColumnId()) { - Y_ABORT_UNLESS(columnIds.emplace(c->GetColumnId()).second); - lastColumnId = c->GetColumnId(); + Y_ABORT_UNLESS(c->GetEntityId()); + if (!lastColumnId || *lastColumnId != c->GetEntityId()) { + Y_ABORT_UNLESS(columnIds.emplace(c->GetEntityId()).second); + lastColumnId = c->GetEntityId(); currentChunkIdx = 0; } c->SetChunkIdx(currentChunkIdx++); @@ -94,60 +95,58 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { return true; } -TGeneralSerializedSlice::TGeneralSerializedSlice(const std::map<ui32, std::vector<IPortionColumnChunk::TPtr>>& data, - ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings) +TGeneralSerializedSlice::TGeneralSerializedSlice(const std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, + const TSplitSettings& settings) : Schema(schema) , Counters(counters) , Settings(settings) { - std::optional<ui32> recordsCount; - for (auto&& [columnId, chunks] : data) { - auto f = schema->GetField(columnId); - TSplittedColumn column(f, columnId); - column.AddChunks(chunks); - if (!recordsCount) { - recordsCount = column.GetRecordsCount(); - } else { - AFL_VERIFY(*recordsCount == column.GetRecordsCount())("records_count", *recordsCount)("column", column.GetRecordsCount()); + for (auto&& [entityId, chunks] : data) { + TSplittedEntity entity(entityId); + entity.SetChunks(chunks); + if (!!entity.GetRecordsCount()) { + if (!recordsCount) { + recordsCount = entity.GetRecordsCount(); + } else { + AFL_VERIFY(*recordsCount == entity.GetRecordsCount())("records_count", *recordsCount)("column", entity.GetRecordsCount()); + } } - Size += column.GetSize(); - Columns.emplace_back(std::move(column)); + Size += entity.GetSize(); + Data.emplace_back(std::move(entity)); } Y_ABORT_UNLESS(recordsCount); RecordsCount = *recordsCount; } -TGeneralSerializedSlice::TGeneralSerializedSlice(ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings) - : Schema(schema) +TGeneralSerializedSlice::TGeneralSerializedSlice(const ui32 recordsCount, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings) + : RecordsCount(recordsCount) + , Schema(schema) , Counters(counters) , Settings(settings) { } -TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings) - : TBase(schema, counters, settings) +TBatchSerializedSlice::TBatchSerializedSlice(const std::shared_ptr<arrow::RecordBatch>& batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings) + : TBase(TValidator::CheckNotNull(batch)->num_rows(), schema, counters, settings) , Batch(batch) { Y_ABORT_UNLESS(batch); - RecordsCount = batch->num_rows(); - Columns.reserve(batch->num_columns()); + Data.reserve(batch->num_columns()); for (auto&& i : batch->schema()->fields()) { - TSplittedColumn c(i, schema->GetColumnId(i->name())); - Columns.emplace_back(std::move(c)); + TSplittedEntity c(schema->GetColumnId(i->name())); + Data.emplace_back(std::move(c)); } ui32 idx = 0; for (auto&& i : batch->columns()) { - auto& c = Columns[idx]; - auto columnSaver = schema->GetColumnSaver(c.GetColumnId()); - auto stats = schema->GetColumnSerializationStats(c.GetColumnId()); + auto& c = Data[idx]; + auto columnSaver = schema->GetColumnSaver(c.GetEntityId()); + auto stats = schema->GetColumnSerializationStats(c.GetEntityId()); TSimpleSplitter splitter(columnSaver, Counters); - if (stats) { - splitter.SetStats(*stats); - } - std::vector<IPortionColumnChunk::TPtr> chunks; - for (auto&& i : splitter.Split(i, c.GetField(), Settings.GetMaxBlobSize())) { - chunks.emplace_back(std::make_shared<TSplittedColumnChunk>(c.GetColumnId(), i, Schema)); + splitter.SetStats(stats); + std::vector<std::shared_ptr<IPortionDataChunk>> chunks; + for (auto&& i : splitter.Split(i, Schema->GetField(c.GetEntityId()), Settings.GetMaxBlobSize())) { + chunks.emplace_back(std::make_shared<TSplittedColumnChunk>(c.GetEntityId(), i, Schema)); } c.SetChunks(chunks); Size += c.GetSize(); @@ -156,11 +155,11 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> } void TGeneralSerializedSlice::MergeSlice(TGeneralSerializedSlice&& slice) { - Y_ABORT_UNLESS(Columns.size() == slice.Columns.size()); + Y_ABORT_UNLESS(Data.size() == slice.Data.size()); RecordsCount += slice.GetRecordsCount(); - for (ui32 i = 0; i < Columns.size(); ++i) { - Size += slice.Columns[i].GetSize(); - Columns[i].Merge(std::move(slice.Columns[i])); + for (ui32 i = 0; i < Data.size(); ++i) { + Size += slice.Data[i].GetSize(); + Data[i].Merge(std::move(slice.Data[i])); } } diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index feda743b26..d86db7ae8c 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -87,32 +87,34 @@ public: }; class TGeneralSerializedSlice { +private: + YDB_READONLY(ui32, RecordsCount, 0); protected: - std::vector<TSplittedColumn> Columns; + std::vector<TSplittedEntity> Data; ui64 Size = 0; - ui32 RecordsCount = 0; ISchemaDetailInfo::TPtr Schema; std::shared_ptr<NColumnShard::TSplitterCounters> Counters; TSplitSettings Settings; TGeneralSerializedSlice() = default; - const TSplittedColumn& GetColumnVerified(const std::string& fieldName) const { - for (auto&& i : Columns) { - if (i.GetField()->name() == fieldName) { + const TSplittedEntity& GetEntityDataVerified(const ui32& entityId) const { + for (auto&& i : Data) { + if (i.GetEntityId() == entityId) { return i; } } Y_ABORT_UNLESS(false); - return Columns.front(); + return Data.front(); } + public: std::shared_ptr<arrow::RecordBatch> GetFirstLastPKBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const { std::vector<std::shared_ptr<arrow::Array>> pkColumns; for (auto&& i : pkSchema->fields()) { auto aBuilder = NArrow::MakeBuilder(i); - const TSplittedColumn& splittedColumn = GetColumnVerified(i->name()); - NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetFirstScalar())); - NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetLastScalar())); + const TSplittedEntity& splittedEntity = GetEntityDataVerified(Schema->GetColumnId(i->name())); + NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedEntity.GetFirstScalar())); + NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedEntity.GetLastScalar())); pkColumns.emplace_back(NArrow::TStatusValidator::GetValid(aBuilder->Finish())); } return arrow::RecordBatch::Make(pkSchema, 2, pkColumns); @@ -121,12 +123,9 @@ public: ui64 GetSize() const { return Size; } - ui32 GetRecordsCount() const { - return RecordsCount; - } - std::vector<std::vector<IPortionColumnChunk::TPtr>> GroupChunksByBlobs() { - std::vector<std::vector<IPortionColumnChunk::TPtr>> result; + std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> GroupChunksByBlobs() { + std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> result; std::vector<TSplittedBlob> blobs; GroupBlobs(blobs); for (auto&& i : blobs) { @@ -142,8 +141,8 @@ public: MergeSlice(std::move(objects[i])); } } - TGeneralSerializedSlice(const std::map<ui32, std::vector<IPortionColumnChunk::TPtr>>& data, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings); - TGeneralSerializedSlice(ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings); + TGeneralSerializedSlice(const std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings); + TGeneralSerializedSlice(const ui32 recordsCount, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings); void MergeSlice(TGeneralSerializedSlice&& slice); @@ -159,10 +158,9 @@ private: using TBase = TGeneralSerializedSlice; YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch); public: - TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings); + TBatchSerializedSlice(const std::shared_ptr<arrow::RecordBatch>& batch, ISchemaDetailInfo::TPtr schema, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const TSplitSettings& settings); - explicit TBatchSerializedSlice(TVectorView<TBatchSerializedSlice>&& objects) - { + explicit TBatchSerializedSlice(TVectorView<TBatchSerializedSlice>&& objects) { Y_ABORT_UNLESS(objects.size()); std::swap(*this, objects.front()); for (ui32 i = 1; i < objects.size(); ++i) { diff --git a/ydb/core/tx/columnshard/splitter/blob_info.cpp b/ydb/core/tx/columnshard/splitter/blob_info.cpp index 4d0f4ef715..01a6e1a000 100644 --- a/ydb/core/tx/columnshard/splitter/blob_info.cpp +++ b/ydb/core/tx/columnshard/splitter/blob_info.cpp @@ -2,9 +2,8 @@ namespace NKikimr::NOlap { -void TSplittedBlob::Take(const IPortionColumnChunk::TPtr& chunk) { +void TSplittedBlob::Take(const std::shared_ptr<IPortionDataChunk>& chunk) { Chunks.emplace_back(chunk); Size += chunk->GetPackedSize(); } - } diff --git a/ydb/core/tx/columnshard/splitter/blob_info.h b/ydb/core/tx/columnshard/splitter/blob_info.h index 027f388d06..a4515f9f78 100644 --- a/ydb/core/tx/columnshard/splitter/blob_info.h +++ b/ydb/core/tx/columnshard/splitter/blob_info.h @@ -7,9 +7,10 @@ namespace NKikimr::NOlap { class TSplittedBlob { private: YDB_READONLY(i64, Size, 0); - YDB_READONLY_DEF(std::vector<IPortionColumnChunk::TPtr>, Chunks); + YDB_READONLY_DEF(std::vector<std::shared_ptr<IPortionDataChunk>>, Chunks); + public: - void Take(const IPortionColumnChunk::TPtr& chunk); + void Take(const std::shared_ptr<IPortionDataChunk>& chunk); bool operator<(const TSplittedBlob& item) const { return Size > item.Size; } diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp index e4e6193345..0bf0b70fe6 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.cpp +++ b/ydb/core/tx/columnshard/splitter/chunks.cpp @@ -1,8 +1,10 @@ #include "chunks.h" +#include <ydb/core/tx/columnshard/engines/portions/column_record.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> namespace NKikimr::NOlap { -std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> IPortionColumnChunk::InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const { +std::vector<std::shared_ptr<IPortionDataChunk>> IPortionColumnChunk::DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const { ui64 sumSize = 0; for (auto&& i : splitSizes) { sumSize += i; @@ -13,7 +15,7 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> IPortionColumnChunk::Inte } else { Y_ABORT_UNLESS(GetRecordsCount() >= splitSizes.size()); } - auto result = DoInternalSplit(saver, counters, splitSizes); + auto result = DoInternalSplitImpl(saver, counters, splitSizes); if (sumSize == GetPackedSize()) { Y_ABORT_UNLESS(result.size() == splitSizes.size()); } else { @@ -22,4 +24,9 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> IPortionColumnChunk::Inte return result; } +void IPortionColumnChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const { + TColumnRecord rec(GetChunkAddress(), bRange, BuildSimpleChunkMeta()); + portionInfo.AppendOneChunkColumn(std::move(rec)); +} + } diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 36ca66c58e..1623bd2914 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -1,38 +1,75 @@ #pragma once #include "chunk_meta.h" -#include <ydb/core/tx/columnshard/engines/scheme/column_features.h> + #include <ydb/core/tx/columnshard/counters/splitter.h> #include <ydb/core/tx/columnshard/engines/portions/common.h> +#include <ydb/core/tx/columnshard/engines/scheme/column_features.h> namespace NKikimr::NOlap { -class IPortionColumnChunk { -public: - using TPtr = std::shared_ptr<IPortionColumnChunk>; +class IPortionDataChunk { +private: + YDB_READONLY(ui32, EntityId, 0); + + std::optional<ui32> ChunkIdx; + protected: - ui32 ColumnId = 0; - ui16 ChunkIdx = 0; - virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const = 0; - virtual ui64 DoGetPackedSize() const { + ui64 DoGetPackedSize() const { return GetData().size(); } virtual const TString& DoGetData() const = 0; - virtual ui32 DoGetRecordsCount() const = 0; virtual TString DoDebugString() const = 0; - virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const = 0; + virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const = 0; + virtual bool DoIsSplittable() const = 0; + virtual std::optional<ui32> DoGetRecordsCount() const = 0; virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const = 0; virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const = 0; - + virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const = 0; public: - IPortionColumnChunk(const ui32 columnId) - : ColumnId(columnId) - { + IPortionDataChunk(const ui32 entityId, const std::optional<ui16>& chunkIdx = {}) + : EntityId(entityId) + , ChunkIdx(chunkIdx) { + } + virtual ~IPortionDataChunk() = default; + + TString DebugString() const { + return DoDebugString(); } - virtual ~IPortionColumnChunk() = default; - TChunkAddress GetChunkAddress() const { - return TChunkAddress(ColumnId, ChunkIdx); + const TString& GetData() const { + return DoGetData(); + } + + ui64 GetPackedSize() const { + return DoGetPackedSize(); + } + + std::optional<ui32> GetRecordsCount() const { + return DoGetRecordsCount(); + } + + ui32 GetRecordsCountVerified() const { + auto result = DoGetRecordsCount(); + AFL_VERIFY(result); + return *result; + } + + std::vector<std::shared_ptr<IPortionDataChunk>> InternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const { + return DoInternalSplit(saver, counters, splitSizes); + } + + bool IsSplittable() const { + return DoIsSplittable(); + } + + ui16 GetChunkIdx() const { + AFL_VERIFY(!!ChunkIdx); + return *ChunkIdx; + } + + void SetChunkIdx(const ui16 value) { + ChunkIdx = value; } std::shared_ptr<arrow::Scalar> GetFirstScalar() const { @@ -46,38 +83,136 @@ public: return result; } + TChunkAddress GetChunkAddress() const { + return TChunkAddress(GetEntityId(), GetChunkIdx()); + } + + void AddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const { + return DoAddIntoPortion(bRange, portionInfo); + } +}; + +class IPortionColumnChunk : public IPortionDataChunk { +private: + using TBase = IPortionDataChunk; + +protected: + virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const = 0; + virtual ui32 DoGetRecordsCountImpl() const = 0; + virtual std::optional<ui32> DoGetRecordsCount() const override final { + return DoGetRecordsCountImpl(); + } + + virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const override; + + virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const = 0; + virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override; + virtual bool DoIsSplittable() const override { + return GetRecordsCount() > 1; + } + +public: + IPortionColumnChunk(const ui32 entityId, const std::optional<ui16>& chunkIdx = {}) + : TBase(entityId, chunkIdx) { + } + virtual ~IPortionColumnChunk() = default; + TSimpleChunkMeta BuildSimpleChunkMeta() const { return DoBuildSimpleChunkMeta(); } ui32 GetColumnId() const { - return ColumnId; + return GetEntityId(); } +}; - ui16 GetChunkIdx() const { - return ChunkIdx; +class TChunkedColumnReader { +private: + std::vector<std::shared_ptr<IPortionDataChunk>> Chunks; + std::shared_ptr<TColumnLoader> Loader; + + std::shared_ptr<arrow::Array> CurrentChunk; + ui32 CurrentChunkIndex = 0; + ui32 CurrentRecordIndex = 0; +public: + TChunkedColumnReader(const std::vector<std::shared_ptr<IPortionDataChunk>>& chunks, const std::shared_ptr<TColumnLoader>& loader) + : Chunks(chunks) + , Loader(loader) + { + if (Chunks.size()) { + CurrentChunk = Loader->ApplyVerifiedColumn(Chunks.front()->GetData()); + } } - void SetChunkIdx(const ui16 value) { - ChunkIdx = value; + const std::shared_ptr<arrow::Array>& GetCurrentChunk() const { + return CurrentChunk; } - TString DebugString() const { - return DoDebugString(); + ui32 GetCurrentRecordIndex() const { + return CurrentRecordIndex; } - ui32 GetRecordsCount() const { - return DoGetRecordsCount(); + bool IsCorrect() const { + return !!CurrentChunk; } - const TString& GetData() const { - return DoGetData(); + bool ReadNext() { + AFL_VERIFY(!!CurrentChunk); + if (++CurrentRecordIndex < CurrentChunk->length()) { + return true; + } + while (++CurrentChunkIndex < Chunks.size()) { + CurrentChunk = Loader->ApplyVerifiedColumn(Chunks[CurrentChunkIndex]->GetData()); + CurrentRecordIndex = 0; + if (CurrentRecordIndex < CurrentChunk->length()) { + return true; + } + } + CurrentChunk = nullptr; + return false; } +}; - ui64 GetPackedSize() const { - return DoGetPackedSize(); +class TChunkedBatchReader { +private: + std::vector<TChunkedColumnReader> Columns; + bool IsCorrectFlag = true; +public: + TChunkedBatchReader(const std::vector<TChunkedColumnReader>& columnReaders) + : Columns(columnReaders) + { + AFL_VERIFY(Columns.size()); + for (auto&& i : Columns) { + AFL_VERIFY(i.IsCorrect()); + } + } + + bool IsCorrect() const { + return IsCorrectFlag; + } + + bool ReadNext() { + std::optional<bool> result; + for (auto&& i : Columns) { + if (!result) { + result = i.ReadNext(); + } else { + AFL_VERIFY(*result == i.ReadNext()); + } + } + if (!*result) { + IsCorrectFlag = false; + } + return *result; + } + + std::vector<TChunkedColumnReader>::const_iterator begin() const { + return Columns.begin(); } - std::vector<IPortionColumnChunk::TPtr> InternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const; + std::vector<TChunkedColumnReader>::const_iterator end() const { + return Columns.end(); + } }; + } diff --git a/ydb/core/tx/columnshard/splitter/column_info.h b/ydb/core/tx/columnshard/splitter/column_info.h index d2c76e9535..ff2f39190f 100644 --- a/ydb/core/tx/columnshard/splitter/column_info.h +++ b/ydb/core/tx/columnshard/splitter/column_info.h @@ -7,14 +7,31 @@ namespace NKikimr::NOlap { -class TSplittedColumn { +class TSplittedEntity { private: + YDB_READONLY(ui32, EntityId, 0); YDB_READONLY(ui64, Size, 0); - YDB_READONLY(ui32, ColumnId, 0); - YDB_READONLY(ui32, RecordsCount, 0); - YDB_READONLY_DEF(std::shared_ptr<arrow::Field>, Field); - YDB_READONLY_DEF(std::vector<IPortionColumnChunk::TPtr>, Chunks); + YDB_READONLY_DEF(std::vector<std::shared_ptr<IPortionDataChunk>>, Chunks); + YDB_READONLY_DEF(std::optional<ui32>, RecordsCount); + +protected: + template <class T> + const T& GetChunkAs(const ui32 idx) const { + AFL_VERIFY(idx < Chunks.size()); + auto result = std::dynamic_pointer_cast<T>(Chunks[idx]); + AFL_VERIFY(result); + return *result; + } + public: + TSplittedEntity(const ui32 entityId) + : EntityId(entityId) { + AFL_VERIFY(EntityId); + } + + bool operator<(const TSplittedEntity& item) const { + return Size > item.Size; + } std::shared_ptr<arrow::Scalar> GetFirstScalar() const { Y_ABORT_UNLESS(Chunks.size()); @@ -26,46 +43,40 @@ public: return Chunks.back()->GetLastScalar(); } - void Merge(TSplittedColumn&& c) { + void Merge(TSplittedEntity&& c) { Size += c.Size; - Y_ABORT_UNLESS(Field->name() == c.Field->name()); - Y_DEBUG_ABORT_UNLESS(Field->Equals(c.Field)); - Y_ABORT_UNLESS(ColumnId == c.ColumnId); - Y_ABORT_UNLESS(ColumnId); - RecordsCount += c.RecordsCount; + AFL_VERIFY(!!RecordsCount == !!c.RecordsCount); + if (RecordsCount) { + *RecordsCount += *c.RecordsCount; + } + AFL_VERIFY(EntityId == c.EntityId)("self", EntityId)("c", c.EntityId); + Y_ABORT_UNLESS(c.EntityId); for (auto&& i : c.Chunks) { Chunks.emplace_back(std::move(i)); } } - void SetChunks(const std::vector<IPortionColumnChunk::TPtr>& data) { + void SetChunks(const std::vector<std::shared_ptr<IPortionDataChunk>>& data) { Y_ABORT_UNLESS(Chunks.empty()); + std::optional<bool> hasRecords; for (auto&& i : data) { - Y_ABORT_UNLESS(i->GetColumnId() == ColumnId); - Size += i->GetPackedSize(); - RecordsCount += i->GetRecordsCount(); - Chunks.emplace_back(i); - } - } - - void AddChunks(const std::vector<IPortionColumnChunk::TPtr>& data) { - for (auto&& i : data) { - Y_ABORT_UNLESS(i->GetColumnId() == ColumnId); + Y_ABORT_UNLESS(i->GetEntityId() == EntityId); Size += i->GetPackedSize(); - RecordsCount += i->GetRecordsCount(); Chunks.emplace_back(i); + auto rc = i->GetRecordsCount(); + if (!hasRecords) { + hasRecords = !!rc; + } + AFL_VERIFY(*hasRecords == !!rc); + if (!rc) { + continue; + } + if (!RecordsCount) { + RecordsCount = rc; + } else { + *RecordsCount += *rc; + } } } - - TSplittedColumn(std::shared_ptr<arrow::Field> f, const ui32 id) - : ColumnId(id) - , Field(f) - { - - } - - bool operator<(const TSplittedColumn& item) const { - return Size > item.Size; - } }; } diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp index f60a0d9e91..5fae273e44 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp @@ -39,13 +39,13 @@ TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters Y_ABORT_UNLESS(recordsCountCheck == batch->num_rows()); } -bool TRBSplitLimiter::Next(std::vector<std::vector<IPortionColumnChunk::TPtr>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch) { +bool TRBSplitLimiter::Next(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch) { if (!Slices.size()) { return false; } std::vector<TSplittedBlob> blobs; Slices.front().GroupBlobs(blobs); - std::vector<std::vector<IPortionColumnChunk::TPtr>> result; + std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> result; std::map<ui32, ui32> columnChunks; for (auto&& i : blobs) { if (blobs.size() == 1) { @@ -60,5 +60,4 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<IPortionColumnChunk::TPtr>>& Slices.pop_front(); return true; } - } diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h index 5053f2eb1d..0b0356b86a 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.h +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h @@ -56,7 +56,7 @@ public: TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings); - bool Next(std::vector<std::vector<IPortionColumnChunk::TPtr>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch); + bool Next(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch); }; } diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp index 690309c190..7a155eb315 100644 --- a/ydb/core/tx/columnshard/splitter/simple.cpp +++ b/ydb/core/tx/columnshard/splitter/simple.cpp @@ -6,11 +6,11 @@ namespace NKikimr::NOlap { -std::vector<IPortionColumnChunk::TPtr> TSplittedColumnChunk::DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const { +std::vector<std::shared_ptr<IPortionDataChunk>> TSplittedColumnChunk::DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const { auto chunks = TSimpleSplitter(saver, counters).SplitBySizes(Data.GetSlicedBatch(), Data.GetSerializedChunk(), splitSizes); - std::vector<IPortionColumnChunk::TPtr> newChunks; + std::vector<std::shared_ptr<IPortionDataChunk>> newChunks; for (auto&& i : chunks) { - newChunks.emplace_back(std::make_shared<TSplittedColumnChunk>(ColumnId, i, SchemaInfo)); + newChunks.emplace_back(std::make_shared<TSplittedColumnChunk>(GetColumnId(), i, SchemaInfo)); } return newChunks; } @@ -19,7 +19,9 @@ TString TSplittedColumnChunk::DoDebugString() const { return TStringBuilder() << "records_count=" << GetRecordsCount() << ";data=" << NArrow::DebugJson(Data.GetSlicedBatch(), 3, 3) << ";"; } -std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::Array>& data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const { +std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::Array>& data, const std::shared_ptr<arrow::Field>& field, const ui32 maxBlobSize) const { + AFL_VERIFY(data); + AFL_VERIFY(field); auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field}); auto batch = arrow::RecordBatch::Make(schema, data->length(), {data}); return Split(batch, maxBlobSize); diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h index 1878e72c6a..d83df17125 100644 --- a/ydb/core/tx/columnshard/splitter/simple.h +++ b/ydb/core/tx/columnshard/splitter/simple.h @@ -140,7 +140,7 @@ public: return TLinearSplitInfo(countPacksMax, stepPackMin, objectsCount); } - std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::Array>& data, std::shared_ptr<arrow::Field> field, const ui32 maxBlobSize) const; + std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::Array>& data, const std::shared_ptr<arrow::Field>& field, const ui32 maxBlobSize) const; std::vector<TSaverSplittedChunk> Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const; std::vector<TSaverSplittedChunk> SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const; std::vector<TSaverSplittedChunk> SplitBySizes(std::shared_ptr<arrow::RecordBatch> data, const TString& dataSerialization, const std::vector<ui64>& splitPartSizesExt) const; @@ -152,18 +152,18 @@ private: TSaverSplittedChunk Data; ISchemaDetailInfo::TPtr SchemaInfo; protected: - virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const override; + virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override; virtual const TString& DoGetData() const override { return Data.GetSerializedChunk(); } - virtual ui32 DoGetRecordsCount() const override { + virtual ui32 DoGetRecordsCountImpl() const override { return Data.GetRecordsCount(); } virtual TString DoDebugString() const override; virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const override { - return TSimpleChunkMeta(Data.GetColumn(), SchemaInfo->NeedMinMaxForColumn(ColumnId), SchemaInfo->IsSortedColumn(ColumnId)); + return TSimpleChunkMeta(Data.GetColumn(), SchemaInfo->NeedMinMaxForColumn(GetColumnId()), SchemaInfo->IsSortedColumn(GetColumnId())); } virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override { diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp index 4b17b8a410..a8face4b53 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp @@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(Splitter) { void Execute(std::shared_ptr<arrow::RecordBatch> batch) { NKikimr::NColumnShard::TIndexationCounters counters("test"); NKikimr::NOlap::TRBSplitLimiter limiter(counters.SplitterCounters, Schema, batch, NKikimr::NOlap::TSplitSettings()); - std::vector<std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr>> chunksForBlob; + std::vector<std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>>> chunksForBlob; std::map<std::string, std::vector<std::shared_ptr<arrow::RecordBatch>>> restoredBatch; std::vector<i64> blobsSize; bool hasMultiSplit = false; @@ -91,10 +91,12 @@ Y_UNIT_TEST_SUITE(Splitter) { ui64 blobSize = 0; sb << "["; std::set<ui32> blobColumnChunks; - for (auto&& i : chunks) { + for (auto&& iData : chunks) { + auto i = dynamic_pointer_cast<NKikimr::NOlap::IPortionColumnChunk>(iData); + AFL_VERIFY(i); ++chunksCount; const ui32 columnId = i->GetColumnId(); - recordsCountByColumn[columnId] += i->GetRecordsCount(); + recordsCountByColumn[columnId] += i->GetRecordsCountVerified(); restoredBatch[Schema->GetColumnName(columnId)].emplace_back(*Schema->GetColumnLoader(columnId).Apply(i->GetData())); blobSize += i->GetData().size(); if (i->GetRecordsCount() != NKikimr::NOlap::TSplitSettings().GetMinRecordsCount() && !blobColumnChunks.emplace(columnId).second) { diff --git a/ydb/library/accessor/validator.cpp b/ydb/library/accessor/validator.cpp new file mode 100644 index 0000000000..34e22a943b --- /dev/null +++ b/ydb/library/accessor/validator.cpp @@ -0,0 +1 @@ +#include "validator.h" diff --git a/ydb/library/accessor/validator.h b/ydb/library/accessor/validator.h new file mode 100644 index 0000000000..df81586f87 --- /dev/null +++ b/ydb/library/accessor/validator.h @@ -0,0 +1,12 @@ +#pragma once + +#include <ydb/library/actors/core/log.h> + +class TValidator { +public: + template <class T> + static const T& CheckNotNull(const T& object) { + AFL_VERIFY(!!object); + return object; + } +};
\ No newline at end of file diff --git a/ydb/library/accessor/ya.make b/ydb/library/accessor/ya.make index 7b9629f0b3..430948d7ca 100644 --- a/ydb/library/accessor/ya.make +++ b/ydb/library/accessor/ya.make @@ -1,10 +1,12 @@ LIBRARY() PEERDIR( + ydb/library/actors/core ) SRCS( accessor.cpp + validator.cpp ) END() |