aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 12:39:21 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 13:04:39 +0300
commit9f05ad4a7c0f811fd2f7b6de45c1e1e7c3b29903 (patch)
tree07b4297f712e39fa421bb0925109e9a9db28257c
parent347d5c422477a8dc5399f447e5f0441c1bb0c502 (diff)
downloadydb-9f05ad4a7c0f811fd2f7b6de45c1e1e7c3b29903.tar.gz
KIKIMR-19211: general compaction withno assemble full data for pk
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h9
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h16
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp85
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h9
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.h7
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h22
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h18
-rw-r--r--ydb/core/tx/columnshard/splitter/column_info.h11
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.cpp9
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.h12
15 files changed, 191 insertions, 51 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
index 9202d1366c..e583cdfdcb 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
@@ -36,10 +36,11 @@ public:
bool Fetch(TMergedColumn& column);
- TPortionColumnCursor(const TPortionInfoWithBlobs& portionWithBlobs, const ui32 columnId, const std::shared_ptr<TColumnLoader> loader)
- : ColumnLoader(loader) {
- BlobChunks = portionWithBlobs.GetColumnChunks(columnId);
- ColumnChunks = portionWithBlobs.GetPortionInfo().GetColumnChunksPointers(columnId);
+ TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader> loader)
+ : BlobChunks(columnChunks)
+ , ColumnChunks(records)
+ , ColumnLoader(loader)
+ {
Y_VERIFY(BlobChunks.size());
Y_VERIFY(ColumnChunks.size() == BlobChunks.size());
CurrentBlobChunk = BlobChunks.front();
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp
index d182916772..555a077b97 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp
@@ -12,9 +12,8 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TChunkPreparation::DoInte
std::vector<IPortionColumnChunk::TPtr> newChunks;
for (auto&& i : chunks) {
Y_VERIFY(i.GetSlicedBatch()->num_columns() == 1);
- TColumnRecord newRecord(TChunkAddress(ColumnId, ChunkIdx), i.GetSlicedBatch()->column(0), SchemaInfo->GetIndexInfo());
newChunks.emplace_back(std::make_shared<TChunkPreparation>(
- saver.Apply(i.GetSlicedBatch()), newRecord, SchemaInfo));
+ saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), ColumnId, SchemaInfo));
}
return newChunks;
}
@@ -23,7 +22,9 @@ std::shared_ptr<arrow::Array> TColumnPortion::AppendBlob(const TString& data, co
if (CurrentPortionRecords + columnChunk.GetMeta().GetNumRowsVerified() <= Context.GetPortionRowsCountLimit() &&
columnChunk.GetMeta().GetRawBytesVerified() < Context.GetChunkRawBytesLimit() &&
data.size() < Context.GetChunkPackedBytesLimit() &&
- columnChunk.GetMeta().GetRawBytesVerified() > Context.GetStorePackedChunkSizeLimit() && Context.GetSaver().IsHardPacker()) {
+ columnChunk.GetMeta().GetRawBytesVerified() > Context.GetStorePackedChunkSizeLimit() && Context.GetSaver().IsHardPacker() &&
+ Context.GetUseWholeChunksOptimization())
+ {
FlushBuffer();
Chunks.emplace_back(std::make_shared<TChunkPreparation>(data, columnChunk, Context.GetSchemaInfo()));
PackedSize += Chunks.back()->GetPackedSize();
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h
index 81609d16b0..842779d0f2 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h
@@ -15,6 +15,8 @@ private:
TString Data;
TColumnRecord Record;
ISnapshotSchema::TPtr SchemaInfo;
+ std::shared_ptr<arrow::Scalar> First;
+ std::shared_ptr<arrow::Scalar> Last;
protected:
virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const override;
virtual const TString& DoGetData() const override {
@@ -29,6 +31,13 @@ protected:
virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const override {
return Record.GetMeta();
}
+ virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override {
+ return First;
+ }
+ virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override {
+ return Last;
+ }
+
public:
const TColumnRecord& GetRecord() const {
return Record;
@@ -47,6 +56,9 @@ public:
, Data(data)
, Record(TChunkAddress(columnId, 0), column, schema->GetIndexInfo())
, SchemaInfo(schema) {
+ Y_VERIFY(column->length());
+ First = NArrow::TStatusValidator::GetValid(column->GetScalar(0));
+ Last = NArrow::TStatusValidator::GetValid(column->GetScalar(column->length() - 1));
Record.BlobRange.Size = data.size();
}
};
@@ -81,6 +93,10 @@ public:
return CurrentPortionRecords;
}
+ TString DebugString() const {
+ return TStringBuilder() << "chunks=" << Chunks.size() << ";records=" << CurrentPortionRecords << ";";
+ }
+
};
class TColumnPortion: public TColumnPortionResult {
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h b/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h
index ab417a7d4a..9a8ff38b46 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h
@@ -20,6 +20,7 @@ private:
YDB_READONLY(ui64, ExpectedBlobPackedBytes, 4 * 1024 * 1024);
YDB_READONLY(ui64, ChunkRawBytesLimit, 50 * 1024 * 1024);
YDB_READONLY(ui64, StorePackedChunkSizeLimit, 512 * 1024);
+ YDB_READONLY(bool, UseWholeChunksOptimization, true);
TColumnSerializationStat ColumnStat;
const TIndexInfo& IndexInfo;
@@ -49,8 +50,10 @@ public:
, Field(f)
, PortionRowsCountLimit(portionRowsCountLimit)
, ChunkRawBytesLimit(chunkRawBytesLimit)
+ , UseWholeChunksOptimization(!schema->GetIndexInfo().GetReplaceKey()->GetFieldByName(f->name()))
, ColumnStat(columnStat)
- , IndexInfo(schema->GetIndexInfo()) {
+ , IndexInfo(schema->GetIndexInfo())
+ {
Y_VERIFY(PortionRowsCountLimit);
Y_VERIFY(ChunkRawBytesLimit);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp
index 6d47706120..5d219f7914 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp
@@ -3,6 +3,7 @@
namespace NKikimr::NOlap::NCompaction {
void TMergedColumn::AppendBlob(const TString& data, const TColumnRecord& columnChunk) {
+ RecordsCount += columnChunk.GetMeta().GetNumRowsVerified();
auto remained = Portions.back().AppendBlob(data, columnChunk);
while (remained) {
Y_VERIFY(Portions.back().IsFullPortion());
@@ -15,6 +16,7 @@ void TMergedColumn::AppendBlob(const TString& data, const TColumnRecord& columnC
}
void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::RecordBatch>& data) {
+ RecordsCount += data->num_rows();
Y_VERIFY(data);
Y_VERIFY(data->num_columns() == 1);
auto remained = data->column(0);
@@ -28,6 +30,7 @@ void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::RecordBatch>& data)
}
void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::Array>& data) {
+ RecordsCount += data->length();
Y_VERIFY(data);
auto remained = data;
while (remained = Portions.back().AppendSlice(remained)) {
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h
index 3d02c074a1..066dc93a7f 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h
@@ -9,6 +9,7 @@ class TMergedColumn {
private:
TColumnMergeContext Context;
YDB_READONLY_DEF(std::vector<TColumnPortion>, Portions);
+ YDB_READONLY(ui32, RecordsCount, 0);
void NewPortion() {
Portions.emplace_back(TColumnPortion(Context));
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index 3fe4f8e565..fb9a45be10 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -10,11 +10,13 @@
#include <ydb/core/tx/columnshard/splitter/rb_splitter.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include "../reader/read_filter_merger.h"
namespace NKikimr::NOlap::NCompaction {
TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs);
+ Blobs.clear();
std::optional<TSnapshot> maxSnapshot;
for (auto&& i : SwitchedPortions) {
if (!maxSnapshot || *maxSnapshot < i.GetMinSnapshot()) {
@@ -25,44 +27,44 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
static const TString portionIdFieldName = "$$__portion_id";
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
+ static const std::shared_ptr<arrow::Field> portionIdField = std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
+ static const std::shared_ptr<arrow::Field> portionRecordIndexField = std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
auto resultSchema = context.SchemaVersions.GetLastSchema();
std::vector<std::string> pkFieldNames = resultSchema->GetIndexInfo().GetReplaceKey()->field_names();
std::set<std::string> pkFieldNamesSet(pkFieldNames.begin(), pkFieldNames.end());
for (auto&& i : TIndexInfo::GetSpecialColumnNames()) {
pkFieldNamesSet.emplace(i);
}
- ui32 idx = 0;
- for (auto&& i : portions) {
- auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot());
- auto batch = i.GetBatch(dataSchema, *resultSchema, pkFieldNamesSet);
- {
- NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(portionIdFieldName, idx++);
- batch =
- NArrow::TStatusValidator::GetValid(
- batch->AddColumn(batch->num_columns(),
- std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>()),
- column->BuildArray(batch->num_rows()))
- );
- }
- {
- NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(portionRecordIndexFieldName);
- batch =
- NArrow::TStatusValidator::GetValid(
- batch->AddColumn(batch->num_columns(),
- std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>()),
- column->BuildArray(batch->num_rows()))
- );
- }
- batches.emplace_back(batch);
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey()));
- }
- auto merged = NArrow::MergeSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription(), Max<size_t>());
- Y_VERIFY(merged.size() == 1);
- auto batchResult = merged.front();
+ std::shared_ptr<arrow::RecordBatch> batchResult;
+ {
+ arrow::FieldVector indexFields;
+ indexFields.emplace_back(portionIdField);
+ indexFields.emplace_back(portionRecordIndexField);
+ auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
+ NIndexedReader::TMergePartialStream mergeStream(resultSchema->GetIndexInfo().GetReplaceKey(), dataSchema, false);
+ ui32 idx = 0;
+ for (auto&& i : portions) {
+ auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot());
+ auto batch = i.GetBatch(dataSchema, *resultSchema, pkFieldNamesSet);
+ {
+ NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(portionIdFieldName, idx++);
+ batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), portionIdField, column->BuildArray(batch->num_rows())));
+ }
+ {
+ NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(portionRecordIndexFieldName);
+ batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), portionRecordIndexField, column->BuildArray(batch->num_rows())));
+ }
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey()));
+ mergeStream.AddPoolSource({}, batch, nullptr);
+ }
+ NIndexedReader::TRecordBatchBuilder indexesBuilder(indexFields);
+ mergeStream.DrainAll(indexesBuilder);
+ batchResult = indexesBuilder.Finalize();
+ }
+
auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
Y_VERIFY(columnPortionIdx && columnPortionRecordIdx);
@@ -86,14 +88,17 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
Y_VERIFY(columnInfo);
TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext);
TMergedColumn mColumn(context);
- auto c = batchResult->GetColumnByName(f->name());
- if (c) {
- mColumn.AppendSlice(c);
- } else {
+ {
+ auto c = batchResult->GetColumnByName(f->name());
+ AFL_VERIFY(!c);
+ AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
std::vector<TPortionColumnCursor> cursors;
auto loader = resultSchema->GetColumnLoader(f->name());
for (auto&& p : portions) {
- cursors.emplace_back(TPortionColumnCursor(p, columnId, loader));
+ std::vector<const TColumnRecord*> records;
+ std::vector<IPortionColumnChunk::TPtr> chunks;
+ p.ExtractColumnChunks(columnId, records, chunks);
+ cursors.emplace_back(TPortionColumnCursor(chunks, records, loader));
}
std::optional<ui16> predPortionIdx;
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
@@ -110,12 +115,18 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
predPortionIdx = portionIdx;
}
}
+ AFL_VERIFY(mColumn.GetRecordsCount() == batchResult->num_rows())("f_name", f->name())("mCount", mColumn.GetRecordsCount())("bCount", batchResult->num_rows());
columnChunks[f->name()] = mColumn.BuildResult();
}
Y_VERIFY(columnChunks.size());
for (auto&& i : columnChunks) {
+ if (i.second.size() != columnChunks.begin()->second.size()) {
+ for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString());
+ }
+ }
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
}
@@ -133,14 +144,12 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
TSimilarSlicer slicer(4 * 1024 * 1024);
auto packs = slicer.Split(batchSlices);
- ui32 recordIdx = 0;
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
- auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator()));
- AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, b, SaverContext.GetTierName());
- recordIdx += slice.GetRecordsCount();
+ NArrow::TFirstLastSpecialKeys specialKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
+ AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, specialKeys, SaverContext.GetTierName());
}
if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
TStringBuilder sbSwitched;
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
index a460b22d8a..93acea9f53 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.h
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -158,12 +158,17 @@ protected:
virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const override {
return ColumnRecord.GetMeta();
}
+ virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override {
+ return nullptr;
+ }
+ virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override {
+ return nullptr;
+ }
public:
-
TSimpleOrderedColumnChunk(const TColumnRecord& cRecord, const TString& data)
: TBase(cRecord.ColumnId)
, ColumnRecord(cRecord)
- , Data(std::move(data)) {
+ , Data(data) {
ChunkIdx = cRecord.Chunk;
}
};
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
index 1f05635ab2..38f1d58fa7 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
@@ -47,6 +47,19 @@ void TPortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TPortionInfoWithBlobs& own
}
}
+void TPortionInfoWithBlobs::TBlobInfo::ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap) {
+ const auto pred = [this, &resultMap, columnId](const IPortionColumnChunk::TPtr& chunk) {
+ if (chunk->GetColumnId() == columnId) {
+ resultMap.emplace(chunk->GetChunkAddress(), chunk);
+ Chunks.erase(chunk->GetChunkAddress());
+ return true;
+ } else {
+ return false;
+ }
+ };
+ ChunksOrdered.erase(std::remove_if(ChunksOrdered.begin(), ChunksOrdered.end(), pred), ChunksOrdered.end());
+}
+
std::shared_ptr<arrow::RecordBatch> TPortionInfoWithBlobs::GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames) const {
Y_VERIFY(data);
if (columnNames.empty()) {
@@ -183,4 +196,19 @@ std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TPortionInfoWithBlobs::Ge
return result;
}
+void TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks) {
+ records = GetPortionInfo().GetColumnChunksPointers(columnId);
+ std::map<TChunkAddress, IPortionColumnChunk::TPtr> chunksMap;
+ for (auto&& i : Blobs) {
+ i.ExtractColumnChunks(columnId, chunksMap);
+ }
+ std::vector<IPortionColumnChunk::TPtr> chunksLocal;
+ for (auto&& i : chunksMap) {
+ Y_VERIFY(i.first.GetColumnId() == columnId);
+ Y_VERIFY(i.first.GetChunk() == chunksLocal.size());
+ chunksLocal.emplace_back(i.second);
+ }
+ std::swap(chunksLocal, chunks);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
index c4a68771f8..942982afb2 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
@@ -25,8 +25,7 @@ public:
public:
TBuilder(TBlobInfo& blob, TPortionInfoWithBlobs& portion)
: OwnerBlob(&blob)
- , OwnerPortion(&portion)
- {
+ , OwnerPortion(&portion) {
}
const TColumnRecord& AddChunk(const IPortionColumnChunk::TPtr& chunk) {
@@ -50,6 +49,8 @@ public:
}
void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId);
+ void ExtractColumnChunks(const ui32 columnId, std::map<TChunkAddress, IPortionColumnChunk::TPtr>& resultMap);
+
};
private:
TPortionInfo PortionInfo;
@@ -83,6 +84,8 @@ public:
std::vector<IPortionColumnChunk::TPtr> GetColumnChunks(const ui32 columnId) const;
+ void ExtractColumnChunks(const ui32 columnId, std::vector<const TColumnRecord*>& records, std::vector<IPortionColumnChunk::TPtr>& chunks);
+
ui64 GetSize() const {
return PortionInfo.BlobsBytes();
}
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index 9a9fea5355..2dbe00f629 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -95,7 +95,29 @@ protected:
std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
TSplitSettings Settings;
TGeneralSerializedSlice() = default;
+
+ const TSplittedColumn& GetColumnVerified(const std::string& fieldName) const {
+ for (auto&& i : Columns) {
+ if (i.GetField()->name() == fieldName) {
+ return i;
+ }
+ }
+ Y_VERIFY(false);
+ return Columns.front();
+ }
public:
+ std::shared_ptr<arrow::RecordBatch> GetFirstLastPKBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const {
+ std::vector<std::shared_ptr<arrow::Array>> pkColumns;
+ for (auto&& i : pkSchema->fields()) {
+ auto aBuilder = NArrow::MakeBuilder(i);
+ const TSplittedColumn& splittedColumn = GetColumnVerified(i->name());
+ NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetFirstScalar()));
+ NArrow::TStatusValidator::Validate(aBuilder->AppendScalar(*splittedColumn.GetLastScalar()));
+ pkColumns.emplace_back(NArrow::TStatusValidator::GetValid(aBuilder->Finish()));
+ }
+ return arrow::RecordBatch::Make(pkSchema, 2, pkColumns);
+ }
+
ui64 GetSize() const {
return Size;
}
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index 135c875b15..9329cb4075 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -2,6 +2,7 @@
#include "chunk_meta.h"
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/counters/splitter.h>
+#include <ydb/core/tx/columnshard/engines/portions/common.h>
namespace NKikimr::NOlap {
@@ -19,6 +20,8 @@ protected:
virtual ui32 DoGetRecordsCount() const = 0;
virtual TString DoDebugString() const = 0;
virtual TSimpleChunkMeta DoBuildSimpleChunkMeta() const = 0;
+ virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const = 0;
+ virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const = 0;
public:
IPortionColumnChunk(const ui32 columnId)
@@ -28,6 +31,21 @@ public:
}
virtual ~IPortionColumnChunk() = default;
+ TChunkAddress GetChunkAddress() const {
+ return TChunkAddress(ColumnId, ChunkIdx);
+ }
+
+ std::shared_ptr<arrow::Scalar> GetFirstScalar() const {
+ auto result = DoGetFirstScalar();
+ Y_VERIFY(result);
+ return result;
+ }
+ std::shared_ptr<arrow::Scalar> GetLastScalar() const {
+ auto result = DoGetLastScalar();
+ Y_VERIFY(result);
+ return result;
+ }
+
TSimpleChunkMeta BuildSimpleChunkMeta() const {
return DoBuildSimpleChunkMeta();
}
diff --git a/ydb/core/tx/columnshard/splitter/column_info.h b/ydb/core/tx/columnshard/splitter/column_info.h
index 15048eac81..1d3479663b 100644
--- a/ydb/core/tx/columnshard/splitter/column_info.h
+++ b/ydb/core/tx/columnshard/splitter/column_info.h
@@ -15,6 +15,17 @@ private:
YDB_READONLY_DEF(std::shared_ptr<arrow::Field>, Field);
YDB_READONLY_DEF(std::vector<IPortionColumnChunk::TPtr>, Chunks);
public:
+
+ std::shared_ptr<arrow::Scalar> GetFirstScalar() const {
+ Y_VERIFY(Chunks.size());
+ return Chunks.front()->GetFirstScalar();
+ }
+
+ std::shared_ptr<arrow::Scalar> GetLastScalar() const {
+ Y_VERIFY(Chunks.size());
+ return Chunks.back()->GetLastScalar();
+ }
+
void Merge(TSplittedColumn&& c) {
Size += c.Size;
Y_VERIFY(Field->name() == c.Field->name());
diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp
index 327c7b1885..edef7301c3 100644
--- a/ydb/core/tx/columnshard/splitter/simple.cpp
+++ b/ydb/core/tx/columnshard/splitter/simple.cpp
@@ -1,5 +1,6 @@
#include "simple.h"
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/common/validation.h>
#include <util/string/join.h>
namespace NKikimr::NOlap {
@@ -112,4 +113,12 @@ std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitBySizes(std::shared_ptr<a
return SplitByRecordsCount(data, recordsCount);
}
+std::shared_ptr<arrow::Scalar> TSaverSplittedChunk::GetFirstScalar() const {
+ return NArrow::TStatusValidator::GetValid(SlicedBatch->column(0)->GetScalar(0));
+}
+
+std::shared_ptr<arrow::Scalar> TSaverSplittedChunk::GetLastScalar() const {
+ return NArrow::TStatusValidator::GetValid(SlicedBatch->column(0)->GetScalar(GetRecordsCount() - 1));
+}
+
}
diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h
index 661c1443fa..2b68111ab1 100644
--- a/ydb/core/tx/columnshard/splitter/simple.h
+++ b/ydb/core/tx/columnshard/splitter/simple.h
@@ -21,12 +21,16 @@ public:
return SlicedBatch->num_rows();
}
+ std::shared_ptr<arrow::Scalar> GetFirstScalar() const;
+ std::shared_ptr<arrow::Scalar> GetLastScalar() const;
+
TSaverSplittedChunk(std::shared_ptr<arrow::RecordBatch> batch, TString&& serializedChunk)
: SlicedBatch(batch)
, SerializedChunk(std::move(serializedChunk))
{
Y_VERIFY(SlicedBatch);
Y_VERIFY(SlicedBatch->num_columns() == 1);
+ Y_VERIFY(SlicedBatch->num_rows());
}
@@ -162,8 +166,14 @@ protected:
return TSimpleChunkMeta(Data.GetColumn(), SchemaInfo->NeedMinMaxForColumn(ColumnId), SchemaInfo->IsSortedColumn(ColumnId));
}
-public:
+ virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override {
+ return Data.GetFirstScalar();
+ }
+ virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override {
+ return Data.GetLastScalar();
+ }
+public:
i64 GetSize() const {
return Data.GetSerializedChunk().size();
}