diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-18 07:43:15 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-18 07:43:15 +0300 |
commit | 5654ec350abd6e2df2567ab710b064c4ef5595f5 (patch) | |
tree | a34352e1b01c9534b928e92c2ca4d1a1a16f468f | |
parent | fe084621cceabfb6c26d7d09d46d877396d3ff6f (diff) | |
download | ydb-5654ec350abd6e2df2567ab710b064c4ef5595f5.tar.gz |
additional helpers and tests for serializers and dict convertions
60 files changed, 1517 insertions, 328 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt index 225e2097297..8bfa7dc43ee 100644 --- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,11 @@ # original buildsystem will not be accepted. +add_subdirectory(common) +add_subdirectory(dictionary) +add_subdirectory(serializer) +add_subdirectory(simple_builder) +add_subdirectory(switch) add_subdirectory(ut) add_library(core-formats-arrow) @@ -20,6 +25,9 @@ target_link_libraries(core-formats-arrow PUBLIC yutil libs-apache-arrow ydb-core-scheme + formats-arrow-serializer + formats-arrow-simple_builder + formats-arrow-dictionary ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt index 103005acef6..2f7c6515330 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt @@ -6,6 +6,11 @@ # original buildsystem will not be accepted. +add_subdirectory(common) +add_subdirectory(dictionary) +add_subdirectory(serializer) +add_subdirectory(simple_builder) +add_subdirectory(switch) add_subdirectory(ut) add_library(core-formats-arrow) @@ -21,6 +26,9 @@ target_link_libraries(core-formats-arrow PUBLIC yutil libs-apache-arrow ydb-core-scheme + formats-arrow-serializer + formats-arrow-simple_builder + formats-arrow-dictionary ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt index 103005acef6..2f7c6515330 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt @@ -6,6 +6,11 @@ # original buildsystem will not be accepted. +add_subdirectory(common) +add_subdirectory(dictionary) +add_subdirectory(serializer) +add_subdirectory(simple_builder) +add_subdirectory(switch) add_subdirectory(ut) add_library(core-formats-arrow) @@ -21,6 +26,9 @@ target_link_libraries(core-formats-arrow PUBLIC yutil libs-apache-arrow ydb-core-scheme + formats-arrow-serializer + formats-arrow-simple_builder + formats-arrow-dictionary ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt index 26821f7aa13..23061c5821d 100644 --- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt @@ -6,6 +6,11 @@ # original buildsystem will not be accepted. +add_subdirectory(common) +add_subdirectory(dictionary) +add_subdirectory(serializer) +add_subdirectory(simple_builder) +add_subdirectory(switch) add_subdirectory(ut) add_library(core-formats-arrow) @@ -21,6 +26,9 @@ target_link_libraries(core-formats-arrow PUBLIC yutil libs-apache-arrow ydb-core-scheme + formats-arrow-serializer + formats-arrow-simple_builder + formats-arrow-dictionary ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 00b9ee850cf..882a4811659 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -1,7 +1,10 @@ #include "arrow_helpers.h" #include "switch_type.h" #include "one_batch_input_stream.h" +#include "common/validation.h" #include "merging_sorted_input_stream.h" +#include "serializer/batch_only.h" +#include "serializer/abstract.h" #include <ydb/core/util/yverify_stream.h> #include <util/system/yassert.h> @@ -124,15 +127,12 @@ std::shared_ptr<arrow::Schema> MakeArrowSchema(const std::vector<std::pair<TStri } TString SerializeSchema(const arrow::Schema& schema) { - auto buffer = arrow::ipc::SerializeSchema(schema); - if (!buffer.ok()) { - return {}; - } - return TString((const char*)(*buffer)->data(), (*buffer)->size()); + auto buffer = TStatusValidator::GetValid(arrow::ipc::SerializeSchema(schema)); + return buffer->ToString(); } std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) { - std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(str)); + auto buffer = std::make_shared<arrow::Buffer>((const ui8*)str.data(), str.size()); arrow::io::BufferReader reader(buffer); arrow::ipc::DictionaryMemo dictMemo; auto schema = ReadSchema(&reader, &dictMemo); @@ -142,67 +142,8 @@ std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) { return *schema; } -namespace { - class TFixedStringOutputStream final : public arrow::io::OutputStream { - public: - TFixedStringOutputStream(TString* out) - : Out(out) - , Position(0) - { } - - arrow::Status Close() override { - Out = nullptr; - return arrow::Status::OK(); - } - - bool closed() const override { - return Out == nullptr; - } - - arrow::Result<int64_t> Tell() const override { - return Position; - } - - arrow::Status Write(const void* data, int64_t nbytes) override { - if (Y_LIKELY(nbytes > 0)) { - Y_VERIFY(Out && Out->size() - Position >= ui64(nbytes)); - char* dst = &(*Out)[Position]; - ::memcpy(dst, data, nbytes); - Position += nbytes; - } - - return arrow::Status::OK(); - } - - size_t GetPosition() const { - return Position; - } - - private: - TString* Out; - size_t Position; - }; -} - TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options) { - arrow::ipc::IpcPayload payload; - auto status = arrow::ipc::GetRecordBatchPayload(*batch, options, &payload); - Y_VERIFY_OK(status); - - int32_t metadata_length = 0; - arrow::io::MockOutputStream mock; - status = arrow::ipc::WriteIpcPayload(payload, options, &mock, &metadata_length); - Y_VERIFY_OK(status); - - TString str; - str.resize(mock.GetExtentBytesWritten()); - - TFixedStringOutputStream out(&str); - status = arrow::ipc::WriteIpcPayload(payload, options, &out, &metadata_length); - Y_VERIFY_OK(status); - Y_VERIFY(out.GetPosition() == str.size()); - - return str; + return NSerialization::TBatchPayloadSerializer(options).Serialize(batch); } TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch) { @@ -211,18 +152,14 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b return SerializeBatch(batch, writeOptions); } -std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema) { - arrow::ipc::DictionaryMemo dictMemo; - auto options = arrow::ipc::IpcReadOptions::Defaults(); - options.use_threads = false; - - std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(blob)); - arrow::io::BufferReader reader(buffer); - auto batch = ReadRecordBatch(schema, &dictMemo, options, &reader); - if (!batch.ok() || !(*batch)->Validate().ok()) { - return {}; +std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema) +{ + auto result = NSerialization::TBatchPayloadDeserializer(schema).Deserialize(blob); + if (result.ok()) { + return *result; + } else { + return nullptr; } - return *batch; } std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema, const ui32 rowsCount) { diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index 91f3731e66b..55dbde33b24 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -17,20 +17,6 @@ class TReplaceKeyTemplate; using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>; using TRawReplaceKey = TReplaceKeyTemplate<const TArrayVec*>; -// Arrow inrernally keeps references to Buffer objects with the data -// This helper class implements arrow::Buffer over TString that owns -// the actual memory -class TBufferOverString : public arrow::Buffer { - TString Str; -public: - explicit TBufferOverString(TString str) - : arrow::Buffer((const unsigned char*)str.data(), str.size()) - , Str(str) - { - Y_VERIFY(data() == (const unsigned char*)Str.data()); - } -}; - std::shared_ptr<arrow::DataType> GetArrowType(NScheme::TTypeInfo typeInfo); std::shared_ptr<arrow::DataType> GetCSVArrowType(NScheme::TTypeInfo typeId); diff --git a/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..757b7290185 --- /dev/null +++ b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-common) +target_link_libraries(formats-arrow-common PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp +) diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..c351a86aff4 --- /dev/null +++ b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-common) +target_link_libraries(formats-arrow-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp +) diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..c351a86aff4 --- /dev/null +++ b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-common) +target_link_libraries(formats-arrow-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp +) diff --git a/ydb/core/formats/arrow/common/CMakeLists.txt b/ydb/core/formats/arrow/common/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/formats/arrow/common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..757b7290185 --- /dev/null +++ b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-common) +target_link_libraries(formats-arrow-common PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp +) diff --git a/ydb/core/formats/arrow/common/validation.cpp b/ydb/core/formats/arrow/common/validation.cpp new file mode 100644 index 00000000000..ca069631e7c --- /dev/null +++ b/ydb/core/formats/arrow/common/validation.cpp @@ -0,0 +1,5 @@ +#include "validation.h" + +namespace NKikimr::NArrow { + +} diff --git a/ydb/core/formats/arrow/common/validation.h b/ydb/core/formats/arrow/common/validation.h new file mode 100644 index 00000000000..44e8cc19ed9 --- /dev/null +++ b/ydb/core/formats/arrow/common/validation.h @@ -0,0 +1,28 @@ +#pragma once + +#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <util/system/yassert.h> + +namespace NKikimr::NArrow { + +class TStatusValidator { +public: + static void Validate(const arrow::Status& status) { + Y_VERIFY(status.ok(), "%s", status.ToString().c_str()); + } + + template <class T> + static T GetValid(const arrow::Result<T>& result) { + Y_VERIFY(result.ok(), "%s", result.status().ToString().c_str()); + return *result; + } + + template <class T> + static T GetValid(arrow::Result<T>&& result) { + Y_VERIFY(result.ok(), "%s", result.status().ToString().c_str()); + return std::move(*result); + } +}; + +} diff --git a/ydb/core/formats/arrow/converter.cpp b/ydb/core/formats/arrow/converter.cpp index f34ad2eaac8..df99fbc05a3 100644 --- a/ydb/core/formats/arrow/converter.cpp +++ b/ydb/core/formats/arrow/converter.cpp @@ -76,6 +76,10 @@ static bool ConvertColumn(const NScheme::TTypeInfo colType, std::shared_ptr<arro case NScheme::NTypeIds::JsonDocument: { for (i32 i = 0; i < binaryArray.length(); ++i) { auto value = binaryArray.Value(i); + if (!value.size()) { + Y_VERIFY(builder.AppendNull().ok()); + continue; + } const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size())); if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) { return false; diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..b78423781c4 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-dictionary) +target_link_libraries(formats-arrow-dictionary PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch +) +target_sources(formats-arrow-dictionary PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp +) diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..f3f3651f3e7 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-dictionary) +target_link_libraries(formats-arrow-dictionary PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch +) +target_sources(formats-arrow-dictionary PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp +) diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..f3f3651f3e7 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-dictionary) +target_link_libraries(formats-arrow-dictionary PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch +) +target_sources(formats-arrow-dictionary PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp +) diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..b78423781c4 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-dictionary) +target_link_libraries(formats-arrow-dictionary PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch +) +target_sources(formats-arrow-dictionary PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp +) diff --git a/ydb/core/formats/arrow/dictionary/conversion.cpp b/ydb/core/formats/arrow/dictionary/conversion.cpp new file mode 100644 index 00000000000..8b8ed9aa2e3 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/conversion.cpp @@ -0,0 +1,106 @@ +#include "conversion.h" +#include <ydb/core/formats/arrow/switch/switch_type.h> +#include <ydb/core/formats/arrow/simple_builder/filler.h> +#include <ydb/core/formats/arrow/simple_builder/array.h> + +namespace NKikimr::NArrow { + +std::shared_ptr<arrow::Array> DictionaryToArray(const std::shared_ptr<arrow::DictionaryArray>& data) { + Y_VERIFY(data); + return DictionaryToArray(*data); +} + +std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& data) { + std::shared_ptr<arrow::Array> result; + SwitchType(data.dictionary()->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TDictionaryValue = typename TWrap::T; + using TDictionary = typename arrow::TypeTraits<TDictionaryValue>::ArrayType; + constexpr bool noParams = arrow::TypeTraits<TDictionaryValue>::is_parameter_free; + if constexpr (!noParams) { + Y_VERIFY(false); + return true; + } + if constexpr (noParams) { + auto& columnDictionary = static_cast<const TDictionary&>(*data.dictionary()); + SwitchType(data.indices()->type_id(), [&](const auto& type) { + using TWrapIndices = std::decay_t<decltype(type)>; + constexpr bool hasCType = arrow::has_c_type<typename TWrapIndices::T>::value; + if constexpr (hasCType) { + constexpr bool indicesIntegral = std::is_integral<typename TWrapIndices::T::c_type>::value; + if constexpr (indicesIntegral && hasCType) { + using TIndices = typename arrow::TypeTraits<typename TWrapIndices::T>::ArrayType; + using TDictionaryAccessor = TDictionaryArrayAccessor<TDictionaryValue, TIndices>; + auto& columnIndices = static_cast<const TIndices&>(*data.indices()); + result = TSimpleArrayConstructor<TDictionaryAccessor>("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length()); + return true; + } + } + Y_VERIFY(false); + return true; + }); + } + return true; + }); + Y_VERIFY(result); + return result; +} + +std::shared_ptr<arrow::RecordBatch> DictionaryToArray(const std::shared_ptr<arrow::RecordBatch>& data) { + std::vector<std::shared_ptr<arrow::Field>> fields; + bool hasDict = false; + for (auto&& f : data->schema()->fields()) { + if (f->type()->id() == arrow::Type::DICTIONARY) { + auto& dType = static_cast<const arrow::DictionaryType&>(*f->type()); + fields.emplace_back(std::make_shared<arrow::Field>(f->name(), dType.value_type())); + hasDict = true; + } else { + fields.emplace_back(f); + } + } + if (!hasDict) { + return data; + } + std::vector<std::shared_ptr<arrow::Array>> columns; + for (auto&& c : data->columns()) { + if (c->type_id() == arrow::Type::DICTIONARY) { + auto& dColumn = static_cast<const arrow::DictionaryArray&>(*c); + columns.emplace_back(DictionaryToArray(dColumn)); + } else { + columns.emplace_back(c); + } + } + std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields); + return arrow::RecordBatch::Make(schema, data->num_rows(), columns); +} + +std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<arrow::Array>& data) { + Y_VERIFY(IsDictionableArray(data)); + std::shared_ptr<arrow::DictionaryArray> result; + SwitchType(data->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + if constexpr (arrow::has_string_view<typename TWrap::T>::value && arrow::TypeTraits<typename TWrap::T>::is_parameter_free) { + auto resultArray = TDictionaryArrayConstructor<TLinearArrayAccessor<typename TWrap::T>>("absent", *data).BuildArray(data->length()); + Y_VERIFY(resultArray->type()->id() == arrow::Type::DICTIONARY); + result = static_pointer_cast<arrow::DictionaryArray>(resultArray); + } else { + Y_VERIFY(false); + } + return true; + }); + Y_VERIFY(result); + return result; +} + +bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data) { + Y_VERIFY(data); + bool result = false; + SwitchType(data->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + result = arrow::has_c_type<typename TWrap::T>::value; + return true; + }); + return result; +} + +} diff --git a/ydb/core/formats/arrow/dictionary/conversion.h b/ydb/core/formats/arrow/dictionary/conversion.h new file mode 100644 index 00000000000..aab2def356e --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/conversion.h @@ -0,0 +1,14 @@ +#pragma once +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_dict.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NArrow { + +bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data); +std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<arrow::Array>& data); +std::shared_ptr<arrow::Array> DictionaryToArray(const std::shared_ptr<arrow::DictionaryArray>& data); +std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& data); +std::shared_ptr<arrow::RecordBatch> DictionaryToArray(const std::shared_ptr<arrow::RecordBatch>& data); + +} diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..9ac14ecc710 --- /dev/null +++ b/ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-serializer) +target_link_libraries(formats-arrow-serializer PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-common + cpp-actors-core + ydb-core-protos +) +target_sources(formats-arrow-serializer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp +) diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..f5c1649ec6f --- /dev/null +++ b/ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-serializer) +target_link_libraries(formats-arrow-serializer PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-common + cpp-actors-core + ydb-core-protos +) +target_sources(formats-arrow-serializer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp +) diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..f5c1649ec6f --- /dev/null +++ b/ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-serializer) +target_link_libraries(formats-arrow-serializer PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-common + cpp-actors-core + ydb-core-protos +) +target_sources(formats-arrow-serializer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp +) diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.txt b/ydb/core/formats/arrow/serializer/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/formats/arrow/serializer/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/formats/arrow/serializer/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/serializer/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..9ac14ecc710 --- /dev/null +++ b/ydb/core/formats/arrow/serializer/CMakeLists.windows-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-serializer) +target_link_libraries(formats-arrow-serializer PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-common + cpp-actors-core + ydb-core-protos +) +target_sources(formats-arrow-serializer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/full.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/batch_only.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/serializer/stream.cpp +) diff --git a/ydb/core/formats/arrow/serializer/abstract.cpp b/ydb/core/formats/arrow/serializer/abstract.cpp new file mode 100644 index 00000000000..f893a2f063b --- /dev/null +++ b/ydb/core/formats/arrow/serializer/abstract.cpp @@ -0,0 +1,4 @@ +#include "abstract.h" +namespace NKikimr::NArrow::NSerialization { + +} diff --git a/ydb/core/formats/arrow/serializer/abstract.h b/ydb/core/formats/arrow/serializer/abstract.h new file mode 100644 index 00000000000..7fff5a6d29f --- /dev/null +++ b/ydb/core/formats/arrow/serializer/abstract.h @@ -0,0 +1,33 @@ +#pragma once + +#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <util/generic/string.h> + +namespace NKikimr::NArrow::NSerialization { + +class ISerializer { +protected: + virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; +public: + using TPtr = std::shared_ptr<ISerializer>; + virtual ~ISerializer() = default; + + TString Serialize(const std::shared_ptr<arrow::RecordBatch>& batch) const { + return DoSerialize(batch); + } +}; + +class IDeserializer { +protected: + virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const = 0; +public: + using TPtr = std::shared_ptr<IDeserializer>; + virtual ~IDeserializer() = default; + + arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const { + return DoDeserialize(data); + } +}; + +} diff --git a/ydb/core/formats/arrow/serializer/batch_only.cpp b/ydb/core/formats/arrow/serializer/batch_only.cpp new file mode 100644 index 00000000000..85733ee09aa --- /dev/null +++ b/ydb/core/formats/arrow/serializer/batch_only.cpp @@ -0,0 +1,70 @@ +#include "batch_only.h" +#include "stream.h" +#include <ydb/core/formats/arrow/common/validation.h> +#include <ydb/core/protos/services.pb.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/dictionary.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h> +#include <library/cpp/actors/core/log.h> +namespace NKikimr::NArrow::NSerialization { + +namespace { +// Arrow internally keeps references to Buffer objects with the data +// This helper class implements arrow::Buffer over TString that owns +// the actual memory +class TBufferOverString: public arrow::Buffer { + TString Str; +public: + explicit TBufferOverString(TString str) + : arrow::Buffer((const unsigned char*)str.data(), str.size()) + , Str(str) { + Y_VERIFY(data() == (const unsigned char*)Str.data()); + } +}; +} + +arrow::Result<std::shared_ptr<arrow::RecordBatch>> TBatchPayloadDeserializer::DoDeserialize(const TString& data) const { + arrow::ipc::DictionaryMemo dictMemo; + auto options = arrow::ipc::IpcReadOptions::Defaults(); + options.use_threads = false; + + std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data)); + arrow::io::BufferReader reader(buffer); + AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "parsing")("size", data.size())("columns", Schema->num_fields()); + auto batchResult = arrow::ipc::ReadRecordBatch(Schema, &dictMemo, options, &reader); + if (!batchResult.ok()) { + return batchResult; + } + std::shared_ptr<arrow::RecordBatch> batch = *batchResult; + if (!batch) { + return arrow::Status(arrow::StatusCode::SerializationError, "empty batch"); + } + auto validation = batch->Validate(); + if (!validation.ok()) { + return arrow::Status(arrow::StatusCode::SerializationError, "batch is not valid: " + validation.ToString()); + } + return batch; +} + +TString TBatchPayloadSerializer::DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const { + arrow::ipc::IpcPayload payload; + TStatusValidator::Validate(arrow::ipc::GetRecordBatchPayload(*batch, Options, &payload)); + + int32_t metadata_length = 0; + arrow::io::MockOutputStream mock; + TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &mock, &metadata_length)); + + TString str; + str.resize(mock.GetExtentBytesWritten()); + + TFixedStringOutputStream out(&str); + TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length)); + Y_VERIFY(out.GetPosition() == str.size()); + Y_VERIFY_DEBUG(TBatchPayloadDeserializer(batch->schema()).Deserialize(str).ok()); + AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields()); + return str; +} + +} diff --git a/ydb/core/formats/arrow/serializer/batch_only.h b/ydb/core/formats/arrow/serializer/batch_only.h new file mode 100644 index 00000000000..5771202d7af --- /dev/null +++ b/ydb/core/formats/arrow/serializer/batch_only.h @@ -0,0 +1,32 @@ +#pragma once +#include "abstract.h" +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> + +namespace NKikimr::NArrow::NSerialization { + +class TBatchPayloadSerializer: public ISerializer { +private: + const arrow::ipc::IpcWriteOptions Options; +protected: + virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override; +public: + TBatchPayloadSerializer(const arrow::ipc::IpcWriteOptions& options) + : Options(options) { + + } +}; + +class TBatchPayloadDeserializer: public IDeserializer { +private: + const std::shared_ptr<arrow::Schema> Schema; +protected: + virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const override; +public: + TBatchPayloadDeserializer(const std::shared_ptr<arrow::Schema> schema) + : Schema(schema) { + + } +}; + +} diff --git a/ydb/core/formats/arrow/serializer/full.cpp b/ydb/core/formats/arrow/serializer/full.cpp new file mode 100644 index 00000000000..c5424a856b0 --- /dev/null +++ b/ydb/core/formats/arrow/serializer/full.cpp @@ -0,0 +1,55 @@ +#include "full.h" +#include "stream.h" +#include <ydb/core/formats/arrow/dictionary/conversion.h> +#include <ydb/core/formats/arrow/common/validation.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/dictionary.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> +namespace NKikimr::NArrow::NSerialization { + +arrow::Result<std::shared_ptr<arrow::RecordBatch>> TFullDataDeserializer::DoDeserialize(const TString& data) const { + arrow::ipc::DictionaryMemo dictMemo; + auto options = arrow::ipc::IpcReadOptions::Defaults(); + options.use_threads = false; + + arrow::Buffer buffer((const ui8*)data.data(), data.size()); + arrow::io::BufferReader bufferReader(buffer); + auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&bufferReader)); + + std::shared_ptr<arrow::RecordBatch> batch; + auto readResult = reader->ReadNext(&batch); + if (!readResult.ok()) { + return readResult; + } + if (!batch) { + return arrow::Status(arrow::StatusCode::SerializationError, "null batch"); + } + auto validation = batch->Validate(); + if (!validation.ok()) { + return arrow::Status(arrow::StatusCode::SerializationError, "validation error: " + validation.ToString()); + } + return batch; +} + +TString TFullDataSerializer::DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const { + arrow::io::MockOutputStream mock; + { + auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&mock, batch->schema(), Options)); + TStatusValidator::Validate(writer->WriteRecordBatch(*batch)); + TStatusValidator::Validate(writer->Close()); + } + TString result; + result.resize(mock.GetExtentBytesWritten()); + { + TFixedStringOutputStream stream(&result); + auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&stream, batch->schema(), Options)); + TStatusValidator::Validate(writer->WriteRecordBatch(*batch)); + TStatusValidator::Validate(writer->Close()); + Y_VERIFY(stream.GetPosition() == result.size()); + } + return result; +} + +} diff --git a/ydb/core/formats/arrow/serializer/full.h b/ydb/core/formats/arrow/serializer/full.h new file mode 100644 index 00000000000..7fa063f107b --- /dev/null +++ b/ydb/core/formats/arrow/serializer/full.h @@ -0,0 +1,31 @@ +#pragma once + +#include "abstract.h" +#include <ydb/library/accessor/accessor.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NArrow::NSerialization { + +class TFullDataSerializer: public ISerializer { +private: + const arrow::ipc::IpcWriteOptions Options; +protected: + virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override; +public: + TFullDataSerializer(const arrow::ipc::IpcWriteOptions& options) + : Options(options) { + + } +}; + +class TFullDataDeserializer: public IDeserializer { +protected: + virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const override; +public: + TFullDataDeserializer() { + + } +}; + +} diff --git a/ydb/core/formats/arrow/serializer/stream.cpp b/ydb/core/formats/arrow/serializer/stream.cpp new file mode 100644 index 00000000000..04473379b97 --- /dev/null +++ b/ydb/core/formats/arrow/serializer/stream.cpp @@ -0,0 +1,20 @@ +#include "stream.h" +namespace NKikimr::NArrow { + +arrow::Status NSerialization::TFixedStringOutputStream::Write(const void* data, int64_t nbytes) { + if (Y_LIKELY(nbytes > 0)) { + Y_VERIFY(Out && Out->size() - Position >= ui64(nbytes)); + char* dst = &(*Out)[Position]; + ::memcpy(dst, data, nbytes); + Position += nbytes; + } + + return arrow::Status::OK(); +} + +arrow::Status NSerialization::TFixedStringOutputStream::Close() { + Out = nullptr; + return arrow::Status::OK(); +} + +} diff --git a/ydb/core/formats/arrow/serializer/stream.h b/ydb/core/formats/arrow/serializer/stream.h new file mode 100644 index 00000000000..424e183f19a --- /dev/null +++ b/ydb/core/formats/arrow/serializer/stream.h @@ -0,0 +1,37 @@ +#pragma once +#include <contrib/libs/apache/arrow/cpp/src/arrow/io/interfaces.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h> +#include <util/generic/string.h> + +namespace NKikimr::NArrow::NSerialization { + +class TFixedStringOutputStream final: public arrow::io::OutputStream { +public: + TFixedStringOutputStream(TString* out) + : Out(out) + , Position(0) { + } + + arrow::Status Close() override; + + bool closed() const override { + return Out == nullptr; + } + + arrow::Result<int64_t> Tell() const override { + return Position; + } + + arrow::Status Write(const void* data, int64_t nbytes) override; + + size_t GetPosition() const { + return Position; + } + +private: + TString* Out; + size_t Position; +}; + +} diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..47b66ad10ae --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-simple_builder) +target_link_libraries(formats-arrow-simple_builder PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-simple_builder PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp +) diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..3623d31eb52 --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-simple_builder) +target_link_libraries(formats-arrow-simple_builder PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-simple_builder PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp +) diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..3623d31eb52 --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-simple_builder) +target_link_libraries(formats-arrow-simple_builder PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-simple_builder PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp +) diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/formats/arrow/simple_builder/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/simple_builder/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..47b66ad10ae --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-simple_builder) +target_link_libraries(formats-arrow-simple_builder PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow +) +target_sources(formats-arrow-simple_builder PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/filler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/array.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/simple_builder/batch.cpp +) diff --git a/ydb/core/formats/arrow/simple_builder/array.cpp b/ydb/core/formats/arrow/simple_builder/array.cpp new file mode 100644 index 00000000000..4c4ffca6c1d --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/array.cpp @@ -0,0 +1,5 @@ +#include "array.h" + +namespace NKikimr::NArrow { + +} diff --git a/ydb/core/formats/arrow/simple_builder/array.h b/ydb/core/formats/arrow/simple_builder/array.h new file mode 100644 index 00000000000..7f823787590 --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/array.h @@ -0,0 +1,72 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_dict.h> +#include <util/generic/string.h> + +namespace NKikimr::NArrow { + +class IArrayBuilder { +private: + YDB_READONLY_DEF(TString, FieldName) +protected: + virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const = 0; +public: + using TPtr = std::shared_ptr<IArrayBuilder>; + virtual ~IArrayBuilder() = default; + std::shared_ptr<arrow::Array> BuildArray(const ui32 recordsCount) const { + return DoBuildArray(recordsCount); + } + + IArrayBuilder(const TString& fieldName) + : FieldName(fieldName) { + + } +}; + +template <class TFiller> +class TSimpleArrayConstructor: public IArrayBuilder { +private: + using TBase = IArrayBuilder; + using TBuilder = typename arrow::TypeTraits<typename TFiller::TValue>::BuilderType; + const TFiller Filler; +protected: + virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const override { + TBuilder fBuilder = TBuilder(); + Y_VERIFY(fBuilder.Reserve(recordsCount).ok()); + for (ui32 i = 0; i < recordsCount; ++i) { + Y_VERIFY(fBuilder.Append(Filler.GetValue(i)).ok()); + } + return *fBuilder.Finish(); + } +public: + TSimpleArrayConstructor(const TString& fieldName, const TFiller& filler = TFiller()) + : TBase(fieldName) + , Filler(filler) { + + } +}; + +template <class TFiller> +class TDictionaryArrayConstructor: public IArrayBuilder { +private: + using TBase = IArrayBuilder; + const TFiller Filler; +protected: + virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const override { + auto fBuilder = std::make_shared<arrow::DictionaryBuilder<typename TFiller::TValue>>(std::make_shared<typename TFiller::TValue>()); + Y_VERIFY(fBuilder->Reserve(recordsCount).ok()); + for (ui32 i = 0; i < recordsCount; ++i) { + Y_VERIFY(fBuilder->Append(Filler.GetValue(i)).ok()); + } + return *fBuilder->Finish(); + } +public: + TDictionaryArrayConstructor(const TString& fieldName, const TFiller& filler = TFiller()) + : TBase(fieldName) + , Filler(filler) { + + } +}; +} diff --git a/ydb/core/formats/arrow/simple_builder/batch.cpp b/ydb/core/formats/arrow/simple_builder/batch.cpp new file mode 100644 index 00000000000..4a60043a087 --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/batch.cpp @@ -0,0 +1,18 @@ +#include "batch.h" + +namespace NKikimr::NArrow { + +std::shared_ptr<arrow::RecordBatch> TRecordBatchConstructor::BuildBatch(const ui32 numRows) const { + std::vector<std::shared_ptr<arrow::Array>> columns; + std::vector<std::shared_ptr<arrow::Field>> fields; + for (auto&& i : Builders) { + columns.emplace_back(i->BuildArray(numRows)); + fields.emplace_back(std::make_shared<arrow::Field>(i->GetFieldName(), columns.back()->type())); + } + auto batch = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), numRows, columns); + Y_VERIFY(batch); + Y_VERIFY_DEBUG(batch->ValidateFull().ok()); + return batch; +} + +} diff --git a/ydb/core/formats/arrow/simple_builder/batch.h b/ydb/core/formats/arrow/simple_builder/batch.h new file mode 100644 index 00000000000..4dfb3ea0e3f --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/batch.h @@ -0,0 +1,17 @@ +#pragma once +#include "array.h" +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NArrow { +class TRecordBatchConstructor { +private: + const std::vector<IArrayBuilder::TPtr> Builders; +public: + TRecordBatchConstructor(const std::vector<IArrayBuilder::TPtr> builders) + : Builders(builders) { + Y_VERIFY(Builders.size()); + } + + std::shared_ptr<arrow::RecordBatch> BuildBatch(const ui32 numRows) const; +}; +} diff --git a/ydb/core/formats/arrow/simple_builder/filler.cpp b/ydb/core/formats/arrow/simple_builder/filler.cpp new file mode 100644 index 00000000000..c4ce2456a57 --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/filler.cpp @@ -0,0 +1,17 @@ +#include "filler.h" +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NArrow { + +TStringPoolFiller::TStringPoolFiller(const ui32 poolSize, const ui32 strLen) { + for (ui32 i = 0; i < poolSize; ++i) { + Data.emplace_back(NUnitTest::RandomString(strLen, i)); + } +} + +arrow::util::string_view TStringPoolFiller::GetValue(const ui32 idx) const { + const TString& str = Data[(2 + 7 * idx) % Data.size()]; + return arrow::util::string_view(str.data(), str.size()); +} + +} diff --git a/ydb/core/formats/arrow/simple_builder/filler.h b/ydb/core/formats/arrow/simple_builder/filler.h new file mode 100644 index 00000000000..520192af2fa --- /dev/null +++ b/ydb/core/formats/arrow/simple_builder/filler.h @@ -0,0 +1,64 @@ +#pragma once +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/util/string_view.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h> +#include <util/system/types.h> +#include <util/generic/string.h> + +namespace NKikimr::NArrow { + +template <class TArrowInt> +class TIntSeqFiller { +public: + using TValue = TArrowInt; + typename TArrowInt::c_type GetValue(const ui32 idx) const { + return idx; + } +}; + +class TStringPoolFiller { +private: + std::vector<TString> Data; +public: + using TValue = arrow::StringType; + arrow::util::string_view GetValue(const ui32 idx) const; + + TStringPoolFiller(const ui32 poolSize, const ui32 strLen); +}; + +template <class TValueExt> +class TLinearArrayAccessor { +private: + using TArray = typename arrow::TypeTraits<TValueExt>::ArrayType; + const TArray& Data; +public: + using TValue = TValueExt; + auto GetValue(const ui32 idx) const { + return Data.Value(idx); + } + + TLinearArrayAccessor(const arrow::Array& data) + : Data(static_cast<const TArray&>(data)) { + } +}; + +template <class TDictionaryValue, class TIndices> +class TDictionaryArrayAccessor { +private: + using TDictionary = typename arrow::TypeTraits<TDictionaryValue>::ArrayType; + const TDictionary& Dictionary; + const TIndices& Indices; +public: + using TValue = TDictionaryValue; + auto GetValue(const ui32 idx) const { + return Dictionary.Value(Indices.Value(idx)); + } + + TDictionaryArrayAccessor(const TDictionary& dictionary, const TIndices& indices) + : Dictionary(dictionary) + , Indices(indices) + { + } +}; + +} diff --git a/ydb/core/formats/arrow/switch/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/switch/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..fb4dc6e1cc9 --- /dev/null +++ b/ydb/core/formats/arrow/switch/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-switch) +target_link_libraries(formats-arrow-switch PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-scheme_types +) +target_sources(formats-arrow-switch PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp +) diff --git a/ydb/core/formats/arrow/switch/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/switch/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..6e6f464e616 --- /dev/null +++ b/ydb/core/formats/arrow/switch/CMakeLists.linux-aarch64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-switch) +target_link_libraries(formats-arrow-switch PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-scheme_types +) +target_sources(formats-arrow-switch PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp +) diff --git a/ydb/core/formats/arrow/switch/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/switch/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..6e6f464e616 --- /dev/null +++ b/ydb/core/formats/arrow/switch/CMakeLists.linux-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-switch) +target_link_libraries(formats-arrow-switch PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-scheme_types +) +target_sources(formats-arrow-switch PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp +) diff --git a/ydb/core/formats/arrow/switch/CMakeLists.txt b/ydb/core/formats/arrow/switch/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/formats/arrow/switch/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/formats/arrow/switch/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/switch/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..fb4dc6e1cc9 --- /dev/null +++ b/ydb/core/formats/arrow/switch/CMakeLists.windows-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-switch) +target_link_libraries(formats-arrow-switch PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-scheme_types +) +target_sources(formats-arrow-switch PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/switch/switch_type.cpp +) diff --git a/ydb/core/formats/arrow/switch/switch_type.cpp b/ydb/core/formats/arrow/switch/switch_type.cpp new file mode 100644 index 00000000000..b8396151cb4 --- /dev/null +++ b/ydb/core/formats/arrow/switch/switch_type.cpp @@ -0,0 +1,5 @@ +#include "switch_type.h" + +namespace NKikimr::NArrow { + +} diff --git a/ydb/core/formats/arrow/switch/switch_type.h b/ydb/core/formats/arrow/switch/switch_type.h new file mode 100644 index 00000000000..2c7986343e9 --- /dev/null +++ b/ydb/core/formats/arrow/switch/switch_type.h @@ -0,0 +1,239 @@ +#pragma once +#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> +#include <util/system/yassert.h> +#include <ydb/core/scheme_types/scheme_type_info.h> +#include <ydb/core/scheme/scheme_type_id.h> + +namespace NKikimr::NArrow { + +template <typename TType> +struct TTypeWrapper +{ + using T = TType; +}; + +template <class TResult, TResult defaultValue, typename TFunc, bool EnableNull = false> +TResult SwitchTypeImpl(arrow::Type::type typeId, TFunc&& f) { + switch (typeId) { + case arrow::Type::NA: { + if constexpr (EnableNull) { + return f(TTypeWrapper<arrow::NullType>()); + } + break; + } + case arrow::Type::BOOL: + return f(TTypeWrapper<arrow::BooleanType>()); + case arrow::Type::UINT8: + return f(TTypeWrapper<arrow::UInt8Type>()); + case arrow::Type::INT8: + return f(TTypeWrapper<arrow::Int8Type>()); + case arrow::Type::UINT16: + return f(TTypeWrapper<arrow::UInt16Type>()); + case arrow::Type::INT16: + return f(TTypeWrapper<arrow::Int16Type>()); + case arrow::Type::UINT32: + return f(TTypeWrapper<arrow::UInt32Type>()); + case arrow::Type::INT32: + return f(TTypeWrapper<arrow::Int32Type>()); + case arrow::Type::UINT64: + return f(TTypeWrapper<arrow::UInt64Type>()); + case arrow::Type::INT64: + return f(TTypeWrapper<arrow::Int64Type>()); + case arrow::Type::HALF_FLOAT: + return f(TTypeWrapper<arrow::HalfFloatType>()); + case arrow::Type::FLOAT: + return f(TTypeWrapper<arrow::FloatType>()); + case arrow::Type::DOUBLE: + return f(TTypeWrapper<arrow::DoubleType>()); + case arrow::Type::STRING: + return f(TTypeWrapper<arrow::StringType>()); + case arrow::Type::BINARY: + return f(TTypeWrapper<arrow::BinaryType>()); + case arrow::Type::FIXED_SIZE_BINARY: + return f(TTypeWrapper<arrow::FixedSizeBinaryType>()); + case arrow::Type::DATE32: + return f(TTypeWrapper<arrow::Date32Type>()); + case arrow::Type::DATE64: + return f(TTypeWrapper<arrow::Date64Type>()); + case arrow::Type::TIMESTAMP: + return f(TTypeWrapper<arrow::TimestampType>()); + case arrow::Type::TIME32: + return f(TTypeWrapper<arrow::Time32Type>()); + case arrow::Type::TIME64: + return f(TTypeWrapper<arrow::Time64Type>()); + case arrow::Type::INTERVAL_MONTHS: + return f(TTypeWrapper<arrow::MonthIntervalType>()); + case arrow::Type::DECIMAL: + return f(TTypeWrapper<arrow::Decimal128Type>()); + case arrow::Type::DURATION: + return f(TTypeWrapper<arrow::DurationType>()); + case arrow::Type::LARGE_STRING: + return f(TTypeWrapper<arrow::LargeStringType>()); + case arrow::Type::LARGE_BINARY: + return f(TTypeWrapper<arrow::LargeBinaryType>()); + case arrow::Type::DECIMAL256: + case arrow::Type::DENSE_UNION: + case arrow::Type::DICTIONARY: + case arrow::Type::EXTENSION: + case arrow::Type::FIXED_SIZE_LIST: + case arrow::Type::INTERVAL_DAY_TIME: + case arrow::Type::LARGE_LIST: + case arrow::Type::LIST: + case arrow::Type::MAP: + case arrow::Type::MAX_ID: + case arrow::Type::SPARSE_UNION: + case arrow::Type::STRUCT: + break; + } + + return defaultValue; +} + +template <typename TFunc, bool EnableNull = false> +bool SwitchType(arrow::Type::type typeId, TFunc&& f) { + return SwitchTypeImpl<bool, false, TFunc, EnableNull>(typeId, std::move(f)); +} + +template <typename TFunc> +bool SwitchTypeWithNull(arrow::Type::type typeId, TFunc&& f) { + return SwitchType<TFunc, true>(typeId, std::move(f)); +} + +template <typename TFunc> +bool SwitchArrayType(const arrow::Datum& column, TFunc&& f) { + auto type = column.type(); + Y_VERIFY(type); + return SwitchType(type->id(), std::forward<TFunc>(f)); +} + +/** + * @brief Function to switch yql type correctly and uniformly converting it to arrow type using callback + * + * @tparam TFunc Callback type + * @param typeId Type of data callback work with. + * @param callback Template function of signature (TTypeWrapper) -> bool + * @return Result of execution of callback or false if the type typeId is not supported. + */ +template <typename TFunc> +bool SwitchYqlTypeToArrowType(const NScheme::TTypeInfo& typeInfo, TFunc&& callback) { + switch (typeInfo.GetTypeId()) { + case NScheme::NTypeIds::Bool: + return callback(TTypeWrapper<arrow::BooleanType>()); + case NScheme::NTypeIds::Int8: + return callback(TTypeWrapper<arrow::Int8Type>()); + case NScheme::NTypeIds::Uint8: + return callback(TTypeWrapper<arrow::UInt8Type>()); + case NScheme::NTypeIds::Int16: + return callback(TTypeWrapper<arrow::Int16Type>()); + case NScheme::NTypeIds::Date: + case NScheme::NTypeIds::Uint16: + return callback(TTypeWrapper<arrow::UInt16Type>()); + case NScheme::NTypeIds::Int32: + return callback(TTypeWrapper<arrow::Int32Type>()); + case NScheme::NTypeIds::Datetime: + case NScheme::NTypeIds::Uint32: + return callback(TTypeWrapper<arrow::UInt32Type>()); + case NScheme::NTypeIds::Int64: + return callback(TTypeWrapper<arrow::Int64Type>()); + case NScheme::NTypeIds::Uint64: + return callback(TTypeWrapper<arrow::UInt64Type>()); + case NScheme::NTypeIds::Float: + return callback(TTypeWrapper<arrow::FloatType>()); + case NScheme::NTypeIds::Double: + return callback(TTypeWrapper<arrow::DoubleType>()); + case NScheme::NTypeIds::Utf8: + return callback(TTypeWrapper<arrow::StringType>()); + case NScheme::NTypeIds::String: + case NScheme::NTypeIds::String4k: + case NScheme::NTypeIds::String2m: + case NScheme::NTypeIds::Yson: + case NScheme::NTypeIds::Json: + case NScheme::NTypeIds::DyNumber: + case NScheme::NTypeIds::JsonDocument: + return callback(TTypeWrapper<arrow::BinaryType>()); + case NScheme::NTypeIds::Timestamp: + return callback(TTypeWrapper<arrow::TimestampType>()); + case NScheme::NTypeIds::Interval: + return callback(TTypeWrapper<arrow::DurationType>()); + case NScheme::NTypeIds::Decimal: + return callback(TTypeWrapper<arrow::Decimal128Type>()); + + case NScheme::NTypeIds::PairUi64Ui64: + case NScheme::NTypeIds::ActorId: + case NScheme::NTypeIds::StepOrderId: + break; // Deprecated types + + case NScheme::NTypeIds::Pg: + break; // TODO: support pg types + } + return false; +} + +inline bool IsPrimitiveYqlType(const NScheme::TTypeInfo& typeInfo) { + switch (typeInfo.GetTypeId()) { + case NScheme::NTypeIds::Int8: + case NScheme::NTypeIds::Uint8: + case NScheme::NTypeIds::Int16: + case NScheme::NTypeIds::Date: + case NScheme::NTypeIds::Uint16: + case NScheme::NTypeIds::Int32: + case NScheme::NTypeIds::Datetime: + case NScheme::NTypeIds::Uint32: + case NScheme::NTypeIds::Int64: + case NScheme::NTypeIds::Uint64: + case NScheme::NTypeIds::Float: + case NScheme::NTypeIds::Double: + case NScheme::NTypeIds::Timestamp: + case NScheme::NTypeIds::Interval: + return true; + default: + break; + } + return false; +} + +template <typename T> +bool Append(arrow::ArrayBuilder& builder, const typename T::c_type& value) { + using TBuilder = typename arrow::TypeTraits<T>::BuilderType; + + auto status = static_cast<TBuilder&>(builder).Append(value); + return status.ok(); +} + +template <typename T> +bool Append(arrow::ArrayBuilder& builder, arrow::util::string_view value) { + using TBuilder = typename arrow::TypeTraits<T>::BuilderType; + + auto status = static_cast<TBuilder&>(builder).Append(value); + return status.ok(); +} + +template <typename T> +bool Append(arrow::ArrayBuilder& builder, const typename T::c_type* values, size_t size) { + using TBuilder = typename arrow::NumericBuilder<T>; + + auto status = static_cast<TBuilder&>(builder).AppendValues(values, size); + return status.ok(); +} + +template <typename T> +bool Append(T& builder, const arrow::Array& array, int position) { + return SwitchType(array.type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; + using TBuilder = typename arrow::TypeTraits<typename TWrap::T>::BuilderType; + + auto& typedArray = static_cast<const TArray&>(array); + auto& typedBuilder = static_cast<TBuilder&>(builder); + + if (typedArray.IsNull(position)) { + auto status = typedBuilder.AppendNull(); + return status.ok(); + } else { + auto status = typedBuilder.Append(typedArray.GetView(position)); + return status.ok(); + } + }); +} + +} diff --git a/ydb/core/formats/arrow/switch_type.h b/ydb/core/formats/arrow/switch_type.h index 558589b71ad..1acf90b1bb2 100644 --- a/ydb/core/formats/arrow/switch_type.h +++ b/ydb/core/formats/arrow/switch_type.h @@ -1,237 +1,2 @@ #pragma once -#include <ydb/core/scheme/scheme_tablecell.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> - -namespace NKikimr::NArrow { - -template <typename TType> -struct TTypeWrapper -{ - using T = TType; -}; - -template <class TResult, TResult defaultValue, typename TFunc, bool EnableNull = false> -TResult SwitchTypeImpl(arrow::Type::type typeId, TFunc&& f) { - switch (typeId) { - case arrow::Type::NA: { - if constexpr (EnableNull) { - return f(TTypeWrapper<arrow::NullType>()); - } - break; - } - case arrow::Type::BOOL: - return f(TTypeWrapper<arrow::BooleanType>()); - case arrow::Type::UINT8: - return f(TTypeWrapper<arrow::UInt8Type>()); - case arrow::Type::INT8: - return f(TTypeWrapper<arrow::Int8Type>()); - case arrow::Type::UINT16: - return f(TTypeWrapper<arrow::UInt16Type>()); - case arrow::Type::INT16: - return f(TTypeWrapper<arrow::Int16Type>()); - case arrow::Type::UINT32: - return f(TTypeWrapper<arrow::UInt32Type>()); - case arrow::Type::INT32: - return f(TTypeWrapper<arrow::Int32Type>()); - case arrow::Type::UINT64: - return f(TTypeWrapper<arrow::UInt64Type>()); - case arrow::Type::INT64: - return f(TTypeWrapper<arrow::Int64Type>()); - case arrow::Type::HALF_FLOAT: - return f(TTypeWrapper<arrow::HalfFloatType>()); - case arrow::Type::FLOAT: - return f(TTypeWrapper<arrow::FloatType>()); - case arrow::Type::DOUBLE: - return f(TTypeWrapper<arrow::DoubleType>()); - case arrow::Type::STRING: - return f(TTypeWrapper<arrow::StringType>()); - case arrow::Type::BINARY: - return f(TTypeWrapper<arrow::BinaryType>()); - case arrow::Type::FIXED_SIZE_BINARY: - return f(TTypeWrapper<arrow::FixedSizeBinaryType>()); - case arrow::Type::DATE32: - return f(TTypeWrapper<arrow::Date32Type>()); - case arrow::Type::DATE64: - return f(TTypeWrapper<arrow::Date64Type>()); - case arrow::Type::TIMESTAMP: - return f(TTypeWrapper<arrow::TimestampType>()); - case arrow::Type::TIME32: - return f(TTypeWrapper<arrow::Time32Type>()); - case arrow::Type::TIME64: - return f(TTypeWrapper<arrow::Time64Type>()); - case arrow::Type::INTERVAL_MONTHS: - return f(TTypeWrapper<arrow::MonthIntervalType>()); - case arrow::Type::DECIMAL: - return f(TTypeWrapper<arrow::Decimal128Type>()); - case arrow::Type::DURATION: - return f(TTypeWrapper<arrow::DurationType>()); - case arrow::Type::LARGE_STRING: - return f(TTypeWrapper<arrow::LargeStringType>()); - case arrow::Type::LARGE_BINARY: - return f(TTypeWrapper<arrow::LargeBinaryType>()); - case arrow::Type::DECIMAL256: - case arrow::Type::DENSE_UNION: - case arrow::Type::DICTIONARY: - case arrow::Type::EXTENSION: - case arrow::Type::FIXED_SIZE_LIST: - case arrow::Type::INTERVAL_DAY_TIME: - case arrow::Type::LARGE_LIST: - case arrow::Type::LIST: - case arrow::Type::MAP: - case arrow::Type::MAX_ID: - case arrow::Type::SPARSE_UNION: - case arrow::Type::STRUCT: - break; - } - - return defaultValue; -} - -template <typename TFunc, bool EnableNull = false> -bool SwitchType(arrow::Type::type typeId, TFunc&& f) { - return SwitchTypeImpl<bool, false, TFunc, EnableNull>(typeId, std::move(f)); -} - -template <typename TFunc> -bool SwitchTypeWithNull(arrow::Type::type typeId, TFunc&& f) { - return SwitchType<TFunc, true>(typeId, std::move(f)); -} - -template <typename TFunc> -bool SwitchArrayType(const arrow::Datum& column, TFunc&& f) { - auto type = column.type(); - Y_VERIFY(type); - return SwitchType(type->id(), std::forward<TFunc>(f)); -} - -/** - * @brief Function to switch yql type correctly and uniformly converting it to arrow type using callback - * - * @tparam TFunc Callback type - * @param typeId Type of data callback work with. - * @param callback Template function of signature (TTypeWrapper) -> bool - * @return Result of execution of callback or false if the type typeId is not supported. - */ -template <typename TFunc> -bool SwitchYqlTypeToArrowType(const NScheme::TTypeInfo& typeInfo, TFunc&& callback) { - switch (typeInfo.GetTypeId()) { - case NScheme::NTypeIds::Bool: - return callback(TTypeWrapper<arrow::BooleanType>()); - case NScheme::NTypeIds::Int8: - return callback(TTypeWrapper<arrow::Int8Type>()); - case NScheme::NTypeIds::Uint8: - return callback(TTypeWrapper<arrow::UInt8Type>()); - case NScheme::NTypeIds::Int16: - return callback(TTypeWrapper<arrow::Int16Type>()); - case NScheme::NTypeIds::Date: - case NScheme::NTypeIds::Uint16: - return callback(TTypeWrapper<arrow::UInt16Type>()); - case NScheme::NTypeIds::Int32: - return callback(TTypeWrapper<arrow::Int32Type>()); - case NScheme::NTypeIds::Datetime: - case NScheme::NTypeIds::Uint32: - return callback(TTypeWrapper<arrow::UInt32Type>()); - case NScheme::NTypeIds::Int64: - return callback(TTypeWrapper<arrow::Int64Type>()); - case NScheme::NTypeIds::Uint64: - return callback(TTypeWrapper<arrow::UInt64Type>()); - case NScheme::NTypeIds::Float: - return callback(TTypeWrapper<arrow::FloatType>()); - case NScheme::NTypeIds::Double: - return callback(TTypeWrapper<arrow::DoubleType>()); - case NScheme::NTypeIds::Utf8: - return callback(TTypeWrapper<arrow::StringType>()); - case NScheme::NTypeIds::String: - case NScheme::NTypeIds::String4k: - case NScheme::NTypeIds::String2m: - case NScheme::NTypeIds::Yson: - case NScheme::NTypeIds::Json: - case NScheme::NTypeIds::DyNumber: - case NScheme::NTypeIds::JsonDocument: - return callback(TTypeWrapper<arrow::BinaryType>()); - case NScheme::NTypeIds::Timestamp: - return callback(TTypeWrapper<arrow::TimestampType>()); - case NScheme::NTypeIds::Interval: - return callback(TTypeWrapper<arrow::DurationType>()); - case NScheme::NTypeIds::Decimal: - return callback(TTypeWrapper<arrow::Decimal128Type>()); - - case NScheme::NTypeIds::PairUi64Ui64: - case NScheme::NTypeIds::ActorId: - case NScheme::NTypeIds::StepOrderId: - break; // Deprecated types - - case NScheme::NTypeIds::Pg: - break; // TODO: support pg types - } - return false; -} - -inline bool IsPrimitiveYqlType(const NScheme::TTypeInfo& typeInfo) { - switch (typeInfo.GetTypeId()) { - case NScheme::NTypeIds::Int8: - case NScheme::NTypeIds::Uint8: - case NScheme::NTypeIds::Int16: - case NScheme::NTypeIds::Date: - case NScheme::NTypeIds::Uint16: - case NScheme::NTypeIds::Int32: - case NScheme::NTypeIds::Datetime: - case NScheme::NTypeIds::Uint32: - case NScheme::NTypeIds::Int64: - case NScheme::NTypeIds::Uint64: - case NScheme::NTypeIds::Float: - case NScheme::NTypeIds::Double: - case NScheme::NTypeIds::Timestamp: - case NScheme::NTypeIds::Interval: - return true; - default: - break; - } - return false; -} - -template <typename T> -bool Append(arrow::ArrayBuilder& builder, const typename T::c_type& value) { - using TBuilder = typename arrow::TypeTraits<T>::BuilderType; - - auto status = static_cast<TBuilder&>(builder).Append(value); - return status.ok(); -} - -template <typename T> -bool Append(arrow::ArrayBuilder& builder, arrow::util::string_view value) { - using TBuilder = typename arrow::TypeTraits<T>::BuilderType; - - auto status = static_cast<TBuilder&>(builder).Append(value); - return status.ok(); -} - -template <typename T> -bool Append(arrow::ArrayBuilder& builder, const typename T::c_type* values, size_t size) { - using TBuilder = typename arrow::NumericBuilder<T>; - - auto status = static_cast<TBuilder&>(builder).AppendValues(values, size); - return status.ok(); -} - -template <typename T> -bool Append(T& builder, const arrow::Array& array, int position) { - return SwitchType(array.type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; - using TBuilder = typename arrow::TypeTraits<typename TWrap::T>::BuilderType; - - auto& typedArray = static_cast<const TArray&>(array); - auto& typedBuilder = static_cast<TBuilder&>(builder); - - if (typedArray.IsNull(position)) { - auto status = typedBuilder.AppendNull(); - return status.ok(); - } else { - auto status = typedBuilder.Append(typedArray.GetView(position)); - return status.ok(); - } - }); -} - -} +#include "switch/switch_type.h" diff --git a/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt index e63852f457f..f77bf86c419 100644 --- a/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/ut/CMakeLists.darwin-x86_64.txt @@ -38,6 +38,7 @@ target_link_options(ydb-core-formats-arrow-ut PRIVATE target_sources(ydb-core-formats-arrow-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp ) set_property( TARGET diff --git a/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt index b396886cf0c..ad981e1dca2 100644 --- a/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/ut/CMakeLists.linux-aarch64.txt @@ -41,6 +41,7 @@ target_link_options(ydb-core-formats-arrow-ut PRIVATE target_sources(ydb-core-formats-arrow-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp ) set_property( TARGET diff --git a/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt index 722652c673a..de6ae69716f 100644 --- a/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/ut/CMakeLists.linux-x86_64.txt @@ -42,6 +42,7 @@ target_link_options(ydb-core-formats-arrow-ut PRIVATE target_sources(ydb-core-formats-arrow-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp ) set_property( TARGET diff --git a/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt index 937feb4f7bd..bcbbd96108b 100644 --- a/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/ut/CMakeLists.windows-x86_64.txt @@ -31,6 +31,7 @@ target_link_libraries(ydb-core-formats-arrow-ut PUBLIC target_sources(ydb-core-formats-arrow-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_arrow.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut_program_step.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ut/ut_dictionary.cpp ) set_property( TARGET diff --git a/ydb/core/formats/arrow/ut/ut_dictionary.cpp b/ydb/core/formats/arrow/ut/ut_dictionary.cpp new file mode 100644 index 00000000000..f90efa8c709 --- /dev/null +++ b/ydb/core/formats/arrow/ut/ut_dictionary.cpp @@ -0,0 +1,89 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/serializer/batch_only.h> +#include <ydb/core/formats/arrow/serializer/full.h> +#include <ydb/core/formats/arrow/simple_builder/array.h> +#include <ydb/core/formats/arrow/simple_builder/batch.h> +#include <ydb/core/formats/arrow/simple_builder/filler.h> +#include <ydb/core/formats/arrow/dictionary/conversion.h> + +Y_UNIT_TEST_SUITE(Dictionary) { + + using namespace NKikimr::NArrow; + + ui64 Test(IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) { + std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize); + const TString data = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch); + auto deserializedBatch = *NKikimr::NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); + Y_VERIFY(!!deserializedBatch); + auto originalBatchTransformed = DictionaryToArray(batch); + auto roundBatchTransformed = DictionaryToArray(deserializedBatch); + const TString roundUnpacked = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed); + const TString roundTransformed = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed); + Y_VERIFY(roundBatchTransformed->num_rows() == originalBatchTransformed->num_rows()); + Y_VERIFY(roundUnpacked == roundTransformed); + return data.size(); + } + + Y_UNIT_TEST(Simple) { + const std::vector<arrow::Compression::type> codecs = { arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, }; + for (auto&& codec : codecs) { + arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults(); + options.codec = *arrow::util::Codec::Create(codec); + Cerr << (options.codec ? options.codec->name() : "NO_CODEC") << Endl; + for (auto bSize : { 100000 }) { + Cerr << "--" << bSize << Endl; + for (auto pSize : { 1, 16, 64, 128, 512, 1024 }) { + Cerr << "----" << pSize << Endl; + for (auto&& strLen : { 1, 10, 16, 32, 64 }) { + Cerr << "------" << strLen << Endl; + ui64 bytesDict; + ui64 bytesRaw; + { + IArrayBuilder::TPtr column = std::make_shared<TDictionaryArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen)); + bytesDict = Test(column, options, bSize); + } + { + IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen)); + bytesRaw = Test(column, options, bSize); + } + Cerr << "--------" << bytesDict << " / " << bytesRaw << " = " << 1.0 * bytesDict / bytesRaw << Endl; + } + } + } + } + } + + Y_UNIT_TEST(ComparePayloadAndFull) { + const std::vector<arrow::Compression::type> codecs = { arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, }; + for (auto&& codec : codecs) { + arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults(); + options.codec = *arrow::util::Codec::Create(codec); + Cerr << (options.codec ? options.codec->name() : "NO_CODEC") << Endl; + for (auto bSize : { 1000, 10000, 100000 }) { + Cerr << "--" << bSize << Endl; + for (auto pSize : { 1, 16, 64, 128, 512, 1024 }) { + Cerr << "----" << pSize << Endl; + for (auto&& strLen : { 1, 10, 16, 32, 64 }) { + Cerr << "------" << strLen << Endl; + ui64 bytesFull; + ui64 bytesPayload; + { + IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen)); + std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize); + const TString dataFull = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch); + const TString dataPayload = NKikimr::NArrow::NSerialization::TBatchPayloadSerializer(options).Serialize(batch); + bytesFull = dataFull.size(); + bytesPayload = dataPayload.size(); + } + const double fraq = 1 - 1.0 * bytesPayload / bytesFull; + Cerr << "--------" << bytesPayload << " / " << bytesFull << " = " << 100 * fraq << "%" << Endl; + UNIT_ASSERT(fraq * 100 < 3); + } + } + } + } + } + + +}; diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp index a7baa934b71..a00eef8cde9 100644 --- a/ydb/core/io_formats/csv_arrow.cpp +++ b/ydb/core/io_formats/csv_arrow.cpp @@ -162,7 +162,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr return {}; } - auto buffer = std::make_shared<NArrow::TBufferOverString>(csv); + auto buffer = std::make_shared<arrow::Buffer>((const ui8*)csv.data(), csv.size()); auto input = std::make_shared<arrow::io::BufferReader>(buffer); auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input, ReadOptions, ParseOptions, ConvertOptions); diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 4282fe13218..151969d55c4 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -367,6 +367,7 @@ enum EServiceKikimr { EXT_INDEX = 1900; TX_CONVEYOR = 2000; + ARROW_HELPER = 2100; }; message TActivity { |