diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 09:07:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 09:38:31 +0300 |
commit | 82c27d07b8cca223d3ec075911da0541c782b7b7 (patch) | |
tree | c808b84ca981d2851fa008f7a6362ee7da5aec2a | |
parent | 0c7c463bde024e085f92b83fb9128526ab3c1acb (diff) | |
download | ydb-82c27d07b8cca223d3ec075911da0541c782b7b7.tar.gz |
KIKIMR-19216: use special keys for meta construction
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.cpp | 11 | ||||
-rw-r--r-- | ydb/core/formats/arrow/permutations.cpp | 11 | ||||
-rw-r--r-- | ydb/core/formats/arrow/permutations.h | 1 | ||||
-rw-r--r-- | ydb/core/formats/arrow/special_keys.cpp | 13 | ||||
-rw-r--r-- | ydb/core/formats/arrow/special_keys.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portions/meta.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portions/meta.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portions/portion_info.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portions/portion_info.h | 11 |
9 files changed, 48 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 542e9efc80..935ab44062 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -502,8 +502,7 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared for (auto& field : schema->fields()) { std::unique_ptr<arrow::ArrayBuilder> builder; - auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder); - Y_VERIFY_OK(status); + TStatusValidator::Validate(arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder)); if (sizeByColumn.size()) { auto it = sizeByColumn.find(field->name()); if (it != sizeByColumn.end()) { @@ -512,7 +511,7 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared } if (reserve) { - Y_VERIFY_OK(builder->Reserve(reserve)); + TStatusValidator::Validate(builder->Reserve(reserve)); } builders.emplace_back(std::move(builder)); @@ -523,8 +522,7 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field) { std::unique_ptr<arrow::ArrayBuilder> builder; - auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder); - Y_VERIFY_OK(status); + TStatusValidator::Validate(arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder)); return std::move(builder); } @@ -532,8 +530,7 @@ std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<ar std::vector<std::shared_ptr<arrow::Array>> out; for (auto& builder : builders) { std::shared_ptr<arrow::Array> array; - auto status = builder->Finish(&array); - Y_VERIFY_OK(status); + TStatusValidator::Validate(builder->Finish(&array)); out.emplace_back(array); } return out; diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index ed7f410b8a..d7405120c4 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -110,6 +110,15 @@ std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64 return out; } +std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes) { + auto schema = source->schema(); + std::vector<std::shared_ptr<arrow::Array>> columns; + for (auto&& i : source->columns()) { + columns.emplace_back(CopyRecords(i, indexes)); + } + return arrow::RecordBatch::Make(schema, indexes.size(), columns); +} + std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& source, const std::vector<ui64>& indexes) { if (!source) { return source; @@ -127,7 +136,9 @@ std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& s { auto& builderImpl = static_cast<TBuilder&>(*builder); + const ui32 arraySize = column.length(); for (auto&& i : indexes) { + Y_VERIFY(i < arraySize); TStatusValidator::Validate(builderImpl.Append(column.GetView(i))); } } diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h index 9a1175887b..3e380f7236 100644 --- a/ydb/core/formats/arrow/permutations.h +++ b/ydb/core/formats/arrow/permutations.h @@ -11,5 +11,6 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique); std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& source, const std::vector<ui64>& indexes); +std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes); } diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index f2204b662a..31f21b5295 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -1,4 +1,5 @@ #include "special_keys.h" +#include "permutations.h" #include <ydb/core/formats/arrow/serializer/full.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/formats/arrow/arrow_filter.h> @@ -33,19 +34,17 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> if (columnNames.size()) { keyBatch = NArrow::ExtractColumns(batch, columnNames); } - std::vector<bool> bits(batch->num_rows(), false); - bits[0] = true; - bits[batch->num_rows() - 1] = true; + std::vector<ui64> indexes = {0}; + if (batch->num_rows() > 1) { + indexes.emplace_back(batch->num_rows() - 1); + } - auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows()); - Data = NArrow::TStatusValidator::GetValid(arrow::compute::Filter(keyBatch, filter)).record_batch(); - Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); + Data = NArrow::CopyRecords(keyBatch, indexes); } TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data) : TBase(data) { - Y_VERIFY(Data); Y_VERIFY_DEBUG(Data->ValidateFull().ok()); Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); } diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h index 38451fdd7b..135e3aff11 100644 --- a/ydb/core/formats/arrow/special_keys.h +++ b/ydb/core/formats/arrow/special_keys.h @@ -32,6 +32,10 @@ class TFirstLastSpecialKeys: public TSpecialKeys { private: using TBase = TSpecialKeys; public: + const std::shared_ptr<arrow::RecordBatch>& GetBatch() const { + return Data; + } + std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const { return GetKeyByIndex(0, schema); } diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index c8712eb6cd..1d560b6107 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -5,9 +5,12 @@ namespace NKikimr::NOlap { -void TPortionMeta::FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo) { +void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo) { + auto& batch = specials.GetBatch(); + AFL_VERIFY(batch->num_rows()); { auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); + AFL_VERIFY(keyBatch); std::vector<bool> bits(batch->num_rows(), false); bits[0] = true; bits[batch->num_rows() - 1] = true; // it could be 0 if batch has one row diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index 99f7ca0983..ae4305e067 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/tx/columnshard/common/portion.h> #include <ydb/core/formats/arrow/replace_key.h> +#include <ydb/core/formats/arrow/special_keys.h> #include <ydb/core/protos/tx_columnshard.pb.h> #include <ydb/library/accessor/accessor.h> #include <util/stream/output.h> @@ -26,7 +27,7 @@ public: std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const; - void FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo); + void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo); EProduced GetProduced() const { return Produced; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 115a5bb20f..5f20c393f5 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -28,11 +28,16 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) { - const auto& indexInfo = snapshotSchema.GetIndexInfo(); Y_VERIFY(batch->num_rows() == NumRows()); + AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(batch), tierName); +} + +void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials, + const TString& tierName) { + const auto& indexInfo = snapshotSchema.GetIndexInfo(); Meta = {}; Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); - Meta.FillBatchInfo(batch, indexInfo); + Meta.FillBatchInfo(specials, indexInfo); Meta.SetTierName(tierName); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 67743fb931..3b6ff6a6a0 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -5,6 +5,7 @@ #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> #include <ydb/core/tx/columnshard/blobs_action/abstract/storage.h> +#include <ydb/core/formats/arrow/special_keys.h> #include <ydb/library/yverify_stream/yverify_stream.h> namespace NKikimr::NOlap { @@ -190,7 +191,7 @@ public: return {sum, max}; } - ui64 BlobsBytes() const noexcept { + ui64 GetBlobBytes() const noexcept { ui64 sum = 0; for (const auto& rec : Records) { sum += rec.BlobRange.Size; @@ -198,6 +199,10 @@ public: return sum; } + ui64 BlobsBytes() const noexcept { + return GetBlobBytes(); + } + bool IsVisible(const TSnapshot& snapshot) const { if (Empty()) { return false; @@ -219,7 +224,9 @@ public: void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta); void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, - const TString& tierName); + const TString& tierName); + void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials, + const TString& tierName); std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; |