diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-22 12:57:55 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-22 12:57:55 +0300 |
commit | c8adb6c07725bee79f0f8a00ec207167316d86b9 (patch) | |
tree | a106f5e94b0bc416aca5cdc03f5077479092be01 | |
parent | 65d9f1f53c013e9b46c3999849db844de5aef4d1 (diff) | |
download | ydb-c8adb6c07725bee79f0f8a00ec207167316d86b9.tar.gz |
compression usage for columnshard (enum + custom codecs)
-rw-r--r-- | ydb/core/formats/arrow/compression/object.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.cpp | 84 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 47 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.cpp | 36 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_olap_types.cpp | 5 |
6 files changed, 139 insertions, 47 deletions
diff --git a/ydb/core/formats/arrow/compression/object.cpp b/ydb/core/formats/arrow/compression/object.cpp index 21be59b5944..590e45cac69 100644 --- a/ydb/core/formats/arrow/compression/object.cpp +++ b/ydb/core/formats/arrow/compression/object.cpp @@ -11,9 +11,9 @@ TConclusionStatus NKikimr::NArrow::TCompression::Validate() const { const int levelMin = codec->minimum_compression_level(); const int levelMax = codec->maximum_compression_level(); if (Level && (*Level < levelMin || levelMax < *Level)) { - TStringBuilder sb; - sb << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]"; - return TConclusionStatus::Fail(sb); + return TConclusionStatus::Fail( + TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]" + ); } } return TConclusionStatus::Success(); diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index eb4e8a33738..03ec854aac8 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -6,6 +6,8 @@ #include <ydb/core/formats/arrow/sort_cursor.h> #include <ydb/core/sys_view/common/schema.h> #include <ydb/core/formats/arrow/serializer/batch_only.h> +#include <ydb/core/formats/arrow/transformer/dictionary.h> +#include <ydb/core/formats/arrow/serializer/full.h> namespace NKikimr::NOlap { @@ -332,7 +334,7 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { return MinMaxIdxColumnsIds.contains(it->second); } -TColumnSaver TIndexInfo::GetColumnSaver(const ui32 /*columnId*/, const TSaverContext& context) const { +TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const { arrow::ipc::IpcWriteOptions options; if (context.GetExternalCompression()) { options.codec = context.GetExternalCompression()->BuildArrowCodec(); @@ -340,13 +342,37 @@ TColumnSaver TIndexInfo::GetColumnSaver(const ui32 /*columnId*/, const TSaverCon options.codec = DefaultCompression.BuildArrowCodec(); } options.use_threads = false; - return TColumnSaver(nullptr, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options)); + auto it = ColumnFeatures.find(columnId); + NArrow::NTransformation::ITransformer::TPtr transformer; + if (it != ColumnFeatures.end()) { + transformer = it->second.GetSaveTransformer(); + auto codec = it->second.GetCompressionCodec(); + if (!!codec) { + options.codec = std::move(codec); + } + } + if (!transformer) { + return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options)); + } else { + return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TFullDataSerializer>(options)); + } } std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const { - return std::make_shared<TColumnLoader>(nullptr, - std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(GetColumnSchema(columnId)), - GetColumnSchema(columnId), columnId); + auto it = ColumnFeatures.find(columnId); + NArrow::NTransformation::ITransformer::TPtr transformer; + if (it != ColumnFeatures.end()) { + transformer = it->second.GetLoadTransformer(); + } + if (!transformer) { + return std::make_shared<TColumnLoader>(transformer, + std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(GetColumnSchema(columnId)), + GetColumnSchema(columnId), columnId); + } else { + return std::make_shared<TColumnLoader>(transformer, + std::make_shared<NArrow::NSerialization::TFullDataDeserializer>(), + GetColumnSchema(columnId), columnId); + } } std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const { @@ -360,6 +386,38 @@ std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) return std::make_shared<arrow::Schema>(fields); } +bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { + if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema"); + return false; + } + + for (const auto& col : schema.GetColumns()) { + const ui32 id = col.GetId(); + const TString& name = col.GetName(); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), + col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); + Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); + ColumnNames[name] = id; + std::optional<TColumnFeatures> cFeatures = TColumnFeatures::BuildFromProto(col); + if (!cFeatures) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature"); + return false; + } + ColumnFeatures.emplace(id, *cFeatures); + } + + for (const auto& keyName : schema.GetKeyColumnNames()) { + Y_VERIFY(ColumnNames.contains(keyName)); + KeyColumns.push_back(ColumnNames[keyName]); + } + + if (schema.HasDefaultCompression()) { + Y_VERIFY(DefaultCompression.DeserializeFromProto(schema.GetDefaultCompression())); + } + return true; +} + std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) { std::vector<std::shared_ptr<arrow::Field>> fields; fields.reserve(withSpecials ? ids.size() + 2 : ids.size()); @@ -395,4 +453,20 @@ std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& table return out; } +NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetSaveTransformer() const { + NArrow::NTransformation::ITransformer::TPtr transformer; + if (LowCardinality.value_or(false)) { + transformer = std::make_shared<NArrow::NTransformation::TDictionaryPackTransformer>(); + } + return transformer; +} + +NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer() const { + NArrow::NTransformation::ITransformer::TPtr transformer; + if (LowCardinality.value_or(false)) { + transformer = std::make_shared<NArrow::NTransformation::TDictionaryUnpackTransformer>(); + } + return transformer; +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index a40496e4321..a565d80ba6d 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -8,6 +8,7 @@ #include <ydb/core/tablet_flat/flat_dbase_scheme.h> #include <ydb/core/formats/arrow/serializer/abstract.h> #include <ydb/core/formats/arrow/transformer/abstract.h> +#include <ydb/core/scheme/scheme_types_proto.h> namespace arrow { class Array; @@ -108,9 +109,42 @@ public: } }; +class TColumnFeatures { +private: + std::optional<NArrow::TCompression> Compression; + std::optional<bool> LowCardinality; +public: + static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo) { + TColumnFeatures result; + if (columnInfo.HasCompression()) { + NArrow::TCompression compression = NArrow::TCompression::Default(); + Y_VERIFY(compression.DeserializeFromProto(columnInfo.GetCompression())); + result.Compression = compression; + } + if (columnInfo.HasLowCardinality()) { + result.LowCardinality = columnInfo.GetLowCardinality(); + } + return result; + } + + NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const; + NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const; + + std::unique_ptr<arrow::util::Codec> GetCompressionCodec() const { + if (Compression) { + return Compression->BuildArrowCodec(); + } else { + return nullptr; + } + } + +}; + /// Column engine index description in terms of tablet's local table. /// We have to use YDB types for keys here. struct TIndexInfo : public NTable::TScheme::TTableSchema { +private: + THashMap<ui32, TColumnFeatures> ColumnFeatures; public: static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; @@ -143,8 +177,17 @@ public: } return true; } -public: TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false); + bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); +public: + + static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { + TIndexInfo result("", 0, schema.GetCompositeMarks()); + if (!result.DeserializeFromProto(schema)) { + return {}; + } + return result; + } /// Returns id of the index. ui32 GetId() const noexcept { @@ -227,8 +270,6 @@ public: std::shared_ptr<NArrow::TSortDescription> SortDescription() const; std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const; - void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } - static const std::vector<std::string>& GetSpecialColumnNames() { static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; return result; diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index b88462131b3..74fc0c73660 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -219,7 +219,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc return true; } -void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) { +void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) { Y_VERIFY(SchemaPresets.contains(presetId)); auto preset = SchemaPresets.at(presetId); @@ -266,9 +266,9 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi table.AddVersion(version, versionInfo); } -void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) { +void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema) { NOlap::TSnapshot snapshot{version.Step, version.TxId}; - NOlap::TIndexInfo indexInfo = ConvertSchema(schema); + NOlap::TIndexInfo indexInfo = DeserializeIndexInfoFromProto(schema); indexInfo.SetAllKeys(); if (!PrimaryIndex) { PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId); @@ -288,31 +288,9 @@ std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(c return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords); } -NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) { - Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); - - ui32 indexId = 0; - NOlap::TIndexInfo indexInfo("", indexId, schema.GetCompositeMarks()); - - for (const auto& col : schema.GetColumns()) { - const ui32 id = col.GetId(); - const TString& name = col.GetName(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), - col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); - indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); - indexInfo.ColumnNames[name] = id; - } - - for (const auto& keyName : schema.GetKeyColumnNames()) { - Y_VERIFY(indexInfo.ColumnNames.contains(keyName)); - indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); - } - - if (schema.HasDefaultCompression()) { - NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression()); - indexInfo.SetDefaultCompression(compression); - } - - return indexInfo; +NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { + std::optional<NOlap::TIndexInfo> indexInfo = NOlap::TIndexInfo::BuildFromProto(schema); + Y_VERIFY(indexInfo); + return *indexInfo; } } diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 62a6917e252..987c258dbcd 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -51,8 +51,6 @@ public: } }; -using TTableSchema = NKikimrSchemeOp::TColumnTableSchema; - class TSchemaPreset : public TVersionedSchema<NKikimrTxColumnShard::TSchemaPresetVersionInfo> { public: using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo; @@ -206,7 +204,7 @@ public: void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db); bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db); - void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db); + void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db); void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); void OnTtlUpdate(); @@ -214,8 +212,8 @@ public: std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords); private: - void IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema); - static NOlap::TIndexInfo ConvertSchema(const TTableSchema& schema); + void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema); + static NOlap::TIndexInfo DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); }; } diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp index dfc2e54615a..6dccee16e86 100644 --- a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp @@ -112,8 +112,9 @@ namespace NKikimr::NSchemeShard { if (!Compression) { Compression = NArrow::TCompression::Default(); } - if (!Compression->ApplyDiff(diffColumn.GetCompression())) { - errors.AddError("Cannot merge compression info"); + auto applyDiffResult = Compression->ApplyDiff(diffColumn.GetCompression()); + if (!applyDiffResult) { + errors.AddError("Cannot merge compression info: " + applyDiffResult.GetErrorMessage()); return false; } } |