aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 09:07:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 09:38:31 +0300
commit82c27d07b8cca223d3ec075911da0541c782b7b7 (patch)
treec808b84ca981d2851fa008f7a6362ee7da5aec2a
parent0c7c463bde024e085f92b83fb9128526ab3c1acb (diff)
downloadydb-82c27d07b8cca223d3ec075911da0541c782b7b7.tar.gz
KIKIMR-19216: use special keys for meta construction
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp11
-rw-r--r--ydb/core/formats/arrow/permutations.cpp11
-rw-r--r--ydb/core/formats/arrow/permutations.h1
-rw-r--r--ydb/core/formats/arrow/special_keys.cpp13
-rw-r--r--ydb/core/formats/arrow/special_keys.h4
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h3
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h11
9 files changed, 48 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 542e9efc80..935ab44062 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -502,8 +502,7 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared
for (auto& field : schema->fields()) {
std::unique_ptr<arrow::ArrayBuilder> builder;
- auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder);
- Y_VERIFY_OK(status);
+ TStatusValidator::Validate(arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder));
if (sizeByColumn.size()) {
auto it = sizeByColumn.find(field->name());
if (it != sizeByColumn.end()) {
@@ -512,7 +511,7 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared
}
if (reserve) {
- Y_VERIFY_OK(builder->Reserve(reserve));
+ TStatusValidator::Validate(builder->Reserve(reserve));
}
builders.emplace_back(std::move(builder));
@@ -523,8 +522,7 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared
std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field) {
std::unique_ptr<arrow::ArrayBuilder> builder;
- auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder);
- Y_VERIFY_OK(status);
+ TStatusValidator::Validate(arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder));
return std::move(builder);
}
@@ -532,8 +530,7 @@ std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<ar
std::vector<std::shared_ptr<arrow::Array>> out;
for (auto& builder : builders) {
std::shared_ptr<arrow::Array> array;
- auto status = builder->Finish(&array);
- Y_VERIFY_OK(status);
+ TStatusValidator::Validate(builder->Finish(&array));
out.emplace_back(array);
}
return out;
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp
index ed7f410b8a..d7405120c4 100644
--- a/ydb/core/formats/arrow/permutations.cpp
+++ b/ydb/core/formats/arrow/permutations.cpp
@@ -110,6 +110,15 @@ std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64
return out;
}
+std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes) {
+ auto schema = source->schema();
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ for (auto&& i : source->columns()) {
+ columns.emplace_back(CopyRecords(i, indexes));
+ }
+ return arrow::RecordBatch::Make(schema, indexes.size(), columns);
+}
+
std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& source, const std::vector<ui64>& indexes) {
if (!source) {
return source;
@@ -127,7 +136,9 @@ std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& s
{
auto& builderImpl = static_cast<TBuilder&>(*builder);
+ const ui32 arraySize = column.length();
for (auto&& i : indexes) {
+ Y_VERIFY(i < arraySize);
TStatusValidator::Validate(builderImpl.Append(column.GetView(i)));
}
}
diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h
index 9a1175887b..3e380f7236 100644
--- a/ydb/core/formats/arrow/permutations.h
+++ b/ydb/core/formats/arrow/permutations.h
@@ -11,5 +11,6 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& source, const std::vector<ui64>& indexes);
+std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes);
}
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp
index f2204b662a..31f21b5295 100644
--- a/ydb/core/formats/arrow/special_keys.cpp
+++ b/ydb/core/formats/arrow/special_keys.cpp
@@ -1,4 +1,5 @@
#include "special_keys.h"
+#include "permutations.h"
#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
@@ -33,19 +34,17 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch>
if (columnNames.size()) {
keyBatch = NArrow::ExtractColumns(batch, columnNames);
}
- std::vector<bool> bits(batch->num_rows(), false);
- bits[0] = true;
- bits[batch->num_rows() - 1] = true;
+ std::vector<ui64> indexes = {0};
+ if (batch->num_rows() > 1) {
+ indexes.emplace_back(batch->num_rows() - 1);
+ }
- auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
- Data = NArrow::TStatusValidator::GetValid(arrow::compute::Filter(keyBatch, filter)).record_batch();
- Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
+ Data = NArrow::CopyRecords(keyBatch, indexes);
}
TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data)
: TBase(data)
{
- Y_VERIFY(Data);
Y_VERIFY_DEBUG(Data->ValidateFull().ok());
Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
}
diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h
index 38451fdd7b..135e3aff11 100644
--- a/ydb/core/formats/arrow/special_keys.h
+++ b/ydb/core/formats/arrow/special_keys.h
@@ -32,6 +32,10 @@ class TFirstLastSpecialKeys: public TSpecialKeys {
private:
using TBase = TSpecialKeys;
public:
+ const std::shared_ptr<arrow::RecordBatch>& GetBatch() const {
+ return Data;
+ }
+
std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const {
return GetKeyByIndex(0, schema);
}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index c8712eb6cd..1d560b6107 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -5,9 +5,12 @@
namespace NKikimr::NOlap {
-void TPortionMeta::FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo) {
+void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo) {
+ auto& batch = specials.GetBatch();
+ AFL_VERIFY(batch->num_rows());
{
auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
+ AFL_VERIFY(keyBatch);
std::vector<bool> bits(batch->num_rows(), false);
bits[0] = true;
bits[batch->num_rows() - 1] = true; // it could be 0 if batch has one row
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index 99f7ca0983..ae4305e067 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/tx/columnshard/common/portion.h>
#include <ydb/core/formats/arrow/replace_key.h>
+#include <ydb/core/formats/arrow/special_keys.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
#include <ydb/library/accessor/accessor.h>
#include <util/stream/output.h>
@@ -26,7 +27,7 @@ public:
std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const;
- void FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo);
+ void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo);
EProduced GetProduced() const {
return Produced;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 115a5bb20f..5f20c393f5 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -28,11 +28,16 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record)
void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName) {
- const auto& indexInfo = snapshotSchema.GetIndexInfo();
Y_VERIFY(batch->num_rows() == NumRows());
+ AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(batch), tierName);
+}
+
+void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials,
+ const TString& tierName) {
+ const auto& indexInfo = snapshotSchema.GetIndexInfo();
Meta = {};
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
- Meta.FillBatchInfo(batch, indexInfo);
+ Meta.FillBatchInfo(specials, indexInfo);
Meta.SetTierName(tierName);
}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 67743fb931..3b6ff6a6a0 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -5,6 +5,7 @@
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/storage.h>
+#include <ydb/core/formats/arrow/special_keys.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
namespace NKikimr::NOlap {
@@ -190,7 +191,7 @@ public:
return {sum, max};
}
- ui64 BlobsBytes() const noexcept {
+ ui64 GetBlobBytes() const noexcept {
ui64 sum = 0;
for (const auto& rec : Records) {
sum += rec.BlobRange.Size;
@@ -198,6 +199,10 @@ public:
return sum;
}
+ ui64 BlobsBytes() const noexcept {
+ return GetBlobBytes();
+ }
+
bool IsVisible(const TSnapshot& snapshot) const {
if (Empty()) {
return false;
@@ -219,7 +224,9 @@ public:
void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta);
void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
- const TString& tierName);
+ const TString& tierName);
+ void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials,
+ const TString& tierName);
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;