aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-22 12:57:55 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-22 12:57:55 +0300
commitc8adb6c07725bee79f0f8a00ec207167316d86b9 (patch)
treea106f5e94b0bc416aca5cdc03f5077479092be01
parent65d9f1f53c013e9b46c3999849db844de5aef4d1 (diff)
downloadydb-c8adb6c07725bee79f0f8a00ec207167316d86b9.tar.gz
compression usage for columnshard (enum + custom codecs)
-rw-r--r--ydb/core/formats/arrow/compression/object.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp84
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h47
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp36
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_olap_types.cpp5
6 files changed, 139 insertions, 47 deletions
diff --git a/ydb/core/formats/arrow/compression/object.cpp b/ydb/core/formats/arrow/compression/object.cpp
index 21be59b5944..590e45cac69 100644
--- a/ydb/core/formats/arrow/compression/object.cpp
+++ b/ydb/core/formats/arrow/compression/object.cpp
@@ -11,9 +11,9 @@ TConclusionStatus NKikimr::NArrow::TCompression::Validate() const {
const int levelMin = codec->minimum_compression_level();
const int levelMax = codec->maximum_compression_level();
if (Level && (*Level < levelMin || levelMax < *Level)) {
- TStringBuilder sb;
- sb << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]";
- return TConclusionStatus::Fail(sb);
+ return TConclusionStatus::Fail(
+ TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]"
+ );
}
}
return TConclusionStatus::Success();
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index eb4e8a33738..03ec854aac8 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -6,6 +6,8 @@
#include <ydb/core/formats/arrow/sort_cursor.h>
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/formats/arrow/serializer/batch_only.h>
+#include <ydb/core/formats/arrow/transformer/dictionary.h>
+#include <ydb/core/formats/arrow/serializer/full.h>
namespace NKikimr::NOlap {
@@ -332,7 +334,7 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
return MinMaxIdxColumnsIds.contains(it->second);
}
-TColumnSaver TIndexInfo::GetColumnSaver(const ui32 /*columnId*/, const TSaverContext& context) const {
+TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const {
arrow::ipc::IpcWriteOptions options;
if (context.GetExternalCompression()) {
options.codec = context.GetExternalCompression()->BuildArrowCodec();
@@ -340,13 +342,37 @@ TColumnSaver TIndexInfo::GetColumnSaver(const ui32 /*columnId*/, const TSaverCon
options.codec = DefaultCompression.BuildArrowCodec();
}
options.use_threads = false;
- return TColumnSaver(nullptr, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options));
+ auto it = ColumnFeatures.find(columnId);
+ NArrow::NTransformation::ITransformer::TPtr transformer;
+ if (it != ColumnFeatures.end()) {
+ transformer = it->second.GetSaveTransformer();
+ auto codec = it->second.GetCompressionCodec();
+ if (!!codec) {
+ options.codec = std::move(codec);
+ }
+ }
+ if (!transformer) {
+ return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options));
+ } else {
+ return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TFullDataSerializer>(options));
+ }
}
std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const {
- return std::make_shared<TColumnLoader>(nullptr,
- std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(GetColumnSchema(columnId)),
- GetColumnSchema(columnId), columnId);
+ auto it = ColumnFeatures.find(columnId);
+ NArrow::NTransformation::ITransformer::TPtr transformer;
+ if (it != ColumnFeatures.end()) {
+ transformer = it->second.GetLoadTransformer();
+ }
+ if (!transformer) {
+ return std::make_shared<TColumnLoader>(transformer,
+ std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(GetColumnSchema(columnId)),
+ GetColumnSchema(columnId), columnId);
+ } else {
+ return std::make_shared<TColumnLoader>(transformer,
+ std::make_shared<NArrow::NSerialization::TFullDataDeserializer>(),
+ GetColumnSchema(columnId), columnId);
+ }
}
std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const {
@@ -360,6 +386,38 @@ std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId)
return std::make_shared<arrow::Schema>(fields);
}
+bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
+ if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema");
+ return false;
+ }
+
+ for (const auto& col : schema.GetColumns()) {
+ const ui32 id = col.GetId();
+ const TString& name = col.GetName();
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
+ col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
+ Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
+ ColumnNames[name] = id;
+ std::optional<TColumnFeatures> cFeatures = TColumnFeatures::BuildFromProto(col);
+ if (!cFeatures) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature");
+ return false;
+ }
+ ColumnFeatures.emplace(id, *cFeatures);
+ }
+
+ for (const auto& keyName : schema.GetKeyColumnNames()) {
+ Y_VERIFY(ColumnNames.contains(keyName));
+ KeyColumns.push_back(ColumnNames[keyName]);
+ }
+
+ if (schema.HasDefaultCompression()) {
+ Y_VERIFY(DefaultCompression.DeserializeFromProto(schema.GetDefaultCompression()));
+ }
+ return true;
+}
+
std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(withSpecials ? ids.size() + 2 : ids.size());
@@ -395,4 +453,20 @@ std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& table
return out;
}
+NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetSaveTransformer() const {
+ NArrow::NTransformation::ITransformer::TPtr transformer;
+ if (LowCardinality.value_or(false)) {
+ transformer = std::make_shared<NArrow::NTransformation::TDictionaryPackTransformer>();
+ }
+ return transformer;
+}
+
+NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer() const {
+ NArrow::NTransformation::ITransformer::TPtr transformer;
+ if (LowCardinality.value_or(false)) {
+ transformer = std::make_shared<NArrow::NTransformation::TDictionaryUnpackTransformer>();
+ }
+ return transformer;
+}
+
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index a40496e4321..a565d80ba6d 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -8,6 +8,7 @@
#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/transformer/abstract.h>
+#include <ydb/core/scheme/scheme_types_proto.h>
namespace arrow {
class Array;
@@ -108,9 +109,42 @@ public:
}
};
+class TColumnFeatures {
+private:
+ std::optional<NArrow::TCompression> Compression;
+ std::optional<bool> LowCardinality;
+public:
+ static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo) {
+ TColumnFeatures result;
+ if (columnInfo.HasCompression()) {
+ NArrow::TCompression compression = NArrow::TCompression::Default();
+ Y_VERIFY(compression.DeserializeFromProto(columnInfo.GetCompression()));
+ result.Compression = compression;
+ }
+ if (columnInfo.HasLowCardinality()) {
+ result.LowCardinality = columnInfo.GetLowCardinality();
+ }
+ return result;
+ }
+
+ NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const;
+ NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const;
+
+ std::unique_ptr<arrow::util::Codec> GetCompressionCodec() const {
+ if (Compression) {
+ return Compression->BuildArrowCodec();
+ } else {
+ return nullptr;
+ }
+ }
+
+};
+
/// Column engine index description in terms of tablet's local table.
/// We have to use YDB types for keys here.
struct TIndexInfo : public NTable::TScheme::TTableSchema {
+private:
+ THashMap<ui32, TColumnFeatures> ColumnFeatures;
public:
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
@@ -143,8 +177,17 @@ public:
}
return true;
}
-public:
TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false);
+ bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
+public:
+
+ static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
+ TIndexInfo result("", 0, schema.GetCompositeMarks());
+ if (!result.DeserializeFromProto(schema)) {
+ return {};
+ }
+ return result;
+ }
/// Returns id of the index.
ui32 GetId() const noexcept {
@@ -227,8 +270,6 @@ public:
std::shared_ptr<NArrow::TSortDescription> SortDescription() const;
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const;
- void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
-
static const std::vector<std::string>& GetSpecialColumnNames() {
static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) };
return result;
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index b88462131b3..74fc0c73660 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -219,7 +219,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc
return true;
}
-void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) {
+void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) {
Y_VERIFY(SchemaPresets.contains(presetId));
auto preset = SchemaPresets.at(presetId);
@@ -266,9 +266,9 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi
table.AddVersion(version, versionInfo);
}
-void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) {
+void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema) {
NOlap::TSnapshot snapshot{version.Step, version.TxId};
- NOlap::TIndexInfo indexInfo = ConvertSchema(schema);
+ NOlap::TIndexInfo indexInfo = DeserializeIndexInfoFromProto(schema);
indexInfo.SetAllKeys();
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId);
@@ -288,31 +288,9 @@ std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(c
return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords);
}
-NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) {
- Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
-
- ui32 indexId = 0;
- NOlap::TIndexInfo indexInfo("", indexId, schema.GetCompositeMarks());
-
- for (const auto& col : schema.GetColumns()) {
- const ui32 id = col.GetId();
- const TString& name = col.GetName();
- auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
- col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
- indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
- indexInfo.ColumnNames[name] = id;
- }
-
- for (const auto& keyName : schema.GetKeyColumnNames()) {
- Y_VERIFY(indexInfo.ColumnNames.contains(keyName));
- indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
- }
-
- if (schema.HasDefaultCompression()) {
- NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression());
- indexInfo.SetDefaultCompression(compression);
- }
-
- return indexInfo;
+NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
+ std::optional<NOlap::TIndexInfo> indexInfo = NOlap::TIndexInfo::BuildFromProto(schema);
+ Y_VERIFY(indexInfo);
+ return *indexInfo;
}
}
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 62a6917e252..987c258dbcd 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -51,8 +51,6 @@ public:
}
};
-using TTableSchema = NKikimrSchemeOp::TColumnTableSchema;
-
class TSchemaPreset : public TVersionedSchema<NKikimrTxColumnShard::TSchemaPresetVersionInfo> {
public:
using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo;
@@ -206,7 +204,7 @@ public:
void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db);
bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);
- void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db);
+ void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
void OnTtlUpdate();
@@ -214,8 +212,8 @@ public:
std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords);
private:
- void IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema);
- static NOlap::TIndexInfo ConvertSchema(const TTableSchema& schema);
+ void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema);
+ static NOlap::TIndexInfo DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
};
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp
index dfc2e54615a..6dccee16e86 100644
--- a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp
@@ -112,8 +112,9 @@ namespace NKikimr::NSchemeShard {
if (!Compression) {
Compression = NArrow::TCompression::Default();
}
- if (!Compression->ApplyDiff(diffColumn.GetCompression())) {
- errors.AddError("Cannot merge compression info");
+ auto applyDiffResult = Compression->ApplyDiff(diffColumn.GetCompression());
+ if (!applyDiffResult) {
+ errors.AddError("Cannot merge compression info: " + applyDiffResult.GetErrorMessage());
return false;
}
}