aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-02-01 18:09:46 +0300
committerGitHub <noreply@github.com>2024-02-01 18:09:46 +0300
commit320f5201bac519d51b66e15b6e83c0ae38e69b44 (patch)
treec018169fcaa038df54d5b412df91ffc809b58e94
parentb5853aa1bb9fc92c70a468517f7199aa2ba58d17 (diff)
downloadydb-320f5201bac519d51b66e15b6e83c0ae38e69b44.tar.gz
serializer as interface for make new locally (#1496)
* serializer as interface for make new locally * correction * fix tests build * fix build * fix dependencies * corrections * fix build * fix build * fix build * fix splitter test
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp6
-rw-r--r--ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt23
-rw-r--r--ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/formats/arrow/compression/CMakeLists.txt19
-rw-r--r--ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/formats/arrow/compression/diff.cpp77
-rw-r--r--ydb/core/formats/arrow/compression/diff.h34
-rw-r--r--ydb/core/formats/arrow/compression/object.cpp70
-rw-r--r--ydb/core/formats/arrow/compression/object.h39
-rw-r--r--ydb/core/formats/arrow/compression/ya.make16
-rw-r--r--ydb/core/formats/arrow/serializer/abstract.cpp25
-rw-r--r--ydb/core/formats/arrow/serializer/abstract.h90
-rw-r--r--ydb/core/formats/arrow/serializer/batch_only.cpp58
-rw-r--r--ydb/core/formats/arrow/serializer/batch_only.h38
-rw-r--r--ydb/core/formats/arrow/serializer/full.cpp54
-rw-r--r--ydb/core/formats/arrow/serializer/full.h38
-rw-r--r--ydb/core/formats/arrow/serializer/native.cpp185
-rw-r--r--ydb/core/formats/arrow/serializer/native.h72
-rw-r--r--ydb/core/formats/arrow/serializer/parsing.cpp (renamed from ydb/core/formats/arrow/compression/parsing.cpp)0
-rw-r--r--ydb/core/formats/arrow/serializer/parsing.h (renamed from ydb/core/formats/arrow/compression/parsing.h)0
-rw-r--r--ydb/core/formats/arrow/serializer/ya.make5
-rw-r--r--ydb/core/formats/arrow/special_keys.cpp6
-rw-r--r--ydb/core/formats/arrow/ut/ut_dictionary.cpp15
-rw-r--r--ydb/core/formats/arrow/ut/ut_size_calcer.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_log_store.cpp10
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h1
-rw-r--r--ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp10
-rw-r--r--ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h4
-rw-r--r--ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp2
-rw-r--r--ydb/core/kqp/ut/common/columnshard.cpp1
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp9
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp1
-rw-r--r--ydb/core/protos/flat_scheme_op.proto19
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/column_features.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/column_features.h45
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp41
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h2
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/tier_info.h17
-rw-r--r--ydb/core/tx/columnshard/engines/ya.make1
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp7
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/update.cpp35
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/update.h15
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/ya.make3
-rw-r--r--ydb/core/tx/schemeshard/olap/common/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
-rw-r--r--ydb/core/tx/tiering/manager.cpp16
-rw-r--r--ydb/core/tx/tiering/manager.h3
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h6
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/utils.cpp7
-rw-r--r--ydb/services/bg_tasks/abstract/interface.h10
-rw-r--r--ydb/services/metadata/abstract/request_features.h1
-rw-r--r--ydb/services/metadata/abstract/ya.make1
65 files changed, 551 insertions, 765 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index eac6a6670e0..1a2b9016731 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -4,7 +4,7 @@
#include "common/validation.h"
#include "merging_sorted_input_stream.h"
#include "permutations.h"
-#include "serializer/batch_only.h"
+#include "serializer/native.h"
#include "serializer/abstract.h"
#include "serializer/stream.h"
#include "simple_arrays_cache.h"
@@ -106,7 +106,7 @@ std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) {
}
TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options) {
- return NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
+ return NSerialization::TNativeSerializer(options).SerializePayload(batch);
}
TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch) {
@@ -117,7 +117,7 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b
std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema)
{
- auto result = NSerialization::TBatchPayloadDeserializer(schema).Deserialize(blob);
+ auto result = NSerialization::TNativeSerializer().Deserialize(blob, schema);
if (result.ok()) {
return *result;
} else {
diff --git a/ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt b/ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt
deleted file mode 100644
index 0f1b4b72adc..00000000000
--- a/ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(formats-arrow-compression)
-target_link_libraries(formats-arrow-compression PUBLIC
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- ydb-library-conclusion
-)
-target_sources(formats-arrow-compression PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
-)
diff --git a/ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index 0f1b4b72adc..00000000000
--- a/ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(formats-arrow-compression)
-target_link_libraries(formats-arrow-compression PUBLIC
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- ydb-library-conclusion
-)
-target_sources(formats-arrow-compression PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
-)
diff --git a/ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index 80620e94bb4..00000000000
--- a/ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,24 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(formats-arrow-compression)
-target_link_libraries(formats-arrow-compression PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- ydb-library-conclusion
-)
-target_sources(formats-arrow-compression PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
-)
diff --git a/ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index 80620e94bb4..00000000000
--- a/ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,24 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(formats-arrow-compression)
-target_link_libraries(formats-arrow-compression PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- ydb-library-conclusion
-)
-target_sources(formats-arrow-compression PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
-)
diff --git a/ydb/core/formats/arrow/compression/CMakeLists.txt b/ydb/core/formats/arrow/compression/CMakeLists.txt
deleted file mode 100644
index d863ebd1806..00000000000
--- a/ydb/core/formats/arrow/compression/CMakeLists.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
- include(CMakeLists.darwin-arm64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-endif()
diff --git a/ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index 0f1b4b72adc..00000000000
--- a/ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(formats-arrow-compression)
-target_link_libraries(formats-arrow-compression PUBLIC
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
- ydb-library-conclusion
-)
-target_sources(formats-arrow-compression PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
-)
diff --git a/ydb/core/formats/arrow/compression/diff.cpp b/ydb/core/formats/arrow/compression/diff.cpp
deleted file mode 100644
index 0659cba5e2a..00000000000
--- a/ydb/core/formats/arrow/compression/diff.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-#include "diff.h"
-#include "object.h"
-#include "parsing.h"
-#include <util/string/cast.h>
-
-namespace NKikimr::NArrow {
-
-NKikimrSchemeOp::TCompressionOptions TCompressionDiff::SerializeToProto() const {
- NKikimrSchemeOp::TCompressionOptions result;
- if (Level) {
- result.SetCompressionLevel(*Level);
- }
- if (Codec) {
- result.SetCompressionCodec(CompressionToProto(*Codec));
- }
- return result;
-}
-
-TConclusionStatus TCompressionDiff::DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features) {
- {
- auto fValue = features.Extract("COMPRESSION.TYPE");
- if (fValue) {
- Codec = NArrow::CompressionFromString(*fValue);
- if (!Codec) {
- return TConclusionStatus::Fail("cannot parse COMPRESSION.TYPE as arrow::Compression");
- }
- }
- }
- {
- auto fValue = features.Extract("COMPRESSION.LEVEL");
- if (fValue) {
- ui32 level;
- if (!TryFromString<ui32>(*fValue, level)) {
- return TConclusionStatus::Fail("cannot parse COMPRESSION.LEVEL as ui32");
- }
- Level = level;
- }
- }
- return TConclusionStatus::Success();
-}
-
-bool TCompressionDiff::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto) {
- if (proto.HasCompressionLevel()) {
- Level = proto.GetCompressionLevel();
- }
- if (proto.HasCompressionCodec()) {
- Codec = CompressionFromProto(proto.GetCompressionCodec());
- if (!Codec) {
- return false;
- }
- }
- return true;
-}
-
-NKikimr::TConclusionStatus TCompressionDiff::Apply(std::optional<TCompression>& settings) const {
- if (IsEmpty()) {
- return TConclusionStatus::Success();
- }
- TCompression merged;
- if (!!settings) {
- merged = *settings;
- }
- if (Codec) {
- merged.Codec = *Codec;
- }
- if (Level) {
- merged.Level = *Level;
- }
- auto validation = merged.Validate();
- if (!validation) {
- return validation;
- }
- settings = merged;
- return TConclusionStatus::Success();
-}
-
-}
diff --git a/ydb/core/formats/arrow/compression/diff.h b/ydb/core/formats/arrow/compression/diff.h
deleted file mode 100644
index 53cdd5bae78..00000000000
--- a/ydb/core/formats/arrow/compression/diff.h
+++ /dev/null
@@ -1,34 +0,0 @@
-#pragma once
-
-#include <ydb/library/conclusion/status.h>
-#include <ydb/library/conclusion/result.h>
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <ydb/services/metadata/abstract/request_features.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
-#include <optional>
-#include <map>
-
-namespace NKikimr::NArrow {
-
-class TCompression;
-
-class TCompressionDiff {
-private:
- std::optional<arrow::Compression::type> Codec;
- std::optional<int> Level;
- bool IsEmpty() const {
- return !Level && !Codec;
- }
-public:
- NKikimrSchemeOp::TCompressionOptions SerializeToProto() const;
- bool DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto);
- TConclusionStatus DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features);
- const std::optional<arrow::Compression::type>& GetCodec() const {
- return Codec;
- }
- const std::optional<int>& GetLevel() const {
- return Level;
- }
- TConclusionStatus Apply(std::optional<TCompression>& settings) const;
-};
-}
diff --git a/ydb/core/formats/arrow/compression/object.cpp b/ydb/core/formats/arrow/compression/object.cpp
deleted file mode 100644
index 7f58d2618e9..00000000000
--- a/ydb/core/formats/arrow/compression/object.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-#include "object.h"
-#include "parsing.h"
-#include <ydb/core/formats/arrow/common/validation.h>
-#include <util/string/builder.h>
-
-namespace NKikimr::NArrow {
-
-TConclusionStatus NKikimr::NArrow::TCompression::Validate() const {
- if (Level) {
- auto codec = TStatusValidator::GetValid(arrow::util::Codec::Create(Codec));
- const int levelMin = codec->minimum_compression_level();
- const int levelMax = codec->maximum_compression_level();
- if (Level && (*Level < levelMin || levelMax < *Level)) {
- return TConclusionStatus::Fail(
- TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]"
- );
- }
- }
- return TConclusionStatus::Success();
-}
-
-TConclusionStatus TCompression::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) {
- if (compression.HasCompressionCodec()) {
- auto codecOpt = NArrow::CompressionFromProto(compression.GetCompressionCodec());
- if (!codecOpt) {
- return TConclusionStatus::Fail("cannot parse codec type from proto");
- }
- Codec = *codecOpt;
- }
- if (compression.HasCompressionLevel()) {
- Level = compression.GetCompressionLevel();
- }
- return Validate();
-}
-
-NKikimrSchemeOp::TCompressionOptions TCompression::SerializeToProto() const {
- NKikimrSchemeOp::TCompressionOptions result;
- result.SetCompressionCodec(NArrow::CompressionToProto(Codec));
- if (Level) {
- result.SetCompressionLevel(*Level);
- }
- return result;
-}
-
-TString TCompression::DebugString() const {
- TStringBuilder sb;
- sb << arrow::util::Codec::GetCodecAsString(Codec) << ":" << Level.value_or(arrow::util::kUseDefaultCompressionLevel);
- return sb;
-}
-
-std::unique_ptr<arrow::util::Codec> TCompression::BuildArrowCodec() const {
- return NArrow::TStatusValidator::GetValid(
- arrow::util::Codec::Create(
- Codec, Level.value_or(arrow::util::kUseDefaultCompressionLevel)));
-}
-
-NKikimr::TConclusion<NKikimr::NArrow::TCompression> TCompression::BuildFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) {
- TCompression result;
- auto resultStatus = result.DeserializeFromProto(compression);
- if (!resultStatus) {
- return resultStatus;
- }
- return result;
-}
-
-std::unique_ptr<arrow::util::Codec> TCompression::BuildDefaultCodec() {
- return *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
-}
-
-}
diff --git a/ydb/core/formats/arrow/compression/object.h b/ydb/core/formats/arrow/compression/object.h
deleted file mode 100644
index 18c32b59221..00000000000
--- a/ydb/core/formats/arrow/compression/object.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#pragma once
-
-#include <ydb/library/conclusion/result.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
-#include "diff.h"
-
-namespace NKikimr::NArrow {
-
-class TCompression {
-private:
- arrow::Compression::type Codec = arrow::Compression::LZ4_FRAME;
- std::optional<int> Level;
- TCompression() = default;
-
- TConclusionStatus Validate() const;
- friend class TCompressionDiff;
-public:
-
- static std::unique_ptr<arrow::util::Codec> BuildDefaultCodec();
-
- TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression);
-
- NKikimrSchemeOp::TCompressionOptions SerializeToProto() const;
-
- static TConclusion<TCompression> BuildFromProto(const NKikimrSchemeOp::TCompressionOptions& compression);
-
- explicit TCompression(const arrow::Compression::type codec, std::optional<int> level = {})
- : Codec(codec)
- , Level(level)
- {
-
- }
-
- TString DebugString() const;
- std::unique_ptr<arrow::util::Codec> BuildArrowCodec() const;
-
-};
-
-}
diff --git a/ydb/core/formats/arrow/compression/ya.make b/ydb/core/formats/arrow/compression/ya.make
deleted file mode 100644
index 420b028e3f2..00000000000
--- a/ydb/core/formats/arrow/compression/ya.make
+++ /dev/null
@@ -1,16 +0,0 @@
-LIBRARY()
-
-SRCS(
- diff.cpp
- object.cpp
- parsing.cpp
-)
-
-PEERDIR(
- contrib/libs/apache/arrow
- ydb/core/protos
- ydb/core/formats/arrow
- ydb/library/conclusion
-)
-
-END()
diff --git a/ydb/core/formats/arrow/serializer/abstract.cpp b/ydb/core/formats/arrow/serializer/abstract.cpp
index 13ee3d731bb..77b4b7fb5bc 100644
--- a/ydb/core/formats/arrow/serializer/abstract.cpp
+++ b/ydb/core/formats/arrow/serializer/abstract.cpp
@@ -1,11 +1,28 @@
#include "abstract.h"
+#include "native.h"
namespace NKikimr::NArrow::NSerialization {
-arrow::Result<std::shared_ptr<arrow::RecordBatch>> IDeserializer::Deserialize(const TString& data) const {
- if (!data) {
- return nullptr;
+NKikimr::TConclusionStatus TSerializerContainer::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto) {
+ NKikimrSchemeOp::TOlapColumn::TSerializer serializerProto;
+ serializerProto.SetClassName(NArrow::NSerialization::TNativeSerializer::GetClassNameStatic());
+ *serializerProto.MutableArrowCompression() = proto;
+ AFL_VERIFY(Initialize(NArrow::NSerialization::TNativeSerializer::GetClassNameStatic()));
+ return GetObjectPtr()->DeserializeFromProto(serializerProto);
+}
+
+NKikimr::TConclusionStatus TSerializerContainer::DeserializeFromRequest(NYql::TFeaturesExtractor& features) {
+ const std::optional<TString> className = features.Extract("SERIALIZER.CLASS_NAME");
+ if (!className) {
+ return TConclusionStatus::Success();
+ }
+ if (!TBase::Initialize(*className)) {
+ return TConclusionStatus::Fail("dont know anything about class_name=" + *className);
}
- return DoDeserialize(data);
+ return TBase::GetObjectPtr()->DeserializeFromRequest(features);
+}
+
+std::shared_ptr<NKikimr::NArrow::NSerialization::ISerializer> TSerializerContainer::GetDefaultSerializer() {
+ return std::make_shared<TNativeSerializer>();
}
}
diff --git a/ydb/core/formats/arrow/serializer/abstract.h b/ydb/core/formats/arrow/serializer/abstract.h
index f21ac1d58f7..6051f9fc90c 100644
--- a/ydb/core/formats/arrow/serializer/abstract.h
+++ b/ydb/core/formats/arrow/serializer/abstract.h
@@ -1,38 +1,106 @@
#pragma once
+#include <ydb/library/conclusion/status.h>
+#include <ydb/services/metadata/abstract/request_features.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <util/generic/string.h>
+#include <util/string/builder.h>
namespace NKikimr::NArrow::NSerialization {
class ISerializer {
protected:
- virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+ virtual TString DoSerializeFull(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+ virtual TString DoSerializePayload(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+ virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const = 0;
+ virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data, const std::shared_ptr<arrow::Schema>& schema) const = 0;
+ virtual TString DoDebugString() const {
+ return "";
+ }
+
+ virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) = 0;
+
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const = 0;
public:
using TPtr = std::shared_ptr<ISerializer>;
+ using TFactory = NObjectFactory::TObjectFactory<ISerializer, TString>;
+ using TProto = NKikimrSchemeOp::TOlapColumn::TSerializer;
virtual ~ISerializer() = default;
- TString Serialize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
- return DoSerialize(batch);
+ TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features) {
+ return DoDeserializeFromRequest(features);
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << "{class_name=" << GetClassName() << ";details={" << DoDebugString() << "}}";
+ }
+
+ TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) {
+ return DoDeserializeFromProto(proto);
+ }
+
+ void SerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const {
+ return DoSerializeToProto(proto);
+ }
+
+ TString SerializeFull(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return DoSerializeFull(batch);
+ }
+
+ TString SerializePayload(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return DoSerializePayload(batch);
+ }
+
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const {
+ if (!data) {
+ return nullptr;
+ }
+ return DoDeserialize(data);
+ }
+
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data, const std::shared_ptr<arrow::Schema>& schema) const {
+ if (!data) {
+ return nullptr;
+ }
+ return DoDeserialize(data, schema);
}
virtual bool IsHardPacker() const = 0;
+
+ virtual TString GetClassName() const = 0;
};
-class IDeserializer {
-protected:
- virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const = 0;
- virtual TString DoDebugString() const = 0;
+class TSerializerContainer: public NBackgroundTasks::TInterfaceProtoContainer<ISerializer> {
+private:
+ using TBase = NBackgroundTasks::TInterfaceProtoContainer<ISerializer>;
public:
- using TPtr = std::shared_ptr<IDeserializer>;
- virtual ~IDeserializer() = default;
+ using TBase::TBase;
+
+ TSerializerContainer(const std::shared_ptr<ISerializer>& object)
+ : TBase(object)
+ {
+
+ }
TString DebugString() const {
- return DoDebugString();
+ if (GetObjectPtr()) {
+ return GetObjectPtr()->DebugString();
+ } else {
+ return "NO_OBJECT";
+ }
}
+ using TBase::DeserializeFromProto;
+
+ static std::shared_ptr<ISerializer> GetDefaultSerializer();
+
+ TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto);
- arrow::Result<std::shared_ptr<arrow::RecordBatch>> Deserialize(const TString& data) const;
+ TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features);
};
}
diff --git a/ydb/core/formats/arrow/serializer/batch_only.cpp b/ydb/core/formats/arrow/serializer/batch_only.cpp
deleted file mode 100644
index c93f97b3f55..00000000000
--- a/ydb/core/formats/arrow/serializer/batch_only.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-#include "batch_only.h"
-#include "stream.h"
-#include <ydb/core/formats/arrow/common/validation.h>
-#include <ydb/library/services/services.pb.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/dictionary.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
-#include <ydb/library/actors/core/log.h>
-namespace NKikimr::NArrow::NSerialization {
-
-arrow::Result<std::shared_ptr<arrow::RecordBatch>> TBatchPayloadDeserializer::DoDeserialize(const TString& data) const {
- arrow::ipc::DictionaryMemo dictMemo;
- auto options = arrow::ipc::IpcReadOptions::Defaults();
- options.use_threads = false;
-
- std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data));
- arrow::io::BufferReader reader(buffer);
- AFL_TRACE(NKikimrServices::ARROW_HELPER)("event", "parsing")("size", data.size())("columns", Schema->num_fields());
- auto batchResult = arrow::ipc::ReadRecordBatch(Schema, &dictMemo, options, &reader);
- if (!batchResult.ok()) {
- return batchResult;
- }
- std::shared_ptr<arrow::RecordBatch> batch = *batchResult;
- if (!batch) {
- return arrow::Status(arrow::StatusCode::SerializationError, "empty batch");
- }
- auto validation = batch->Validate();
- if (!validation.ok()) {
- return arrow::Status(arrow::StatusCode::SerializationError, "batch is not valid: " + validation.ToString());
- }
- return batch;
-}
-
-TString TBatchPayloadSerializer::DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
- arrow::ipc::IpcPayload payload;
- // Build payload. Compression if set up performed here.
- TStatusValidator::Validate(arrow::ipc::GetRecordBatchPayload(*batch, Options, &payload));
-
- int32_t metadata_length = 0;
- arrow::io::MockOutputStream mock;
- // Process prepared payload through mock stream. Fast and efficient.
- TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &mock, &metadata_length));
-
- TString str;
- str.resize(mock.GetExtentBytesWritten());
-
- TFixedStringOutputStream out(&str);
- // Write prepared payload into the resultant string. No extra allocation will be made.
- TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length));
- Y_ABORT_UNLESS(out.GetPosition() == str.size());
- Y_DEBUG_ABORT_UNLESS(TBatchPayloadDeserializer(batch->schema()).Deserialize(str).ok());
- AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields());
- return str;
-}
-
-}
diff --git a/ydb/core/formats/arrow/serializer/batch_only.h b/ydb/core/formats/arrow/serializer/batch_only.h
deleted file mode 100644
index b3dfda9e7ff..00000000000
--- a/ydb/core/formats/arrow/serializer/batch_only.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-#include "abstract.h"
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
-
-namespace NKikimr::NArrow::NSerialization {
-
-class TBatchPayloadSerializer: public ISerializer {
-private:
- const arrow::ipc::IpcWriteOptions Options;
-protected:
- virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
-public:
- virtual bool IsHardPacker() const override {
- return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3;
- }
- TBatchPayloadSerializer(const arrow::ipc::IpcWriteOptions& options)
- : Options(options) {
-
- }
-};
-
-class TBatchPayloadDeserializer: public IDeserializer {
-private:
- const std::shared_ptr<arrow::Schema> Schema;
-protected:
- virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const override;
- virtual TString DoDebugString() const override {
- return "type=BATCH_PAYLOAD;";
- }
-public:
- TBatchPayloadDeserializer(const std::shared_ptr<arrow::Schema> schema)
- : Schema(schema) {
-
- }
-};
-
-}
diff --git a/ydb/core/formats/arrow/serializer/full.cpp b/ydb/core/formats/arrow/serializer/full.cpp
deleted file mode 100644
index 9f830c3662c..00000000000
--- a/ydb/core/formats/arrow/serializer/full.cpp
+++ /dev/null
@@ -1,54 +0,0 @@
-#include "full.h"
-#include "stream.h"
-#include <ydb/core/formats/arrow/dictionary/conversion.h>
-#include <ydb/core/formats/arrow/common/validation.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/dictionary.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
-namespace NKikimr::NArrow::NSerialization {
-
-arrow::Result<std::shared_ptr<arrow::RecordBatch>> TFullDataDeserializer::DoDeserialize(const TString& data) const {
- arrow::ipc::DictionaryMemo dictMemo;
- auto options = arrow::ipc::IpcReadOptions::Defaults();
- options.use_threads = false;
-
- std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data));
- arrow::io::BufferReader readerStream(buffer);
- auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&readerStream));
-
- std::shared_ptr<arrow::RecordBatch> batch;
- auto readResult = reader->ReadNext(&batch);
- if (!readResult.ok()) {
- return readResult;
- }
- if (!batch) {
- return arrow::Status(arrow::StatusCode::SerializationError, "null batch");
- }
- auto validation = batch->Validate();
- if (!validation.ok()) {
- return arrow::Status(arrow::StatusCode::SerializationError, "validation error: " + validation.ToString());
- }
- return batch;
-}
-
-TString TFullDataSerializer::DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
- TString result;
- {
- arrow::io::MockOutputStream mock;
- auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&mock, batch->schema(), Options));
- TStatusValidator::Validate(writer->WriteRecordBatch(*batch));
- result.reserve(mock.GetExtentBytesWritten());
- }
- {
- TStringOutputStream stream(&result);
- auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&stream, batch->schema(), Options));
- TStatusValidator::Validate(writer->WriteRecordBatch(*batch));
- TStatusValidator::Validate(writer->Close());
- Y_ABORT_UNLESS(stream.GetPosition() == result.size());
- }
- return result;
-}
-
-}
diff --git a/ydb/core/formats/arrow/serializer/full.h b/ydb/core/formats/arrow/serializer/full.h
deleted file mode 100644
index 56761a3d752..00000000000
--- a/ydb/core/formats/arrow/serializer/full.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include "abstract.h"
-#include <ydb/library/accessor/accessor.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-
-namespace NKikimr::NArrow::NSerialization {
-
-class TFullDataSerializer: public ISerializer {
-private:
- const arrow::ipc::IpcWriteOptions Options;
-protected:
- virtual TString DoSerialize(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
-public:
- virtual bool IsHardPacker() const override {
- return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3;
- }
-
- TFullDataSerializer(const arrow::ipc::IpcWriteOptions& options)
- : Options(options) {
-
- }
-};
-
-class TFullDataDeserializer: public IDeserializer {
-protected:
- virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const override;
- virtual TString DoDebugString() const override {
- return "type=FULL_DATA;";
- }
-public:
- TFullDataDeserializer() {
-
- }
-};
-
-}
diff --git a/ydb/core/formats/arrow/serializer/native.cpp b/ydb/core/formats/arrow/serializer/native.cpp
new file mode 100644
index 00000000000..5a6819ee270
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/native.cpp
@@ -0,0 +1,185 @@
+#include "native.h"
+#include "stream.h"
+#include "parsing.h"
+#include <ydb/core/formats/arrow/dictionary/conversion.h>
+#include <ydb/core/formats/arrow/common/validation.h>
+
+#include <ydb/library/services/services.pb.h>
+#include <ydb/library/actors/core/log.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/dictionary.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
+
+namespace NKikimr::NArrow::NSerialization {
+
+arrow::Result<std::shared_ptr<arrow::RecordBatch>> TNativeSerializer::DoDeserialize(const TString& data) const {
+ arrow::ipc::DictionaryMemo dictMemo;
+ auto options = arrow::ipc::IpcReadOptions::Defaults();
+ options.use_threads = false;
+
+ std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data));
+ arrow::io::BufferReader readerStream(buffer);
+ auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&readerStream));
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ auto readResult = reader->ReadNext(&batch);
+ if (!readResult.ok()) {
+ return readResult;
+ }
+ if (!batch) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "null batch");
+ }
+ auto validation = batch->Validate();
+ if (!validation.ok()) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "validation error: " + validation.ToString());
+ }
+ return batch;
+}
+
+TString TNativeSerializer::DoSerializeFull(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ TString result;
+ {
+ arrow::io::MockOutputStream mock;
+ auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&mock, batch->schema(), Options));
+ TStatusValidator::Validate(writer->WriteRecordBatch(*batch));
+ result.reserve(mock.GetExtentBytesWritten());
+ }
+ {
+ TStringOutputStream stream(&result);
+ auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&stream, batch->schema(), Options));
+ TStatusValidator::Validate(writer->WriteRecordBatch(*batch));
+ TStatusValidator::Validate(writer->Close());
+ Y_ABORT_UNLESS(stream.GetPosition() == result.size());
+ }
+ return result;
+}
+
+arrow::Result<std::shared_ptr<arrow::RecordBatch>> TNativeSerializer::DoDeserialize(const TString& data, const std::shared_ptr<arrow::Schema>& schema) const {
+ arrow::ipc::DictionaryMemo dictMemo;
+ auto options = arrow::ipc::IpcReadOptions::Defaults();
+ options.use_threads = false;
+
+ std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data));
+ arrow::io::BufferReader reader(buffer);
+ AFL_TRACE(NKikimrServices::ARROW_HELPER)("event", "parsing")("size", data.size())("columns", schema->num_fields());
+ auto batchResult = arrow::ipc::ReadRecordBatch(schema, &dictMemo, options, &reader);
+ if (!batchResult.ok()) {
+ return batchResult;
+ }
+ std::shared_ptr<arrow::RecordBatch> batch = *batchResult;
+ if (!batch) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "empty batch");
+ }
+ auto validation = batch->Validate();
+ if (!validation.ok()) {
+ return arrow::Status(arrow::StatusCode::SerializationError, "batch is not valid: " + validation.ToString());
+ }
+ return batch;
+}
+
+TString TNativeSerializer::DoSerializePayload(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ arrow::ipc::IpcPayload payload;
+ // Build payload. Compression if set up performed here.
+ TStatusValidator::Validate(arrow::ipc::GetRecordBatchPayload(*batch, Options, &payload));
+
+ int32_t metadata_length = 0;
+ arrow::io::MockOutputStream mock;
+ // Process prepared payload through mock stream. Fast and efficient.
+ TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &mock, &metadata_length));
+
+ TString str;
+ str.resize(mock.GetExtentBytesWritten());
+
+ TFixedStringOutputStream out(&str);
+ // Write prepared payload into the resultant string. No extra allocation will be made.
+ TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length));
+ Y_ABORT_UNLESS(out.GetPosition() == str.size());
+ Y_DEBUG_ABORT_UNLESS(Deserialize(str, batch->schema()).ok());
+ AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields());
+ return str;
+}
+
+NKikimr::TConclusion<std::shared_ptr<arrow::util::Codec>> TNativeSerializer::BuildCodec(const arrow::Compression::type& cType, const std::optional<ui32> level) const {
+ auto codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(cType));
+ if (!codec) {
+ return std::shared_ptr<arrow::util::Codec>();
+ }
+ const int levelDef = level.value_or(codec->default_compression_level());
+ const int levelMin = codec->minimum_compression_level();
+ const int levelMax = codec->maximum_compression_level();
+ if (levelDef < levelMin || levelMax < levelDef) {
+ return TConclusionStatus::Fail(
+ TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]"
+ );
+ }
+ std::shared_ptr<arrow::util::Codec> codecPtr = std::move(NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(cType, levelDef)));
+ return codecPtr;
+}
+
+NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) {
+ std::optional<arrow::Compression::type> codec;
+ std::optional<int> level;
+ {
+ auto fValue = features.Extract("COMPRESSION.TYPE");
+ if (!fValue) {
+ return TConclusionStatus::Fail("not defined COMPRESSION.TYPE as arrow::Compression");
+ }
+ codec = NArrow::CompressionFromString(*fValue);
+ if (!codec) {
+ return TConclusionStatus::Fail("cannot parse COMPRESSION.TYPE as arrow::Compression");
+ }
+ }
+ {
+ auto fValue = features.Extract("COMPRESSION.LEVEL");
+ if (fValue) {
+ ui32 levelLocal;
+ if (!TryFromString<ui32>(*fValue, levelLocal)) {
+ return TConclusionStatus::Fail("cannot parse COMPRESSION.LEVEL as ui32");
+ }
+ level = levelLocal;
+ }
+ }
+ auto codecPtrStatus = BuildCodec(codec.value_or(Options.codec->compression_type()), level);
+ if (!codecPtrStatus) {
+ return codecPtrStatus.GetError();
+ }
+ Options.codec = *codecPtrStatus;
+ return TConclusionStatus::Success();
+}
+
+NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) {
+ if (!proto.HasArrowCompression()) {
+ return TConclusionStatus::Fail("no arrow serializer data in proto");
+ }
+ auto compression = proto.GetArrowCompression();
+ if (!compression.HasCodec()) {
+ Options = GetDefaultOptions();
+ return TConclusionStatus::Success();
+ }
+ std::optional<arrow::Compression::type> codec = NArrow::CompressionFromProto(compression.GetCodec());
+ if (!codec) {
+ return TConclusionStatus::Fail("cannot parse codec type from proto");
+ }
+ std::optional<int> level;
+ if (compression.HasLevel()) {
+ level = compression.GetLevel();
+ }
+
+ Options.use_threads = false;
+ auto result = BuildCodec(*codec, level);
+ if (!result) {
+ return result.GetError();
+ }
+ Options.codec = *result;
+ return TConclusionStatus::Success();
+}
+
+void TNativeSerializer::DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const {
+ proto.MutableArrowCompression()->SetCodec(NArrow::CompressionToProto(Options.codec->compression_type()));
+ proto.MutableArrowCompression()->SetLevel(Options.codec->compression_level());
+}
+
+}
diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h
new file mode 100644
index 00000000000..ece3e9e34fc
--- /dev/null
+++ b/ydb/core/formats/arrow/serializer/native.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include "abstract.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/conclusion/result.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NArrow::NSerialization {
+
+class TNativeSerializer: public ISerializer {
+public:
+ static TString GetClassNameStatic() {
+ return "ARROW_SERIALIZER";
+ }
+private:
+ arrow::ipc::IpcWriteOptions Options;
+
+ TConclusion<std::shared_ptr<arrow::util::Codec>> BuildCodec(const arrow::Compression::type& cType, const std::optional<ui32> level) const;
+ static const inline TFactory::TRegistrator<TNativeSerializer> Registrator = TFactory::TRegistrator<TNativeSerializer>(GetClassNameStatic());
+protected:
+ virtual TString DoSerializeFull(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+ virtual TString DoSerializePayload(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+ virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data) const override;
+ virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> DoDeserialize(const TString& data, const std::shared_ptr<arrow::Schema>& schema) const override;
+
+ virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override;
+
+ static arrow::ipc::IpcOptions BuildDefaultOptions() {
+ arrow::ipc::IpcWriteOptions options;
+ options.use_threads = false;
+ options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
+ return options;
+ }
+
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) override;
+
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const override;
+
+public:
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+
+ virtual bool IsHardPacker() const override {
+ return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3;
+ }
+
+ static arrow::ipc::IpcOptions GetDefaultOptions() {
+ static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions();
+ return options;
+ }
+
+ TNativeSerializer(const arrow::Compression::type compressionType) {
+ Options.use_threads = false;
+ auto r = arrow::util::Codec::Create(compressionType);
+ AFL_VERIFY(r.ok());
+ Options.codec = std::move(*r);
+ }
+
+ TNativeSerializer(const arrow::ipc::IpcWriteOptions& options = GetDefaultOptions())
+ : Options(options) {
+ Options.use_threads = false;
+
+ }
+};
+
+}
diff --git a/ydb/core/formats/arrow/compression/parsing.cpp b/ydb/core/formats/arrow/serializer/parsing.cpp
index 8a394073bdd..8a394073bdd 100644
--- a/ydb/core/formats/arrow/compression/parsing.cpp
+++ b/ydb/core/formats/arrow/serializer/parsing.cpp
diff --git a/ydb/core/formats/arrow/compression/parsing.h b/ydb/core/formats/arrow/serializer/parsing.h
index e1dbbf9badd..e1dbbf9badd 100644
--- a/ydb/core/formats/arrow/compression/parsing.h
+++ b/ydb/core/formats/arrow/serializer/parsing.h
diff --git a/ydb/core/formats/arrow/serializer/ya.make b/ydb/core/formats/arrow/serializer/ya.make
index 558825873a2..bf7e091ab4b 100644
--- a/ydb/core/formats/arrow/serializer/ya.make
+++ b/ydb/core/formats/arrow/serializer/ya.make
@@ -3,15 +3,16 @@ LIBRARY()
PEERDIR(
contrib/libs/apache/arrow
ydb/core/formats/arrow/common
+ ydb/services/metadata/abstract
ydb/library/actors/core
ydb/core/protos
)
SRCS(
abstract.cpp
- full.cpp
- batch_only.cpp
+ GLOBAL native.cpp
stream.cpp
+ parsing.cpp
)
END()
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp
index 0006b339464..b84fa44799c 100644
--- a/ydb/core/formats/arrow/special_keys.cpp
+++ b/ydb/core/formats/arrow/special_keys.cpp
@@ -1,7 +1,7 @@
#include "special_keys.h"
#include "permutations.h"
#include "reader/read_filter_merger.h"
-#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
@@ -11,7 +11,7 @@ bool TSpecialKeys::DeserializeFromString(const TString& data) {
if (!data) {
return false;
}
- Data = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(data));
+ Data = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->Deserialize(data));
return !!Data;
}
@@ -25,7 +25,7 @@ NKikimr::NArrow::TReplaceKey TSpecialKeys::GetKeyByIndex(const ui32 position, co
}
TString TSpecialKeys::SerializeToString() const {
- return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(Data);
+ return NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->SerializeFull(Data);
}
TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const {
diff --git a/ydb/core/formats/arrow/ut/ut_dictionary.cpp b/ydb/core/formats/arrow/ut/ut_dictionary.cpp
index c723eb1957c..c3df2c6a30f 100644
--- a/ydb/core/formats/arrow/ut/ut_dictionary.cpp
+++ b/ydb/core/formats/arrow/ut/ut_dictionary.cpp
@@ -1,7 +1,6 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
@@ -13,13 +12,13 @@ Y_UNIT_TEST_SUITE(Dictionary) {
ui64 Test(NConstruction::IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) {
std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize);
- const TString data = NSerialization::TFullDataSerializer(options).Serialize(batch);
- auto deserializedBatch = *NSerialization::TFullDataDeserializer().Deserialize(data);
+ const TString data = NSerialization::TNativeSerializer(options).SerializeFull(batch);
+ auto deserializedBatch = *NSerialization::TNativeSerializer().Deserialize(data);
Y_ABORT_UNLESS(!!deserializedBatch);
auto originalBatchTransformed = DictionaryToArray(batch);
auto roundBatchTransformed = DictionaryToArray(deserializedBatch);
- const TString roundUnpacked = NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed);
- const TString roundTransformed = NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed);
+ const TString roundUnpacked = NSerialization::TNativeSerializer(options).SerializeFull(roundBatchTransformed);
+ const TString roundTransformed = NSerialization::TNativeSerializer(options).SerializeFull(originalBatchTransformed);
Y_ABORT_UNLESS(roundBatchTransformed->num_rows() == originalBatchTransformed->num_rows());
Y_ABORT_UNLESS(roundUnpacked == roundTransformed);
return data.size();
@@ -154,8 +153,8 @@ Y_UNIT_TEST_SUITE(Dictionary) {
"field", NConstruction::TStringPoolFiller(pSize, strLen)
);
std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize);
- const TString dataFull = NSerialization::TFullDataSerializer(options).Serialize(batch);
- const TString dataPayload = NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
+ const TString dataFull = NSerialization::TNativeSerializer(options).SerializeFull(batch);
+ const TString dataPayload = NSerialization::TNativeSerializer(options).SerializePayload(batch);
bytesFull = dataFull.size();
bytesPayload = dataPayload.size();
}
diff --git a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
index 4a8413dcd5f..24d2c52d921 100644
--- a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
+++ b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
@@ -1,7 +1,5 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp
index 2e3453d5df9..10d9ac929f5 100644
--- a/ydb/core/grpc_services/rpc_log_store.cpp
+++ b/ydb/core/grpc_services/rpc_log_store.cpp
@@ -42,16 +42,16 @@ bool ConvertCompressionFromPublicToInternal(const Ydb::LogStore::Compression& fr
error = "LogStores with no compression are disabled.";
return false;
case Ydb::LogStore::Compression::CODEC_LZ4:
- to.SetCompressionCodec(NKikimrSchemeOp::ColumnCodecLZ4);
+ to.SetCodec(NKikimrSchemeOp::ColumnCodecLZ4);
break;
case Ydb::LogStore::Compression::CODEC_ZSTD:
- to.SetCompressionCodec(NKikimrSchemeOp::ColumnCodecZSTD);
+ to.SetCodec(NKikimrSchemeOp::ColumnCodecZSTD);
break;
default:
break;
}
if (from.compression_level()) {
- to.SetCompressionLevel(from.compression_level());
+ to.SetLevel(from.compression_level());
}
return true;
}
@@ -60,7 +60,7 @@ void ConvertCompressionFromInternalToPublic(const NKikimrSchemeOp::TCompressionO
Ydb::LogStore::Compression& to)
{
to.set_compression_codec(Ydb::LogStore::Compression::CODEC_LZ4); // LZ4 if not set
- switch (from.GetCompressionCodec()) {
+ switch (from.GetCodec()) {
case NKikimrSchemeOp::ColumnCodecPlain:
to.set_compression_codec(Ydb::LogStore::Compression::CODEC_PLAIN);
break;
@@ -73,7 +73,7 @@ void ConvertCompressionFromInternalToPublic(const NKikimrSchemeOp::TCompressionO
default:
break;
}
- to.set_compression_level(from.GetCompressionLevel());
+ to.set_compression_level(from.GetLevel());
}
bool ConvertSchemaFromPublicToInternal(const Ydb::LogStore::Schema& from, NKikimrSchemeOp::TColumnTableSchema& to,
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index 59dc26c2990..38a29ce7752 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -6,7 +6,6 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/tx/sharding/sharding.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index 5d57f94fa40..f31f946d28f 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -6,7 +6,6 @@
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
namespace NKikimr::NKqp {
diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp
index e7706d1093c..c3b65e98119 100644
--- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp
+++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp
@@ -17,9 +17,9 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl
}
}
{
- auto result = CompressionDiff.DeserializeFromRequestFeatures(features);
- if (!result) {
- return TConclusionStatus::Fail(result.GetErrorMessage());
+ auto status = Serializer.DeserializeFromRequest(features);
+ if (!status) {
+ return status;
}
}
return TConclusionStatus::Success();
@@ -28,7 +28,9 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl
void TAlterColumnOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* column = schemaData.AddAlterColumns();
column->SetName(ColumnName);
- *column->MutableCompression() = CompressionDiff.SerializeToProto();
+ if (!!Serializer) {
+ Serializer.SerializeToProto(*column->MutableSerializer());
+ }
*column->MutableDictionaryEncoding() = DictionaryEncodingDiff.SerializeToProto();
}
diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h
index 803ffd5464f..81c0e362be3 100644
--- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h
+++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h
@@ -1,5 +1,5 @@
#include "abstract.h"
-#include <ydb/core/formats/arrow/compression/diff.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/dictionary/diff.h>
namespace NKikimr::NKqp::NColumnshard {
@@ -14,7 +14,7 @@ private:
TString ColumnName;
- NArrow::TCompressionDiff CompressionDiff;
+ NArrow::NSerialization::TSerializerContainer Serializer;
NArrow::NDictionary::TEncodingDiff DictionaryEncodingDiff;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
index 08467f55d56..3301f543b8f 100644
--- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
+++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
@@ -11,7 +11,7 @@ SRCS(
PEERDIR(
ydb/services/metadata/manager
- ydb/core/formats/arrow/compression
+ ydb/core/formats/arrow/serializer
ydb/core/kqp/gateway/utils
ydb/core/protos
)
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index 55c8adf074d..9ba5c0fb564 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -654,7 +654,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
{
txRes.Ops.insert(node.Raw());
bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes, tablesData, types);
- Cerr << KqpExprToPrettyString(*node.Raw(), ctx) << Endl;
+// Cerr << KqpExprToPrettyString(*node.Raw(), ctx) << Endl;
txRes.AddResult(node);
return result;
}
diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp
index 99a10e26eca..085415d36c5 100644
--- a/ydb/core/kqp/ut/common/columnshard.cpp
+++ b/ydb/core/kqp/ut/common/columnshard.cpp
@@ -1,5 +1,4 @@
#include "columnshard.h"
-#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/testlib/cs_helper.h>
namespace NKikimr {
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 6f53286948f..707a363f244 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -29,7 +29,6 @@
#include <fmt/format.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
namespace NKikimr {
namespace NKqp {
@@ -1367,7 +1366,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[1u;]])");
}
- AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < 0.15 * csController->GetIndexesSkippingOnSelect().Val());
+ AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < 0.20 * csController->GetIndexesSkippingOnSelect().Val());
}
@@ -3793,6 +3792,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ABORT_UNLESS(d.GetMaxCount() - d.GetMinCount() <= 2);
}
}
+ {
+ auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, `COMPRESSION.TYPE`=`zstd`);";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
}
const ui64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1;
const ui64 bytesUnpack = bytesUnpack1PK - bytesPK1;
diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
index d09a196b7af..3ba8d515e8c 100644
--- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
@@ -7,7 +7,6 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/testlib/common_helper.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
#include <library/cpp/threading/local_executor/local_executor.h>
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 1f53b50a833..3c13d8c27b4 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -7,7 +7,6 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/testlib/common_helper.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/library/uuid/uuid.h>
#include <ydb/library/binary_json/write.h>
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 895e1240cbd..ca8629e2504 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -397,14 +397,24 @@ message TDictionaryEncodingSettings {
}
message TCompressionOptions {
- optional EColumnCodec CompressionCodec = 2; // LZ4 (in arrow LZ4_FRAME variant) if not set
- optional int32 CompressionLevel = 3; // Use default compression level if not set (0 != not set)
+ optional EColumnCodec Codec = 2; // LZ4 (in arrow LZ4_FRAME variant) if not set
+ optional int32 Level = 3; // Use default compression level if not set (0 != not set)
+}
+
+message TOlapColumn {
+
+ message TSerializer {
+ optional string ClassName = 1;
+ oneof Implementation {
+ TCompressionOptions ArrowCompression = 40;
+ }
+ }
}
message TOlapColumnDiff {
optional string Name = 1;
- optional TCompressionOptions Compression = 2;
optional TDictionaryEncodingSettings DictionaryEncoding = 4;
+ optional TOlapColumn.TSerializer Serializer = 5;
}
message TOlapColumnDescription {
@@ -419,8 +429,9 @@ message TOlapColumnDescription {
optional NKikimrProto.TTypeInfo TypeInfo = 6;
optional bool NotNull = 7;
- optional TCompressionOptions Compression = 8;
+ optional TCompressionOptions Compression = 8[deprecated = true];
optional TDictionaryEncodingSettings DictionaryEncoding = 9;
+ optional TOlapColumn.TSerializer Serializer = 10;
}
message TRequestedBloomFilter {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 8df44d7df99..407aed03b43 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -375,10 +375,10 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec
cProto.SetName(tier.Name);
*cProto.MutableObjectStorage() = tier.S3;
if (tier.Codec) {
- cProto.MutableCompression()->SetCompressionCodec(tier.GetCodecId());
+ cProto.MutableCompression()->SetCodec(tier.GetCodecId());
}
if (tier.CompressionLevel) {
- cProto.MutableCompression()->SetCompressionLevel(*tier.CompressionLevel);
+ cProto.MutableCompression()->SetLevel(*tier.CompressionLevel);
}
NColumnShard::NTiers::TTierConfig tConfig(tier.Name, cProto);
cs->MutableTierConfigs().emplace(tConfig.GetTierName(), tConfig);
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 0ea35595f33..58849aa342e 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -236,10 +236,10 @@ struct TTestSchema {
}
if (specials.HasCodec()) {
- schema->MutableDefaultCompression()->SetCompressionCodec(specials.GetCodecId());
+ schema->MutableDefaultCompression()->SetCodec(specials.GetCodecId());
}
if (specials.CompressionLevel) {
- schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel);
+ schema->MutableDefaultCompression()->SetLevel(*specials.CompressionLevel);
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index c6810001093..f91fcb099f4 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
+#include <ydb/core/formats/arrow/serializer/native.h>
namespace NKikimr::NOlap {
@@ -116,9 +117,9 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
continue;
}
if (b->num_rows() < 100) {
- SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::UNCOMPRESSED));
+ SaverContext.SetExternalSerializer(NArrow::NSerialization::TSerializerContainer(std::make_shared<NArrow::NSerialization::TNativeSerializer>(arrow::Compression::type::UNCOMPRESSED)));
} else {
- SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::LZ4_FRAME));
+ SaverContext.SetExternalSerializer(NArrow::NSerialization::TSerializerContainer(std::make_shared<NArrow::NSerialization::TNativeSerializer>(arrow::Compression::type::LZ4_FRAME)));
}
auto portions = MakeAppendedPortions(b, pathId, maxSnapshot, nullptr, context);
Y_ABORT_UNLESS(portions.size());
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index 80117f34f33..cfba0a99406 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -43,8 +43,8 @@ std::optional<TPortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPorti
auto* tiering = Tiering.FindPtr(evictFeatures.PathId);
Y_ABORT_UNLESS(tiering);
- auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
- if (!compression) {
+ auto serializer = tiering->GetSerializer(evictFeatures.TargetTierName);
+ if (!serializer) {
// Nothing to recompress. We have no other kinds of evictions yet.
evictFeatures.DataChanges = false;
auto result = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs);
@@ -58,7 +58,7 @@ std::optional<TPortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPorti
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_for_eviction", portionInfo.DebugString());
TSaverContext saverContext(evictFeatures.StorageOperator, SaverContext.GetStoragesManager());
- saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression);
+ saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalSerializer(*serializer);
auto withBlobs = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs);
withBlobs.GetPortionInfo().InitOperator(evictFeatures.StorageOperator, true);
withBlobs.GetPortionInfo().MutableMeta().SetTierName(evictFeatures.TargetTierName);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index e52f769a817..3ba21e93aef 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -3,7 +3,6 @@
#include "fetched_data.h"
#include "plain_read_data.h"
#include "constructor.h"
-#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
#include <ydb/core/tx/columnshard/blobs_reader/events.h>
#include <ydb/core/tx/conveyor/usage/service.h>
diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.cpp b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
index a416509e487..c234314d16b 100644
--- a/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
@@ -1,7 +1,6 @@
#include "column_features.h"
#include "index_info.h"
-#include <ydb/core/formats/arrow/serializer/full.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <util/string/builder.h>
namespace NKikimr::NOlap {
@@ -23,26 +22,19 @@ NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer(
}
void TColumnFeatures::InitLoader(const TIndexInfo& info) {
- NArrow::NTransformation::ITransformer::TPtr transformer = GetLoadTransformer();
auto schema = info.GetColumnSchema(ColumnId);
- if (!transformer) {
- Loader = std::make_shared<TColumnLoader>(transformer,
- std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(schema),
- schema, ColumnId);
- } else {
- Loader = std::make_shared<TColumnLoader>(transformer,
- std::make_shared<NArrow::NSerialization::TFullDataDeserializer>(),
- schema, ColumnId);
- }
+ Loader = std::make_shared<TColumnLoader>(GetLoadTransformer(), Serializer, schema, ColumnId);
}
std::optional<NKikimr::NOlap::TColumnFeatures> TColumnFeatures::BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const TIndexInfo& indexInfo) {
const ui32 columnId = columnInfo.GetId();
TColumnFeatures result(columnId);
- if (columnInfo.HasCompression()) {
- auto settings = NArrow::TCompression::BuildFromProto(columnInfo.GetCompression());
- Y_ABORT_UNLESS(settings.IsSuccess());
- result.Compression = *settings;
+ if (columnInfo.HasSerializer()) {
+ AFL_VERIFY(result.Serializer.DeserializeFromProto(columnInfo.GetSerializer()));
+ } else if (columnInfo.HasCompression()) {
+ AFL_VERIFY(result.Serializer.DeserializeFromProto(columnInfo.GetCompression()));
+ } else {
+ result.Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer();
}
if (columnInfo.HasDictionaryEncoding()) {
auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding());
@@ -53,20 +45,19 @@ std::optional<NKikimr::NOlap::TColumnFeatures> TColumnFeatures::BuildFromProto(c
return result;
}
-std::unique_ptr<arrow::util::Codec> TColumnFeatures::GetCompressionCodec() const {
- if (Compression) {
- return Compression->BuildArrowCodec();
- } else {
- return nullptr;
- }
-}
-
NKikimr::NOlap::TColumnFeatures TColumnFeatures::BuildFromIndexInfo(const ui32 columnId, const TIndexInfo& indexInfo) {
TColumnFeatures result(columnId);
result.InitLoader(indexInfo);
return result;
}
+TColumnFeatures::TColumnFeatures(const ui32 columnId)
+ : ColumnId(columnId)
+ , Serializer(NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer())
+{
+
+}
+
TString TColumnLoader::DebugString() const {
TStringBuilder result;
if (ExpectedSchema) {
@@ -75,8 +66,8 @@ TString TColumnLoader::DebugString() const {
if (Transformer) {
result << "transformer:" << Transformer->DebugString() << ";";
}
- if (Deserializer) {
- result << "deserializer:" << Deserializer->DebugString() << ";";
+ if (Serializer) {
+ result << "serializer:" << Serializer->DebugString() << ";";
}
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h
index 11415ecc20d..7e99af80eb7 100644
--- a/ydb/core/tx/columnshard/engines/scheme/column_features.h
+++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h
@@ -1,5 +1,4 @@
#pragma once
-#include <ydb/core/formats/arrow/compression/object.h>
#include <ydb/core/formats/arrow/dictionary/object.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/transformer/abstract.h>
@@ -14,7 +13,7 @@ namespace NKikimr::NOlap {
class TSaverContext {
private:
TString TierName;
- std::optional<NArrow::TCompression> ExternalCompression;
+ std::optional<NArrow::NSerialization::TSerializerContainer> ExternalSerializer;
YDB_READONLY_DEF(std::shared_ptr<IBlobsStorageOperator>, StorageOperator);
YDB_READONLY_DEF(std::shared_ptr<IStoragesManager>, StoragesManager);
public:
@@ -24,11 +23,12 @@ public:
}
- const std::optional<NArrow::TCompression>& GetExternalCompression() const {
- return ExternalCompression;
+ const std::optional<NArrow::NSerialization::TSerializerContainer>& GetExternalSerializer() const {
+ return ExternalSerializer;
}
- TSaverContext& SetExternalCompression(const std::optional<NArrow::TCompression>& value) {
- ExternalCompression = value;
+ TSaverContext& SetExternalSerializer(const std::optional<NArrow::NSerialization::TSerializerContainer>& value) {
+ AFL_VERIFY(!!value);
+ ExternalSerializer = value;
return *this;
}
const TString& GetTierName() const {
@@ -43,17 +43,17 @@ public:
class TColumnSaver {
private:
NArrow::NTransformation::ITransformer::TPtr Transformer;
- NArrow::NSerialization::ISerializer::TPtr Serializer;
+ NArrow::NSerialization::TSerializerContainer Serializer;
public:
TColumnSaver() = default;
- TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer)
+ TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, const NArrow::NSerialization::TSerializerContainer serializer)
: Transformer(transformer)
, Serializer(serializer) {
Y_ABORT_UNLESS(Serializer);
}
bool IsHardPacker() const {
- return Serializer && Serializer->IsHardPacker();
+ return Serializer->IsHardPacker();
}
TString Apply(std::shared_ptr<arrow::Array> data, std::shared_ptr<arrow::Field> field) const {
@@ -65,9 +65,9 @@ public:
TString Apply(const std::shared_ptr<arrow::RecordBatch>& data) const {
Y_ABORT_UNLESS(Serializer);
if (Transformer) {
- return Serializer->Serialize(Transformer->Transform(data));
+ return Serializer->SerializeFull(Transformer->Transform(data));
} else {
- return Serializer->Serialize(data);
+ return Serializer->SerializePayload(data);
}
}
};
@@ -75,22 +75,22 @@ public:
class TColumnLoader {
private:
NArrow::NTransformation::ITransformer::TPtr Transformer;
- NArrow::NSerialization::IDeserializer::TPtr Deserializer;
+ NArrow::NSerialization::TSerializerContainer Serializer;
std::shared_ptr<arrow::Schema> ExpectedSchema;
const ui32 ColumnId;
public:
TString DebugString() const;
- TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer,
+ TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, const NArrow::NSerialization::TSerializerContainer& serializer,
const std::shared_ptr<arrow::Schema>& expectedSchema, const ui32 columnId)
: Transformer(transformer)
- , Deserializer(deserializer)
+ , Serializer(serializer)
, ExpectedSchema(expectedSchema)
, ColumnId(columnId) {
Y_ABORT_UNLESS(ExpectedSchema);
auto fieldsCountStr = ::ToString(ExpectedSchema->num_fields());
Y_ABORT_UNLESS(ExpectedSchema->num_fields() == 1, "%s", fieldsCountStr.data());
- Y_ABORT_UNLESS(Deserializer);
+ Y_ABORT_UNLESS(Serializer);
}
ui32 GetColumnId() const {
@@ -106,8 +106,9 @@ public:
}
arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const {
- Y_ABORT_UNLESS(Deserializer);
- arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = Deserializer->Deserialize(data);
+ Y_ABORT_UNLESS(Serializer);
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray =
+ Transformer ? Serializer->Deserialize(data) : Serializer->Deserialize(data, ExpectedSchema);
if (!columnArray.ok()) {
return columnArray;
}
@@ -134,21 +135,19 @@ struct TIndexInfo;
class TColumnFeatures {
private:
ui32 ColumnId;
- std::optional<NArrow::TCompression> Compression;
+ YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, Serializer);
std::optional<NArrow::NDictionary::TEncodingSettings> DictionaryEncoding;
std::shared_ptr<TColumnLoader> Loader;
NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const;
void InitLoader(const TIndexInfo& info);
- TColumnFeatures(const ui32 columnId)
- : ColumnId(columnId) {
- }
+ TColumnFeatures(const ui32 columnId);
public:
TString DebugString() const {
TStringBuilder sb;
- sb << "compression=" << (Compression ? Compression->DebugString() : "NO") << ";";
+ sb << "serializer=" << (Serializer ? Serializer->DebugString() : "NO") << ";";
sb << "encoding=" << (DictionaryEncoding ? DictionaryEncoding->DebugString() : "NO") << ";";
sb << "loader=" << (Loader ? Loader->DebugString() : "NO") << ";";
return sb;
@@ -158,8 +157,6 @@ public:
static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const TIndexInfo& indexInfo);
static TColumnFeatures BuildFromIndexInfo(const ui32 columnId, const TIndexInfo& indexInfo);
- std::unique_ptr<arrow::util::Codec> GetCompressionCodec() const;
-
const std::shared_ptr<TColumnLoader>& GetLoader() const {
AFL_VERIFY(Loader);
return Loader;
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 6bac1a0af45..e60785485be 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -3,9 +3,8 @@
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/formats/arrow/sort_cursor.h>
#include <ydb/core/sys_view/common/schema.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
+#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/formats/arrow/transformer/dictionary.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/base/appdata.h>
namespace NKikimr::NOlap {
@@ -296,32 +295,23 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
}
TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const {
- arrow::ipc::IpcWriteOptions options;
- options.use_threads = false;
-
NArrow::NTransformation::ITransformer::TPtr transformer;
- std::unique_ptr<arrow::util::Codec> columnCodec;
+ NArrow::NSerialization::TSerializerContainer serializer;
{
auto it = ColumnFeatures.find(columnId);
AFL_VERIFY(it != ColumnFeatures.end());
transformer = it->second.GetSaveTransformer();
- columnCodec = it->second.GetCompressionCodec();
- }
-
- if (context.GetExternalCompression()) {
- options.codec = context.GetExternalCompression()->BuildArrowCodec();
- } else if (columnCodec) {
- options.codec = std::move(columnCodec);
- } else if (DefaultCompression) {
- options.codec = DefaultCompression->BuildArrowCodec();
- } else {
- options.codec = NArrow::TCompression::BuildDefaultCodec();
+ serializer = it->second.GetSerializer();
}
- if (!transformer) {
- return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options));
+ if (!!context.GetExternalSerializer()) {
+ return TColumnSaver(transformer, *context.GetExternalSerializer());
+ } else if (!!serializer) {
+ return TColumnSaver(transformer, serializer);
+ } else if (DefaultSerializer) {
+ return TColumnSaver(transformer, DefaultSerializer);
} else {
- return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TFullDataSerializer>(options));
+ return TColumnSaver(transformer, NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer());
}
}
@@ -390,8 +380,7 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema&
const ui32 id = col.GetId();
const TString& name = col.GetName();
const bool notNull = col.HasNotNull() ? col.GetNotNull() : false;
- auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
- col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod, notNull);
ColumnNames[name] = id;
}
@@ -413,12 +402,12 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema&
}
if (schema.HasDefaultCompression()) {
- auto result = NArrow::TCompression::BuildFromProto(schema.GetDefaultCompression());
- if (!result) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", result.GetErrorMessage());
+ NArrow::NSerialization::TSerializerContainer container;
+ if (!container.DeserializeFromProto(schema.GetDefaultCompression())) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "cannot_parse_default_serializer");
return false;
}
- DefaultCompression = *result;
+ DefaultSerializer = container;
}
Version = schema.GetVersion();
return true;
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 44d15cc375d..7409e33bd91 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -246,7 +246,7 @@ private:
std::shared_ptr<arrow::Schema> ExtendedKey; // Extend PK with snapshot columns to allow old shapshot reads
THashSet<TString> RequiredColumns;
THashSet<ui32> MinMaxIdxColumnsIds;
- std::optional<NArrow::TCompression> DefaultCompression;
+ NArrow::NSerialization::TSerializerContainer DefaultSerializer;
};
std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false);
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
index 77e9430ed95..cf87cf941d8 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
@@ -4,7 +4,7 @@
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/formats/arrow/hash/xx_hash.h>
#include <ydb/core/formats/arrow/hash/calcer.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/size_calcer.h>
namespace NKikimr::NOlap::NIndexes {
@@ -14,6 +14,7 @@ void TPortionIndexChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo
}
std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
+ AFL_VERIFY(Serializer);
AFL_VERIFY(data.size());
std::vector<TChunkedColumnReader> columnReaders;
for (auto&& i : ColumnIds) {
@@ -27,12 +28,12 @@ std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex
}
TChunkedBatchReader reader(std::move(columnReaders));
std::shared_ptr<arrow::RecordBatch> indexBatch = DoBuildIndexImpl(reader);
- const TString indexData = TColumnSaver(nullptr, Serializer).Apply(indexBatch);
+ const TString indexData = Serializer->SerializeFull(indexBatch);
return std::make_shared<TPortionIndexChunk>(indexId, recordsCount, NArrow::GetBatchDataSize(indexBatch), indexData);
}
bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) {
- Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults());
+ Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer();
return true;
}
@@ -40,7 +41,7 @@ TIndexByColumns::TIndexByColumns(const ui32 indexId, const TString& indexName, c
: TBase(indexId, indexName)
, ColumnIds(columnIds)
{
- Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults());
+ Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer();
}
NKikimr::TConclusionStatus TIndexByColumns::CheckSameColumnsForModification(const IIndexMeta& newMeta) const {
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
index 368589b11c3..1ec8aede916 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
@@ -1,5 +1,5 @@
#include "checker.h"
-#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/common/validation.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
@@ -14,7 +14,7 @@ void TBloomFilterChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapInde
bool TBloomFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
for (auto&& blob : blobs) {
- auto rb = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(blob));
+ auto rb = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->Deserialize(blob));
AFL_VERIFY(rb);
AFL_VERIFY(rb->schema()->num_fields() == 1);
AFL_VERIFY(rb->schema()->field(0)->type()->id() == arrow::Type::BOOL);
diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.h b/ydb/core/tx/columnshard/engines/scheme/tier_info.h
index d9808ca9b91..395752201f8 100644
--- a/ydb/core/tx/columnshard/engines/scheme/tier_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.h
@@ -3,7 +3,6 @@
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/common/validation.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
-#include <ydb/core/formats/arrow/compression/object.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
#include <util/generic/set.h>
@@ -18,7 +17,7 @@ private:
YDB_READONLY_DEF(TDuration, EvictDuration);
ui32 TtlUnitsInSecond;
- YDB_READONLY_DEF(std::optional<NArrow::TCompression>, Compression);
+ YDB_READONLY_DEF(std::optional<NArrow::NSerialization::TSerializerContainer>, Serializer);
public:
TTierInfo(const TString& tierName, TDuration evictDuration, const TString& column, ui32 unitsInSecond = 0)
: Name(tierName)
@@ -34,8 +33,8 @@ public:
return now - EvictDuration;
}
- TTierInfo& SetCompression(const NArrow::TCompression& value) {
- Compression = value;
+ TTierInfo& SetSerializer(const NArrow::NSerialization::TSerializerContainer& value) {
+ Serializer = value;
return *this;
}
@@ -51,9 +50,9 @@ public:
TString GetDebugString() const {
TStringBuilder sb;
- sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";compression=";
- if (Compression) {
- sb << Compression->DebugString();
+ sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";serializer=";
+ if (Serializer) {
+ sb << Serializer->DebugString();
} else {
sb << "NOT_SPECIFIED(Default)";
}
@@ -133,11 +132,11 @@ public:
return {};
}
- std::optional<NArrow::TCompression> GetCompression(const TString& name) const {
+ std::optional<NArrow::NSerialization::TSerializerContainer> GetSerializer(const TString& name) const {
auto it = TierByName.find(name);
if (it != TierByName.end()) {
Y_ABORT_UNLESS(!name.empty());
- return it->second->GetCompression();
+ return it->second->GetSerializer();
}
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make
index 4b058ca9bbc..8e80cb730b6 100644
--- a/ydb/core/tx/columnshard/engines/ya.make
+++ b/ydb/core/tx/columnshard/engines/ya.make
@@ -29,7 +29,6 @@ PEERDIR(
ydb/core/tx/columnshard/engines/insert_table
ydb/core/tx/columnshard/engines/changes
ydb/core/tx/columnshard/engines/portions
- ydb/core/formats/arrow/compression
ydb/core/tx/program
# for NYql::NUdf alloc stuff used in binary_json
diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
index 3c5b881b4bb..72aa5539f6b 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
@@ -1,10 +1,9 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/tx/columnshard/splitter/rb_splitter.h>
#include <ydb/core/tx/columnshard/counters/indexation.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/native.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
Y_UNIT_TEST_SUITE(Splitter) {
@@ -23,7 +22,7 @@ Y_UNIT_TEST_SUITE(Splitter) {
}
virtual NKikimr::NOlap::TColumnSaver GetColumnSaver(const ui32 columnId) const override {
- return NKikimr::NOlap::TColumnSaver(nullptr, std::make_shared<NKikimr::NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults()));
+ return NKikimr::NOlap::TColumnSaver(nullptr, std::make_shared<NSerialization::TNativeSerializer>(arrow::ipc::IpcOptions::Defaults()));
}
virtual std::optional<NKikimr::NOlap::TColumnSerializationStat> GetColumnSerializationStats(const ui32 /*columnId*/) const override {
@@ -36,7 +35,7 @@ Y_UNIT_TEST_SUITE(Splitter) {
NKikimr::NOlap::TColumnLoader GetColumnLoader(const ui32 columnId) const {
arrow::FieldVector v = {std::make_shared<arrow::Field>(GetColumnName(columnId), std::make_shared<arrow::StringType>())};
auto schema = std::make_shared<arrow::Schema>(v);
- return NKikimr::NOlap::TColumnLoader(nullptr, std::make_shared<NKikimr::NArrow::NSerialization::TFullDataDeserializer>(), schema, columnId);
+ return NKikimr::NOlap::TColumnLoader(nullptr, NSerialization::TSerializerContainer::GetDefaultSerializer(), schema, columnId);
}
virtual std::shared_ptr<arrow::Field> GetField(const ui32 columnId) const override {
diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make
index e242c0b6b57..4d9e9af0466 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ya.make
+++ b/ydb/core/tx/columnshard/splitter/ut/ya.make
@@ -7,7 +7,6 @@ PEERDIR(
ydb/library/arrow_kernels
ydb/core/tx/columnshard/counters
- ydb/core/formats/arrow/compression
ydb/core/tx/columnshard/engines/portions
ydb/core/kqp/common
ydb/library/yql/parser/pg_wrapper
diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp
index a6d602b5fa4..a0427414ca7 100644
--- a/ydb/core/tx/schemeshard/olap/columns/update.cpp
+++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp
@@ -2,6 +2,7 @@
#include <ydb/library/yql/minikql/mkql_type_ops.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/scheme_types/scheme_type_registry.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
namespace NKikimr::NSchemeShard {
@@ -13,13 +14,13 @@ namespace NKikimr::NSchemeShard {
Name = columnSchema.GetName();
NotNullFlag = columnSchema.GetNotNull();
TypeName = columnSchema.GetType();
- if (columnSchema.HasCompression()) {
- auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression());
- if (!compression) {
- errors.AddError("Cannot parse compression info: " + compression.GetErrorMessage());
+ if (columnSchema.HasSerializer()) {
+ NArrow::NSerialization::TSerializerContainer serializer;
+ if (!serializer.DeserializeFromProto(columnSchema.GetSerializer())) {
+ errors.AddError("Cannot parse serializer info");
return false;
}
- Compression = *compression;
+ Serializer = serializer;
}
if (columnSchema.HasDictionaryEncoding()) {
auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding());
@@ -72,10 +73,14 @@ namespace NKikimr::NSchemeShard {
columnSchema.GetTypeId(), nullptr)
.TypeInfo;
}
- if (columnSchema.HasCompression()) {
- auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression());
- Y_ABORT_UNLESS(compression.IsSuccess(), "%s", compression.GetErrorMessage().data());
- Compression = *compression;
+ if (columnSchema.HasSerializer()) {
+ NArrow::NSerialization::TSerializerContainer serializer;
+ AFL_VERIFY(serializer.DeserializeFromProto(columnSchema.GetSerializer()));
+ Serializer = serializer;
+ } else if (columnSchema.HasCompression()) {
+ NArrow::NSerialization::TSerializerContainer serializer;
+ AFL_VERIFY(serializer.DeserializeFromProto(columnSchema.GetCompression()));
+ Serializer = serializer;
}
if (columnSchema.HasDictionaryEncoding()) {
auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding());
@@ -89,8 +94,8 @@ namespace NKikimr::NSchemeShard {
columnSchema.SetName(Name);
columnSchema.SetType(TypeName);
columnSchema.SetNotNull(NotNullFlag);
- if (Compression) {
- *columnSchema.MutableCompression() = Compression->SerializeToProto();
+ if (Serializer) {
+ Serializer->SerializeToProto(*columnSchema.MutableSerializer());
}
if (DictionaryEncoding) {
*columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto();
@@ -105,12 +110,8 @@ namespace NKikimr::NSchemeShard {
bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) {
Y_ABORT_UNLESS(GetName() == diffColumn.GetName());
- {
- auto result = diffColumn.GetCompression().Apply(Compression);
- if (!result) {
- errors.AddError("Cannot merge compression info: " + result.GetErrorMessage());
- return false;
- }
+ if (diffColumn.GetSerializer()) {
+ Serializer = diffColumn.GetSerializer();
}
{
auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding);
diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h
index 45fce0c9909..26eb18a971a 100644
--- a/ydb/core/tx/schemeshard/olap/columns/update.h
+++ b/ydb/core/tx/schemeshard/olap/columns/update.h
@@ -1,19 +1,18 @@
#pragma once
-#include <ydb/core/formats/arrow/compression/diff.h>
#include <ydb/core/formats/arrow/dictionary/diff.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/schemeshard/olap/common/common.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/scheme_types/scheme_type_info.h>
-#include <ydb/core/formats/arrow/compression/object.h>
#include <ydb/core/formats/arrow/dictionary/object.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
namespace NKikimr::NSchemeShard {
class TOlapColumnDiff {
private:
YDB_READONLY_DEF(TString, Name);
- YDB_READONLY_DEF(NArrow::TCompressionDiff, Compression);
+ YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, Serializer);
YDB_READONLY_DEF(NArrow::NDictionary::TEncodingDiff, DictionaryEncoding);
public:
bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) {
@@ -22,9 +21,11 @@ public:
errors.AddError("empty field name");
return false;
}
- if (!Compression.DeserializeFromProto(columnSchema.GetCompression())) {
- errors.AddError("cannot parse compression diff from proto");
- return false;
+ if (columnSchema.HasSerializer()) {
+ if (!Serializer.DeserializeFromProto(columnSchema.GetSerializer())) {
+ errors.AddError("cannot parse serializer diff from proto");
+ return false;
+ }
}
if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) {
errors.AddError("cannot parse dictionary encoding diff from proto");
@@ -41,7 +42,7 @@ private:
YDB_READONLY_DEF(TString, TypeName);
YDB_READONLY_DEF(NScheme::TTypeInfo, Type);
YDB_FLAG_ACCESSOR(NotNull, false);
- YDB_READONLY_DEF(std::optional<NArrow::TCompression>, Compression);
+ YDB_READONLY_DEF(std::optional<NArrow::NSerialization::TSerializerContainer>, Serializer);
YDB_READONLY_DEF(std::optional<NArrow::NDictionary::TEncodingSettings>, DictionaryEncoding);
public:
TOlapColumnAdd(const std::optional<ui32>& keyOrder)
diff --git a/ydb/core/tx/schemeshard/olap/columns/ya.make b/ydb/core/tx/schemeshard/olap/columns/ya.make
index 44971d7eeb2..94c1c4c0feb 100644
--- a/ydb/core/tx/schemeshard/olap/columns/ya.make
+++ b/ydb/core/tx/schemeshard/olap/columns/ya.make
@@ -8,7 +8,8 @@ SRCS(
PEERDIR(
ydb/core/protos
ydb/core/formats/arrow/dictionary
- ydb/core/formats/arrow/compression
+ ydb/core/formats/arrow/serializer
+ ydb/core/tx/schemeshard/olap/common
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/schemeshard/olap/common/ya.make b/ydb/core/tx/schemeshard/olap/common/ya.make
index 58bbf1cb442..aea04768253 100644
--- a/ydb/core/tx/schemeshard/olap/common/ya.make
+++ b/ydb/core/tx/schemeshard/olap/common/ya.make
@@ -7,6 +7,7 @@ SRCS(
PEERDIR(
ydb/library/ydb_issue
ydb/core/base
+ ydb/library/aclib
)
END()
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index f049575c2c9..8cbf4b637d3 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -244,7 +244,6 @@ PEERDIR(
ydb/core/engine/minikql
ydb/core/external_sources
ydb/core/filestore/core
- ydb/core/formats/arrow/compression
ydb/core/kesus/tablet
ydb/core/metering
ydb/core/persqueue
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index 583d06149fa..aca16e6c4db 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -106,10 +106,16 @@ TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId,
{
}
-NArrow::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) {
- auto out = NArrow::TCompression::BuildFromProto(compression);
- Y_ABORT_UNLESS(out, "%s", out.GetErrorMessage().data());
- return *out;
+NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compressionProto) {
+ NArrow::NSerialization::TSerializerContainer container;
+ AFL_VERIFY(container.DeserializeFromProto(compressionProto));
+ return container;
+}
+
+NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TOlapColumn::TSerializer& serializerProto) {
+ NArrow::NSerialization::TSerializerContainer container;
+ AFL_VERIFY(container.DeserializeFromProto(serializerProto));
+ return container;
}
}
@@ -208,7 +214,7 @@ THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const {
for (auto& [name, tier] : pathTiering.GetTierByName()) {
auto it = tierConfigs.find(name);
if (it != tierConfigs.end()) {
- tier->SetCompression(NTiers::ConvertCompression(it->second.GetCompression()));
+ tier->SetSerializer(NTiers::ConvertCompression(it->second.GetCompression()));
}
}
}
diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h
index 6bb6c619439..727d2b37226 100644
--- a/ydb/core/tx/tiering/manager.h
+++ b/ydb/core/tx/tiering/manager.h
@@ -15,7 +15,8 @@
namespace NKikimr::NColumnShard {
namespace NTiers {
-NArrow::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression);
+NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TOlapColumn::TSerializer& serializerProto);
+NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compressionProto);
class TManager {
private:
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
index 6f09b9e5ffa..c5b4d78e2be 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
@@ -677,10 +677,10 @@ namespace NYql::NConnector::NTest {
TBuilder& AddResponse(
const std::shared_ptr<arrow::RecordBatch>& recordBatch,
const NApi::TError& error) {
- NKikimr::NArrow::NSerialization::TFullDataSerializer ser(arrow::ipc::IpcWriteOptions::Defaults());
+ NKikimr::NArrow::NSerialization::TSerializerContainer ser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer();
auto& response = this->Result_->Responses().emplace_back();
response.mutable_error()->CopyFrom(error);
- response.set_arrow_ipc_streaming(ser.Serialize(recordBatch));
+ response.set_arrow_ipc_streaming(ser->SerializeFull(recordBatch));
return static_cast<TBuilder&>(*this);
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
index 2f8b0a1c9c1..cde710e529e 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
@@ -4,8 +4,7 @@
#include <arrow/ipc/api.h>
#include <util/string/builder.h>
#include <util/system/type_name.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/yql_panic.h>
@@ -76,8 +75,8 @@ namespace NYql::NConnector {
}
std::shared_ptr<arrow::RecordBatch> ArrowIPCStreamingToArrowRecordBatch(const TProtoStringType dump) {
- NKikimr::NArrow::NSerialization::TFullDataDeserializer deser;
- auto result = deser.Deserialize(dump);
+ NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer();
+ auto result = deser->Deserialize(dump);
if (!result.ok()) {
ythrow yexception() << result.status().ToString();
}
diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h
index f6cd594e49f..273224c968c 100644
--- a/ydb/services/bg_tasks/abstract/interface.h
+++ b/ydb/services/bg_tasks/abstract/interface.h
@@ -111,6 +111,12 @@ public:
: Object(object) {
}
+ template <class TDerived>
+ TCommonInterfaceContainer(std::shared_ptr<TDerived> object)
+ : Object(object) {
+ static_assert(std::is_base_of<IInterface, TDerived>::value);
+ }
+
bool Initialize(const TString& className) {
AFL_VERIFY(!Object)("problem", "initialize for not-empty-object");
Object.reset(TFactory::Construct(className));
@@ -169,6 +175,10 @@ public:
return !Object;
}
+ operator bool() const {
+ return !!Object;
+ }
+
};
class TStringContainerProcessor {
diff --git a/ydb/services/metadata/abstract/request_features.h b/ydb/services/metadata/abstract/request_features.h
index c985276ceb7..01d525aded8 100644
--- a/ydb/services/metadata/abstract/request_features.h
+++ b/ydb/services/metadata/abstract/request_features.h
@@ -40,5 +40,6 @@ public:
}
std::optional<TString> Extract(const TString& paramName);
+
};
}
diff --git a/ydb/services/metadata/abstract/ya.make b/ydb/services/metadata/abstract/ya.make
index 1058cebaa79..7ba40379add 100644
--- a/ydb/services/metadata/abstract/ya.make
+++ b/ydb/services/metadata/abstract/ya.make
@@ -19,7 +19,6 @@ PEERDIR(
ydb/library/actors/core
ydb/library/yql/core/expr_nodes
ydb/public/api/protos
- ydb/services/metadata/request
)
END()