diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-23 10:18:57 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-23 10:18:57 +0300 |
commit | 2d78a3b5b4b567bd6395b26bd81bc208490fda83 (patch) | |
tree | 1e179309246776ae24b4e265abd0a9c9693a255a | |
parent | 09dbf8e905ff4109b2a298e8c85eb62b11003397 (diff) | |
download | ydb-2d78a3b5b4b567bd6395b26bd81bc208490fda83.tar.gz |
encoding compression settings
speed up dictionary decoding
23 files changed, 325 insertions, 101 deletions
diff --git a/ydb/core/formats/arrow/compression/diff.cpp b/ydb/core/formats/arrow/compression/diff.cpp index 8d1cca93549..0659cba5e2a 100644 --- a/ydb/core/formats/arrow/compression/diff.cpp +++ b/ydb/core/formats/arrow/compression/diff.cpp @@ -1,4 +1,5 @@ #include "diff.h" +#include "object.h" #include "parsing.h" #include <util/string/cast.h> @@ -51,4 +52,26 @@ bool TCompressionDiff::DeserializeFromProto(const NKikimrSchemeOp::TCompressionO return true; } +NKikimr::TConclusionStatus TCompressionDiff::Apply(std::optional<TCompression>& settings) const { + if (IsEmpty()) { + return TConclusionStatus::Success(); + } + TCompression merged; + if (!!settings) { + merged = *settings; + } + if (Codec) { + merged.Codec = *Codec; + } + if (Level) { + merged.Level = *Level; + } + auto validation = merged.Validate(); + if (!validation) { + return validation; + } + settings = merged; + return TConclusionStatus::Success(); +} + } diff --git a/ydb/core/formats/arrow/compression/diff.h b/ydb/core/formats/arrow/compression/diff.h index d2deae3cbff..53cdd5bae78 100644 --- a/ydb/core/formats/arrow/compression/diff.h +++ b/ydb/core/formats/arrow/compression/diff.h @@ -2,7 +2,6 @@ #include <ydb/library/conclusion/status.h> #include <ydb/library/conclusion/result.h> -#include <ydb/library/accessor/accessor.h> #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/services/metadata/abstract/request_features.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> @@ -11,14 +10,16 @@ namespace NKikimr::NArrow { +class TCompression; + class TCompressionDiff { private: std::optional<arrow::Compression::type> Codec; std::optional<int> Level; -public: bool IsEmpty() const { return !Level && !Codec; } +public: NKikimrSchemeOp::TCompressionOptions SerializeToProto() const; bool DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto); TConclusionStatus DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features); @@ -28,5 +29,6 @@ public: const std::optional<int>& GetLevel() const { return Level; } + TConclusionStatus Apply(std::optional<TCompression>& settings) const; }; } diff --git a/ydb/core/formats/arrow/compression/object.cpp b/ydb/core/formats/arrow/compression/object.cpp index 590e45cac69..7f58d2618e9 100644 --- a/ydb/core/formats/arrow/compression/object.cpp +++ b/ydb/core/formats/arrow/compression/object.cpp @@ -19,22 +19,6 @@ TConclusionStatus NKikimr::NArrow::TCompression::Validate() const { return TConclusionStatus::Success(); } -TConclusionStatus NKikimr::NArrow::TCompression::ApplyDiff(const TCompressionDiff& diff) { - TCompression merged = *this; - if (diff.GetCodec()) { - merged.Codec = *diff.GetCodec(); - } - if (diff.GetLevel()) { - merged.Level = *diff.GetLevel(); - } - auto validation = merged.Validate(); - if (!validation) { - return validation; - } - std::swap(*this, merged); - return TConclusionStatus::Success(); -} - TConclusionStatus TCompression::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) { if (compression.HasCompressionCodec()) { auto codecOpt = NArrow::CompressionFromProto(compression.GetCompressionCodec()); @@ -70,4 +54,17 @@ std::unique_ptr<arrow::util::Codec> TCompression::BuildArrowCodec() const { Codec, Level.value_or(arrow::util::kUseDefaultCompressionLevel))); } +NKikimr::TConclusion<NKikimr::NArrow::TCompression> TCompression::BuildFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) { + TCompression result; + auto resultStatus = result.DeserializeFromProto(compression); + if (!resultStatus) { + return resultStatus; + } + return result; +} + +std::unique_ptr<arrow::util::Codec> TCompression::BuildDefaultCodec() { + return *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); +} + } diff --git a/ydb/core/formats/arrow/compression/object.h b/ydb/core/formats/arrow/compression/object.h index cdb955c986b..18c32b59221 100644 --- a/ydb/core/formats/arrow/compression/object.h +++ b/ydb/core/formats/arrow/compression/object.h @@ -1,31 +1,28 @@ #pragma once -#include <ydb/library/conclusion/status.h> +#include <ydb/library/conclusion/result.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> #include "diff.h" namespace NKikimr::NArrow { -struct TCompression { +class TCompression { private: arrow::Compression::type Codec = arrow::Compression::LZ4_FRAME; std::optional<int> Level; TCompression() = default; TConclusionStatus Validate() const; - + friend class TCompressionDiff; public: - TConclusionStatus ApplyDiff(const TCompressionDiff& diff); + static std::unique_ptr<arrow::util::Codec> BuildDefaultCodec(); TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression); NKikimrSchemeOp::TCompressionOptions SerializeToProto() const; - static const TCompression& Default() { - static TCompression result; - return result; - } + static TConclusion<TCompression> BuildFromProto(const NKikimrSchemeOp::TCompressionOptions& compression); explicit TCompression(const arrow::Compression::type codec, std::optional<int> level = {}) : Codec(codec) diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt index b78423781c4..f34bb025e5d 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.darwin-x86_64.txt @@ -12,9 +12,12 @@ target_link_libraries(formats-arrow-dictionary PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + ydb-core-protos formats-arrow-simple_builder formats-arrow-switch ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/diff.cpp ) diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt index f3f3651f3e7..93955803c58 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-aarch64.txt @@ -13,9 +13,12 @@ target_link_libraries(formats-arrow-dictionary PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + ydb-core-protos formats-arrow-simple_builder formats-arrow-switch ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/diff.cpp ) diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt index f3f3651f3e7..93955803c58 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt @@ -13,9 +13,12 @@ target_link_libraries(formats-arrow-dictionary PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + ydb-core-protos formats-arrow-simple_builder formats-arrow-switch ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/diff.cpp ) diff --git a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt index b78423781c4..f34bb025e5d 100644 --- a/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt @@ -12,9 +12,12 @@ target_link_libraries(formats-arrow-dictionary PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + ydb-core-protos formats-arrow-simple_builder formats-arrow-switch ) target_sources(formats-arrow-dictionary PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/conversion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/dictionary/diff.cpp ) diff --git a/ydb/core/formats/arrow/dictionary/conversion.cpp b/ydb/core/formats/arrow/dictionary/conversion.cpp index 78c57c8d23f..6250b8c9348 100644 --- a/ydb/core/formats/arrow/dictionary/conversion.cpp +++ b/ydb/core/formats/arrow/dictionary/conversion.cpp @@ -30,9 +30,15 @@ std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& da constexpr bool indicesIntegral = std::is_integral<typename TWrapIndices::T::c_type>::value; if constexpr (indicesIntegral && hasCType) { using TIndices = typename arrow::TypeTraits<typename TWrapIndices::T>::ArrayType; - using TDictionaryAccessor = NConstruction::TDictionaryArrayAccessor<TDictionaryValue, TIndices>; auto& columnIndices = static_cast<const TIndices&>(*data.indices()); - result = NConstruction::TSimpleArrayConstructor("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length()); + constexpr bool hasStrView = arrow::has_string_view<TDictionaryValue>::value; + if constexpr (hasStrView) { + using TBinaryDictionaryAccessor = NConstruction::TBinaryDictionaryArrayAccessor<TDictionaryValue, TIndices>; + result = NConstruction::TBinaryArrayConstructor("absent", TBinaryDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length()); + } else { + using TDictionaryAccessor = NConstruction::TDictionaryArrayAccessor<TDictionaryValue, TIndices>; + result = NConstruction::TSimpleArrayConstructor("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length()); + } return true; } } diff --git a/ydb/core/formats/arrow/dictionary/diff.cpp b/ydb/core/formats/arrow/dictionary/diff.cpp new file mode 100644 index 00000000000..a35a119aa03 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/diff.cpp @@ -0,0 +1,49 @@ +#include "diff.h" +#include "object.h" +#include <util/string/cast.h> + +namespace NKikimr::NArrow::NDictionary { + +NKikimrSchemeOp::TDictionaryEncodingSettings TEncodingDiff::SerializeToProto() const { + NKikimrSchemeOp::TDictionaryEncodingSettings result; + if (Enabled) { + result.SetEnabled(*Enabled); + } + return result; +} + +TConclusionStatus TEncodingDiff::DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features) { + { + auto fValue = features.Extract("ENCODING.DICTIONARY.ENABLED"); + if (fValue) { + bool enabled = false; + if (!TryFromString<bool>(*fValue, enabled)) { + return TConclusionStatus::Fail("cannot parse COMPRESSTION.DICTIONARY.ENABLED as boolean"); + } + Enabled = enabled; + } + } + return TConclusionStatus::Success(); +} + +bool TEncodingDiff::DeserializeFromProto(const NKikimrSchemeOp::TDictionaryEncodingSettings& proto) { + if (proto.HasEnabled()) { + Enabled = proto.GetEnabled(); + } + return true; +} + +TConclusionStatus TEncodingDiff::Apply(std::optional<TEncodingSettings>& settings) const { + if (IsEmpty()) { + return TConclusionStatus::Success(); + } + if (!settings) { + settings = TEncodingSettings(); + } + if (Enabled) { + settings->Enabled = *Enabled; + } + return TConclusionStatus::Success(); +} + +} diff --git a/ydb/core/formats/arrow/dictionary/diff.h b/ydb/core/formats/arrow/dictionary/diff.h new file mode 100644 index 00000000000..1eeae21a869 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/diff.h @@ -0,0 +1,28 @@ +#pragma once + +#include <ydb/library/conclusion/status.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/services/metadata/abstract/request_features.h> +#include <optional> +#include <map> + +namespace NKikimr::NArrow::NDictionary { + +class TEncodingSettings; + +class TEncodingDiff { +private: + std::optional<bool> Enabled; + bool IsEmpty() const { + return !Enabled; + } +public: + NKikimrSchemeOp::TDictionaryEncodingSettings SerializeToProto() const; + bool DeserializeFromProto(const NKikimrSchemeOp::TDictionaryEncodingSettings& proto); + TConclusionStatus DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features); + const std::optional<bool>& GetEnabled() const { + return Enabled; + } + TConclusionStatus Apply(std::optional<TEncodingSettings>& settings) const; +}; +} diff --git a/ydb/core/formats/arrow/dictionary/object.cpp b/ydb/core/formats/arrow/dictionary/object.cpp new file mode 100644 index 00000000000..4a72802b2ae --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/object.cpp @@ -0,0 +1,43 @@ +#include "object.h" +#include <ydb/core/formats/arrow/common/validation.h> +#include <ydb/core/formats/arrow/transformer/dictionary.h> +#include <util/string/builder.h> + +namespace NKikimr::NArrow::NDictionary { + +TConclusionStatus TEncodingSettings::DeserializeFromProto(const NKikimrSchemeOp::TDictionaryEncodingSettings& proto) { + if (proto.HasEnabled()) { + Enabled = proto.GetEnabled(); + } + return TConclusionStatus::Success(); +} + +NKikimrSchemeOp::TDictionaryEncodingSettings TEncodingSettings::SerializeToProto() const { + NKikimrSchemeOp::TDictionaryEncodingSettings result; + result.SetEnabled(Enabled); + return result; +} + +TString TEncodingSettings::DebugString() const { + TStringBuilder sb; + sb << "enabled=" << Enabled << ";"; + return sb; +} + +NTransformation::ITransformer::TPtr TEncodingSettings::BuildEncoder() const { + if (Enabled) { + return std::make_shared<NArrow::NTransformation::TDictionaryPackTransformer>(); + } else { + return nullptr; + } +} + +NTransformation::ITransformer::TPtr TEncodingSettings::BuildDecoder() const { + if (Enabled) { + return std::make_shared<NArrow::NTransformation::TDictionaryUnpackTransformer>(); + } else { + return nullptr; + } +} + +} diff --git a/ydb/core/formats/arrow/dictionary/object.h b/ydb/core/formats/arrow/dictionary/object.h new file mode 100644 index 00000000000..ba8b39a4f68 --- /dev/null +++ b/ydb/core/formats/arrow/dictionary/object.h @@ -0,0 +1,41 @@ +#pragma once + +#include <ydb/library/conclusion/result.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> +#include <ydb/core/formats/arrow/transformer/abstract.h> + +namespace NKikimr::NArrow::NDictionary { + +class TEncodingSettings { +private: + bool Enabled = false; + TEncodingSettings() = default; + friend class TEncodingDiff; +public: + + NTransformation::ITransformer::TPtr BuildEncoder() const; + NTransformation::ITransformer::TPtr BuildDecoder() const; + + TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TDictionaryEncodingSettings& proto); + static TConclusion<TEncodingSettings> BuildFromProto(const NKikimrSchemeOp::TDictionaryEncodingSettings& proto) { + TEncodingSettings result; + auto resultParse = result.DeserializeFromProto(proto); + if (!resultParse) { + return resultParse; + } + return result; + } + + NKikimrSchemeOp::TDictionaryEncodingSettings SerializeToProto() const; + + explicit TEncodingSettings(const bool enabled) + : Enabled(enabled) + { + + } + + TString DebugString() const; +}; + +} diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp index e78d16f928e..4f4ad041f7e 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp @@ -11,18 +11,16 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl ColumnName = *fValue; } { - auto fValue = features.Extract("LOW_CARDINALITY"); - if (fValue) { - bool value; - if (!TryFromString<bool>(*fValue, value)) { - return TConclusionStatus::Fail("cannot parse LOW_CARDINALITY as bool"); - } - LowCardinality = value; + auto result = DictionaryEncodingDiff.DeserializeFromRequestFeatures(features); + if (!result) { + return TConclusionStatus::Fail(result.GetErrorMessage()); } } - auto result = CompressionDiff.DeserializeFromRequestFeatures(features); - if (!result) { - return TConclusionStatus::Fail(result.GetErrorMessage()); + { + auto result = CompressionDiff.DeserializeFromRequestFeatures(features); + if (!result) { + return TConclusionStatus::Fail(result.GetErrorMessage()); + } } return TConclusionStatus::Success(); } @@ -32,9 +30,7 @@ void TAlterColumnOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTable auto* column = schemaData->AddAlterColumns(); column->SetName(ColumnName); *column->MutableCompression() = CompressionDiff.SerializeToProto(); - if (LowCardinality) { - column->SetLowCardinality(*LowCardinality); - } + *column->MutableDictionaryEncoding() = DictionaryEncodingDiff.SerializeToProto(); } } diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h index 13927b8e637..b0fe850dab0 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h @@ -1,5 +1,6 @@ #include "abstract.h" #include <ydb/core/formats/arrow/compression/diff.h> +#include <ydb/core/formats/arrow/dictionary/diff.h> namespace NKikimr::NKqp::NColumnshard { @@ -14,7 +15,7 @@ private: TString ColumnName; NArrow::TCompressionDiff CompressionDiff; - std::optional<bool> LowCardinality; + NArrow::NDictionary::TEncodingDiff DictionaryEncodingDiff; public: TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override; diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index d2891880efd..fbc9c2dc12d 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -3336,19 +3336,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Sleep(TDuration::Seconds(5)); auto tableClient = kikimr.GetTableClient(); { - auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, LOW_CARDINALITY=`true`);"; + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `ENCODING.DICTIONARY.ENABLED`=`true`);"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); } { - auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field1, LOW_CARDINALITY=`true`);"; + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field1, `ENCODING.DICTIONARY.ENABLED`=`true`);"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString()); } { - auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, LOW_CARDINALITY1=`true`);"; + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `ENCODING.DICTIONARY.ENABLED1`=`true`);"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString()); diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index b9a633ec4ec..5a14a5c96fd 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -380,6 +380,10 @@ message TTableDescription { optional TTableReplicationConfig ReplicationConfig = 40; } +message TDictionaryEncodingSettings { + optional bool Enabled = 1; +} + message TCompressionOptions { optional EColumnCodec CompressionCodec = 2; // LZ4 (in arrow LZ4_FRAME variant) if not set optional int32 CompressionLevel = 3; // Use default compression level if not set (0 != not set) @@ -388,7 +392,7 @@ message TCompressionOptions { message TOlapColumnDiff { optional string Name = 1; optional TCompressionOptions Compression = 2; - optional bool LowCardinality = 3; + optional TDictionaryEncodingSettings DictionaryEncoding = 4; } message TOlapColumnDescription { @@ -404,7 +408,7 @@ message TOlapColumnDescription { optional bool NotNull = 7; optional TCompressionOptions Compression = 8; - optional bool LowCardinality = 9; + optional TDictionaryEncodingSettings DictionaryEncoding = 9; } enum EColumnTableEngine { diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 03ec854aac8..288d5883d11 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -336,21 +336,28 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const { arrow::ipc::IpcWriteOptions options; - if (context.GetExternalCompression()) { - options.codec = context.GetExternalCompression()->BuildArrowCodec(); - } else { - options.codec = DefaultCompression.BuildArrowCodec(); - } options.use_threads = false; - auto it = ColumnFeatures.find(columnId); + NArrow::NTransformation::ITransformer::TPtr transformer; - if (it != ColumnFeatures.end()) { - transformer = it->second.GetSaveTransformer(); - auto codec = it->second.GetCompressionCodec(); - if (!!codec) { - options.codec = std::move(codec); + std::unique_ptr<arrow::util::Codec> columnCodec; + { + auto it = ColumnFeatures.find(columnId); + if (it != ColumnFeatures.end()) { + transformer = it->second.GetSaveTransformer(); + columnCodec = it->second.GetCompressionCodec(); } } + + if (context.GetExternalCompression()) { + options.codec = context.GetExternalCompression()->BuildArrowCodec(); + } else if (columnCodec) { + options.codec = std::move(columnCodec); + } else if (DefaultCompression) { + options.codec = DefaultCompression->BuildArrowCodec(); + } else { + options.codec = NArrow::TCompression::BuildDefaultCodec(); + } + if (!transformer) { return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options)); } else { @@ -413,7 +420,12 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& } if (schema.HasDefaultCompression()) { - Y_VERIFY(DefaultCompression.DeserializeFromProto(schema.GetDefaultCompression())); + auto result = NArrow::TCompression::BuildFromProto(schema.GetDefaultCompression()); + if (!result) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", result.GetErrorMessage()); + return false; + } + DefaultCompression = *result; } return true; } @@ -455,16 +467,16 @@ std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& table NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetSaveTransformer() const { NArrow::NTransformation::ITransformer::TPtr transformer; - if (LowCardinality.value_or(false)) { - transformer = std::make_shared<NArrow::NTransformation::TDictionaryPackTransformer>(); + if (DictionaryEncoding) { + transformer = DictionaryEncoding->BuildEncoder(); } return transformer; } NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer() const { NArrow::NTransformation::ITransformer::TPtr transformer; - if (LowCardinality.value_or(false)) { - transformer = std::make_shared<NArrow::NTransformation::TDictionaryUnpackTransformer>(); + if (DictionaryEncoding) { + transformer = DictionaryEncoding->BuildDecoder(); } return transformer; } diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 2e9e316b223..b178ced05d1 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -6,6 +6,7 @@ #include <ydb/core/sys_view/common/schema.h> #include <ydb/core/tablet_flat/flat_dbase_scheme.h> +#include <ydb/core/formats/arrow/dictionary/object.h> #include <ydb/core/formats/arrow/serializer/abstract.h> #include <ydb/core/formats/arrow/transformer/abstract.h> #include <ydb/core/scheme/scheme_types_proto.h> @@ -112,17 +113,19 @@ public: class TColumnFeatures { private: std::optional<NArrow::TCompression> Compression; - std::optional<bool> LowCardinality; + std::optional<NArrow::NDictionary::TEncodingSettings> DictionaryEncoding; public: static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo) { TColumnFeatures result; if (columnInfo.HasCompression()) { - NArrow::TCompression compression = NArrow::TCompression::Default(); - Y_VERIFY(compression.DeserializeFromProto(columnInfo.GetCompression())); - result.Compression = compression; + auto settings = NArrow::TCompression::BuildFromProto(columnInfo.GetCompression()); + Y_VERIFY(settings.IsSuccess()); + result.Compression = *settings; } - if (columnInfo.HasLowCardinality()) { - result.LowCardinality = columnInfo.GetLowCardinality(); + if (columnInfo.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding()); + Y_VERIFY(settings.IsSuccess()); + result.DictionaryEncoding = *settings; } return result; } @@ -297,7 +300,7 @@ private: std::shared_ptr<arrow::Schema> IndexKey; THashSet<TString> RequiredColumns; THashSet<ui32> MinMaxIdxColumnsIds; - TCompression DefaultCompression = TCompression::Default(); + std::optional<TCompression> DefaultCompression; }; std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false); diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h index 4416299630e..c1215ca29ca 100644 --- a/ydb/core/tx/columnshard/engines/tier_info.h +++ b/ydb/core/tx/columnshard/engines/tier_info.h @@ -111,7 +111,7 @@ public: if (Compression) { sb << Compression->DebugString(); } else { - sb << TCompression::Default().DebugString(); + sb << "NOT_SPECIFIED(Default)"; } return sb; } diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp index 6dccee16e86..bb007c568c0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp @@ -23,15 +23,20 @@ namespace NKikimr::NSchemeShard { NotNullFlag = columnSchema.GetNotNull(); TypeName = columnSchema.GetType(); if (columnSchema.HasCompression()) { - NArrow::TCompression compression = NArrow::TCompression::Default(); - if (!compression.DeserializeFromProto(columnSchema.GetCompression())) { - errors.AddError("Cannot parse compression info"); + auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); + if (!compression) { + errors.AddError("Cannot parse compression info: " + compression.GetErrorMessage()); return false; } - Compression = compression; + Compression = *compression; } - if (columnSchema.HasLowCardinality()) { - LowCardinality = columnSchema.GetLowCardinality(); + if (columnSchema.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); + if (!settings) { + errors.AddError("Cannot parse dictionary compression info: " + settings.GetErrorMessage()); + return false; + } + DictionaryEncoding = *settings; } if (columnSchema.HasTypeId()) { @@ -78,12 +83,14 @@ namespace NKikimr::NSchemeShard { .TypeInfo; } if (columnSchema.HasCompression()) { - NArrow::TCompression compression = NArrow::TCompression::Default(); - Y_VERIFY(compression.DeserializeFromProto(columnSchema.GetCompression())); - Compression = compression; + auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); + Y_VERIFY(compression.IsSuccess(), "%s", compression.GetErrorMessage().data()); + Compression = *compression; } - if (columnSchema.HasLowCardinality()) { - LowCardinality = columnSchema.GetLowCardinality(); + if (columnSchema.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); + Y_VERIFY(settings.IsSuccess()); + DictionaryEncoding = *settings; } NotNullFlag = columnSchema.GetNotNull(); } @@ -95,8 +102,8 @@ namespace NKikimr::NSchemeShard { if (Compression) { *columnSchema.MutableCompression() = Compression->SerializeToProto(); } - if (LowCardinality && *LowCardinality) { - columnSchema.SetLowCardinality(*LowCardinality); + if (DictionaryEncoding) { + *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto(); } auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, ""); @@ -108,19 +115,19 @@ namespace NKikimr::NSchemeShard { bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) { Y_VERIFY(GetName() == diffColumn.GetName()); - if (!diffColumn.GetCompression().IsEmpty()) { - if (!Compression) { - Compression = NArrow::TCompression::Default(); - } - auto applyDiffResult = Compression->ApplyDiff(diffColumn.GetCompression()); - if (!applyDiffResult) { - errors.AddError("Cannot merge compression info: " + applyDiffResult.GetErrorMessage()); + { + auto result = diffColumn.GetCompression().Apply(Compression); + if (!result) { + errors.AddError("Cannot merge compression info: " + result.GetErrorMessage()); return false; } } - - if (diffColumn.GetLowCardinality()) { - LowCardinality = diffColumn.GetLowCardinality(); + { + auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding); + if (!result) { + errors.AddError("Cannot merge dictionary encoding info: " + result.GetErrorMessage()); + return false; + } } return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h index ce487b7e23c..63941ccbd88 100644 --- a/ydb/core/tx/schemeshard/schemeshard_olap_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.h @@ -6,6 +6,8 @@ #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/formats/arrow/compression/object.h> #include <ydb/core/formats/arrow/compression/diff.h> +#include <ydb/core/formats/arrow/dictionary/object.h> +#include <ydb/core/formats/arrow/dictionary/diff.h> namespace NKikimr::NSchemeShard { @@ -41,7 +43,7 @@ namespace NKikimr::NSchemeShard { YDB_READONLY_DEF(NScheme::TTypeInfo, Type); YDB_FLAG_ACCESSOR(NotNull, false); YDB_READONLY_DEF(std::optional<NArrow::TCompression>, Compression); - YDB_READONLY_DEF(std::optional<bool>, LowCardinality); + YDB_READONLY_DEF(std::optional<NArrow::NDictionary::TEncodingSettings>, DictionaryEncoding); public: TOlapColumnAdd(const std::optional<ui32>& keyOrder) : KeyOrder(keyOrder) @@ -78,7 +80,7 @@ namespace NKikimr::NSchemeShard { class TOlapColumnDiff { YDB_READONLY_DEF(TString, Name); YDB_READONLY_DEF(NArrow::TCompressionDiff, Compression); - YDB_READONLY_DEF(std::optional<bool>, LowCardinality); + YDB_READONLY_DEF(NArrow::NDictionary::TEncodingDiff, DictionaryEncoding); public: bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) { Name = columnSchema.GetName(); @@ -90,8 +92,9 @@ namespace NKikimr::NSchemeShard { errors.AddError("cannot parse compression diff from proto"); return false; } - if (columnSchema.HasLowCardinality()) { - LowCardinality = columnSchema.GetLowCardinality(); + if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) { + errors.AddError("cannot parse dictionary encoding diff from proto"); + return false; } return true; } diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index 7ff41f0e7ec..c6373f79651 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -116,9 +116,9 @@ TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, } NKikimr::NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { - NOlap::TCompression out = NOlap::TCompression::Default(); - Y_VERIFY(out.DeserializeFromProto(compression)); - return out; + auto out = NOlap::TCompression::BuildFromProto(compression); + Y_VERIFY(out, "%s", out.GetErrorMessage().data()); + return *out; } } |