diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-31 16:45:35 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-31 17:28:28 +0300 |
commit | 294ac8703830421d1e940c702e3a2876471a1a84 (patch) | |
tree | bed6aabf8065fabcbffd167ab638a0df2ab76da5 | |
parent | d4e753164ac99107e86c79f7ea06900dd7203a3a (diff) | |
download | ydb-294ac8703830421d1e940c702e3a2876471a1a84.tar.gz |
KIKIMR-19093: helpers for arrow manipulation. and fix dependencies.
29 files changed, 161 insertions, 16 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt index 793eeea34b..9e497a26d7 100644 --- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt @@ -31,6 +31,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt index 57ebad2a1c..6dffeca31e 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt @@ -32,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt index 57ebad2a1c..6dffeca31e 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt index ae91024fc6..618a9d0bd7 100644 --- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 5dfa52f61b..0216b67394 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -159,9 +159,32 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns)); } +std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const std::vector<TString>& columnNames) { + if (columnNames.empty()) { + return nullptr; + } + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.reserve(columnNames.size()); + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(columnNames.size()); + + auto srcSchema = srcBatch->schema(); + for (auto& name : columnNames) { + int pos = srcSchema->GetFieldIndex(name); + AFL_VERIFY(pos >= 0)("field_name", name); + fields.push_back(srcSchema->field(pos)); + columns.push_back(srcBatch->column(pos)); + } + + return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns)); +} + std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::shared_ptr<arrow::Schema>& dstSchema, bool addNotExisted) { + Y_VERIFY(srcBatch); + Y_VERIFY(dstSchema); std::vector<std::shared_ptr<arrow::Array>> columns; columns.reserve(dstSchema->num_fields()); @@ -175,13 +198,15 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: } columns.back() = *result; } else { + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name()) + ("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names())); return nullptr; } } Y_VERIFY(columns.back()); if (!columns.back()->type()->Equals(field->type())) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name()) + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name()) ("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString()); return nullptr; } @@ -488,6 +513,13 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared return builders; } +std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field) { + std::unique_ptr<arrow::ArrayBuilder> builder; + auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder); + Y_VERIFY_OK(status); + return std::move(builder); +} + std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders) { std::vector<std::shared_ptr<arrow::Array>> out; for (auto& builder : builders) { @@ -876,4 +908,33 @@ std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::Recor return DeserializeBatch(SerializeBatch(original, arrow::ipc::IpcWriteOptions::Defaults()), original->schema()); } +std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb) { + std::vector<std::shared_ptr<arrow::Array>> columns; + std::vector<std::shared_ptr<arrow::Field>> fields; + std::optional<ui32> recordsCount; + std::set<std::string> columnNames; + for (auto&& i : rb) { + if (!i) { + continue; + } + for (auto&& c : i->columns()) { + columns.emplace_back(c); + if (!recordsCount) { + recordsCount = c->length(); + } else { + Y_VERIFY(*recordsCount == c->length()); + } + } + for (auto&& f : i->schema()->fields()) { + Y_VERIFY(columnNames.emplace(f->name()).second); + fields.emplace_back(f); + } + } + if (columns.empty()) { + return nullptr; + } + auto schema = std::make_shared<arrow::Schema>(fields); + return arrow::RecordBatch::Make(schema, *recordsCount, columns); +} + } diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index e79448025b..f71e4742b5 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -1,6 +1,5 @@ #pragma once #include "switch_type.h" -#include "size_calcer.h" #include <ydb/core/formats/factory.h> #include <ydb/core/scheme/scheme_tablecell.h> #include <library/cpp/json/writer/json_value.h> @@ -8,6 +7,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> +#include <ydb/library/accessor/accessor.h> namespace NKikimr::NArrow { @@ -57,6 +57,8 @@ std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow:: std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::vector<TString>& columnNames); +std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const std::vector<TString>& columnNames); std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::shared_ptr<arrow::Schema>& dstSchema, bool addNotExisted = false); @@ -73,6 +75,7 @@ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const std::shared_ptr<TSortDescription>& description); +std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb); std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const std::shared_ptr<TSortDescription>& description, size_t maxBatchRows); @@ -83,6 +86,8 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared const std::vector<ui32>& sharding, ui32 numShards); +std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field); + std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema, size_t reserve = 0, const std::map<std::string, ui64>& sizeByColumn = {}); std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders); diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt index f34bb025e5..37ff6ac325 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC ydb-core-protos formats-arrow-simple_builder formats-arrow-switch + cpp-actors-core ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt index 93955803c5..811088131e 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt @@ -16,6 +16,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC ydb-core-protos formats-arrow-simple_builder formats-arrow-switch + cpp-actors-core ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt index 93955803c5..811088131e 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC ydb-core-protos formats-arrow-simple_builder formats-arrow-switch + cpp-actors-core ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt index f34bb025e5..37ff6ac325 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC ydb-core-protos formats-arrow-simple_builder formats-arrow-switch + cpp-actors-core ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp diff --git a/ydb/core/formats/arrow/dictionary/ya.make b/ydb/core/formats/arrow/dictionary/ya.make index a48034f221..04e71d9bba 100644 --- a/ydb/core/formats/arrow/dictionary/ya.make +++ b/ydb/core/formats/arrow/dictionary/ya.make @@ -5,6 +5,7 @@ PEERDIR( ydb/core/protos ydb/core/formats/arrow/simple_builder ydb/core/formats/arrow/switch + library/cpp/actors/core ) SRCS( diff --git a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp b/ydb/core/formats/arrow/merging_sorted_input_stream.cpp index 6002d269a5..58d8bb2f88 100644 --- a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp +++ b/ydb/core/formats/arrow/merging_sorted_input_stream.cpp @@ -5,6 +5,7 @@ #include <queue> #include "merging_sorted_input_stream.h" #include "switch_type.h" +#include "size_calcer.h" namespace NKikimr::NArrow { diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h index 9c9d477e20..ede577eb68 100644 --- a/ydb/core/formats/arrow/replace_key.h +++ b/ydb/core/formats/arrow/replace_key.h @@ -3,10 +3,14 @@ #include "permutations.h" #include "common/validation.h" #include <ydb/core/base/defs.h> + +#include <library/cpp/actors/core/log.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h> #include <util/string/builder.h> +#include <util/string/join.h> #include <compare> @@ -163,7 +167,7 @@ public: template<typename T = TArrayVecPtr> requires IsOwning std::shared_ptr<arrow::RecordBatch> RestoreBatch(const std::shared_ptr<arrow::Schema>& schema) const { - Y_VERIFY(Size() && Size() == schema->num_fields()); + AFL_VERIFY(Size() && Size() == schema->num_fields())("columns", DebugString())("schema", JoinSeq(",", schema->field_names())); const auto& columns = *Columns; return arrow::RecordBatch::Make(schema, columns[0]->length(), columns); } diff --git a/ydb/core/formats/arrow/serializer/abstract.cpp b/ydb/core/formats/arrow/serializer/abstract.cpp index f893a2f063..13ee3d731b 100644 --- a/ydb/core/formats/arrow/serializer/abstract.cpp +++ b/ydb/core/formats/arrow/serializer/abstract.cpp @@ -1,4 +1,11 @@ #include "abstract.h" namespace NKikimr::NArrow::NSerialization { +arrow::Result<std::shared_ptr<arrow::RecordBatch>> IDeserializer::Deserialize(const TString& data) const { + if (!data) { + return nullptr; + } + return DoDeserialize(data); +} + } diff --git a/ydb/core/formats/arrow/serializer/abstract.h b/ydb/core/formats/arrow/serializer/abstract.h index 1419125aac..f21ac1d58f 100644 --- a/ydb/core/formats/arrow/serializer/abstract.h +++ b/ydb/core/formats/arrow/serializer/abstract.h @@ -16,6 +16,8 @@ public: TString Serialize(const std::shared_ptr<arrow::RecordBatch>& batch) const { return DoSerialize(batch); } + + virtual bool IsHardPacker() const = 0; }; class IDeserializer { @@ -30,9 +32,7 @@ public: return DoDebugString(); } - arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const { - return DoDeserialize(data); - } + arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const; }; } diff --git a/ydb/core/formats/arrow/serializer/batch_only.h b/ydb/core/formats/arrow/serializer/batch_only.h index 6c99134a29..b3dfda9e7f 100644 --- a/ydb/core/formats/arrow/serializer/batch_only.h +++ b/ydb/core/formats/arrow/serializer/batch_only.h @@ -11,6 +11,9 @@ private: protected: virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override; public: + virtual bool IsHardPacker() const override { + return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3; + } TBatchPayloadSerializer(const arrow::ipc::IpcWriteOptions& options) : Options(options) { diff --git a/ydb/core/formats/arrow/serializer/full.h b/ydb/core/formats/arrow/serializer/full.h index eb6bdc8ced..56761a3d75 100644 --- a/ydb/core/formats/arrow/serializer/full.h +++ b/ydb/core/formats/arrow/serializer/full.h @@ -13,6 +13,10 @@ private: protected: virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override; public: + virtual bool IsHardPacker() const override { + return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3; + } + TFullDataSerializer(const arrow::ipc::IpcWriteOptions& options) : Options(options) { diff --git a/ydb/core/formats/arrow/simple_builder/filler.h b/ydb/core/formats/arrow/simple_builder/filler.h index c2eab02914..e86e7a6c21 100644 --- a/ydb/core/formats/arrow/simple_builder/filler.h +++ b/ydb/core/formats/arrow/simple_builder/filler.h @@ -19,8 +19,24 @@ public: return Delta + idx; } TIntSeqFiller(const CType delta = 0) - : Delta(delta) - { + : Delta(delta) { + + } +}; + +template <class TArrowInt> +class TIntConstFiller { +public: + using TValue = TArrowInt; +private: + using CType = typename TArrowInt::c_type; + const CType Value; +public: + CType GetValue(const CType /*idx*/) const { + return Value; + } + TIntConstFiller(const CType value) + : Value(value) { } }; diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp index 56a8ee7c29..ccc4ec225e 100644 --- a/ydb/core/formats/arrow/size_calcer.cpp +++ b/ydb/core/formats/arrow/size_calcer.cpp @@ -215,7 +215,8 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) { } NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch) { - return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch)); + TFirstLastSpecialKeys specialKeys(batch); + return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys); } bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) { diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h index a7ca8095bf..4d14c0cf5a 100644 --- a/ydb/core/formats/arrow/size_calcer.h +++ b/ydb/core/formats/arrow/size_calcer.h @@ -1,4 +1,5 @@ #pragma once +#include "special_keys.h" #include <ydb/library/accessor/accessor.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> @@ -51,22 +52,28 @@ private: YDB_READONLY_DEF(TString, Data); YDB_READONLY(ui32, RowsCount, 0); YDB_READONLY(ui32, RawBytes, 0); + TFirstLastSpecialKeys SpecialKeys; public: size_t GetSize() const { return Data.size(); } + const TFirstLastSpecialKeys& GetSpecialKeys() const { + return SpecialKeys; + } + TString DebugString() const; static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::vector<TSerializedBatch>& result, TString* errorMessage); static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage); static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch); - TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes) + TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const TFirstLastSpecialKeys& specialKeys) : SchemaData(schemaData) , Data(data) , RowsCount(rowsCount) , RawBytes(rawBytes) + , SpecialKeys(specialKeys) { } diff --git a/ydb/core/formats/arrow/sort_cursor.h b/ydb/core/formats/arrow/sort_cursor.h index a12e0b554c..528240ba08 100644 --- a/ydb/core/formats/arrow/sort_cursor.h +++ b/ydb/core/formats/arrow/sort_cursor.h @@ -67,10 +67,14 @@ struct TSortCursorImpl { void Reset(std::shared_ptr<arrow::RecordBatch> batch) { current_batch = batch; - sort_columns = std::make_shared<TArrayVec>(ExtractColumns(batch, desc->SortingKey)->columns()); + auto rbSorting = ExtractColumns(batch, desc->SortingKey); + Y_VERIFY(rbSorting); + sort_columns = std::make_shared<TArrayVec>(rbSorting->columns()); all_columns = &batch->columns(); if (desc->ReplaceKey) { - replace_columns = std::make_shared<TArrayVec>(ExtractColumns(batch, desc->ReplaceKey)->columns()); + auto rbReplace = ExtractColumns(batch, desc->ReplaceKey); + Y_VERIFY(rbReplace); + replace_columns = std::make_shared<TArrayVec>(rbReplace->columns()); } pos = 0; } diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index 4ea5149800..f2204b662a 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -42,7 +42,9 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); } -TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data): TBase(data) { +TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data) + : TBase(data) +{ Y_VERIFY(Data); Y_VERIFY_DEBUG(Data->ValidateFull().ok()); Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); diff --git a/ydb/core/formats/arrow/switch/switch_type.h b/ydb/core/formats/arrow/switch/switch_type.h index 753338a514..7e0f98a5b9 100644 --- a/ydb/core/formats/arrow/switch/switch_type.h +++ b/ydb/core/formats/arrow/switch/switch_type.h @@ -225,7 +225,7 @@ bool Append(arrow::ArrayBuilder& builder, const std::vector<typename T::c_type>& } template <typename T> -bool Append(T& builder, const arrow::Array& array, int position) { +bool Append(T& builder, const arrow::Array& array, int position, ui64* recordSize = nullptr) { return SwitchType(array.type_id(), [&](const auto& type) { using TWrap = std::decay_t<decltype(type)>; using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; @@ -236,10 +236,25 @@ bool Append(T& builder, const arrow::Array& array, int position) { if (typedArray.IsNull(position)) { auto status = typedBuilder.AppendNull(); + if (recordSize) { + *recordSize += 4; + } return status.ok(); } else { - auto status = typedBuilder.Append(typedArray.GetView(position)); - return status.ok(); + if constexpr (!arrow::has_string_view<typename TWrap::T>::value) { + auto status = typedBuilder.Append(typedArray.GetView(position)); + if (recordSize) { + *recordSize += sizeof(typedArray.GetView(position)); + } + return status.ok(); + } + if constexpr (arrow::has_string_view<typename TWrap::T>::value) { + auto status = typedBuilder.Append(typedArray.GetView(position)); + if (recordSize) { + *recordSize += typedArray.GetView(position).size(); + } + return status.ok(); + } } }); } diff --git a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp index 3aec7d6fbf..4a8413dcd5 100644 --- a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp +++ b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp @@ -5,6 +5,7 @@ #include <ydb/core/formats/arrow/simple_builder/array.h> #include <ydb/core/formats/arrow/simple_builder/batch.h> #include <ydb/core/formats/arrow/simple_builder/filler.h> +#include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/core/formats/arrow/dictionary/conversion.h> Y_UNIT_TEST_SUITE(SizeCalcer) { diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make index 4dbdc011d6..416cefc140 100644 --- a/ydb/core/formats/arrow/ya.make +++ b/ydb/core/formats/arrow/ya.make @@ -11,6 +11,7 @@ PEERDIR( ydb/core/formats/arrow/simple_builder ydb/core/formats/arrow/dictionary ydb/core/formats/arrow/transformer + library/cpp/actors/core ydb/library/arrow_kernels ydb/library/binary_json ydb/library/dynumber diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp index b9d31ae8c8..8ac14b1d99 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.cpp +++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp @@ -1,5 +1,6 @@ #include "column_record.h" #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/core/tx/columnshard/common/scalars.h> #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <ydb/core/tx/columnshard/columnshard_schema.h> diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 043917aae1..fe5593efaf 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -4,6 +4,7 @@ #include "read_filter_merger.h" #include <ydb/core/formats/arrow/arrow_filter.h> +#include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/engines/portion_info.h> diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp index eeb95476c5..9ab272aa7d 100644 --- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp @@ -4,6 +4,7 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/formats/arrow/arrow_batch_builder.h> +#include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/core/kqp/compute_actor/kqp_compute_events.h> #include <ydb/core/tablet_flat/flat_row_celled.h> diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 09c3189206..664444c8f4 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -15,6 +15,7 @@ #include <ydb/core/scheme/scheme_type_info.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/public/api/protos/ydb_value.pb.h> |