aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-18 07:43:15 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-18 07:43:15 +0300
commit5654ec350abd6e2df2567ab710b064c4ef5595f5 (patch)
treea34352e1b01c9534b928e92c2ca4d1a1a16f468f
parentfe084621cceabfb6c26d7d09d46d877396d3ff6f (diff)
downloadydb-5654ec350abd6e2df2567ab710b064c4ef5595f5.tar.gz
additional helpers and tests for serializers and dict convertions
-rw-r--r--ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt8
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt8
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt8
-rw-r--r--ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt8
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp91
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h14
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt18
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt19
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt19
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.txt17
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt18
-rw-r--r--ydb/core/formats/arrow/common/validation.cpp5
-rw-r--r--ydb/core/formats/arrow/common/validation.h28
-rw-r--r--ydb/core/formats/arrow/converter.cpp4
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.txt17
-rw-r--r--ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/core/formats/arrow/dictionary/conversion.cpp106
-rw-r--r--ydb/core/formats/arrow/dictionary/conversion.h14
-rw-r--r--ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt24
-rw-r--r--ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt25
-rw-r--r--ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt25
-rw-r--r--ydb/core/formats/arrow/serializer/CMakeLists.txt17
-rw-r--r--ydb/core/formats/arrow/serializer/CMakeLists.windows-x86_64.txt24
-rw-r--r--ydb/core/formats/arrow/serializer/abstract.cpp4
-rw-r--r--ydb/core/formats/arrow/serializer/abstract.h33
-rw-r--r--ydb/core/formats/arrow/serializer/batch_only.cpp70
-rw-r--r--ydb/core/formats/arrow/serializer/batch_only.h32
-rw-r--r--ydb/core/formats/arrow/serializer/full.cpp55
-rw-r--r--ydb/core/formats/arrow/serializer/full.h31
-rw-r--r--ydb/core/formats/arrow/serializer/stream.cpp20
-rw-r--r--ydb/core/formats/arrow/serializer/stream.h37
-rw-r--r--ydb/core/formats/arrow/simple_builder/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/core/formats/arrow/simple_builder/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/core/formats/arrow/simple_builder/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/core/formats/arrow/simple_builder/CMakeLists.txt17
-rw-r--r--ydb/core/formats/arrow/simple_builder/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/core/formats/arrow/simple_builder/array.cpp5
-rw-r--r--ydb/core/formats/arrow/simple_builder/array.h72
-rw-r--r--ydb/core/formats/arrow/simple_builder/batch.cpp18
-rw-r--r--ydb/core/formats/arrow/simple_builder/batch.h17
-rw-r--r--ydb/core/formats/arrow/simple_builder/filler.cpp17
-rw-r--r--ydb/core/formats/arrow/simple_builder/filler.h64
-rw-r--r--ydb/core/formats/arrow/switch/CMakeLists.darwin-x86_64.txt19
-rw-r--r--ydb/core/formats/arrow/switch/CMakeLists.linux-aarch64.txt20
-rw-r--r--ydb/core/formats/arrow/switch/CMakeLists.linux-x86_64.txt20
-rw-r--r--ydb/core/formats/arrow/switch/CMakeLists.txt17
-rw-r--r--ydb/core/formats/arrow/switch/CMakeLists.windows-x86_64.txt19
-rw-r--r--ydb/core/formats/arrow/switch/switch_type.cpp5
-rw-r--r--ydb/core/formats/arrow/switch/switch_type.h239
-rw-r--r--ydb/core/formats/arrow/switch_type.h237
-rw-r--r--ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/ut/ut_dictionary.cpp89
-rw-r--r--ydb/core/io_formats/csv_arrow.cpp2
-rw-r--r--ydb/core/protos/services.proto1
60 files changed, 1517 insertions, 328 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
index 225e2097297..8bfa7dc43ee 100644
--- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,11 @@
# original buildsystem will not be accepted.
+add_subdirectory(common)
+add_subdirectory(dictionary)
+add_subdirectory(serializer)
+add_subdirectory(simple_builder)
+add_subdirectory(switch)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -20,6 +25,9 @@ target_link_libraries(core-formats-arrow PUBLIC
yutil
libs-apache-arrow
ydb-core-scheme
+ formats-arrow-serializer
+ formats-arrow-simple_builder
+ formats-arrow-dictionary
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
index 103005acef6..2f7c6515330 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,11 @@
# original buildsystem will not be accepted.
+add_subdirectory(common)
+add_subdirectory(dictionary)
+add_subdirectory(serializer)
+add_subdirectory(simple_builder)
+add_subdirectory(switch)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -21,6 +26,9 @@ target_link_libraries(core-formats-arrow PUBLIC
yutil
libs-apache-arrow
ydb-core-scheme
+ formats-arrow-serializer
+ formats-arrow-simple_builder
+ formats-arrow-dictionary
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
index 103005acef6..2f7c6515330 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,11 @@
# original buildsystem will not be accepted.
+add_subdirectory(common)
+add_subdirectory(dictionary)
+add_subdirectory(serializer)
+add_subdirectory(simple_builder)
+add_subdirectory(switch)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -21,6 +26,9 @@ target_link_libraries(core-formats-arrow PUBLIC
yutil
libs-apache-arrow
ydb-core-scheme
+ formats-arrow-serializer
+ formats-arrow-simple_builder
+ formats-arrow-dictionary
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
index 26821f7aa13..23061c5821d 100644
--- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,11 @@
# original buildsystem will not be accepted.
+add_subdirectory(common)
+add_subdirectory(dictionary)
+add_subdirectory(serializer)
+add_subdirectory(simple_builder)
+add_subdirectory(switch)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -21,6 +26,9 @@ target_link_libraries(core-formats-arrow PUBLIC
yutil
libs-apache-arrow
ydb-core-scheme
+ formats-arrow-serializer
+ formats-arrow-simple_builder
+ formats-arrow-dictionary
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 00b9ee850cf..882a4811659 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -1,7 +1,10 @@
#include "arrow_helpers.h"
#include "switch_type.h"
#include "one_batch_input_stream.h"
+#include "common/validation.h"
#include "merging_sorted_input_stream.h"
+#include "serializer/batch_only.h"
+#include "serializer/abstract.h"
#include <ydb/core/util/yverify_stream.h>
#include <util/system/yassert.h>
@@ -124,15 +127,12 @@ std::shared_ptr<arrow::Schema> MakeArrowSchema(const std::vector<std::pair<TStri
}
TString SerializeSchema(const arrow::Schema& schema) {
- auto buffer = arrow::ipc::SerializeSchema(schema);
- if (!buffer.ok()) {
- return {};
- }
- return TString((const char*)(*buffer)->data(), (*buffer)->size());
+ auto buffer = TStatusValidator::GetValid(arrow::ipc::SerializeSchema(schema));
+ return buffer->ToString();
}
std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) {
- std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(str));
+ auto buffer = std::make_shared<arrow::Buffer>((const ui8*)str.data(), str.size());
arrow::io::BufferReader reader(buffer);
arrow::ipc::DictionaryMemo dictMemo;
auto schema = ReadSchema(&reader, &dictMemo);
@@ -142,67 +142,8 @@ std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) {
return *schema;
}
-namespace {
- class TFixedStringOutputStream final : public arrow::io::OutputStream {
- public:
- TFixedStringOutputStream(TString* out)
- : Out(out)
- , Position(0)
- { }
-
- arrow::Status Close() override {
- Out = nullptr;
- return arrow::Status::OK();
- }
-
- bool closed() const override {
- return Out == nullptr;
- }
-
- arrow::Result<int64_t> Tell() const override {
- return Position;
- }
-
- arrow::Status Write(const void* data, int64_t nbytes) override {
- if (Y_LIKELY(nbytes > 0)) {
- Y_VERIFY(Out && Out->size() - Position >= ui64(nbytes));
- char* dst = &(*Out)[Position];
- ::memcpy(dst, data, nbytes);
- Position += nbytes;
- }
-
- return arrow::Status::OK();
- }
-
- size_t GetPosition() const {
- return Position;
- }
-
- private:
- TString* Out;
- size_t Position;
- };
-}
-
TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options) {
- arrow::ipc::IpcPayload payload;
- auto status = arrow::ipc::GetRecordBatchPayload(*batch, options, &payload);
- Y_VERIFY_OK(status);
-
- int32_t metadata_length = 0;
- arrow::io::MockOutputStream mock;
- status = arrow::ipc::WriteIpcPayload(payload, options, &mock, &metadata_length);
- Y_VERIFY_OK(status);
-
- TString str;
- str.resize(mock.GetExtentBytesWritten());
-
- TFixedStringOutputStream out(&str);
- status = arrow::ipc::WriteIpcPayload(payload, options, &out, &metadata_length);
- Y_VERIFY_OK(status);
- Y_VERIFY(out.GetPosition() == str.size());
-
- return str;
+ return NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
}
TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch) {
@@ -211,18 +152,14 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b
return SerializeBatch(batch, writeOptions);
}
-std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema) {
- arrow::ipc::DictionaryMemo dictMemo;
- auto options = arrow::ipc::IpcReadOptions::Defaults();
- options.use_threads = false;
-
- std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(blob));
- arrow::io::BufferReader reader(buffer);
- auto batch = ReadRecordBatch(schema, &dictMemo, options, &reader);
- if (!batch.ok() || !(*batch)->Validate().ok()) {
- return {};
+std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema)
+{
+ auto result = NSerialization::TBatchPayloadDeserializer(schema).Deserialize(blob);
+ if (result.ok()) {
+ return *result;
+ } else {
+ return nullptr;
}
- return *batch;
}
std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema, const ui32 rowsCount) {
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index 91f3731e66b..55dbde33b24 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -17,20 +17,6 @@ class TReplaceKeyTemplate;
using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>;
using TRawReplaceKey = TReplaceKeyTemplate<const TArrayVec*>;
-// Arrow inrernally keeps references to Buffer objects with the data
-// This helper class implements arrow::Buffer over TString that owns
-// the actual memory
-class TBufferOverString : public arrow::Buffer {
- TString Str;
-public:
- explicit TBufferOverString(TString str)
- : arrow::Buffer((const unsigned char*)str.data(), str.size())
- , Str(str)
- {
- Y_VERIFY(data() == (const unsigned char*)Str.data());
- }
-};
-
std::shared_ptr<arrow::DataType> GetArrowType(NScheme::TTypeInfo typeInfo);
std::shared_ptr<arrow::DataType> GetCSVArrowType(NScheme::TTypeInfo typeId);
diff --git a/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..757b7290185
--- /dev/null
+++ b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-common)
+target_link_libraries(formats-arrow-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
+)
diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..c351a86aff4
--- /dev/null
+++ b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-common)
+target_link_libraries(formats-arrow-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
+)
diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..c351a86aff4
--- /dev/null
+++ b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-common)
+target_link_libraries(formats-arrow-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
+)
diff --git a/ydb/core/formats/arrow/common/CMakeLists.txt b/ydb/core/formats/arrow/common/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/formats/arrow/common/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..757b7290185
--- /dev/null
+++ b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-common)
+target_link_libraries(formats-arrow-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
+)
diff --git a/ydb/core/formats/arrow/common/validation.cpp b/ydb/core/formats/arrow/common/validation.cpp
new file mode 100644
index 00000000000..ca069631e7c
--- /dev/null
+++ b/ydb/core/formats/arrow/common/validation.cpp
@@ -0,0 +1,5 @@
+#include "validation.h"
+
+namespace NKikimr::NArrow {
+
+}
diff --git a/ydb/core/formats/arrow/common/validation.h b/ydb/core/formats/arrow/common/validation.h
new file mode 100644
index 00000000000..44e8cc19ed9
--- /dev/null
+++ b/ydb/core/formats/arrow/common/validation.h
@@ -0,0 +1,28 @@
+#pragma once
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <util/system/yassert.h>
+
+namespace NKikimr::NArrow {
+
+class TStatusValidator {
+public:
+ static void Validate(const arrow::Status& status) {
+ Y_VERIFY(status.ok(), "%s", status.ToString().c_str());
+ }
+
+ template <class T>
+ static T GetValid(const arrow::Result<T>& result) {
+ Y_VERIFY(result.ok(), "%s", result.status().ToString().c_str());
+ return *result;
+ }
+
+ template <class T>
+ static T GetValid(arrow::Result<T>&& result) {
+ Y_VERIFY(result.ok(), "%s", result.status().ToString().c_str());
+ return std::move(*result);
+ }
+};
+
+}
diff --git a/ydb/core/formats/arrow/converter.cpp b/ydb/core/formats/arrow/converter.cpp
index f34ad2eaac8..df99fbc05a3 100644
--- a/ydb/core/formats/arrow/converter.cpp
+++ b/ydb/core/formats/arrow/converter.cpp
@@ -76,6 +76,10 @@ static bool ConvertColumn(const NScheme::TTypeInfo colType, std::shared_ptr<arro
case NScheme::NTypeIds::JsonDocument: {
for (i32 i = 0; i < binaryArray.length(); ++i) {
auto value = binaryArray.Value(i);
+ if (!value.size()) {
+ Y_VERIFY(builder.AppendNull().ok());
+ continue;
+ }
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size()));
if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) {
return false;
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..b78423781c4
--- /dev/null
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-dictionary)
+target_link_libraries(formats-arrow-dictionary PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+)
+target_sources(formats-arrow-dictionary PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
+)
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..f3f3651f3e7
--- /dev/null
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-dictionary)
+target_link_libraries(formats-arrow-dictionary PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+)
+target_sources(formats-arrow-dictionary PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
+)
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..f3f3651f3e7
--- /dev/null
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-dictionary)
+target_link_libraries(formats-arrow-dictionary PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+)
+target_sources(formats-arrow-dictionary PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
+)
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..b78423781c4
--- /dev/null
+++ b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-dictionary)
+target_link_libraries(formats-arrow-dictionary PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+)
+target_sources(formats-arrow-dictionary PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp
+)
diff --git a/ydb/core/formats/arrow/dictionary/conversion.cpp b/ydb/core/formats/arrow/dictionary/conversion.cpp
new file mode 100644
index 00000000000..8b8ed9aa2e3
--- /dev/null
+++ b/ydb/core/formats/arrow/dictionary/conversion.cpp
@@ -0,0 +1,106 @@
+#include "conversion.h"
+#include <ydb/core/formats/arrow/switch/switch_type.h>
+#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/formats/arrow/simple_builder/array.h>
+
+namespace NKikimr::NArrow {
+
+std::shared_ptr<arrow::Array> DictionaryToArray(const std::shared_ptr<arrow::DictionaryArray>& data) {
+ Y_VERIFY(data);
+ return DictionaryToArray(*data);
+}
+
+std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& data) {
+ std::shared_ptr<arrow::Array> result;
+ SwitchType(data.dictionary()->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TDictionaryValue = typename TWrap::T;
+ using TDictionary = typename arrow::TypeTraits<TDictionaryValue>::ArrayType;
+ constexpr bool noParams = arrow::TypeTraits<TDictionaryValue>::is_parameter_free;
+ if constexpr (!noParams) {
+ Y_VERIFY(false);
+ return true;
+ }
+ if constexpr (noParams) {
+ auto& columnDictionary = static_cast<const TDictionary&>(*data.dictionary());
+ SwitchType(data.indices()->type_id(), [&](const auto& type) {
+ using TWrapIndices = std::decay_t<decltype(type)>;
+ constexpr bool hasCType = arrow::has_c_type<typename TWrapIndices::T>::value;
+ if constexpr (hasCType) {
+ constexpr bool indicesIntegral = std::is_integral<typename TWrapIndices::T::c_type>::value;
+ if constexpr (indicesIntegral && hasCType) {
+ using TIndices = typename arrow::TypeTraits<typename TWrapIndices::T>::ArrayType;
+ using TDictionaryAccessor = TDictionaryArrayAccessor<TDictionaryValue, TIndices>;
+ auto& columnIndices = static_cast<const TIndices&>(*data.indices());
+ result = TSimpleArrayConstructor<TDictionaryAccessor>("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length());
+ return true;
+ }
+ }
+ Y_VERIFY(false);
+ return true;
+ });
+ }
+ return true;
+ });
+ Y_VERIFY(result);
+ return result;
+}
+
+std::shared_ptr<arrow::RecordBatch> DictionaryToArray(const std::shared_ptr<arrow::RecordBatch>& data) {
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ bool hasDict = false;
+ for (auto&& f : data->schema()->fields()) {
+ if (f->type()->id() == arrow::Type::DICTIONARY) {
+ auto& dType = static_cast<const arrow::DictionaryType&>(*f->type());
+ fields.emplace_back(std::make_shared<arrow::Field>(f->name(), dType.value_type()));
+ hasDict = true;
+ } else {
+ fields.emplace_back(f);
+ }
+ }
+ if (!hasDict) {
+ return data;
+ }
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ for (auto&& c : data->columns()) {
+ if (c->type_id() == arrow::Type::DICTIONARY) {
+ auto& dColumn = static_cast<const arrow::DictionaryArray&>(*c);
+ columns.emplace_back(DictionaryToArray(dColumn));
+ } else {
+ columns.emplace_back(c);
+ }
+ }
+ std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
+ return arrow::RecordBatch::Make(schema, data->num_rows(), columns);
+}
+
+std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<arrow::Array>& data) {
+ Y_VERIFY(IsDictionableArray(data));
+ std::shared_ptr<arrow::DictionaryArray> result;
+ SwitchType(data->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ if constexpr (arrow::has_string_view<typename TWrap::T>::value && arrow::TypeTraits<typename TWrap::T>::is_parameter_free) {
+ auto resultArray = TDictionaryArrayConstructor<TLinearArrayAccessor<typename TWrap::T>>("absent", *data).BuildArray(data->length());
+ Y_VERIFY(resultArray->type()->id() == arrow::Type::DICTIONARY);
+ result = static_pointer_cast<arrow::DictionaryArray>(resultArray);
+ } else {
+ Y_VERIFY(false);
+ }
+ return true;
+ });
+ Y_VERIFY(result);
+ return result;
+}
+
+bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data) {
+ Y_VERIFY(data);
+ bool result = false;
+ SwitchType(data->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ result = arrow::has_c_type<typename TWrap::T>::value;
+ return true;
+ });
+ return result;
+}
+
+}
diff --git a/ydb/core/formats/arrow/dictionary/conversion.h b/ydb/core/formats/arrow/dictionary/conversion.h
new file mode 100644
index 00000000000..aab2def356e
--- /dev/null
+++ b/ydb/core/formats/arrow/dictionary/conversion.h
@@ -0,0 +1,14 @@
+#pragma once
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_dict.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NArrow {
+
+bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data);
+std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<arrow::Array>& data);
+std::shared_ptr<arrow::Array> DictionaryToArray(const std::shared_ptr<arrow::DictionaryArray>& data);
+std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& data);
+std::shared_ptr<arrow::RecordBatch> DictionaryToArray(const std::shared_ptr<arrow::RecordBatch>& data);
+
+}
diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..9ac14ecc710
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-serializer)
+target_link_libraries(formats-arrow-serializer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-common
+ cpp-actors-core
+ ydb-core-protos
+)
+target_sources(formats-arrow-serializer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp
+)
diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..f5c1649ec6f
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-serializer)
+target_link_libraries(formats-arrow-serializer PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-common
+ cpp-actors-core
+ ydb-core-protos
+)
+target_sources(formats-arrow-serializer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp
+)
diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..f5c1649ec6f
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-serializer)
+target_link_libraries(formats-arrow-serializer PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-common
+ cpp-actors-core
+ ydb-core-protos
+)
+target_sources(formats-arrow-serializer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp
+)
diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.txt b/ydb/core/formats/arrow/serializer/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..9ac14ecc710
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-serializer)
+target_link_libraries(formats-arrow-serializer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-common
+ cpp-actors-core
+ ydb-core-protos
+)
+target_sources(formats-arrow-serializer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp
+)
diff --git a/ydb/core/formats/arrow/serializer/abstract.cpp b/ydb/core/formats/arrow/serializer/abstract.cpp
new file mode 100644
index 00000000000..f893a2f063b
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/abstract.cpp
@@ -0,0 +1,4 @@
+#include "abstract.h"
+namespace NKikimr::NArrow::NSerialization {
+
+}
diff --git a/ydb/core/formats/arrow/serializer/abstract.h b/ydb/core/formats/arrow/serializer/abstract.h
new file mode 100644
index 00000000000..7fff5a6d29f
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/abstract.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NArrow::NSerialization {
+
+class ISerializer {
+protected:
+ virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+public:
+ using TPtr = std::shared_ptr<ISerializer>;
+ virtual ~ISerializer() = default;
+
+ TString Serialize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return DoSerialize(batch);
+ }
+};
+
+class IDeserializer {
+protected:
+ virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const = 0;
+public:
+ using TPtr = std::shared_ptr<IDeserializer>;
+ virtual ~IDeserializer() = default;
+
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const {
+ return DoDeserialize(data);
+ }
+};
+
+}
diff --git a/ydb/core/formats/arrow/serializer/batch_only.cpp b/ydb/core/formats/arrow/serializer/batch_only.cpp
new file mode 100644
index 00000000000..85733ee09aa
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/batch_only.cpp
@@ -0,0 +1,70 @@
+#include "batch_only.h"
+#include "stream.h"
+#include <ydb/core/formats/arrow/common/validation.h>
+#include <ydb/core/protos/services.pb.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/dictionary.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
+#include <library/cpp/actors/core/log.h>
+namespace NKikimr::NArrow::NSerialization {
+
+namespace {
+// Arrow internally keeps references to Buffer objects with the data
+// This helper class implements arrow::Buffer over TString that owns
+// the actual memory
+class TBufferOverString: public arrow::Buffer {
+ TString Str;
+public:
+ explicit TBufferOverString(TString str)
+ : arrow::Buffer((const unsigned char*)str.data(), str.size())
+ , Str(str) {
+ Y_VERIFY(data() == (const unsigned char*)Str.data());
+ }
+};
+}
+
+arrow::Result<std::shared_ptr<arrow::RecordBatch>> TBatchPayloadDeserializer::DoDeserialize(const TString& data) const {
+ arrow::ipc::DictionaryMemo dictMemo;
+ auto options = arrow::ipc::IpcReadOptions::Defaults();
+ options.use_threads = false;
+
+ std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data));
+ arrow::io::BufferReader reader(buffer);
+ AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "parsing")("size", data.size())("columns", Schema->num_fields());
+ auto batchResult = arrow::ipc::ReadRecordBatch(Schema, &dictMemo, options, &reader);
+ if (!batchResult.ok()) {
+ return batchResult;
+ }
+ std::shared_ptr<arrow::RecordBatch> batch = *batchResult;
+ if (!batch) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "empty batch");
+ }
+ auto validation = batch->Validate();
+ if (!validation.ok()) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "batch is not valid: " + validation.ToString());
+ }
+ return batch;
+}
+
+TString TBatchPayloadSerializer::DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ arrow::ipc::IpcPayload payload;
+ TStatusValidator::Validate(arrow::ipc::GetRecordBatchPayload(*batch, Options, &payload));
+
+ int32_t metadata_length = 0;
+ arrow::io::MockOutputStream mock;
+ TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &mock, &metadata_length));
+
+ TString str;
+ str.resize(mock.GetExtentBytesWritten());
+
+ TFixedStringOutputStream out(&str);
+ TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length));
+ Y_VERIFY(out.GetPosition() == str.size());
+ Y_VERIFY_DEBUG(TBatchPayloadDeserializer(batch->schema()).Deserialize(str).ok());
+ AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields());
+ return str;
+}
+
+}
diff --git a/ydb/core/formats/arrow/serializer/batch_only.h b/ydb/core/formats/arrow/serializer/batch_only.h
new file mode 100644
index 00000000000..5771202d7af
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/batch_only.h
@@ -0,0 +1,32 @@
+#pragma once
+#include "abstract.h"
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+
+namespace NKikimr::NArrow::NSerialization {
+
+class TBatchPayloadSerializer: public ISerializer {
+private:
+ const arrow::ipc::IpcWriteOptions Options;
+protected:
+ virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+public:
+ TBatchPayloadSerializer(const arrow::ipc::IpcWriteOptions& options)
+ : Options(options) {
+
+ }
+};
+
+class TBatchPayloadDeserializer: public IDeserializer {
+private:
+ const std::shared_ptr<arrow::Schema> Schema;
+protected:
+ virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const override;
+public:
+ TBatchPayloadDeserializer(const std::shared_ptr<arrow::Schema> schema)
+ : Schema(schema) {
+
+ }
+};
+
+}
diff --git a/ydb/core/formats/arrow/serializer/full.cpp b/ydb/core/formats/arrow/serializer/full.cpp
new file mode 100644
index 00000000000..c5424a856b0
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/full.cpp
@@ -0,0 +1,55 @@
+#include "full.h"
+#include "stream.h"
+#include <ydb/core/formats/arrow/dictionary/conversion.h>
+#include <ydb/core/formats/arrow/common/validation.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/dictionary.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
+namespace NKikimr::NArrow::NSerialization {
+
+arrow::Result<std::shared_ptr<arrow::RecordBatch>> TFullDataDeserializer::DoDeserialize(const TString& data) const {
+ arrow::ipc::DictionaryMemo dictMemo;
+ auto options = arrow::ipc::IpcReadOptions::Defaults();
+ options.use_threads = false;
+
+ arrow::Buffer buffer((const ui8*)data.data(), data.size());
+ arrow::io::BufferReader bufferReader(buffer);
+ auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&bufferReader));
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ auto readResult = reader->ReadNext(&batch);
+ if (!readResult.ok()) {
+ return readResult;
+ }
+ if (!batch) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "null batch");
+ }
+ auto validation = batch->Validate();
+ if (!validation.ok()) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "validation error: " + validation.ToString());
+ }
+ return batch;
+}
+
+TString TFullDataSerializer::DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ arrow::io::MockOutputStream mock;
+ {
+ auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&mock, batch->schema(), Options));
+ TStatusValidator::Validate(writer->WriteRecordBatch(*batch));
+ TStatusValidator::Validate(writer->Close());
+ }
+ TString result;
+ result.resize(mock.GetExtentBytesWritten());
+ {
+ TFixedStringOutputStream stream(&result);
+ auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&stream, batch->schema(), Options));
+ TStatusValidator::Validate(writer->WriteRecordBatch(*batch));
+ TStatusValidator::Validate(writer->Close());
+ Y_VERIFY(stream.GetPosition() == result.size());
+ }
+ return result;
+}
+
+}
diff --git a/ydb/core/formats/arrow/serializer/full.h b/ydb/core/formats/arrow/serializer/full.h
new file mode 100644
index 00000000000..7fa063f107b
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/full.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "abstract.h"
+#include <ydb/library/accessor/accessor.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NArrow::NSerialization {
+
+class TFullDataSerializer: public ISerializer {
+private:
+ const arrow::ipc::IpcWriteOptions Options;
+protected:
+ virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+public:
+ TFullDataSerializer(const arrow::ipc::IpcWriteOptions& options)
+ : Options(options) {
+
+ }
+};
+
+class TFullDataDeserializer: public IDeserializer {
+protected:
+ virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const override;
+public:
+ TFullDataDeserializer() {
+
+ }
+};
+
+}
diff --git a/ydb/core/formats/arrow/serializer/stream.cpp b/ydb/core/formats/arrow/serializer/stream.cpp
new file mode 100644
index 00000000000..04473379b97
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/stream.cpp
@@ -0,0 +1,20 @@
+#include "stream.h"
+namespace NKikimr::NArrow {
+
+arrow::Status NSerialization::TFixedStringOutputStream::Write(const void* data, int64_t nbytes) {
+ if (Y_LIKELY(nbytes > 0)) {
+ Y_VERIFY(Out && Out->size() - Position >= ui64(nbytes));
+ char* dst = &(*Out)[Position];
+ ::memcpy(dst, data, nbytes);
+ Position += nbytes;
+ }
+
+ return arrow::Status::OK();
+}
+
+arrow::Status NSerialization::TFixedStringOutputStream::Close() {
+ Out = nullptr;
+ return arrow::Status::OK();
+}
+
+}
diff --git a/ydb/core/formats/arrow/serializer/stream.h b/ydb/core/formats/arrow/serializer/stream.h
new file mode 100644
index 00000000000..424e183f19a
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/stream.h
@@ -0,0 +1,37 @@
+#pragma once
+#include <contrib/libs/apache/arrow/cpp/src/arrow/io/interfaces.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NArrow::NSerialization {
+
+class TFixedStringOutputStream final: public arrow::io::OutputStream {
+public:
+ TFixedStringOutputStream(TString* out)
+ : Out(out)
+ , Position(0) {
+ }
+
+ arrow::Status Close() override;
+
+ bool closed() const override {
+ return Out == nullptr;
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return Position;
+ }
+
+ arrow::Status Write(const void* data, int64_t nbytes) override;
+
+ size_t GetPosition() const {
+ return Position;
+ }
+
+private:
+ TString* Out;
+ size_t Position;
+};
+
+}
diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..47b66ad10ae
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-simple_builder)
+target_link_libraries(formats-arrow-simple_builder PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-simple_builder PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp
+)
diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..3623d31eb52
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-simple_builder)
+target_link_libraries(formats-arrow-simple_builder PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-simple_builder PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp
+)
diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..3623d31eb52
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-simple_builder)
+target_link_libraries(formats-arrow-simple_builder PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-simple_builder PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp
+)
diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..47b66ad10ae
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-simple_builder)
+target_link_libraries(formats-arrow-simple_builder PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+)
+target_sources(formats-arrow-simple_builder PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp
+)
diff --git a/ydb/core/formats/arrow/simple_builder/array.cpp b/ydb/core/formats/arrow/simple_builder/array.cpp
new file mode 100644
index 00000000000..4c4ffca6c1d
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/array.cpp
@@ -0,0 +1,5 @@
+#include "array.h"
+
+namespace NKikimr::NArrow {
+
+}
diff --git a/ydb/core/formats/arrow/simple_builder/array.h b/ydb/core/formats/arrow/simple_builder/array.h
new file mode 100644
index 00000000000..7f823787590
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/array.h
@@ -0,0 +1,72 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_dict.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NArrow {
+
+class IArrayBuilder {
+private:
+ YDB_READONLY_DEF(TString, FieldName)
+protected:
+ virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const = 0;
+public:
+ using TPtr = std::shared_ptr<IArrayBuilder>;
+ virtual ~IArrayBuilder() = default;
+ std::shared_ptr<arrow::Array> BuildArray(const ui32 recordsCount) const {
+ return DoBuildArray(recordsCount);
+ }
+
+ IArrayBuilder(const TString& fieldName)
+ : FieldName(fieldName) {
+
+ }
+};
+
+template <class TFiller>
+class TSimpleArrayConstructor: public IArrayBuilder {
+private:
+ using TBase = IArrayBuilder;
+ using TBuilder = typename arrow::TypeTraits<typename TFiller::TValue>::BuilderType;
+ const TFiller Filler;
+protected:
+ virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const override {
+ TBuilder fBuilder = TBuilder();
+ Y_VERIFY(fBuilder.Reserve(recordsCount).ok());
+ for (ui32 i = 0; i < recordsCount; ++i) {
+ Y_VERIFY(fBuilder.Append(Filler.GetValue(i)).ok());
+ }
+ return *fBuilder.Finish();
+ }
+public:
+ TSimpleArrayConstructor(const TString& fieldName, const TFiller& filler = TFiller())
+ : TBase(fieldName)
+ , Filler(filler) {
+
+ }
+};
+
+template <class TFiller>
+class TDictionaryArrayConstructor: public IArrayBuilder {
+private:
+ using TBase = IArrayBuilder;
+ const TFiller Filler;
+protected:
+ virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const override {
+ auto fBuilder = std::make_shared<arrow::DictionaryBuilder<typename TFiller::TValue>>(std::make_shared<typename TFiller::TValue>());
+ Y_VERIFY(fBuilder->Reserve(recordsCount).ok());
+ for (ui32 i = 0; i < recordsCount; ++i) {
+ Y_VERIFY(fBuilder->Append(Filler.GetValue(i)).ok());
+ }
+ return *fBuilder->Finish();
+ }
+public:
+ TDictionaryArrayConstructor(const TString& fieldName, const TFiller& filler = TFiller())
+ : TBase(fieldName)
+ , Filler(filler) {
+
+ }
+};
+}
diff --git a/ydb/core/formats/arrow/simple_builder/batch.cpp b/ydb/core/formats/arrow/simple_builder/batch.cpp
new file mode 100644
index 00000000000..4a60043a087
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/batch.cpp
@@ -0,0 +1,18 @@
+#include "batch.h"
+
+namespace NKikimr::NArrow {
+
+std::shared_ptr<arrow::RecordBatch> TRecordBatchConstructor::BuildBatch(const ui32 numRows) const {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ for (auto&& i : Builders) {
+ columns.emplace_back(i->BuildArray(numRows));
+ fields.emplace_back(std::make_shared<arrow::Field>(i->GetFieldName(), columns.back()->type()));
+ }
+ auto batch = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), numRows, columns);
+ Y_VERIFY(batch);
+ Y_VERIFY_DEBUG(batch->ValidateFull().ok());
+ return batch;
+}
+
+}
diff --git a/ydb/core/formats/arrow/simple_builder/batch.h b/ydb/core/formats/arrow/simple_builder/batch.h
new file mode 100644
index 00000000000..4dfb3ea0e3f
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/batch.h
@@ -0,0 +1,17 @@
+#pragma once
+#include "array.h"
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NArrow {
+class TRecordBatchConstructor {
+private:
+ const std::vector<IArrayBuilder::TPtr> Builders;
+public:
+ TRecordBatchConstructor(const std::vector<IArrayBuilder::TPtr> builders)
+ : Builders(builders) {
+ Y_VERIFY(Builders.size());
+ }
+
+ std::shared_ptr<arrow::RecordBatch> BuildBatch(const ui32 numRows) const;
+};
+}
diff --git a/ydb/core/formats/arrow/simple_builder/filler.cpp b/ydb/core/formats/arrow/simple_builder/filler.cpp
new file mode 100644
index 00000000000..c4ce2456a57
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/filler.cpp
@@ -0,0 +1,17 @@
+#include "filler.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NArrow {
+
+TStringPoolFiller::TStringPoolFiller(const ui32 poolSize, const ui32 strLen) {
+ for (ui32 i = 0; i < poolSize; ++i) {
+ Data.emplace_back(NUnitTest::RandomString(strLen, i));
+ }
+}
+
+arrow::util::string_view TStringPoolFiller::GetValue(const ui32 idx) const {
+ const TString& str = Data[(2 + 7 * idx) % Data.size()];
+ return arrow::util::string_view(str.data(), str.size());
+}
+
+}
diff --git a/ydb/core/formats/arrow/simple_builder/filler.h b/ydb/core/formats/arrow/simple_builder/filler.h
new file mode 100644
index 00000000000..520192af2fa
--- /dev/null
+++ b/ydb/core/formats/arrow/simple_builder/filler.h
@@ -0,0 +1,64 @@
+#pragma once
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/util/string_view.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
+#include <util/system/types.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NArrow {
+
+template <class TArrowInt>
+class TIntSeqFiller {
+public:
+ using TValue = TArrowInt;
+ typename TArrowInt::c_type GetValue(const ui32 idx) const {
+ return idx;
+ }
+};
+
+class TStringPoolFiller {
+private:
+ std::vector<TString> Data;
+public:
+ using TValue = arrow::StringType;
+ arrow::util::string_view GetValue(const ui32 idx) const;
+
+ TStringPoolFiller(const ui32 poolSize, const ui32 strLen);
+};
+
+template <class TValueExt>
+class TLinearArrayAccessor {
+private:
+ using TArray = typename arrow::TypeTraits<TValueExt>::ArrayType;
+ const TArray& Data;
+public:
+ using TValue = TValueExt;
+ auto GetValue(const ui32 idx) const {
+ return Data.Value(idx);
+ }
+
+ TLinearArrayAccessor(const arrow::Array& data)
+ : Data(static_cast<const TArray&>(data)) {
+ }
+};
+
+template <class TDictionaryValue, class TIndices>
+class TDictionaryArrayAccessor {
+private:
+ using TDictionary = typename arrow::TypeTraits<TDictionaryValue>::ArrayType;
+ const TDictionary& Dictionary;
+ const TIndices& Indices;
+public:
+ using TValue = TDictionaryValue;
+ auto GetValue(const ui32 idx) const {
+ return Dictionary.Value(Indices.Value(idx));
+ }
+
+ TDictionaryArrayAccessor(const TDictionary& dictionary, const TIndices& indices)
+ : Dictionary(dictionary)
+ , Indices(indices)
+ {
+ }
+};
+
+}
diff --git a/ydb/core/formats/arrow/switch/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/switch/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..fb4dc6e1cc9
--- /dev/null
+++ b/ydb/core/formats/arrow/switch/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-switch)
+target_link_libraries(formats-arrow-switch PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-scheme_types
+)
+target_sources(formats-arrow-switch PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp
+)
diff --git a/ydb/core/formats/arrow/switch/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/switch/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..6e6f464e616
--- /dev/null
+++ b/ydb/core/formats/arrow/switch/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-switch)
+target_link_libraries(formats-arrow-switch PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-scheme_types
+)
+target_sources(formats-arrow-switch PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp
+)
diff --git a/ydb/core/formats/arrow/switch/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/switch/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..6e6f464e616
--- /dev/null
+++ b/ydb/core/formats/arrow/switch/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-switch)
+target_link_libraries(formats-arrow-switch PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-scheme_types
+)
+target_sources(formats-arrow-switch PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp
+)
diff --git a/ydb/core/formats/arrow/switch/CMakeLists.txt b/ydb/core/formats/arrow/switch/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/formats/arrow/switch/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/formats/arrow/switch/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/switch/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..fb4dc6e1cc9
--- /dev/null
+++ b/ydb/core/formats/arrow/switch/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-switch)
+target_link_libraries(formats-arrow-switch PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-scheme_types
+)
+target_sources(formats-arrow-switch PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp
+)
diff --git a/ydb/core/formats/arrow/switch/switch_type.cpp b/ydb/core/formats/arrow/switch/switch_type.cpp
new file mode 100644
index 00000000000..b8396151cb4
--- /dev/null
+++ b/ydb/core/formats/arrow/switch/switch_type.cpp
@@ -0,0 +1,5 @@
+#include "switch_type.h"
+
+namespace NKikimr::NArrow {
+
+}
diff --git a/ydb/core/formats/arrow/switch/switch_type.h b/ydb/core/formats/arrow/switch/switch_type.h
new file mode 100644
index 00000000000..2c7986343e9
--- /dev/null
+++ b/ydb/core/formats/arrow/switch/switch_type.h
@@ -0,0 +1,239 @@
+#pragma once
+#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
+#include <util/system/yassert.h>
+#include <ydb/core/scheme_types/scheme_type_info.h>
+#include <ydb/core/scheme/scheme_type_id.h>
+
+namespace NKikimr::NArrow {
+
+template <typename TType>
+struct TTypeWrapper
+{
+ using T = TType;
+};
+
+template <class TResult, TResult defaultValue, typename TFunc, bool EnableNull = false>
+TResult SwitchTypeImpl(arrow::Type::type typeId, TFunc&& f) {
+ switch (typeId) {
+ case arrow::Type::NA: {
+ if constexpr (EnableNull) {
+ return f(TTypeWrapper<arrow::NullType>());
+ }
+ break;
+ }
+ case arrow::Type::BOOL:
+ return f(TTypeWrapper<arrow::BooleanType>());
+ case arrow::Type::UINT8:
+ return f(TTypeWrapper<arrow::UInt8Type>());
+ case arrow::Type::INT8:
+ return f(TTypeWrapper<arrow::Int8Type>());
+ case arrow::Type::UINT16:
+ return f(TTypeWrapper<arrow::UInt16Type>());
+ case arrow::Type::INT16:
+ return f(TTypeWrapper<arrow::Int16Type>());
+ case arrow::Type::UINT32:
+ return f(TTypeWrapper<arrow::UInt32Type>());
+ case arrow::Type::INT32:
+ return f(TTypeWrapper<arrow::Int32Type>());
+ case arrow::Type::UINT64:
+ return f(TTypeWrapper<arrow::UInt64Type>());
+ case arrow::Type::INT64:
+ return f(TTypeWrapper<arrow::Int64Type>());
+ case arrow::Type::HALF_FLOAT:
+ return f(TTypeWrapper<arrow::HalfFloatType>());
+ case arrow::Type::FLOAT:
+ return f(TTypeWrapper<arrow::FloatType>());
+ case arrow::Type::DOUBLE:
+ return f(TTypeWrapper<arrow::DoubleType>());
+ case arrow::Type::STRING:
+ return f(TTypeWrapper<arrow::StringType>());
+ case arrow::Type::BINARY:
+ return f(TTypeWrapper<arrow::BinaryType>());
+ case arrow::Type::FIXED_SIZE_BINARY:
+ return f(TTypeWrapper<arrow::FixedSizeBinaryType>());
+ case arrow::Type::DATE32:
+ return f(TTypeWrapper<arrow::Date32Type>());
+ case arrow::Type::DATE64:
+ return f(TTypeWrapper<arrow::Date64Type>());
+ case arrow::Type::TIMESTAMP:
+ return f(TTypeWrapper<arrow::TimestampType>());
+ case arrow::Type::TIME32:
+ return f(TTypeWrapper<arrow::Time32Type>());
+ case arrow::Type::TIME64:
+ return f(TTypeWrapper<arrow::Time64Type>());
+ case arrow::Type::INTERVAL_MONTHS:
+ return f(TTypeWrapper<arrow::MonthIntervalType>());
+ case arrow::Type::DECIMAL:
+ return f(TTypeWrapper<arrow::Decimal128Type>());
+ case arrow::Type::DURATION:
+ return f(TTypeWrapper<arrow::DurationType>());
+ case arrow::Type::LARGE_STRING:
+ return f(TTypeWrapper<arrow::LargeStringType>());
+ case arrow::Type::LARGE_BINARY:
+ return f(TTypeWrapper<arrow::LargeBinaryType>());
+ case arrow::Type::DECIMAL256:
+ case arrow::Type::DENSE_UNION:
+ case arrow::Type::DICTIONARY:
+ case arrow::Type::EXTENSION:
+ case arrow::Type::FIXED_SIZE_LIST:
+ case arrow::Type::INTERVAL_DAY_TIME:
+ case arrow::Type::LARGE_LIST:
+ case arrow::Type::LIST:
+ case arrow::Type::MAP:
+ case arrow::Type::MAX_ID:
+ case arrow::Type::SPARSE_UNION:
+ case arrow::Type::STRUCT:
+ break;
+ }
+
+ return defaultValue;
+}
+
+template <typename TFunc, bool EnableNull = false>
+bool SwitchType(arrow::Type::type typeId, TFunc&& f) {
+ return SwitchTypeImpl<bool, false, TFunc, EnableNull>(typeId, std::move(f));
+}
+
+template <typename TFunc>
+bool SwitchTypeWithNull(arrow::Type::type typeId, TFunc&& f) {
+ return SwitchType<TFunc, true>(typeId, std::move(f));
+}
+
+template <typename TFunc>
+bool SwitchArrayType(const arrow::Datum& column, TFunc&& f) {
+ auto type = column.type();
+ Y_VERIFY(type);
+ return SwitchType(type->id(), std::forward<TFunc>(f));
+}
+
+/**
+ * @brief Function to switch yql type correctly and uniformly converting it to arrow type using callback
+ *
+ * @tparam TFunc Callback type
+ * @param typeId Type of data callback work with.
+ * @param callback Template function of signature (TTypeWrapper) -> bool
+ * @return Result of execution of callback or false if the type typeId is not supported.
+ */
+template <typename TFunc>
+bool SwitchYqlTypeToArrowType(const NScheme::TTypeInfo& typeInfo, TFunc&& callback) {
+ switch (typeInfo.GetTypeId()) {
+ case NScheme::NTypeIds::Bool:
+ return callback(TTypeWrapper<arrow::BooleanType>());
+ case NScheme::NTypeIds::Int8:
+ return callback(TTypeWrapper<arrow::Int8Type>());
+ case NScheme::NTypeIds::Uint8:
+ return callback(TTypeWrapper<arrow::UInt8Type>());
+ case NScheme::NTypeIds::Int16:
+ return callback(TTypeWrapper<arrow::Int16Type>());
+ case NScheme::NTypeIds::Date:
+ case NScheme::NTypeIds::Uint16:
+ return callback(TTypeWrapper<arrow::UInt16Type>());
+ case NScheme::NTypeIds::Int32:
+ return callback(TTypeWrapper<arrow::Int32Type>());
+ case NScheme::NTypeIds::Datetime:
+ case NScheme::NTypeIds::Uint32:
+ return callback(TTypeWrapper<arrow::UInt32Type>());
+ case NScheme::NTypeIds::Int64:
+ return callback(TTypeWrapper<arrow::Int64Type>());
+ case NScheme::NTypeIds::Uint64:
+ return callback(TTypeWrapper<arrow::UInt64Type>());
+ case NScheme::NTypeIds::Float:
+ return callback(TTypeWrapper<arrow::FloatType>());
+ case NScheme::NTypeIds::Double:
+ return callback(TTypeWrapper<arrow::DoubleType>());
+ case NScheme::NTypeIds::Utf8:
+ return callback(TTypeWrapper<arrow::StringType>());
+ case NScheme::NTypeIds::String:
+ case NScheme::NTypeIds::String4k:
+ case NScheme::NTypeIds::String2m:
+ case NScheme::NTypeIds::Yson:
+ case NScheme::NTypeIds::Json:
+ case NScheme::NTypeIds::DyNumber:
+ case NScheme::NTypeIds::JsonDocument:
+ return callback(TTypeWrapper<arrow::BinaryType>());
+ case NScheme::NTypeIds::Timestamp:
+ return callback(TTypeWrapper<arrow::TimestampType>());
+ case NScheme::NTypeIds::Interval:
+ return callback(TTypeWrapper<arrow::DurationType>());
+ case NScheme::NTypeIds::Decimal:
+ return callback(TTypeWrapper<arrow::Decimal128Type>());
+
+ case NScheme::NTypeIds::PairUi64Ui64:
+ case NScheme::NTypeIds::ActorId:
+ case NScheme::NTypeIds::StepOrderId:
+ break; // Deprecated types
+
+ case NScheme::NTypeIds::Pg:
+ break; // TODO: support pg types
+ }
+ return false;
+}
+
+inline bool IsPrimitiveYqlType(const NScheme::TTypeInfo& typeInfo) {
+ switch (typeInfo.GetTypeId()) {
+ case NScheme::NTypeIds::Int8:
+ case NScheme::NTypeIds::Uint8:
+ case NScheme::NTypeIds::Int16:
+ case NScheme::NTypeIds::Date:
+ case NScheme::NTypeIds::Uint16:
+ case NScheme::NTypeIds::Int32:
+ case NScheme::NTypeIds::Datetime:
+ case NScheme::NTypeIds::Uint32:
+ case NScheme::NTypeIds::Int64:
+ case NScheme::NTypeIds::Uint64:
+ case NScheme::NTypeIds::Float:
+ case NScheme::NTypeIds::Double:
+ case NScheme::NTypeIds::Timestamp:
+ case NScheme::NTypeIds::Interval:
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
+template <typename T>
+bool Append(arrow::ArrayBuilder& builder, const typename T::c_type& value) {
+ using TBuilder = typename arrow::TypeTraits<T>::BuilderType;
+
+ auto status = static_cast<TBuilder&>(builder).Append(value);
+ return status.ok();
+}
+
+template <typename T>
+bool Append(arrow::ArrayBuilder& builder, arrow::util::string_view value) {
+ using TBuilder = typename arrow::TypeTraits<T>::BuilderType;
+
+ auto status = static_cast<TBuilder&>(builder).Append(value);
+ return status.ok();
+}
+
+template <typename T>
+bool Append(arrow::ArrayBuilder& builder, const typename T::c_type* values, size_t size) {
+ using TBuilder = typename arrow::NumericBuilder<T>;
+
+ auto status = static_cast<TBuilder&>(builder).AppendValues(values, size);
+ return status.ok();
+}
+
+template <typename T>
+bool Append(T& builder, const arrow::Array& array, int position) {
+ return SwitchType(array.type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+ using TBuilder = typename arrow::TypeTraits<typename TWrap::T>::BuilderType;
+
+ auto& typedArray = static_cast<const TArray&>(array);
+ auto& typedBuilder = static_cast<TBuilder&>(builder);
+
+ if (typedArray.IsNull(position)) {
+ auto status = typedBuilder.AppendNull();
+ return status.ok();
+ } else {
+ auto status = typedBuilder.Append(typedArray.GetView(position));
+ return status.ok();
+ }
+ });
+}
+
+}
diff --git a/ydb/core/formats/arrow/switch_type.h b/ydb/core/formats/arrow/switch_type.h
index 558589b71ad..1acf90b1bb2 100644
--- a/ydb/core/formats/arrow/switch_type.h
+++ b/ydb/core/formats/arrow/switch_type.h
@@ -1,237 +1,2 @@
#pragma once
-#include <ydb/core/scheme/scheme_tablecell.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
-
-namespace NKikimr::NArrow {
-
-template <typename TType>
-struct TTypeWrapper
-{
- using T = TType;
-};
-
-template <class TResult, TResult defaultValue, typename TFunc, bool EnableNull = false>
-TResult SwitchTypeImpl(arrow::Type::type typeId, TFunc&& f) {
- switch (typeId) {
- case arrow::Type::NA: {
- if constexpr (EnableNull) {
- return f(TTypeWrapper<arrow::NullType>());
- }
- break;
- }
- case arrow::Type::BOOL:
- return f(TTypeWrapper<arrow::BooleanType>());
- case arrow::Type::UINT8:
- return f(TTypeWrapper<arrow::UInt8Type>());
- case arrow::Type::INT8:
- return f(TTypeWrapper<arrow::Int8Type>());
- case arrow::Type::UINT16:
- return f(TTypeWrapper<arrow::UInt16Type>());
- case arrow::Type::INT16:
- return f(TTypeWrapper<arrow::Int16Type>());
- case arrow::Type::UINT32:
- return f(TTypeWrapper<arrow::UInt32Type>());
- case arrow::Type::INT32:
- return f(TTypeWrapper<arrow::Int32Type>());
- case arrow::Type::UINT64:
- return f(TTypeWrapper<arrow::UInt64Type>());
- case arrow::Type::INT64:
- return f(TTypeWrapper<arrow::Int64Type>());
- case arrow::Type::HALF_FLOAT:
- return f(TTypeWrapper<arrow::HalfFloatType>());
- case arrow::Type::FLOAT:
- return f(TTypeWrapper<arrow::FloatType>());
- case arrow::Type::DOUBLE:
- return f(TTypeWrapper<arrow::DoubleType>());
- case arrow::Type::STRING:
- return f(TTypeWrapper<arrow::StringType>());
- case arrow::Type::BINARY:
- return f(TTypeWrapper<arrow::BinaryType>());
- case arrow::Type::FIXED_SIZE_BINARY:
- return f(TTypeWrapper<arrow::FixedSizeBinaryType>());
- case arrow::Type::DATE32:
- return f(TTypeWrapper<arrow::Date32Type>());
- case arrow::Type::DATE64:
- return f(TTypeWrapper<arrow::Date64Type>());
- case arrow::Type::TIMESTAMP:
- return f(TTypeWrapper<arrow::TimestampType>());
- case arrow::Type::TIME32:
- return f(TTypeWrapper<arrow::Time32Type>());
- case arrow::Type::TIME64:
- return f(TTypeWrapper<arrow::Time64Type>());
- case arrow::Type::INTERVAL_MONTHS:
- return f(TTypeWrapper<arrow::MonthIntervalType>());
- case arrow::Type::DECIMAL:
- return f(TTypeWrapper<arrow::Decimal128Type>());
- case arrow::Type::DURATION:
- return f(TTypeWrapper<arrow::DurationType>());
- case arrow::Type::LARGE_STRING:
- return f(TTypeWrapper<arrow::LargeStringType>());
- case arrow::Type::LARGE_BINARY:
- return f(TTypeWrapper<arrow::LargeBinaryType>());
- case arrow::Type::DECIMAL256:
- case arrow::Type::DENSE_UNION:
- case arrow::Type::DICTIONARY:
- case arrow::Type::EXTENSION:
- case arrow::Type::FIXED_SIZE_LIST:
- case arrow::Type::INTERVAL_DAY_TIME:
- case arrow::Type::LARGE_LIST:
- case arrow::Type::LIST:
- case arrow::Type::MAP:
- case arrow::Type::MAX_ID:
- case arrow::Type::SPARSE_UNION:
- case arrow::Type::STRUCT:
- break;
- }
-
- return defaultValue;
-}
-
-template <typename TFunc, bool EnableNull = false>
-bool SwitchType(arrow::Type::type typeId, TFunc&& f) {
- return SwitchTypeImpl<bool, false, TFunc, EnableNull>(typeId, std::move(f));
-}
-
-template <typename TFunc>
-bool SwitchTypeWithNull(arrow::Type::type typeId, TFunc&& f) {
- return SwitchType<TFunc, true>(typeId, std::move(f));
-}
-
-template <typename TFunc>
-bool SwitchArrayType(const arrow::Datum& column, TFunc&& f) {
- auto type = column.type();
- Y_VERIFY(type);
- return SwitchType(type->id(), std::forward<TFunc>(f));
-}
-
-/**
- * @brief Function to switch yql type correctly and uniformly converting it to arrow type using callback
- *
- * @tparam TFunc Callback type
- * @param typeId Type of data callback work with.
- * @param callback Template function of signature (TTypeWrapper) -> bool
- * @return Result of execution of callback or false if the type typeId is not supported.
- */
-template <typename TFunc>
-bool SwitchYqlTypeToArrowType(const NScheme::TTypeInfo& typeInfo, TFunc&& callback) {
- switch (typeInfo.GetTypeId()) {
- case NScheme::NTypeIds::Bool:
- return callback(TTypeWrapper<arrow::BooleanType>());
- case NScheme::NTypeIds::Int8:
- return callback(TTypeWrapper<arrow::Int8Type>());
- case NScheme::NTypeIds::Uint8:
- return callback(TTypeWrapper<arrow::UInt8Type>());
- case NScheme::NTypeIds::Int16:
- return callback(TTypeWrapper<arrow::Int16Type>());
- case NScheme::NTypeIds::Date:
- case NScheme::NTypeIds::Uint16:
- return callback(TTypeWrapper<arrow::UInt16Type>());
- case NScheme::NTypeIds::Int32:
- return callback(TTypeWrapper<arrow::Int32Type>());
- case NScheme::NTypeIds::Datetime:
- case NScheme::NTypeIds::Uint32:
- return callback(TTypeWrapper<arrow::UInt32Type>());
- case NScheme::NTypeIds::Int64:
- return callback(TTypeWrapper<arrow::Int64Type>());
- case NScheme::NTypeIds::Uint64:
- return callback(TTypeWrapper<arrow::UInt64Type>());
- case NScheme::NTypeIds::Float:
- return callback(TTypeWrapper<arrow::FloatType>());
- case NScheme::NTypeIds::Double:
- return callback(TTypeWrapper<arrow::DoubleType>());
- case NScheme::NTypeIds::Utf8:
- return callback(TTypeWrapper<arrow::StringType>());
- case NScheme::NTypeIds::String:
- case NScheme::NTypeIds::String4k:
- case NScheme::NTypeIds::String2m:
- case NScheme::NTypeIds::Yson:
- case NScheme::NTypeIds::Json:
- case NScheme::NTypeIds::DyNumber:
- case NScheme::NTypeIds::JsonDocument:
- return callback(TTypeWrapper<arrow::BinaryType>());
- case NScheme::NTypeIds::Timestamp:
- return callback(TTypeWrapper<arrow::TimestampType>());
- case NScheme::NTypeIds::Interval:
- return callback(TTypeWrapper<arrow::DurationType>());
- case NScheme::NTypeIds::Decimal:
- return callback(TTypeWrapper<arrow::Decimal128Type>());
-
- case NScheme::NTypeIds::PairUi64Ui64:
- case NScheme::NTypeIds::ActorId:
- case NScheme::NTypeIds::StepOrderId:
- break; // Deprecated types
-
- case NScheme::NTypeIds::Pg:
- break; // TODO: support pg types
- }
- return false;
-}
-
-inline bool IsPrimitiveYqlType(const NScheme::TTypeInfo& typeInfo) {
- switch (typeInfo.GetTypeId()) {
- case NScheme::NTypeIds::Int8:
- case NScheme::NTypeIds::Uint8:
- case NScheme::NTypeIds::Int16:
- case NScheme::NTypeIds::Date:
- case NScheme::NTypeIds::Uint16:
- case NScheme::NTypeIds::Int32:
- case NScheme::NTypeIds::Datetime:
- case NScheme::NTypeIds::Uint32:
- case NScheme::NTypeIds::Int64:
- case NScheme::NTypeIds::Uint64:
- case NScheme::NTypeIds::Float:
- case NScheme::NTypeIds::Double:
- case NScheme::NTypeIds::Timestamp:
- case NScheme::NTypeIds::Interval:
- return true;
- default:
- break;
- }
- return false;
-}
-
-template <typename T>
-bool Append(arrow::ArrayBuilder& builder, const typename T::c_type& value) {
- using TBuilder = typename arrow::TypeTraits<T>::BuilderType;
-
- auto status = static_cast<TBuilder&>(builder).Append(value);
- return status.ok();
-}
-
-template <typename T>
-bool Append(arrow::ArrayBuilder& builder, arrow::util::string_view value) {
- using TBuilder = typename arrow::TypeTraits<T>::BuilderType;
-
- auto status = static_cast<TBuilder&>(builder).Append(value);
- return status.ok();
-}
-
-template <typename T>
-bool Append(arrow::ArrayBuilder& builder, const typename T::c_type* values, size_t size) {
- using TBuilder = typename arrow::NumericBuilder<T>;
-
- auto status = static_cast<TBuilder&>(builder).AppendValues(values, size);
- return status.ok();
-}
-
-template <typename T>
-bool Append(T& builder, const arrow::Array& array, int position) {
- return SwitchType(array.type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
- using TBuilder = typename arrow::TypeTraits<typename TWrap::T>::BuilderType;
-
- auto& typedArray = static_cast<const TArray&>(array);
- auto& typedBuilder = static_cast<TBuilder&>(builder);
-
- if (typedArray.IsNull(position)) {
- auto status = typedBuilder.AppendNull();
- return status.ok();
- } else {
- auto status = typedBuilder.Append(typedArray.GetView(position));
- return status.ok();
- }
- });
-}
-
-}
+#include "switch/switch_type.h"
diff --git a/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt
index e63852f457f..f77bf86c419 100644
--- a/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt
@@ -38,6 +38,7 @@ target_link_options(ydb-core-formats-arrow-ut PRIVATE
target_sources(ydb-core-formats-arrow-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp
)
set_property(
TARGET
diff --git a/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt
index b396886cf0c..ad981e1dca2 100644
--- a/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt
@@ -41,6 +41,7 @@ target_link_options(ydb-core-formats-arrow-ut PRIVATE
target_sources(ydb-core-formats-arrow-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp
)
set_property(
TARGET
diff --git a/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt
index 722652c673a..de6ae69716f 100644
--- a/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt
@@ -42,6 +42,7 @@ target_link_options(ydb-core-formats-arrow-ut PRIVATE
target_sources(ydb-core-formats-arrow-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp
)
set_property(
TARGET
diff --git a/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt
index 937feb4f7bd..bcbbd96108b 100644
--- a/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt
@@ -31,6 +31,7 @@ target_link_libraries(ydb-core-formats-arrow-ut PUBLIC
target_sources(ydb-core-formats-arrow-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp
)
set_property(
TARGET
diff --git a/ydb/core/formats/arrow/ut/ut_dictionary.cpp b/ydb/core/formats/arrow/ut/ut_dictionary.cpp
new file mode 100644
index 00000000000..f90efa8c709
--- /dev/null
+++ b/ydb/core/formats/arrow/ut/ut_dictionary.cpp
@@ -0,0 +1,89 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/serializer/batch_only.h>
+#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/simple_builder/array.h>
+#include <ydb/core/formats/arrow/simple_builder/batch.h>
+#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/formats/arrow/dictionary/conversion.h>
+
+Y_UNIT_TEST_SUITE(Dictionary) {
+
+ using namespace NKikimr::NArrow;
+
+ ui64 Test(IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) {
+ std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize);
+ const TString data = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch);
+ auto deserializedBatch = *NKikimr::NArrow::NSerialization::TFullDataDeserializer().Deserialize(data);
+ Y_VERIFY(!!deserializedBatch);
+ auto originalBatchTransformed = DictionaryToArray(batch);
+ auto roundBatchTransformed = DictionaryToArray(deserializedBatch);
+ const TString roundUnpacked = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed);
+ const TString roundTransformed = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed);
+ Y_VERIFY(roundBatchTransformed->num_rows() == originalBatchTransformed->num_rows());
+ Y_VERIFY(roundUnpacked == roundTransformed);
+ return data.size();
+ }
+
+ Y_UNIT_TEST(Simple) {
+ const std::vector<arrow::Compression::type> codecs = { arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, };
+ for (auto&& codec : codecs) {
+ arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults();
+ options.codec = *arrow::util::Codec::Create(codec);
+ Cerr << (options.codec ? options.codec->name() : "NO_CODEC") << Endl;
+ for (auto bSize : { 100000 }) {
+ Cerr << "--" << bSize << Endl;
+ for (auto pSize : { 1, 16, 64, 128, 512, 1024 }) {
+ Cerr << "----" << pSize << Endl;
+ for (auto&& strLen : { 1, 10, 16, 32, 64 }) {
+ Cerr << "------" << strLen << Endl;
+ ui64 bytesDict;
+ ui64 bytesRaw;
+ {
+ IArrayBuilder::TPtr column = std::make_shared<TDictionaryArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen));
+ bytesDict = Test(column, options, bSize);
+ }
+ {
+ IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen));
+ bytesRaw = Test(column, options, bSize);
+ }
+ Cerr << "--------" << bytesDict << " / " << bytesRaw << " = " << 1.0 * bytesDict / bytesRaw << Endl;
+ }
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(ComparePayloadAndFull) {
+ const std::vector<arrow::Compression::type> codecs = { arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, };
+ for (auto&& codec : codecs) {
+ arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults();
+ options.codec = *arrow::util::Codec::Create(codec);
+ Cerr << (options.codec ? options.codec->name() : "NO_CODEC") << Endl;
+ for (auto bSize : { 1000, 10000, 100000 }) {
+ Cerr << "--" << bSize << Endl;
+ for (auto pSize : { 1, 16, 64, 128, 512, 1024 }) {
+ Cerr << "----" << pSize << Endl;
+ for (auto&& strLen : { 1, 10, 16, 32, 64 }) {
+ Cerr << "------" << strLen << Endl;
+ ui64 bytesFull;
+ ui64 bytesPayload;
+ {
+ IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen));
+ std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize);
+ const TString dataFull = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch);
+ const TString dataPayload = NKikimr::NArrow::NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
+ bytesFull = dataFull.size();
+ bytesPayload = dataPayload.size();
+ }
+ const double fraq = 1 - 1.0 * bytesPayload / bytesFull;
+ Cerr << "--------" << bytesPayload << " / " << bytesFull << " = " << 100 * fraq << "%" << Endl;
+ UNIT_ASSERT(fraq * 100 < 3);
+ }
+ }
+ }
+ }
+ }
+
+
+};
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp
index a7baa934b71..a00eef8cde9 100644
--- a/ydb/core/io_formats/csv_arrow.cpp
+++ b/ydb/core/io_formats/csv_arrow.cpp
@@ -162,7 +162,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
return {};
}
- auto buffer = std::make_shared<NArrow::TBufferOverString>(csv);
+ auto buffer = std::make_shared<arrow::Buffer>((const ui8*)csv.data(), csv.size());
auto input = std::make_shared<arrow::io::BufferReader>(buffer);
auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input,
ReadOptions, ParseOptions, ConvertOptions);
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 4282fe13218..151969d55c4 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -367,6 +367,7 @@ enum EServiceKikimr {
EXT_INDEX = 1900;
TX_CONVEYOR = 2000;
+ ARROW_HELPER = 2100;
};
message TActivity {