aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-18 17:20:16 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-18 17:20:16 +0300
commit07f2e8c1b3fc830d01173a674f99b371c3f62f97 (patch)
treebe6f0f97c7bf8dbaabbda23c2d2d3a68bccb91e2
parent1ced62df39586b2022790241c31e24d4688af053 (diff)
downloadydb-07f2e8c1b3fc830d01173a674f99b371c3f62f97.tar.gz
savers/loaders for columns
-rw-r--r--ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/formats/arrow/dictionary/conversion.cpp21
-rw-r--r--ydb/core/formats/arrow/dictionary/conversion.h1
-rw-r--r--ydb/core/formats/arrow/transformer/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/core/formats/arrow/transformer/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/formats/arrow/transformer/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/formats/arrow/transformer/CMakeLists.txt17
-rw-r--r--ydb/core/formats/arrow/transformer/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/core/formats/arrow/transformer/abstract.cpp4
-rw-r--r--ydb/core/formats/arrow/transformer/abstract.h21
-rw-r--r--ydb/core/formats/arrow/transformer/composite.cpp13
-rw-r--r--ydb/core/formats/arrow/transformer/composite.h14
-rw-r--r--ydb/core/formats/arrow/transformer/dictionary.cpp13
-rw-r--r--ydb/core/formats/arrow/transformer/dictionary.h18
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h94
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp39
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp54
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h105
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/tier_info.h63
-rw-r--r--ydb/core/tx/tiering/manager.cpp20
25 files changed, 490 insertions, 133 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
index 8bfa7dc43ee..5487526b14f 100644
--- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
@@ -11,6 +11,7 @@ add_subdirectory(dictionary)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
+add_subdirectory(transformer)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -28,6 +29,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-serializer
formats-arrow-simple_builder
formats-arrow-dictionary
+ formats-arrow-transformer
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
index 2f7c6515330..f24e05a3ca8 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
@@ -11,6 +11,7 @@ add_subdirectory(dictionary)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
+add_subdirectory(transformer)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -29,6 +30,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-serializer
formats-arrow-simple_builder
formats-arrow-dictionary
+ formats-arrow-transformer
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
index 2f7c6515330..f24e05a3ca8 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
@@ -11,6 +11,7 @@ add_subdirectory(dictionary)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
+add_subdirectory(transformer)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -29,6 +30,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-serializer
formats-arrow-simple_builder
formats-arrow-dictionary
+ formats-arrow-transformer
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
index 23061c5821d..cf9b3b8f00a 100644
--- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
@@ -11,6 +11,7 @@ add_subdirectory(dictionary)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
+add_subdirectory(transformer)
add_subdirectory(ut)
add_library(core-formats-arrow)
@@ -29,6 +30,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-serializer
formats-arrow-simple_builder
formats-arrow-dictionary
+ formats-arrow-transformer
ydb-library-arrow_kernels
ydb-library-binary_json
ydb-library-dynumber
diff --git a/ydb/core/formats/arrow/dictionary/conversion.cpp b/ydb/core/formats/arrow/dictionary/conversion.cpp
index 8b8ed9aa2e3..e7e08f77908 100644
--- a/ydb/core/formats/arrow/dictionary/conversion.cpp
+++ b/ydb/core/formats/arrow/dictionary/conversion.cpp
@@ -92,6 +92,27 @@ std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<
return result;
}
+std::shared_ptr<arrow::RecordBatch> ArrayToDictionary(const std::shared_ptr<arrow::RecordBatch>& data) {
+ if (!data) {
+ return data;
+ }
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ ui32 idx = 0;
+ for (auto&& i : data->schema()->fields()) {
+ if (i->type()->id() == arrow::Type::DICTIONARY) {
+ fields.emplace_back(i);
+ columns.emplace_back(data->column(idx));
+ } else {
+ columns.emplace_back(ArrayToDictionary(data->column(idx)));
+ fields.emplace_back(std::make_shared<arrow::Field>(i->name(), columns.back()->type()));
+ }
+ ++idx;
+ }
+ std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
+ return arrow::RecordBatch::Make(schema, data->num_rows(), columns);
+}
+
bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data) {
Y_VERIFY(data);
bool result = false;
diff --git a/ydb/core/formats/arrow/dictionary/conversion.h b/ydb/core/formats/arrow/dictionary/conversion.h
index aab2def356e..787fd1050c7 100644
--- a/ydb/core/formats/arrow/dictionary/conversion.h
+++ b/ydb/core/formats/arrow/dictionary/conversion.h
@@ -7,6 +7,7 @@ namespace NKikimr::NArrow {
bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data);
std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<arrow::Array>& data);
+std::shared_ptr<arrow::RecordBatch> ArrayToDictionary(const std::shared_ptr<arrow::RecordBatch>& data);
std::shared_ptr<arrow::Array> DictionaryToArray(const std::shared_ptr<arrow::DictionaryArray>& data);
std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& data);
std::shared_ptr<arrow::RecordBatch> DictionaryToArray(const std::shared_ptr<arrow::RecordBatch>& data);
diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..bf6cb6bcd62
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-transformer)
+target_link_libraries(formats-arrow-transformer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-dictionary
+)
+target_sources(formats-arrow-transformer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp
+)
diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..3b04ad29e4d
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-transformer)
+target_link_libraries(formats-arrow-transformer PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-dictionary
+)
+target_sources(formats-arrow-transformer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp
+)
diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..3b04ad29e4d
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-transformer)
+target_link_libraries(formats-arrow-transformer PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-dictionary
+)
+target_sources(formats-arrow-transformer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp
+)
diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.txt b/ydb/core/formats/arrow/transformer/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..bf6cb6bcd62
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-transformer)
+target_link_libraries(formats-arrow-transformer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-dictionary
+)
+target_sources(formats-arrow-transformer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp
+)
diff --git a/ydb/core/formats/arrow/transformer/abstract.cpp b/ydb/core/formats/arrow/transformer/abstract.cpp
new file mode 100644
index 00000000000..9146e36e3f5
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/abstract.cpp
@@ -0,0 +1,4 @@
+#include "abstract.h"
+namespace NKikimr::NArrow::NTransformation {
+
+}
diff --git a/ydb/core/formats/arrow/transformer/abstract.h b/ydb/core/formats/arrow/transformer/abstract.h
new file mode 100644
index 00000000000..a486e4e2304
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/abstract.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NArrow::NTransformation {
+
+class ITransformer {
+protected:
+ virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+public:
+ using TPtr = std::shared_ptr<ITransformer>;
+ virtual ~ITransformer() = default;
+
+ std::shared_ptr<arrow::RecordBatch> Transform(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return DoTransform(batch);
+ }
+};
+
+}
diff --git a/ydb/core/formats/arrow/transformer/composite.cpp b/ydb/core/formats/arrow/transformer/composite.cpp
new file mode 100644
index 00000000000..05d11a0034b
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/composite.cpp
@@ -0,0 +1,13 @@
+#include "composite.h"
+
+namespace NKikimr::NArrow::NTransformation {
+
+std::shared_ptr<arrow::RecordBatch> TCompositeTransformer::DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ std::shared_ptr<arrow::RecordBatch> current = batch;
+ for (auto&& i : Transformers) {
+ current = i->Transform(current);
+ }
+ return current;
+}
+
+}
diff --git a/ydb/core/formats/arrow/transformer/composite.h b/ydb/core/formats/arrow/transformer/composite.h
new file mode 100644
index 00000000000..a8c526f2aee
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/composite.h
@@ -0,0 +1,14 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NArrow::NTransformation {
+
+class TCompositeTransformer: public ITransformer {
+private:
+ std::vector<ITransformer::TPtr> Transformers;
+protected:
+ virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+public:
+};
+
+}
diff --git a/ydb/core/formats/arrow/transformer/dictionary.cpp b/ydb/core/formats/arrow/transformer/dictionary.cpp
new file mode 100644
index 00000000000..34f13217650
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/dictionary.cpp
@@ -0,0 +1,13 @@
+#include "dictionary.h"
+#include <ydb/core/formats/arrow/dictionary/conversion.h>
+namespace NKikimr::NArrow::NTransformation {
+
+std::shared_ptr<arrow::RecordBatch> TDictionaryPackTransformer::DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return ArrayToDictionary(batch);
+}
+
+std::shared_ptr<arrow::RecordBatch> TDictionaryUnpackTransformer::DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return DictionaryToArray(batch);
+}
+
+}
diff --git a/ydb/core/formats/arrow/transformer/dictionary.h b/ydb/core/formats/arrow/transformer/dictionary.h
new file mode 100644
index 00000000000..5f285b2fd61
--- /dev/null
+++ b/ydb/core/formats/arrow/transformer/dictionary.h
@@ -0,0 +1,18 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NArrow::NTransformation {
+
+class TDictionaryPackTransformer: public ITransformer {
+protected:
+ virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+public:
+};
+
+class TDictionaryUnpackTransformer: public ITransformer {
+protected:
+ virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+public:
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 920c7f3b6ab..eb4e8a33738 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/formats/arrow/sort_cursor.h>
#include <ydb/core/sys_view/common/schema.h>
+#include <ydb/core/formats/arrow/serializer/batch_only.h>
namespace NKikimr::NOlap {
@@ -66,7 +67,7 @@ bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) {
ui32 TIndexInfo::GetColumnId(const std::string& name) const {
auto id = GetColumnIdOptional(name);
- Y_VERIFY(!!id);
+ Y_VERIFY(!!id, "undefined column %s", name.data());
return *id;
}
@@ -331,6 +332,34 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
return MinMaxIdxColumnsIds.contains(it->second);
}
+TColumnSaver TIndexInfo::GetColumnSaver(const ui32 /*columnId*/, const TSaverContext& context) const {
+ arrow::ipc::IpcWriteOptions options;
+ if (context.GetExternalCompression()) {
+ options.codec = context.GetExternalCompression()->BuildArrowCodec();
+ } else {
+ options.codec = DefaultCompression.BuildArrowCodec();
+ }
+ options.use_threads = false;
+ return TColumnSaver(nullptr, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options));
+}
+
+std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const {
+ return std::make_shared<TColumnLoader>(nullptr,
+ std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(GetColumnSchema(columnId)),
+ GetColumnSchema(columnId), columnId);
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const {
+ std::shared_ptr<arrow::Schema> schema = Schema;
+ if (IsSpecialColumn(columnId)) {
+ schema = ArrowSchemaSnapshot();
+ }
+ auto field = schema->GetFieldByName(GetColumnName(columnId));
+ Y_VERIFY(field);
+ std::vector<std::shared_ptr<arrow::Field>> fields = { field };
+ return std::make_shared<arrow::Schema>(fields);
+}
+
std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(withSpecials ? ids.size() + 2 : ids.size());
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 3312a40905a..a40496e4321 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -6,6 +6,8 @@
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/formats/arrow/transformer/abstract.h>
namespace arrow {
class Array;
@@ -20,9 +22,92 @@ namespace NKikimr::NArrow {
namespace NKikimr::NOlap {
struct TInsertedData;
-
+class TSnapshotColumnInfo;
using TNameTypeInfo = std::pair<TString, NScheme::TTypeInfo>;
+class TSaverContext {
+private:
+ TString TierName;
+ std::optional<TCompression> ExternalCompression;
+public:
+ const std::optional<TCompression>& GetExternalCompression() const {
+ return ExternalCompression;
+ }
+ TSaverContext& SetExternalCompression(const std::optional<TCompression>& value) {
+ ExternalCompression = value;
+ return *this;
+ }
+ const TString& GetTierName() const {
+ return TierName;
+ }
+ TSaverContext& SetTierName(const TString& value) {
+ TierName = value;
+ return *this;
+ }
+};
+
+class TColumnSaver {
+private:
+ NArrow::NTransformation::ITransformer::TPtr Transformer;
+ NArrow::NSerialization::ISerializer::TPtr Serializer;
+public:
+ TColumnSaver() = default;
+ TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer)
+ : Transformer(transformer)
+ , Serializer(serializer) {
+ Y_VERIFY(Serializer);
+ }
+
+ TString Apply(const std::shared_ptr<arrow::RecordBatch>& data) const {
+ Y_VERIFY(Serializer);
+ if (Transformer) {
+ return Serializer->Serialize(Transformer->Transform(data));
+ } else {
+ return Serializer->Serialize(data);
+ }
+ }
+};
+
+class TColumnLoader {
+private:
+ NArrow::NTransformation::ITransformer::TPtr Transformer;
+ NArrow::NSerialization::IDeserializer::TPtr Deserializer;
+ std::shared_ptr<arrow::Schema> ExpectedSchema;
+ const ui32 ColumnId;
+public:
+ TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer,
+ const std::shared_ptr<arrow::Schema>& expectedSchema, const ui32 columnId)
+ : Transformer(transformer)
+ , Deserializer(deserializer)
+ , ExpectedSchema(expectedSchema)
+ , ColumnId(columnId)
+ {
+ Y_VERIFY(ExpectedSchema);
+ Y_VERIFY(Deserializer);
+ }
+
+ ui32 GetColumnId() const {
+ return ColumnId;
+ }
+
+ std::shared_ptr<arrow::Schema> GetExpectedSchema() const {
+ return ExpectedSchema;
+ }
+
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const {
+ Y_VERIFY(Deserializer);
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = Deserializer->Deserialize(data);
+ if (!columnArray.ok()) {
+ return columnArray;
+ }
+ if (Transformer) {
+ return Transformer->Transform(*columnArray);
+ } else {
+ return columnArray;
+ }
+ }
+};
+
/// Column engine index description in terms of tablet's local table.
/// We have to use YDB types for keys here.
struct TIndexInfo : public NTable::TScheme::TTableSchema {
@@ -66,6 +151,10 @@ public:
return Id;
}
+ std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const;
+ TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const;
+ std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const;
+
/// Returns an id of the column located by name. The name should exists in the schema.
ui32 GetColumnId(const std::string& name) const;
std::optional<ui32> GetColumnIdOptional(const std::string& name) const;
@@ -139,7 +228,6 @@ public:
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const;
void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
- const TCompression& GetDefaultCompression() const { return DefaultCompression; }
static const std::vector<std::string>& GetSpecialColumnNames() {
static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) };
@@ -163,7 +251,7 @@ private:
std::shared_ptr<arrow::Schema> IndexKey;
THashSet<TString> RequiredColumns;
THashSet<ui32> MinMaxIdxColumnsIds;
- TCompression DefaultCompression;
+ TCompression DefaultCompression = TCompression::Default();
};
std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false);
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index 4b354c3acc5..d552fb3ee4e 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -13,27 +13,6 @@ std::shared_ptr<arrow::RecordBatch> TIndexLogicBase::GetEffectiveKey(const std::
return resBatch;
}
-arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
- auto& codec = compression.Codec;
-
- arrow::ipc::IpcWriteOptions options(arrow::ipc::IpcWriteOptions::Defaults());
- Y_VERIFY(arrow::util::Codec::IsAvailable(codec));
- arrow::Result<std::unique_ptr<arrow::util::Codec>> resCodec;
- if (compression.Level) {
- resCodec = arrow::util::Codec::Create(codec, *compression.Level);
- if (!resCodec.ok()) {
- resCodec = arrow::util::Codec::Create(codec);
- }
- } else {
- resCodec = arrow::util::Codec::Create(codec);
- }
- Y_VERIFY(resCodec.ok());
-
- options.codec.reset((*resCodec).release());
- options.use_threads = false;
- return options;
-}
-
std::shared_ptr<arrow::RecordBatch> TIndexationLogic::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const TIndexInfo& indexInfo, const TInsertedData& inserted) const {
auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot());
@@ -64,16 +43,17 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
auto blobSchema = IndexInfo.GetSchema(undo.GetSnapshot());
auto resultSchema = IndexInfo.GetLastSchema();
auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs);
- auto writeOptions = WriteOptions(*compression);
size_t undoSize = newBlobs.size();
-
+ TSaverContext saverContext;
+ saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression);
for (auto& rec : portionInfo.Records) {
auto pos = resultSchema->GetFieldIndex(rec.ColumnId);
Y_VERIFY(pos >= 0);
auto field = resultSchema->GetField(pos);
+ auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext);
- auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, writeOptions);
+ auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver);
if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
portionInfo = undo;
newBlobs.resize(undoSize);
@@ -102,7 +82,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
std::vector<TPortionInfo> out;
TString tierName;
- TCompression compression = resultSchema->GetIndexInfo().GetDefaultCompression();
+ std::optional<TCompression> compression;
if (pathId) {
if (auto* tiering = GetTieringMap().FindPtr(pathId)) {
tierName = tiering->GetHottestTierName();
@@ -111,7 +91,8 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
}
}
}
- const auto writeOptions = WriteOptions(compression);
+ TSaverContext saverContext;
+ saverContext.SetTierName(tierName).SetExternalCompression(compression);
std::shared_ptr<arrow::RecordBatch> portionBatch = batch;
for (i32 pos = 0; pos < batch->num_rows();) {
@@ -127,12 +108,12 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
bool ok = true;
for (const auto& field : resultSchema->GetSchema()->fields()) {
const auto& name = field->name();
- ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(TString(name.data(), name.size()));
+ ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(name);
/// @warnign records are not valid cause of empty BlobId and zero Portion
TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0);
- auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record),
- writeOptions);
+ auto columnSaver = resultSchema->GetColumnSaver(name, saverContext);
+ auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver);
if (!blob.size()) {
ok = false;
break;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index d95d8469bb4..4bd2230243b 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -5,22 +5,21 @@
namespace NKikimr::NOlap {
TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array,
- const std::shared_ptr<arrow::Field>& field,
- const arrow::ipc::IpcWriteOptions& writeOptions)
-{
- auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field});
- auto batch = arrow::RecordBatch::Make(schema, array->length(), {array});
+ const std::shared_ptr<arrow::Field>& field,
+ const TColumnSaver saver) {
+ auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field });
+ auto batch = arrow::RecordBatch::Make(schema, array->length(), { array });
Y_VERIFY(batch);
- return NArrow::SerializeBatch(batch, writeOptions);
+ return saver.Apply(batch);
}
TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
TColumnRecord&& record,
- const arrow::ipc::IpcWriteOptions& writeOptions,
+ const TColumnSaver saver,
const ui32 limitBytes) {
- auto blob = SerializeColumn(array, field, writeOptions);
+ auto blob = SerializeColumn(array, field, saver);
if (blob.size() >= limitBytes) {
return {};
}
@@ -240,42 +239,14 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
return Meta.ColumnMeta.find(columnId)->second.Max;
}
-std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(const ui32 needCount, const bool reverse) const {
+std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
Y_VERIFY(!Blobs.empty());
- auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ Field });
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
batches.reserve(Blobs.size());
- ui32 count = 0;
- if (!reverse) {
- for (auto& blob : Blobs) {
- batches.push_back(blob.BuildRecordBatch(schema));
- Y_VERIFY(batches.back());
- if (count + batches.back()->num_rows() >= needCount) {
- Y_VERIFY(count <= needCount);
- batches.back() = batches.back()->Slice(0, needCount - count);
- }
- count += batches.back()->num_rows();
- Y_VERIFY(count <= needCount);
- if (count == needCount) {
- break;
- }
- }
- } else {
- for (auto it = Blobs.rbegin(); it != Blobs.rend(); ++it) {
- batches.push_back(it->BuildRecordBatch(schema));
- Y_VERIFY(batches.back());
- if (count + batches.back()->num_rows() >= needCount) {
- Y_VERIFY(count <= needCount);
- batches.back() = batches.back()->Slice(batches.back()->num_rows() - (needCount - count), needCount - count);
- }
- count += batches.back()->num_rows();
- Y_VERIFY(count <= needCount);
- if (count == needCount) {
- break;
- }
- }
- std::reverse(batches.begin(), batches.end());
+ for (auto& blob : Blobs) {
+ batches.push_back(blob.BuildRecordBatch(*Loader));
+ Y_VERIFY(batches.back());
}
auto res = arrow::Table::FromRecordBatches(batches);
@@ -286,12 +257,11 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con
std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector< std::shared_ptr<arrow::Field>> fields;
- ui64 limit = options.RecordsCountLimit ? *options.RecordsCountLimit : Max<ui64>();
for (auto&& i : Columns) {
if (!options.IsAcceptedColumn(i.GetColumnId())) {
continue;
}
- columns.emplace_back(i.Assemble(limit, !options.ForwardAssemble));
+ columns.emplace_back(i.Assemble());
fields.emplace_back(i.GetField());
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index f43f89988c6..1545f29780f 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -5,7 +5,8 @@
#include "index_info.h"
#include <ydb/core/formats/arrow/replace_key.h>
-
+#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/formats/arrow/dictionary/conversion.h>
namespace NKikimr::NOlap {
@@ -14,6 +15,23 @@ public:
using TPtr = std::shared_ptr<ISnapshotSchema>;
virtual ~ISnapshotSchema() {}
+ virtual std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const = 0;
+ std::shared_ptr<TColumnLoader> GetColumnLoader(const TString& columnName) const {
+ return GetColumnLoader(std::string(columnName.data(), columnName.size()));
+ }
+ std::shared_ptr<TColumnLoader> GetColumnLoader(const std::string& columnName) const {
+ return GetColumnLoader(GetColumnId(columnName));
+ }
+
+ virtual TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const = 0;
+ TColumnSaver GetColumnSaver(const TString& columnName, const TSaverContext& context) const {
+ return GetColumnSaver(GetColumnId(columnName), context);
+ }
+ TColumnSaver GetColumnSaver(const std::string& columnName, const TSaverContext& context) const {
+ return GetColumnSaver(TString(columnName.data(), columnName.size()), context);
+ }
+
+ virtual ui32 GetColumnId(const std::string& columnName) const = 0;
virtual int GetFieldIndex(const ui32 columnId) const = 0;
virtual std::shared_ptr<arrow::Field> GetField(const int index) const = 0;
virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
@@ -21,7 +39,8 @@ public:
virtual const TSnapshot& GetSnapshot() const = 0;
};
-class TSnapshotSchema : public ISnapshotSchema {
+class TSnapshotSchema: public ISnapshotSchema {
+private:
TIndexInfo IndexInfo;
std::shared_ptr<arrow::Schema> Schema;
TSnapshot Snapshot;
@@ -33,7 +52,19 @@ public:
{
}
- int GetFieldIndex(const ui32 columnId) const override {
+ virtual TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const override {
+ return IndexInfo.GetColumnSaver(columnId, context);
+ }
+
+ virtual std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const override {
+ return IndexInfo.GetColumnLoader(columnId);
+ }
+
+ virtual ui32 GetColumnId(const std::string& columnName) const override {
+ return IndexInfo.GetColumnId(columnName);
+ }
+
+ virtual int GetFieldIndex(const ui32 columnId) const override {
TString columnName = IndexInfo.GetColumnName(columnId);
std::string name(columnName.data(), columnName.size());
return Schema->GetFieldIndex(name);
@@ -56,7 +87,7 @@ public:
}
};
-class TFilteredSnapshotSchema : public ISnapshotSchema {
+class TFilteredSnapshotSchema: public ISnapshotSchema {
ISnapshotSchema::TPtr OriginalSnapshot;
std::shared_ptr<arrow::Schema> Schema;
std::set<ui32> ColumnIds;
@@ -79,6 +110,20 @@ public:
Schema = std::make_shared<arrow::Schema>(schemaFields);
}
+ virtual TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const override {
+ Y_VERIFY(ColumnIds.contains(columnId));
+ return OriginalSnapshot->GetColumnSaver(columnId, context);
+ }
+
+ virtual std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const override {
+ Y_VERIFY(ColumnIds.contains(columnId));
+ return OriginalSnapshot->GetColumnLoader(columnId);
+ }
+
+ virtual ui32 GetColumnId(const std::string& columnName) const override {
+ return OriginalSnapshot->GetColumnId(columnName);
+ }
+
int GetFieldIndex(const ui32 columnId) const override {
if (!ColumnIds.contains(columnId)) {
return -1;
@@ -382,44 +427,47 @@ public:
return Data;
}
- std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(std::shared_ptr<arrow::Schema> schema) const {
+ std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const {
if (NullRowsCount) {
Y_VERIFY(!Data);
- return NArrow::MakeEmptyBatch(schema, NullRowsCount);
+ return NArrow::MakeEmptyBatch(loader.GetExpectedSchema(), NullRowsCount);
} else {
- Y_VERIFY(Data);
- return NArrow::DeserializeBatch(Data, schema);
+ auto result = loader.Apply(Data);
+ if (!result.ok()) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot unpack batch")("error", result.status().ToString());
+ return nullptr;
+ }
+ return NArrow::DictionaryToArray(*result);
}
}
};
class TPreparedColumn {
private:
- ui32 ColumnId = 0;
- std::shared_ptr<arrow::Field> Field;
+ std::shared_ptr<TColumnLoader> Loader;
std::vector<TAssembleBlobInfo> Blobs;
public:
- ui32 GetColumnId() const noexcept {
- return ColumnId;
+ ui32 GetColumnId() const {
+ return Loader->GetColumnId();
}
const std::string& GetName() const {
- return Field->name();
+ return Loader->GetExpectedSchema()->field(0)->name();
}
std::shared_ptr<arrow::Field> GetField() const {
- return Field;
+ return Loader->GetExpectedSchema()->field(0);
}
- TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TAssembleBlobInfo>&& blobs, const ui32 columnId)
- : ColumnId(columnId)
- , Field(field)
+ TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader)
+ : Loader(loader)
, Blobs(std::move(blobs))
{
-
+ Y_VERIFY(Loader);
+ Y_VERIFY(Loader->GetExpectedSchema()->num_fields() == 1);
}
- std::shared_ptr<arrow::ChunkedArray> Assemble(const ui32 needCount, const bool reverse) const;
+ std::shared_ptr<arrow::ChunkedArray> Assemble() const;
};
class TPreparedBatchData {
@@ -430,19 +478,9 @@ public:
public:
struct TAssembleOptions {
- const bool ForwardAssemble = true;
- std::optional<ui32> RecordsCountLimit;
std::optional<std::set<ui32>> IncludedColumnIds;
std::optional<std::set<ui32>> ExcludedColumnIds;
- TAssembleOptions() noexcept
- : TAssembleOptions(true)
- {}
-
- explicit TAssembleOptions(bool forward) noexcept
- : ForwardAssemble(forward)
- {}
-
bool IsAcceptedColumn(const ui32 columnId) const {
if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) {
return false;
@@ -484,9 +522,8 @@ public:
Y_VERIFY(!Meta.ColumnMeta.empty());
const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows;
- const auto& indexInfo = resultSchema.GetIndexInfo();
for (auto&& field : resultSchema.GetSchema()->fields()) {
- columns.emplace_back(TPreparedColumn(field, {TAssembleBlobInfo(rowsCount)}, indexInfo.GetColumnId(field->name())));
+ columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name())));
}
TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
@@ -529,7 +566,7 @@ public:
}
Y_VERIFY(pos < columns.size());
- columns[pos] = TPreparedColumn(resultField, std::move(blobs), indexInfo.GetColumnId(resultField->name()));
+ columns[pos] = TPreparedColumn(std::move(blobs), dataSchema.GetColumnLoader(resultField->name()));
}
return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount);
@@ -543,12 +580,12 @@ public:
static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
- const arrow::ipc::IpcWriteOptions& writeOptions);
+ const TColumnSaver saver);
TString AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
TColumnRecord&& record,
- const arrow::ipc::IpcWriteOptions& writeOptions,
+ const TColumnSaver saver,
ui32 limitBytes = BLOB_BYTES_LIMIT);
friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) {
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
index 8fc01412f40..38d7bfc7056 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -12,7 +12,6 @@ bool TAssembleBatch::DoExecuteImpl() {
Y_VERIFY(BatchConstructor.GetColumnsCount());
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.RecordsCountLimit = Filter->Size();
auto addBatch = BatchConstructor.Assemble(options);
Y_VERIFY(addBatch);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h
index f6208341e88..cb84cd8c681 100644
--- a/ydb/core/tx/columnshard/engines/tier_info.h
+++ b/ydb/core/tx/columnshard/engines/tier_info.h
@@ -4,13 +4,64 @@
#include "scalars.h"
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/common/validation.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
namespace NKikimr::NOlap {
struct TCompression {
- arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME};
+private:
+ arrow::Compression::type Codec = arrow::Compression::LZ4_FRAME;
std::optional<int> Level;
+ TCompression() = default;
+public:
+
+ bool DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) {
+ if (compression.HasCompressionCodec()) {
+ switch (compression.GetCompressionCodec()) {
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain:
+ Codec = arrow::Compression::UNCOMPRESSED;
+ break;
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4:
+ Codec = arrow::Compression::LZ4_FRAME;
+ break;
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD:
+ Codec = arrow::Compression::ZSTD;
+ break;
+ }
+ }
+
+ if (compression.HasCompressionLevel()) {
+ Level = compression.GetCompressionLevel();
+ }
+ return true;
+ }
+
+ static const TCompression& Default() {
+ static TCompression result;
+ return result;
+ }
+
+ explicit TCompression(const arrow::Compression::type codec, std::optional<int> level = {})
+ : Codec(codec)
+ , Level(level)
+ {
+
+ }
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << arrow::util::Codec::GetCodecAsString(Codec) << ":" << Level.value_or(arrow::util::kUseDefaultCompressionLevel);
+ return sb;
+ }
+
+ std::unique_ptr<arrow::util::Codec> BuildArrowCodec() const {
+ return NArrow::TStatusValidator::GetValid(
+ arrow::util::Codec::Create(
+ Codec, Level.value_or(arrow::util::kUseDefaultCompressionLevel)));
+ }
+
};
class TTierInfo {
@@ -107,10 +158,12 @@ public:
TString GetDebugString() const {
TStringBuilder sb;
- sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' "
- << arrow::util::Codec::GetCodecAsString(Compression ? Compression->Codec : TCompression().Codec)
- << ":" << ((Compression && Compression->Level) ?
- *Compression->Level : arrow::util::kUseDefaultCompressionLevel);
+ sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' ";
+ if (Compression) {
+ sb << Compression->DebugString();
+ } else {
+ sb << TCompression::Default().DebugString();
+ }
return sb;
}
};
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index 34606b2dcb6..7ff41f0e7ec 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -116,24 +116,8 @@ TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId,
}
NKikimr::NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) {
- NOlap::TCompression out;
- if (compression.HasCompressionCodec()) {
- switch (compression.GetCompressionCodec()) {
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain:
- out.Codec = arrow::Compression::UNCOMPRESSED;
- break;
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4:
- out.Codec = arrow::Compression::LZ4_FRAME;
- break;
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD:
- out.Codec = arrow::Compression::ZSTD;
- break;
- }
- }
-
- if (compression.HasCompressionLevel()) {
- out.Level = compression.GetCompressionLevel();
- }
+ NOlap::TCompression out = NOlap::TCompression::Default();
+ Y_VERIFY(out.DeserializeFromProto(compression));
return out;
}
}