diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 12:39:21 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 13:04:39 +0300 |
commit | 9f05ad4a7c0f811fd2f7b6de45c1e1e7c3b29903 (patch) | |
tree | 07b4297f712e39fa421bb0925109e9a9db28257c | |
parent | 347d5c422477a8dc5399f447e5f0441c1bb0c502 (diff) | |
download | ydb-9f05ad4a7c0f811fd2f7b6de45c1e1e7c3b29903.tar.gz |
KIKIMR-19211: general compaction withno assemble full data for pk
15 files changed, 191 insertions, 51 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h index 9202d1366c..e583cdfdcb 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h @@ -36,10 +36,11 @@ public: bool Fetch(TMergedColumn& column); - TPortionColumnCursor(const TPortionInfoWithBlobs& portionWithBlobs, const ui32 columnId, const std::shared_ptr<TColumnLoader> loader) - : ColumnLoader(loader) { - BlobChunks = portionWithBlobs.GetColumnChunks(columnId); - ColumnChunks = portionWithBlobs.GetPortionInfo().GetColumnChunksPointers(columnId); + TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader> loader) + : BlobChunks(columnChunks) + , ColumnChunks(records) + , ColumnLoader(loader) + { Y_VERIFY(BlobChunks.size()); Y_VERIFY(ColumnChunks.size() == BlobChunks.size()); CurrentBlobChunk = BlobChunks.front(); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp index d182916772..555a077b97 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp @@ -12,9 +12,8 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TChunkPreparation::DoInte std::vector<IPortionColumnChunk::TPtr> newChunks; for (auto&& i : chunks) { Y_VERIFY(i.GetSlicedBatch()->num_columns() == 1); - TColumnRecord newRecord(TChunkAddress(ColumnId, ChunkIdx), i.GetSlicedBatch()->column(0), SchemaInfo->GetIndexInfo()); newChunks.emplace_back(std::make_shared<TChunkPreparation>( - saver.Apply(i.GetSlicedBatch()), newRecord, SchemaInfo)); + saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), ColumnId, SchemaInfo)); } return newChunks; } @@ -23,7 +22,9 @@ std::shared_ptr<arrow::Array> TColumnPortion::AppendBlob(const TString& data, co if (CurrentPortionRecords + columnChunk.GetMeta().GetNumRowsVerified() <= Context.GetPortionRowsCountLimit() && columnChunk.GetMeta().GetRawBytesVerified() < Context.GetChunkRawBytesLimit() && data.size() < Context.GetChunkPackedBytesLimit() && - columnChunk.GetMeta().GetRawBytesVerified() > Context.GetStorePackedChunkSizeLimit() && Context.GetSaver().IsHardPacker()) { + columnChunk.GetMeta().GetRawBytesVerified() > Context.GetStorePackedChunkSizeLimit() && Context.GetSaver().IsHardPacker() && + Context.GetUseWholeChunksOptimization()) + { FlushBuffer(); Chunks.emplace_back(std::make_shared<TChunkPreparation>(data, columnChunk, Context.GetSchemaInfo())); PackedSize += Chunks.back()->GetPackedSize(); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h index 81609d16b0..842779d0f2 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h @@ -15,6 +15,8 @@ private: TString Data; TColumnRecord Record; ISnapshotSchema::TPtr SchemaInfo; + std::shared_ptr<arrow::Scalar> First; + std::shared_ptr<arrow::Scalar> Last; protected: virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const override; virtual const TString& DoGetData() const override { @@ -29,6 +31,13 @@ protected: virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const override { return Record.GetMeta(); } + virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override { + return First; + } + virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override { + return Last; + } + public: const TColumnRecord& GetRecord() const { return Record; @@ -47,6 +56,9 @@ public: , Data(data) , Record(TChunkAddress(columnId, 0), column, schema->GetIndexInfo()) , SchemaInfo(schema) { + Y_VERIFY(column->length()); + First = NArrow::TStatusValidator::GetValid(column->GetScalar(0)); + Last = NArrow::TStatusValidator::GetValid(column->GetScalar(column->length() - 1)); Record.BlobRange.Size = data.size(); } }; @@ -81,6 +93,10 @@ public: return CurrentPortionRecords; } + TString DebugString() const { + return TStringBuilder() << "chunks=" << Chunks.size() << ";records=" << CurrentPortionRecords << ";"; + } + }; class TColumnPortion: public TColumnPortionResult { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h b/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h index ab417a7d4a..9a8ff38b46 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h @@ -20,6 +20,7 @@ private: YDB_READONLY(ui64, ExpectedBlobPackedBytes, 4 * 1024 * 1024); YDB_READONLY(ui64, ChunkRawBytesLimit, 50 * 1024 * 1024); YDB_READONLY(ui64, StorePackedChunkSizeLimit, 512 * 1024); + YDB_READONLY(bool, UseWholeChunksOptimization, true); TColumnSerializationStat ColumnStat; const TIndexInfo& IndexInfo; @@ -49,8 +50,10 @@ public: , Field(f) , PortionRowsCountLimit(portionRowsCountLimit) , ChunkRawBytesLimit(chunkRawBytesLimit) + , UseWholeChunksOptimization(!schema->GetIndexInfo().GetReplaceKey()->GetFieldByName(f->name())) , ColumnStat(columnStat) - , IndexInfo(schema->GetIndexInfo()) { + , IndexInfo(schema->GetIndexInfo()) + { Y_VERIFY(PortionRowsCountLimit); Y_VERIFY(ChunkRawBytesLimit); } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp index 6d47706120..5d219f7914 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp @@ -3,6 +3,7 @@ namespace NKikimr::NOlap::NCompaction { void TMergedColumn::AppendBlob(const TString& data, const TColumnRecord& columnChunk) { + RecordsCount += columnChunk.GetMeta().GetNumRowsVerified(); auto remained = Portions.back().AppendBlob(data, columnChunk); while (remained) { Y_VERIFY(Portions.back().IsFullPortion()); @@ -15,6 +16,7 @@ void TMergedColumn::AppendBlob(const TString& data, const TColumnRecord& columnC } void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::RecordBatch>& data) { + RecordsCount += data->num_rows(); Y_VERIFY(data); Y_VERIFY(data->num_columns() == 1); auto remained = data->column(0); @@ -28,6 +30,7 @@ void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::RecordBatch>& data) } void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::Array>& data) { + RecordsCount += data->length(); Y_VERIFY(data); auto remained = data; while (remained = Portions.back().AppendSlice(remained)) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h index 3d02c074a1..066dc93a7f 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h @@ -9,6 +9,7 @@ class TMergedColumn { private: TColumnMergeContext Context; YDB_READONLY_DEF(std::vector<TColumnPortion>, Portions); + YDB_READONLY(ui32, RecordsCount, 0); void NewPortion() { Portions.emplace_back(TColumnPortion(Context)); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 3fe4f8e565..fb9a45be10 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -10,11 +10,13 @@ #include <ydb/core/tx/columnshard/splitter/rb_splitter.h> #include <ydb/core/formats/arrow/simple_builder/array.h> #include <ydb/core/formats/arrow/simple_builder/filler.h> +#include "../reader/read_filter_merger.h" namespace NKikimr::NOlap::NCompaction { TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs); + Blobs.clear(); std::optional<TSnapshot> maxSnapshot; for (auto&& i : SwitchedPortions) { if (!maxSnapshot || *maxSnapshot < i.GetMinSnapshot()) { @@ -25,44 +27,44 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc static const TString portionIdFieldName = "$$__portion_id"; static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; + static const std::shared_ptr<arrow::Field> portionIdField = std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>()); + static const std::shared_ptr<arrow::Field> portionRecordIndexField = std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>()); - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; auto resultSchema = context.SchemaVersions.GetLastSchema(); std::vector<std::string> pkFieldNames = resultSchema->GetIndexInfo().GetReplaceKey()->field_names(); std::set<std::string> pkFieldNamesSet(pkFieldNames.begin(), pkFieldNames.end()); for (auto&& i : TIndexInfo::GetSpecialColumnNames()) { pkFieldNamesSet.emplace(i); } - ui32 idx = 0; - for (auto&& i : portions) { - auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot()); - auto batch = i.GetBatch(dataSchema, *resultSchema, pkFieldNamesSet); - { - NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(portionIdFieldName, idx++); - batch = - NArrow::TStatusValidator::GetValid( - batch->AddColumn(batch->num_columns(), - std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>()), - column->BuildArray(batch->num_rows())) - ); - } - { - NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(portionRecordIndexFieldName); - batch = - NArrow::TStatusValidator::GetValid( - batch->AddColumn(batch->num_columns(), - std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>()), - column->BuildArray(batch->num_rows())) - ); - } - batches.emplace_back(batch); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey())); - } - auto merged = NArrow::MergeSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription(), Max<size_t>()); - Y_VERIFY(merged.size() == 1); - auto batchResult = merged.front(); + std::shared_ptr<arrow::RecordBatch> batchResult; + { + arrow::FieldVector indexFields; + indexFields.emplace_back(portionIdField); + indexFields.emplace_back(portionRecordIndexField); + auto dataSchema = std::make_shared<arrow::Schema>(indexFields); + NIndexedReader::TMergePartialStream mergeStream(resultSchema->GetIndexInfo().GetReplaceKey(), dataSchema, false); + ui32 idx = 0; + for (auto&& i : portions) { + auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot()); + auto batch = i.GetBatch(dataSchema, *resultSchema, pkFieldNamesSet); + { + NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(portionIdFieldName, idx++); + batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), portionIdField, column->BuildArray(batch->num_rows()))); + } + { + NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(portionRecordIndexFieldName); + batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), portionRecordIndexField, column->BuildArray(batch->num_rows()))); + } + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey())); + mergeStream.AddPoolSource({}, batch, nullptr); + } + NIndexedReader::TRecordBatchBuilder indexesBuilder(indexFields); + mergeStream.DrainAll(indexesBuilder); + batchResult = indexesBuilder.Finalize(); + } + auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); Y_VERIFY(columnPortionIdx && columnPortionRecordIdx); @@ -86,14 +88,17 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc Y_VERIFY(columnInfo); TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext); TMergedColumn mColumn(context); - auto c = batchResult->GetColumnByName(f->name()); - if (c) { - mColumn.AppendSlice(c); - } else { + { + auto c = batchResult->GetColumnByName(f->name()); + AFL_VERIFY(!c); + AFL_VERIFY(batchResult->num_rows() == pIdxArray.length()); std::vector<TPortionColumnCursor> cursors; auto loader = resultSchema->GetColumnLoader(f->name()); for (auto&& p : portions) { - cursors.emplace_back(TPortionColumnCursor(p, columnId, loader)); + std::vector<const TColumnRecord*> records; + std::vector<IPortionColumnChunk::TPtr> chunks; + p.ExtractColumnChunks(columnId, records, chunks); + cursors.emplace_back(TPortionColumnCursor(chunks, records, loader)); } std::optional<ui16> predPortionIdx; for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) { @@ -110,12 +115,18 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc predPortionIdx = portionIdx; } } + AFL_VERIFY(mColumn.GetRecordsCount() == batchResult->num_rows())("f_name", f->name())("mCount", mColumn.GetRecordsCount())("bCount", batchResult->num_rows()); columnChunks[f->name()] = mColumn.BuildResult(); } Y_VERIFY(columnChunks.size()); for (auto&& i : columnChunks) { + if (i.second.size() != columnChunks.begin()->second.size()) { + for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString()); + } + } AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); } @@ -133,14 +144,12 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc TSimilarSlicer slicer(4 * 1024 * 1024); auto packs = slicer.Split(batchSlices); - ui32 recordIdx = 0; for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i)); std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs(); - auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator())); - AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, b, SaverContext.GetTierName()); - recordIdx += slice.GetRecordsCount(); + NArrow::TFirstLastSpecialKeys specialKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); + AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, specialKeys, SaverContext.GetTierName()); } if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { TStringBuilder sbSwitched; diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index a460b22d8a..93acea9f53 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -158,12 +158,17 @@ protected: virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const override { return ColumnRecord.GetMeta(); } + virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override { + return nullptr; + } + virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override { + return nullptr; + } public: - TSimpleOrderedColumnChunk(const TColumnRecord& cRecord, const TString& data) : TBase(cRecord.ColumnId) , ColumnRecord(cRecord) - , Data(std::move(data)) { + , Data(data) { ChunkIdx = cRecord.Chunk; } }; diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp index 1f05635ab2..38f1d58fa7 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp @@ -47,6 +47,19 @@ void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& own } } +void TPortionInfoWithBlobs::TBlobInfo::ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap) { + const auto pred = [this, &resultMap, columnId](const IPortionColumnChunk::TPtr& chunk) { + if (chunk->GetColumnId() == columnId) { + resultMap.emplace(chunk->GetChunkAddress(), chunk); + Chunks.erase(chunk->GetChunkAddress()); + return true; + } else { + return false; + } + }; + ChunksOrdered.erase(std::remove_if(ChunksOrdered.begin(), ChunksOrdered.end(), pred), ChunksOrdered.end()); +} + std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames) const { Y_VERIFY(data); if (columnNames.empty()) { @@ -183,4 +196,19 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TPortionInfoWithBlobs::Ge return result; } +void TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks) { + records = GetPortionInfo().GetColumnChunksPointers(columnId); + std::map<TChunkAddress, IPortionColumnChunk::TPtr> chunksMap; + for (auto&& i : Blobs) { + i.ExtractColumnChunks(columnId, chunksMap); + } + std::vector<IPortionColumnChunk::TPtr> chunksLocal; + for (auto&& i : chunksMap) { + Y_VERIFY(i.first.GetColumnId() == columnId); + Y_VERIFY(i.first.GetChunk() == chunksLocal.size()); + chunksLocal.emplace_back(i.second); + } + std::swap(chunksLocal, chunks); +} + } diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h index c4a68771f8..942982afb2 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h @@ -25,8 +25,7 @@ public: public: TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion) : OwnerBlob(&blob) - , OwnerPortion(&portion) - { + , OwnerPortion(&portion) { } const TColumnRecord& AddChunk(const IPortionColumnChunk::TPtr& chunk) { @@ -50,6 +49,8 @@ public: } void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); + void ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap); + }; private: TPortionInfo PortionInfo; @@ -83,6 +84,8 @@ public: std::vector<IPortionColumnChunk::TPtr> GetColumnChunks(const ui32 columnId) const; + void ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks); + ui64 GetSize() const { return PortionInfo.BlobsBytes(); } diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index 9a9fea5355..2dbe00f629 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -95,7 +95,29 @@ protected: std::shared_ptr<NColumnShard::TSplitterCounters> Counters; TSplitSettings Settings; TGeneralSerializedSlice() = default; + + const TSplittedColumn& GetColumnVerified(const std::string& fieldName) const { + for (auto&& i : Columns) { + if (i.GetField()->name() == fieldName) { + return i; + } + } + Y_VERIFY(false); + return Columns.front(); + } public: + std::shared_ptr<arrow::RecordBatch> GetFirstLastPKBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const { + std::vector<std::shared_ptr<arrow::Array>> pkColumns; + for (auto&& i : pkSchema->fields()) { + auto aBuilder = NArrow::MakeBuilder(i); + const TSplittedColumn& splittedColumn = GetColumnVerified(i->name()); + NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetFirstScalar())); + NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetLastScalar())); + pkColumns.emplace_back(NArrow::TStatusValidator::GetValid(aBuilder->Finish())); + } + return arrow::RecordBatch::Make(pkSchema, 2, pkColumns); + } + ui64 GetSize() const { return Size; } diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 135c875b15..9329cb4075 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -2,6 +2,7 @@ #include "chunk_meta.h" #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> #include <ydb/core/tx/columnshard/counters/splitter.h> +#include <ydb/core/tx/columnshard/engines/portions/common.h> namespace NKikimr::NOlap { @@ -19,6 +20,8 @@ protected: virtual ui32 DoGetRecordsCount() const = 0; virtual TString DoDebugString() const = 0; virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const = 0; + virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const = 0; + virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const = 0; public: IPortionColumnChunk(const ui32 columnId) @@ -28,6 +31,21 @@ public: } virtual ~IPortionColumnChunk() = default; + TChunkAddress GetChunkAddress() const { + return TChunkAddress(ColumnId, ChunkIdx); + } + + std::shared_ptr<arrow::Scalar> GetFirstScalar() const { + auto result = DoGetFirstScalar(); + Y_VERIFY(result); + return result; + } + std::shared_ptr<arrow::Scalar> GetLastScalar() const { + auto result = DoGetLastScalar(); + Y_VERIFY(result); + return result; + } + TSimpleChunkMeta BuildSimpleChunkMeta() const { return DoBuildSimpleChunkMeta(); } diff --git a/ydb/core/tx/columnshard/splitter/column_info.h b/ydb/core/tx/columnshard/splitter/column_info.h index 15048eac81..1d3479663b 100644 --- a/ydb/core/tx/columnshard/splitter/column_info.h +++ b/ydb/core/tx/columnshard/splitter/column_info.h @@ -15,6 +15,17 @@ private: YDB_READONLY_DEF(std::shared_ptr<arrow::Field>, Field); YDB_READONLY_DEF(std::vector<IPortionColumnChunk::TPtr>, Chunks); public: + + std::shared_ptr<arrow::Scalar> GetFirstScalar() const { + Y_VERIFY(Chunks.size()); + return Chunks.front()->GetFirstScalar(); + } + + std::shared_ptr<arrow::Scalar> GetLastScalar() const { + Y_VERIFY(Chunks.size()); + return Chunks.back()->GetLastScalar(); + } + void Merge(TSplittedColumn&& c) { Size += c.Size; Y_VERIFY(Field->name() == c.Field->name()); diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp index 327c7b1885..edef7301c3 100644 --- a/ydb/core/tx/columnshard/splitter/simple.cpp +++ b/ydb/core/tx/columnshard/splitter/simple.cpp @@ -1,5 +1,6 @@ #include "simple.h" #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/common/validation.h> #include <util/string/join.h> namespace NKikimr::NOlap { @@ -112,4 +113,12 @@ std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitBySizes(std::shared_ptr<a return SplitByRecordsCount(data, recordsCount); } +std::shared_ptr<arrow::Scalar> TSaverSplittedChunk::GetFirstScalar() const { + return NArrow::TStatusValidator::GetValid(SlicedBatch->column(0)->GetScalar(0)); +} + +std::shared_ptr<arrow::Scalar> TSaverSplittedChunk::GetLastScalar() const { + return NArrow::TStatusValidator::GetValid(SlicedBatch->column(0)->GetScalar(GetRecordsCount() - 1)); +} + } diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h index 661c1443fa..2b68111ab1 100644 --- a/ydb/core/tx/columnshard/splitter/simple.h +++ b/ydb/core/tx/columnshard/splitter/simple.h @@ -21,12 +21,16 @@ public: return SlicedBatch->num_rows(); } + std::shared_ptr<arrow::Scalar> GetFirstScalar() const; + std::shared_ptr<arrow::Scalar> GetLastScalar() const; + TSaverSplittedChunk(std::shared_ptr<arrow::RecordBatch> batch, TString&& serializedChunk) : SlicedBatch(batch) , SerializedChunk(std::move(serializedChunk)) { Y_VERIFY(SlicedBatch); Y_VERIFY(SlicedBatch->num_columns() == 1); + Y_VERIFY(SlicedBatch->num_rows()); } @@ -162,8 +166,14 @@ protected: return TSimpleChunkMeta(Data.GetColumn(), SchemaInfo->NeedMinMaxForColumn(ColumnId), SchemaInfo->IsSortedColumn(ColumnId)); } -public: + virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override { + return Data.GetFirstScalar(); + } + virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override { + return Data.GetLastScalar(); + } +public: i64 GetSize() const { return Data.GetSerializedChunk().size(); } |