aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-08-31 16:45:35 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-08-31 17:28:28 +0300
commit294ac8703830421d1e940c702e3a2876471a1a84 (patch)
treebed6aabf8065fabcbffd167ab638a0df2ab76da5
parentd4e753164ac99107e86c79f7ea06900dd7203a3a (diff)
downloadydb-294ac8703830421d1e940c702e3a2876471a1a84.tar.gz
KIKIMR-19093: helpers for arrow manipulation. and fix dependencies.
-rw-r--r--ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp63
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h7
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/dictionary/ya.make1
-rw-r--r--ydb/core/formats/arrow/merging_sorted_input_stream.cpp1
-rw-r--r--ydb/core/formats/arrow/replace_key.h6
-rw-r--r--ydb/core/formats/arrow/serializer/abstract.cpp7
-rw-r--r--ydb/core/formats/arrow/serializer/abstract.h6
-rw-r--r--ydb/core/formats/arrow/serializer/batch_only.h3
-rw-r--r--ydb/core/formats/arrow/serializer/full.h4
-rw-r--r--ydb/core/formats/arrow/simple_builder/filler.h20
-rw-r--r--ydb/core/formats/arrow/size_calcer.cpp3
-rw-r--r--ydb/core/formats/arrow/size_calcer.h9
-rw-r--r--ydb/core/formats/arrow/sort_cursor.h8
-rw-r--r--ydb/core/formats/arrow/special_keys.cpp4
-rw-r--r--ydb/core/formats/arrow/switch/switch_type.h21
-rw-r--r--ydb/core/formats/arrow/ut/ut_size_calcer.cpp1
-rw-r--r--ydb/core/formats/arrow/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h1
-rw-r--r--ydb/core/tx/datashard/datashard__kqp_scan.cpp1
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h1
29 files changed, 161 insertions, 16 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
index 793eeea34b..9e497a26d7 100644
--- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
@@ -31,6 +31,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
index 57ebad2a1c..6dffeca31e 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
@@ -32,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
index 57ebad2a1c..6dffeca31e 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
index ae91024fc6..618a9d0bd7 100644
--- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 5dfa52f61b..0216b67394 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -159,9 +159,32 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns));
}
+std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const std::vector<TString>& columnNames) {
+ if (columnNames.empty()) {
+ return nullptr;
+ }
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.reserve(columnNames.size());
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(columnNames.size());
+
+ auto srcSchema = srcBatch->schema();
+ for (auto& name : columnNames) {
+ int pos = srcSchema->GetFieldIndex(name);
+ AFL_VERIFY(pos >= 0)("field_name", name);
+ fields.push_back(srcSchema->field(pos));
+ columns.push_back(srcBatch->column(pos));
+ }
+
+ return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns));
+}
+
std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::shared_ptr<arrow::Schema>& dstSchema,
bool addNotExisted) {
+ Y_VERIFY(srcBatch);
+ Y_VERIFY(dstSchema);
std::vector<std::shared_ptr<arrow::Array>> columns;
columns.reserve(dstSchema->num_fields());
@@ -175,13 +198,15 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
}
columns.back() = *result;
} else {
+ AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())
+ ("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
return nullptr;
}
}
Y_VERIFY(columns.back());
if (!columns.back()->type()->Equals(field->type())) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())
+ AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())
("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
return nullptr;
}
@@ -488,6 +513,13 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared
return builders;
}
+std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field) {
+ std::unique_ptr<arrow::ArrayBuilder> builder;
+ auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder);
+ Y_VERIFY_OK(status);
+ return std::move(builder);
+}
+
std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders) {
std::vector<std::shared_ptr<arrow::Array>> out;
for (auto& builder : builders) {
@@ -876,4 +908,33 @@ std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::Recor
return DeserializeBatch(SerializeBatch(original, arrow::ipc::IpcWriteOptions::Defaults()), original->schema());
}
+std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb) {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ std::optional<ui32> recordsCount;
+ std::set<std::string> columnNames;
+ for (auto&& i : rb) {
+ if (!i) {
+ continue;
+ }
+ for (auto&& c : i->columns()) {
+ columns.emplace_back(c);
+ if (!recordsCount) {
+ recordsCount = c->length();
+ } else {
+ Y_VERIFY(*recordsCount == c->length());
+ }
+ }
+ for (auto&& f : i->schema()->fields()) {
+ Y_VERIFY(columnNames.emplace(f->name()).second);
+ fields.emplace_back(f);
+ }
+ }
+ if (columns.empty()) {
+ return nullptr;
+ }
+ auto schema = std::make_shared<arrow::Schema>(fields);
+ return arrow::RecordBatch::Make(schema, *recordsCount, columns);
+}
+
}
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index e79448025b..f71e4742b5 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -1,6 +1,5 @@
#pragma once
#include "switch_type.h"
-#include "size_calcer.h"
#include <ydb/core/formats/factory.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <library/cpp/json/writer/json_value.h>
@@ -8,6 +7,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
+#include <ydb/library/accessor/accessor.h>
namespace NKikimr::NArrow {
@@ -57,6 +57,8 @@ std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::
std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::vector<TString>& columnNames);
+std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const std::vector<TString>& columnNames);
std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::shared_ptr<arrow::Schema>& dstSchema,
bool addNotExisted = false);
@@ -73,6 +75,7 @@ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>&
std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::shared_ptr<TSortDescription>& description);
+std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb);
std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::shared_ptr<TSortDescription>& description,
size_t maxBatchRows);
@@ -83,6 +86,8 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared
const std::vector<ui32>& sharding,
ui32 numShards);
+std::unique_ptr<arrow::ArrayBuilder> MakeBuilder(const std::shared_ptr<arrow::Field>& field);
+
std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema,
size_t reserve = 0, const std::map<std::string, ui64>& sizeByColumn = {});
std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders);
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt
index f34bb025e5..37ff6ac325 100644
--- a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC
ydb-core-protos
formats-arrow-simple_builder
formats-arrow-switch
+ cpp-actors-core
)
target_sources(formats-arrow-dictionary PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt
index 93955803c5..811088131e 100644
--- a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt
@@ -16,6 +16,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC
ydb-core-protos
formats-arrow-simple_builder
formats-arrow-switch
+ cpp-actors-core
)
target_sources(formats-arrow-dictionary PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt
index 93955803c5..811088131e 100644
--- a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC
ydb-core-protos
formats-arrow-simple_builder
formats-arrow-switch
+ cpp-actors-core
)
target_sources(formats-arrow-dictionary PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt
index f34bb025e5..37ff6ac325 100644
--- a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(formats-arrow-dictionary PUBLIC
ydb-core-protos
formats-arrow-simple_builder
formats-arrow-switch
+ cpp-actors-core
)
target_sources(formats-arrow-dictionary PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
diff --git a/ydb/core/formats/arrow/dictionary/ya.make b/ydb/core/formats/arrow/dictionary/ya.make
index a48034f221..04e71d9bba 100644
--- a/ydb/core/formats/arrow/dictionary/ya.make
+++ b/ydb/core/formats/arrow/dictionary/ya.make
@@ -5,6 +5,7 @@ PEERDIR(
ydb/core/protos
ydb/core/formats/arrow/simple_builder
ydb/core/formats/arrow/switch
+ library/cpp/actors/core
)
SRCS(
diff --git a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp b/ydb/core/formats/arrow/merging_sorted_input_stream.cpp
index 6002d269a5..58d8bb2f88 100644
--- a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp
+++ b/ydb/core/formats/arrow/merging_sorted_input_stream.cpp
@@ -5,6 +5,7 @@
#include <queue>
#include "merging_sorted_input_stream.h"
#include "switch_type.h"
+#include "size_calcer.h"
namespace NKikimr::NArrow {
diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h
index 9c9d477e20..ede577eb68 100644
--- a/ydb/core/formats/arrow/replace_key.h
+++ b/ydb/core/formats/arrow/replace_key.h
@@ -3,10 +3,14 @@
#include "permutations.h"
#include "common/validation.h"
#include <ydb/core/base/defs.h>
+
+#include <library/cpp/actors/core/log.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h>
#include <util/string/builder.h>
+#include <util/string/join.h>
#include <compare>
@@ -163,7 +167,7 @@ public:
template<typename T = TArrayVecPtr> requires IsOwning
std::shared_ptr<arrow::RecordBatch> RestoreBatch(const std::shared_ptr<arrow::Schema>& schema) const {
- Y_VERIFY(Size() && Size() == schema->num_fields());
+ AFL_VERIFY(Size() && Size() == schema->num_fields())("columns", DebugString())("schema", JoinSeq(",", schema->field_names()));
const auto& columns = *Columns;
return arrow::RecordBatch::Make(schema, columns[0]->length(), columns);
}
diff --git a/ydb/core/formats/arrow/serializer/abstract.cpp b/ydb/core/formats/arrow/serializer/abstract.cpp
index f893a2f063..13ee3d731b 100644
--- a/ydb/core/formats/arrow/serializer/abstract.cpp
+++ b/ydb/core/formats/arrow/serializer/abstract.cpp
@@ -1,4 +1,11 @@
#include "abstract.h"
namespace NKikimr::NArrow::NSerialization {
+arrow::Result<std::shared_ptr<arrow::RecordBatch>> IDeserializer::Deserialize(const TString& data) const {
+ if (!data) {
+ return nullptr;
+ }
+ return DoDeserialize(data);
+}
+
}
diff --git a/ydb/core/formats/arrow/serializer/abstract.h b/ydb/core/formats/arrow/serializer/abstract.h
index 1419125aac..f21ac1d58f 100644
--- a/ydb/core/formats/arrow/serializer/abstract.h
+++ b/ydb/core/formats/arrow/serializer/abstract.h
@@ -16,6 +16,8 @@ public:
TString Serialize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
return DoSerialize(batch);
}
+
+ virtual bool IsHardPacker() const = 0;
};
class IDeserializer {
@@ -30,9 +32,7 @@ public:
return DoDebugString();
}
- arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const {
- return DoDeserialize(data);
- }
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const;
};
}
diff --git a/ydb/core/formats/arrow/serializer/batch_only.h b/ydb/core/formats/arrow/serializer/batch_only.h
index 6c99134a29..b3dfda9e7f 100644
--- a/ydb/core/formats/arrow/serializer/batch_only.h
+++ b/ydb/core/formats/arrow/serializer/batch_only.h
@@ -11,6 +11,9 @@ private:
protected:
virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
public:
+ virtual bool IsHardPacker() const override {
+ return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3;
+ }
TBatchPayloadSerializer(const arrow::ipc::IpcWriteOptions& options)
: Options(options) {
diff --git a/ydb/core/formats/arrow/serializer/full.h b/ydb/core/formats/arrow/serializer/full.h
index eb6bdc8ced..56761a3d75 100644
--- a/ydb/core/formats/arrow/serializer/full.h
+++ b/ydb/core/formats/arrow/serializer/full.h
@@ -13,6 +13,10 @@ private:
protected:
virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
public:
+ virtual bool IsHardPacker() const override {
+ return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3;
+ }
+
TFullDataSerializer(const arrow::ipc::IpcWriteOptions& options)
: Options(options) {
diff --git a/ydb/core/formats/arrow/simple_builder/filler.h b/ydb/core/formats/arrow/simple_builder/filler.h
index c2eab02914..e86e7a6c21 100644
--- a/ydb/core/formats/arrow/simple_builder/filler.h
+++ b/ydb/core/formats/arrow/simple_builder/filler.h
@@ -19,8 +19,24 @@ public:
return Delta + idx;
}
TIntSeqFiller(const CType delta = 0)
- : Delta(delta)
- {
+ : Delta(delta) {
+
+ }
+};
+
+template <class TArrowInt>
+class TIntConstFiller {
+public:
+ using TValue = TArrowInt;
+private:
+ using CType = typename TArrowInt::c_type;
+ const CType Value;
+public:
+ CType GetValue(const CType /*idx*/) const {
+ return Value;
+ }
+ TIntConstFiller(const CType value)
+ : Value(value) {
}
};
diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp
index 56a8ee7c29..ccc4ec225e 100644
--- a/ydb/core/formats/arrow/size_calcer.cpp
+++ b/ydb/core/formats/arrow/size_calcer.cpp
@@ -215,7 +215,8 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
}
NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch) {
- return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch));
+ TFirstLastSpecialKeys specialKeys(batch);
+ return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys);
}
bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) {
diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h
index a7ca8095bf..4d14c0cf5a 100644
--- a/ydb/core/formats/arrow/size_calcer.h
+++ b/ydb/core/formats/arrow/size_calcer.h
@@ -1,4 +1,5 @@
#pragma once
+#include "special_keys.h"
#include <ydb/library/accessor/accessor.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
@@ -51,22 +52,28 @@ private:
YDB_READONLY_DEF(TString, Data);
YDB_READONLY(ui32, RowsCount, 0);
YDB_READONLY(ui32, RawBytes, 0);
+ TFirstLastSpecialKeys SpecialKeys;
public:
size_t GetSize() const {
return Data.size();
}
+ const TFirstLastSpecialKeys& GetSpecialKeys() const {
+ return SpecialKeys;
+ }
+
TString DebugString() const;
static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::vector<TSerializedBatch>& result, TString* errorMessage);
static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage);
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch);
- TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes)
+ TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const TFirstLastSpecialKeys& specialKeys)
: SchemaData(schemaData)
, Data(data)
, RowsCount(rowsCount)
, RawBytes(rawBytes)
+ , SpecialKeys(specialKeys)
{
}
diff --git a/ydb/core/formats/arrow/sort_cursor.h b/ydb/core/formats/arrow/sort_cursor.h
index a12e0b554c..528240ba08 100644
--- a/ydb/core/formats/arrow/sort_cursor.h
+++ b/ydb/core/formats/arrow/sort_cursor.h
@@ -67,10 +67,14 @@ struct TSortCursorImpl {
void Reset(std::shared_ptr<arrow::RecordBatch> batch) {
current_batch = batch;
- sort_columns = std::make_shared<TArrayVec>(ExtractColumns(batch, desc->SortingKey)->columns());
+ auto rbSorting = ExtractColumns(batch, desc->SortingKey);
+ Y_VERIFY(rbSorting);
+ sort_columns = std::make_shared<TArrayVec>(rbSorting->columns());
all_columns = &batch->columns();
if (desc->ReplaceKey) {
- replace_columns = std::make_shared<TArrayVec>(ExtractColumns(batch, desc->ReplaceKey)->columns());
+ auto rbReplace = ExtractColumns(batch, desc->ReplaceKey);
+ Y_VERIFY(rbReplace);
+ replace_columns = std::make_shared<TArrayVec>(rbReplace->columns());
}
pos = 0;
}
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp
index 4ea5149800..f2204b662a 100644
--- a/ydb/core/formats/arrow/special_keys.cpp
+++ b/ydb/core/formats/arrow/special_keys.cpp
@@ -42,7 +42,9 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch>
Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
}
-TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data): TBase(data) {
+TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data)
+ : TBase(data)
+{
Y_VERIFY(Data);
Y_VERIFY_DEBUG(Data->ValidateFull().ok());
Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
diff --git a/ydb/core/formats/arrow/switch/switch_type.h b/ydb/core/formats/arrow/switch/switch_type.h
index 753338a514..7e0f98a5b9 100644
--- a/ydb/core/formats/arrow/switch/switch_type.h
+++ b/ydb/core/formats/arrow/switch/switch_type.h
@@ -225,7 +225,7 @@ bool Append(arrow::ArrayBuilder& builder, const std::vector<typename T::c_type>&
}
template <typename T>
-bool Append(T& builder, const arrow::Array& array, int position) {
+bool Append(T& builder, const arrow::Array& array, int position, ui64* recordSize = nullptr) {
return SwitchType(array.type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
@@ -236,10 +236,25 @@ bool Append(T& builder, const arrow::Array& array, int position) {
if (typedArray.IsNull(position)) {
auto status = typedBuilder.AppendNull();
+ if (recordSize) {
+ *recordSize += 4;
+ }
return status.ok();
} else {
- auto status = typedBuilder.Append(typedArray.GetView(position));
- return status.ok();
+ if constexpr (!arrow::has_string_view<typename TWrap::T>::value) {
+ auto status = typedBuilder.Append(typedArray.GetView(position));
+ if (recordSize) {
+ *recordSize += sizeof(typedArray.GetView(position));
+ }
+ return status.ok();
+ }
+ if constexpr (arrow::has_string_view<typename TWrap::T>::value) {
+ auto status = typedBuilder.Append(typedArray.GetView(position));
+ if (recordSize) {
+ *recordSize += typedArray.GetView(position).size();
+ }
+ return status.ok();
+ }
}
});
}
diff --git a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
index 3aec7d6fbf..4a8413dcd5 100644
--- a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
+++ b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/formats/arrow/dictionary/conversion.h>
Y_UNIT_TEST_SUITE(SizeCalcer) {
diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make
index 4dbdc011d6..416cefc140 100644
--- a/ydb/core/formats/arrow/ya.make
+++ b/ydb/core/formats/arrow/ya.make
@@ -11,6 +11,7 @@ PEERDIR(
ydb/core/formats/arrow/simple_builder
ydb/core/formats/arrow/dictionary
ydb/core/formats/arrow/transformer
+ library/cpp/actors/core
ydb/library/arrow_kernels
ydb/library/binary_json
ydb/library/dynumber
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
index b9d31ae8c8..8ac14b1d99 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
@@ -1,5 +1,6 @@
#include "column_record.h"
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 043917aae1..fe5593efaf 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -4,6 +4,7 @@
#include "read_filter_merger.h"
#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/engines/portion_info.h>
diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
index eeb95476c5..9ab272aa7d 100644
--- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tablet_flat/flat_row_celled.h>
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 09c3189206..664444c8f4 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -15,6 +15,7 @@
#include <ydb/core/scheme/scheme_type_info.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/ydb_value.pb.h>