diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-18 17:20:16 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-18 17:20:16 +0300 |
commit | 07f2e8c1b3fc830d01173a674f99b371c3f62f97 (patch) | |
tree | be6f0f97c7bf8dbaabbda23c2d2d3a68bccb91e2 | |
parent | 1ced62df39586b2022790241c31e24d4688af053 (diff) | |
download | ydb-07f2e8c1b3fc830d01173a674f99b371c3f62f97.tar.gz |
savers/loaders for columns
25 files changed, 490 insertions, 133 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt index 8bfa7dc43ee..5487526b14f 100644 --- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt @@ -11,6 +11,7 @@ add_subdirectory(dictionary) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) +add_subdirectory(transformer) add_subdirectory(ut) add_library(core-formats-arrow) @@ -28,6 +29,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-serializer formats-arrow-simple_builder formats-arrow-dictionary + formats-arrow-transformer ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt index 2f7c6515330..f24e05a3ca8 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt @@ -11,6 +11,7 @@ add_subdirectory(dictionary) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) +add_subdirectory(transformer) add_subdirectory(ut) add_library(core-formats-arrow) @@ -29,6 +30,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-serializer formats-arrow-simple_builder formats-arrow-dictionary + formats-arrow-transformer ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt index 2f7c6515330..f24e05a3ca8 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt @@ -11,6 +11,7 @@ add_subdirectory(dictionary) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) +add_subdirectory(transformer) add_subdirectory(ut) add_library(core-formats-arrow) @@ -29,6 +30,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-serializer formats-arrow-simple_builder formats-arrow-dictionary + formats-arrow-transformer ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt index 23061c5821d..cf9b3b8f00a 100644 --- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt @@ -11,6 +11,7 @@ add_subdirectory(dictionary) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) +add_subdirectory(transformer) add_subdirectory(ut) add_library(core-formats-arrow) @@ -29,6 +30,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-serializer formats-arrow-simple_builder formats-arrow-dictionary + formats-arrow-transformer ydb-library-arrow_kernels ydb-library-binary_json ydb-library-dynumber diff --git a/ydb/core/formats/arrow/dictionary/conversion.cpp b/ydb/core/formats/arrow/dictionary/conversion.cpp index 8b8ed9aa2e3..e7e08f77908 100644 --- a/ydb/core/formats/arrow/dictionary/conversion.cpp +++ b/ydb/core/formats/arrow/dictionary/conversion.cpp @@ -92,6 +92,27 @@ std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr< return result; } +std::shared_ptr<arrow::RecordBatch> ArrayToDictionary(const std::shared_ptr<arrow::RecordBatch>& data) { + if (!data) { + return data; + } + std::vector<std::shared_ptr<arrow::Field>> fields; + std::vector<std::shared_ptr<arrow::Array>> columns; + ui32 idx = 0; + for (auto&& i : data->schema()->fields()) { + if (i->type()->id() == arrow::Type::DICTIONARY) { + fields.emplace_back(i); + columns.emplace_back(data->column(idx)); + } else { + columns.emplace_back(ArrayToDictionary(data->column(idx))); + fields.emplace_back(std::make_shared<arrow::Field>(i->name(), columns.back()->type())); + } + ++idx; + } + std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields); + return arrow::RecordBatch::Make(schema, data->num_rows(), columns); +} + bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data) { Y_VERIFY(data); bool result = false; diff --git a/ydb/core/formats/arrow/dictionary/conversion.h b/ydb/core/formats/arrow/dictionary/conversion.h index aab2def356e..787fd1050c7 100644 --- a/ydb/core/formats/arrow/dictionary/conversion.h +++ b/ydb/core/formats/arrow/dictionary/conversion.h @@ -7,6 +7,7 @@ namespace NKikimr::NArrow { bool IsDictionableArray(const std::shared_ptr<arrow::Array>& data); std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<arrow::Array>& data); +std::shared_ptr<arrow::RecordBatch> ArrayToDictionary(const std::shared_ptr<arrow::RecordBatch>& data); std::shared_ptr<arrow::Array> DictionaryToArray(const std::shared_ptr<arrow::DictionaryArray>& data); std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& data); std::shared_ptr<arrow::RecordBatch> DictionaryToArray(const std::shared_ptr<arrow::RecordBatch>& data); diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..bf6cb6bcd62 --- /dev/null +++ b/ydb/core/formats/arrow/transformer/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-transformer) +target_link_libraries(formats-arrow-transformer PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-dictionary +) +target_sources(formats-arrow-transformer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp +) diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..3b04ad29e4d --- /dev/null +++ b/ydb/core/formats/arrow/transformer/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-transformer) +target_link_libraries(formats-arrow-transformer PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-dictionary +) +target_sources(formats-arrow-transformer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp +) diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..3b04ad29e4d --- /dev/null +++ b/ydb/core/formats/arrow/transformer/CMakeLists.linux-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-transformer) +target_link_libraries(formats-arrow-transformer PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-dictionary +) +target_sources(formats-arrow-transformer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp +) diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.txt b/ydb/core/formats/arrow/transformer/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/formats/arrow/transformer/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/formats/arrow/transformer/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/transformer/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..bf6cb6bcd62 --- /dev/null +++ b/ydb/core/formats/arrow/transformer/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-transformer) +target_link_libraries(formats-arrow-transformer PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-dictionary +) +target_sources(formats-arrow-transformer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/dictionary.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/transformer/composite.cpp +) diff --git a/ydb/core/formats/arrow/transformer/abstract.cpp b/ydb/core/formats/arrow/transformer/abstract.cpp new file mode 100644 index 00000000000..9146e36e3f5 --- /dev/null +++ b/ydb/core/formats/arrow/transformer/abstract.cpp @@ -0,0 +1,4 @@ +#include "abstract.h" +namespace NKikimr::NArrow::NTransformation { + +} diff --git a/ydb/core/formats/arrow/transformer/abstract.h b/ydb/core/formats/arrow/transformer/abstract.h new file mode 100644 index 00000000000..a486e4e2304 --- /dev/null +++ b/ydb/core/formats/arrow/transformer/abstract.h @@ -0,0 +1,21 @@ +#pragma once + +#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <util/generic/string.h> + +namespace NKikimr::NArrow::NTransformation { + +class ITransformer { +protected: + virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; +public: + using TPtr = std::shared_ptr<ITransformer>; + virtual ~ITransformer() = default; + + std::shared_ptr<arrow::RecordBatch> Transform(const std::shared_ptr<arrow::RecordBatch>& batch) const { + return DoTransform(batch); + } +}; + +} diff --git a/ydb/core/formats/arrow/transformer/composite.cpp b/ydb/core/formats/arrow/transformer/composite.cpp new file mode 100644 index 00000000000..05d11a0034b --- /dev/null +++ b/ydb/core/formats/arrow/transformer/composite.cpp @@ -0,0 +1,13 @@ +#include "composite.h" + +namespace NKikimr::NArrow::NTransformation { + +std::shared_ptr<arrow::RecordBatch> TCompositeTransformer::DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const { + std::shared_ptr<arrow::RecordBatch> current = batch; + for (auto&& i : Transformers) { + current = i->Transform(current); + } + return current; +} + +} diff --git a/ydb/core/formats/arrow/transformer/composite.h b/ydb/core/formats/arrow/transformer/composite.h new file mode 100644 index 00000000000..a8c526f2aee --- /dev/null +++ b/ydb/core/formats/arrow/transformer/composite.h @@ -0,0 +1,14 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NArrow::NTransformation { + +class TCompositeTransformer: public ITransformer { +private: + std::vector<ITransformer::TPtr> Transformers; +protected: + virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const override; +public: +}; + +} diff --git a/ydb/core/formats/arrow/transformer/dictionary.cpp b/ydb/core/formats/arrow/transformer/dictionary.cpp new file mode 100644 index 00000000000..34f13217650 --- /dev/null +++ b/ydb/core/formats/arrow/transformer/dictionary.cpp @@ -0,0 +1,13 @@ +#include "dictionary.h" +#include <ydb/core/formats/arrow/dictionary/conversion.h> +namespace NKikimr::NArrow::NTransformation { + +std::shared_ptr<arrow::RecordBatch> TDictionaryPackTransformer::DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const { + return ArrayToDictionary(batch); +} + +std::shared_ptr<arrow::RecordBatch> TDictionaryUnpackTransformer::DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const { + return DictionaryToArray(batch); +} + +} diff --git a/ydb/core/formats/arrow/transformer/dictionary.h b/ydb/core/formats/arrow/transformer/dictionary.h new file mode 100644 index 00000000000..5f285b2fd61 --- /dev/null +++ b/ydb/core/formats/arrow/transformer/dictionary.h @@ -0,0 +1,18 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NArrow::NTransformation { + +class TDictionaryPackTransformer: public ITransformer { +protected: + virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const override; +public: +}; + +class TDictionaryUnpackTransformer: public ITransformer { +protected: + virtual std::shared_ptr<arrow::RecordBatch> DoTransform(const std::shared_ptr<arrow::RecordBatch>& batch) const override; +public: +}; + +} diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 920c7f3b6ab..eb4e8a33738 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -5,6 +5,7 @@ #include <ydb/core/formats/arrow/arrow_batch_builder.h> #include <ydb/core/formats/arrow/sort_cursor.h> #include <ydb/core/sys_view/common/schema.h> +#include <ydb/core/formats/arrow/serializer/batch_only.h> namespace NKikimr::NOlap { @@ -66,7 +67,7 @@ bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) { ui32 TIndexInfo::GetColumnId(const std::string& name) const { auto id = GetColumnIdOptional(name); - Y_VERIFY(!!id); + Y_VERIFY(!!id, "undefined column %s", name.data()); return *id; } @@ -331,6 +332,34 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { return MinMaxIdxColumnsIds.contains(it->second); } +TColumnSaver TIndexInfo::GetColumnSaver(const ui32 /*columnId*/, const TSaverContext& context) const { + arrow::ipc::IpcWriteOptions options; + if (context.GetExternalCompression()) { + options.codec = context.GetExternalCompression()->BuildArrowCodec(); + } else { + options.codec = DefaultCompression.BuildArrowCodec(); + } + options.use_threads = false; + return TColumnSaver(nullptr, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options)); +} + +std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const { + return std::make_shared<TColumnLoader>(nullptr, + std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(GetColumnSchema(columnId)), + GetColumnSchema(columnId), columnId); +} + +std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const { + std::shared_ptr<arrow::Schema> schema = Schema; + if (IsSpecialColumn(columnId)) { + schema = ArrowSchemaSnapshot(); + } + auto field = schema->GetFieldByName(GetColumnName(columnId)); + Y_VERIFY(field); + std::vector<std::shared_ptr<arrow::Field>> fields = { field }; + return std::make_shared<arrow::Schema>(fields); +} + std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) { std::vector<std::shared_ptr<arrow::Field>> fields; fields.reserve(withSpecials ? ids.size() + 2 : ids.size()); diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 3312a40905a..a40496e4321 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -6,6 +6,8 @@ #include <ydb/core/sys_view/common/schema.h> #include <ydb/core/tablet_flat/flat_dbase_scheme.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/formats/arrow/transformer/abstract.h> namespace arrow { class Array; @@ -20,9 +22,92 @@ namespace NKikimr::NArrow { namespace NKikimr::NOlap { struct TInsertedData; - +class TSnapshotColumnInfo; using TNameTypeInfo = std::pair<TString, NScheme::TTypeInfo>; +class TSaverContext { +private: + TString TierName; + std::optional<TCompression> ExternalCompression; +public: + const std::optional<TCompression>& GetExternalCompression() const { + return ExternalCompression; + } + TSaverContext& SetExternalCompression(const std::optional<TCompression>& value) { + ExternalCompression = value; + return *this; + } + const TString& GetTierName() const { + return TierName; + } + TSaverContext& SetTierName(const TString& value) { + TierName = value; + return *this; + } +}; + +class TColumnSaver { +private: + NArrow::NTransformation::ITransformer::TPtr Transformer; + NArrow::NSerialization::ISerializer::TPtr Serializer; +public: + TColumnSaver() = default; + TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer) + : Transformer(transformer) + , Serializer(serializer) { + Y_VERIFY(Serializer); + } + + TString Apply(const std::shared_ptr<arrow::RecordBatch>& data) const { + Y_VERIFY(Serializer); + if (Transformer) { + return Serializer->Serialize(Transformer->Transform(data)); + } else { + return Serializer->Serialize(data); + } + } +}; + +class TColumnLoader { +private: + NArrow::NTransformation::ITransformer::TPtr Transformer; + NArrow::NSerialization::IDeserializer::TPtr Deserializer; + std::shared_ptr<arrow::Schema> ExpectedSchema; + const ui32 ColumnId; +public: + TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer, + const std::shared_ptr<arrow::Schema>& expectedSchema, const ui32 columnId) + : Transformer(transformer) + , Deserializer(deserializer) + , ExpectedSchema(expectedSchema) + , ColumnId(columnId) + { + Y_VERIFY(ExpectedSchema); + Y_VERIFY(Deserializer); + } + + ui32 GetColumnId() const { + return ColumnId; + } + + std::shared_ptr<arrow::Schema> GetExpectedSchema() const { + return ExpectedSchema; + } + + arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const { + Y_VERIFY(Deserializer); + arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = Deserializer->Deserialize(data); + if (!columnArray.ok()) { + return columnArray; + } + if (Transformer) { + return Transformer->Transform(*columnArray); + } else { + return columnArray; + } + } +}; + /// Column engine index description in terms of tablet's local table. /// We have to use YDB types for keys here. struct TIndexInfo : public NTable::TScheme::TTableSchema { @@ -66,6 +151,10 @@ public: return Id; } + std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const; + TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const; + std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const; + /// Returns an id of the column located by name. The name should exists in the schema. ui32 GetColumnId(const std::string& name) const; std::optional<ui32> GetColumnIdOptional(const std::string& name) const; @@ -139,7 +228,6 @@ public: std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const; void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } - const TCompression& GetDefaultCompression() const { return DefaultCompression; } static const std::vector<std::string>& GetSpecialColumnNames() { static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; @@ -163,7 +251,7 @@ private: std::shared_ptr<arrow::Schema> IndexKey; THashSet<TString> RequiredColumns; THashSet<ui32> MinMaxIdxColumnsIds; - TCompression DefaultCompression; + TCompression DefaultCompression = TCompression::Default(); }; std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false); diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index 4b354c3acc5..d552fb3ee4e 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -13,27 +13,6 @@ std::shared_ptr<arrow::RecordBatch> TIndexLogicBase::GetEffectiveKey(const std:: return resBatch; } -arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) { - auto& codec = compression.Codec; - - arrow::ipc::IpcWriteOptions options(arrow::ipc::IpcWriteOptions::Defaults()); - Y_VERIFY(arrow::util::Codec::IsAvailable(codec)); - arrow::Result<std::unique_ptr<arrow::util::Codec>> resCodec; - if (compression.Level) { - resCodec = arrow::util::Codec::Create(codec, *compression.Level); - if (!resCodec.ok()) { - resCodec = arrow::util::Codec::Create(codec); - } - } else { - resCodec = arrow::util::Codec::Create(codec); - } - Y_VERIFY(resCodec.ok()); - - options.codec.reset((*resCodec).release()); - options.use_threads = false; - return options; -} - std::shared_ptr<arrow::RecordBatch> TIndexationLogic::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const TIndexInfo& indexInfo, const TInsertedData& inserted) const { auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot()); @@ -64,16 +43,17 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, auto blobSchema = IndexInfo.GetSchema(undo.GetSnapshot()); auto resultSchema = IndexInfo.GetLastSchema(); auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs); - auto writeOptions = WriteOptions(*compression); size_t undoSize = newBlobs.size(); - + TSaverContext saverContext; + saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression); for (auto& rec : portionInfo.Records) { auto pos = resultSchema->GetFieldIndex(rec.ColumnId); Y_VERIFY(pos >= 0); auto field = resultSchema->GetField(pos); + auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext); - auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, writeOptions); + auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver); if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { portionInfo = undo; newBlobs.resize(undoSize); @@ -102,7 +82,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI std::vector<TPortionInfo> out; TString tierName; - TCompression compression = resultSchema->GetIndexInfo().GetDefaultCompression(); + std::optional<TCompression> compression; if (pathId) { if (auto* tiering = GetTieringMap().FindPtr(pathId)) { tierName = tiering->GetHottestTierName(); @@ -111,7 +91,8 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI } } } - const auto writeOptions = WriteOptions(compression); + TSaverContext saverContext; + saverContext.SetTierName(tierName).SetExternalCompression(compression); std::shared_ptr<arrow::RecordBatch> portionBatch = batch; for (i32 pos = 0; pos < batch->num_rows();) { @@ -127,12 +108,12 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI bool ok = true; for (const auto& field : resultSchema->GetSchema()->fields()) { const auto& name = field->name(); - ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(TString(name.data(), name.size())); + ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(name); /// @warnign records are not valid cause of empty BlobId and zero Portion TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0); - auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), - writeOptions); + auto columnSaver = resultSchema->GetColumnSaver(name, saverContext); + auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver); if (!blob.size()) { ok = false; break; diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index d95d8469bb4..4bd2230243b 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -5,22 +5,21 @@ namespace NKikimr::NOlap { TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - const arrow::ipc::IpcWriteOptions& writeOptions) -{ - auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field}); - auto batch = arrow::RecordBatch::Make(schema, array->length(), {array}); + const std::shared_ptr<arrow::Field>& field, + const TColumnSaver saver) { + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field }); + auto batch = arrow::RecordBatch::Make(schema, array->length(), { array }); Y_VERIFY(batch); - return NArrow::SerializeBatch(batch, writeOptions); + return saver.Apply(batch); } TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, TColumnRecord&& record, - const arrow::ipc::IpcWriteOptions& writeOptions, + const TColumnSaver saver, const ui32 limitBytes) { - auto blob = SerializeColumn(array, field, writeOptions); + auto blob = SerializeColumn(array, field, saver); if (blob.size() >= limitBytes) { return {}; } @@ -240,42 +239,14 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { return Meta.ColumnMeta.find(columnId)->second.Max; } -std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(const ui32 needCount, const bool reverse) const { +std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_VERIFY(!Blobs.empty()); - auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ Field }); std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(Blobs.size()); - ui32 count = 0; - if (!reverse) { - for (auto& blob : Blobs) { - batches.push_back(blob.BuildRecordBatch(schema)); - Y_VERIFY(batches.back()); - if (count + batches.back()->num_rows() >= needCount) { - Y_VERIFY(count <= needCount); - batches.back() = batches.back()->Slice(0, needCount - count); - } - count += batches.back()->num_rows(); - Y_VERIFY(count <= needCount); - if (count == needCount) { - break; - } - } - } else { - for (auto it = Blobs.rbegin(); it != Blobs.rend(); ++it) { - batches.push_back(it->BuildRecordBatch(schema)); - Y_VERIFY(batches.back()); - if (count + batches.back()->num_rows() >= needCount) { - Y_VERIFY(count <= needCount); - batches.back() = batches.back()->Slice(batches.back()->num_rows() - (needCount - count), needCount - count); - } - count += batches.back()->num_rows(); - Y_VERIFY(count <= needCount); - if (count == needCount) { - break; - } - } - std::reverse(batches.begin(), batches.end()); + for (auto& blob : Blobs) { + batches.push_back(blob.BuildRecordBatch(*Loader)); + Y_VERIFY(batches.back()); } auto res = arrow::Table::FromRecordBatches(batches); @@ -286,12 +257,11 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; std::vector< std::shared_ptr<arrow::Field>> fields; - ui64 limit = options.RecordsCountLimit ? *options.RecordsCountLimit : Max<ui64>(); for (auto&& i : Columns) { if (!options.IsAcceptedColumn(i.GetColumnId())) { continue; } - columns.emplace_back(i.Assemble(limit, !options.ForwardAssemble)); + columns.emplace_back(i.Assemble()); fields.emplace_back(i.GetField()); } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index f43f89988c6..1545f29780f 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -5,7 +5,8 @@ #include "index_info.h" #include <ydb/core/formats/arrow/replace_key.h> - +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/formats/arrow/dictionary/conversion.h> namespace NKikimr::NOlap { @@ -14,6 +15,23 @@ public: using TPtr = std::shared_ptr<ISnapshotSchema>; virtual ~ISnapshotSchema() {} + virtual std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const = 0; + std::shared_ptr<TColumnLoader> GetColumnLoader(const TString& columnName) const { + return GetColumnLoader(std::string(columnName.data(), columnName.size())); + } + std::shared_ptr<TColumnLoader> GetColumnLoader(const std::string& columnName) const { + return GetColumnLoader(GetColumnId(columnName)); + } + + virtual TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const = 0; + TColumnSaver GetColumnSaver(const TString& columnName, const TSaverContext& context) const { + return GetColumnSaver(GetColumnId(columnName), context); + } + TColumnSaver GetColumnSaver(const std::string& columnName, const TSaverContext& context) const { + return GetColumnSaver(TString(columnName.data(), columnName.size()), context); + } + + virtual ui32 GetColumnId(const std::string& columnName) const = 0; virtual int GetFieldIndex(const ui32 columnId) const = 0; virtual std::shared_ptr<arrow::Field> GetField(const int index) const = 0; virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0; @@ -21,7 +39,8 @@ public: virtual const TSnapshot& GetSnapshot() const = 0; }; -class TSnapshotSchema : public ISnapshotSchema { +class TSnapshotSchema: public ISnapshotSchema { +private: TIndexInfo IndexInfo; std::shared_ptr<arrow::Schema> Schema; TSnapshot Snapshot; @@ -33,7 +52,19 @@ public: { } - int GetFieldIndex(const ui32 columnId) const override { + virtual TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const override { + return IndexInfo.GetColumnSaver(columnId, context); + } + + virtual std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const override { + return IndexInfo.GetColumnLoader(columnId); + } + + virtual ui32 GetColumnId(const std::string& columnName) const override { + return IndexInfo.GetColumnId(columnName); + } + + virtual int GetFieldIndex(const ui32 columnId) const override { TString columnName = IndexInfo.GetColumnName(columnId); std::string name(columnName.data(), columnName.size()); return Schema->GetFieldIndex(name); @@ -56,7 +87,7 @@ public: } }; -class TFilteredSnapshotSchema : public ISnapshotSchema { +class TFilteredSnapshotSchema: public ISnapshotSchema { ISnapshotSchema::TPtr OriginalSnapshot; std::shared_ptr<arrow::Schema> Schema; std::set<ui32> ColumnIds; @@ -79,6 +110,20 @@ public: Schema = std::make_shared<arrow::Schema>(schemaFields); } + virtual TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const override { + Y_VERIFY(ColumnIds.contains(columnId)); + return OriginalSnapshot->GetColumnSaver(columnId, context); + } + + virtual std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const override { + Y_VERIFY(ColumnIds.contains(columnId)); + return OriginalSnapshot->GetColumnLoader(columnId); + } + + virtual ui32 GetColumnId(const std::string& columnName) const override { + return OriginalSnapshot->GetColumnId(columnName); + } + int GetFieldIndex(const ui32 columnId) const override { if (!ColumnIds.contains(columnId)) { return -1; @@ -382,44 +427,47 @@ public: return Data; } - std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(std::shared_ptr<arrow::Schema> schema) const { + std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const { if (NullRowsCount) { Y_VERIFY(!Data); - return NArrow::MakeEmptyBatch(schema, NullRowsCount); + return NArrow::MakeEmptyBatch(loader.GetExpectedSchema(), NullRowsCount); } else { - Y_VERIFY(Data); - return NArrow::DeserializeBatch(Data, schema); + auto result = loader.Apply(Data); + if (!result.ok()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot unpack batch")("error", result.status().ToString()); + return nullptr; + } + return NArrow::DictionaryToArray(*result); } } }; class TPreparedColumn { private: - ui32 ColumnId = 0; - std::shared_ptr<arrow::Field> Field; + std::shared_ptr<TColumnLoader> Loader; std::vector<TAssembleBlobInfo> Blobs; public: - ui32 GetColumnId() const noexcept { - return ColumnId; + ui32 GetColumnId() const { + return Loader->GetColumnId(); } const std::string& GetName() const { - return Field->name(); + return Loader->GetExpectedSchema()->field(0)->name(); } std::shared_ptr<arrow::Field> GetField() const { - return Field; + return Loader->GetExpectedSchema()->field(0); } - TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TAssembleBlobInfo>&& blobs, const ui32 columnId) - : ColumnId(columnId) - , Field(field) + TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader) + : Loader(loader) , Blobs(std::move(blobs)) { - + Y_VERIFY(Loader); + Y_VERIFY(Loader->GetExpectedSchema()->num_fields() == 1); } - std::shared_ptr<arrow::ChunkedArray> Assemble(const ui32 needCount, const bool reverse) const; + std::shared_ptr<arrow::ChunkedArray> Assemble() const; }; class TPreparedBatchData { @@ -430,19 +478,9 @@ public: public: struct TAssembleOptions { - const bool ForwardAssemble = true; - std::optional<ui32> RecordsCountLimit; std::optional<std::set<ui32>> IncludedColumnIds; std::optional<std::set<ui32>> ExcludedColumnIds; - TAssembleOptions() noexcept - : TAssembleOptions(true) - {} - - explicit TAssembleOptions(bool forward) noexcept - : ForwardAssemble(forward) - {} - bool IsAcceptedColumn(const ui32 columnId) const { if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) { return false; @@ -484,9 +522,8 @@ public: Y_VERIFY(!Meta.ColumnMeta.empty()); const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows; - const auto& indexInfo = resultSchema.GetIndexInfo(); for (auto&& field : resultSchema.GetSchema()->fields()) { - columns.emplace_back(TPreparedColumn(field, {TAssembleBlobInfo(rowsCount)}, indexInfo.GetColumnId(field->name()))); + columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name()))); } TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks @@ -529,7 +566,7 @@ public: } Y_VERIFY(pos < columns.size()); - columns[pos] = TPreparedColumn(resultField, std::move(blobs), indexInfo.GetColumnId(resultField->name())); + columns[pos] = TPreparedColumn(std::move(blobs), dataSchema.GetColumnLoader(resultField->name())); } return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount); @@ -543,12 +580,12 @@ public: static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, - const arrow::ipc::IpcWriteOptions& writeOptions); + const TColumnSaver saver); TString AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, TColumnRecord&& record, - const arrow::ipc::IpcWriteOptions& writeOptions, + const TColumnSaver saver, ui32 limitBytes = BLOB_BYTES_LIMIT); friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp index 8fc01412f40..38d7bfc7056 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -12,7 +12,6 @@ bool TAssembleBatch::DoExecuteImpl() { Y_VERIFY(BatchConstructor.GetColumnsCount()); TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.RecordsCountLimit = Filter->Size(); auto addBatch = BatchConstructor.Assemble(options); Y_VERIFY(addBatch); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h index f6208341e88..cb84cd8c681 100644 --- a/ydb/core/tx/columnshard/engines/tier_info.h +++ b/ydb/core/tx/columnshard/engines/tier_info.h @@ -4,13 +4,64 @@ #include "scalars.h" #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/common/validation.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> namespace NKikimr::NOlap { struct TCompression { - arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME}; +private: + arrow::Compression::type Codec = arrow::Compression::LZ4_FRAME; std::optional<int> Level; + TCompression() = default; +public: + + bool DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) { + if (compression.HasCompressionCodec()) { + switch (compression.GetCompressionCodec()) { + case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain: + Codec = arrow::Compression::UNCOMPRESSED; + break; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4: + Codec = arrow::Compression::LZ4_FRAME; + break; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD: + Codec = arrow::Compression::ZSTD; + break; + } + } + + if (compression.HasCompressionLevel()) { + Level = compression.GetCompressionLevel(); + } + return true; + } + + static const TCompression& Default() { + static TCompression result; + return result; + } + + explicit TCompression(const arrow::Compression::type codec, std::optional<int> level = {}) + : Codec(codec) + , Level(level) + { + + } + + TString DebugString() const { + TStringBuilder sb; + sb << arrow::util::Codec::GetCodecAsString(Codec) << ":" << Level.value_or(arrow::util::kUseDefaultCompressionLevel); + return sb; + } + + std::unique_ptr<arrow::util::Codec> BuildArrowCodec() const { + return NArrow::TStatusValidator::GetValid( + arrow::util::Codec::Create( + Codec, Level.value_or(arrow::util::kUseDefaultCompressionLevel))); + } + }; class TTierInfo { @@ -107,10 +158,12 @@ public: TString GetDebugString() const { TStringBuilder sb; - sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' " - << arrow::util::Codec::GetCodecAsString(Compression ? Compression->Codec : TCompression().Codec) - << ":" << ((Compression && Compression->Level) ? - *Compression->Level : arrow::util::kUseDefaultCompressionLevel); + sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' "; + if (Compression) { + sb << Compression->DebugString(); + } else { + sb << TCompression::Default().DebugString(); + } return sb; } }; diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index 34606b2dcb6..7ff41f0e7ec 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -116,24 +116,8 @@ TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, } NKikimr::NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { - NOlap::TCompression out; - if (compression.HasCompressionCodec()) { - switch (compression.GetCompressionCodec()) { - case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain: - out.Codec = arrow::Compression::UNCOMPRESSED; - break; - case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4: - out.Codec = arrow::Compression::LZ4_FRAME; - break; - case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD: - out.Codec = arrow::Compression::ZSTD; - break; - } - } - - if (compression.HasCompressionLevel()) { - out.Level = compression.GetCompressionLevel(); - } + NOlap::TCompression out = NOlap::TCompression::Default(); + Y_VERIFY(out.DeserializeFromProto(compression)); return out; } } |