diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-01-22 16:46:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-22 16:46:37 +0300 |
commit | a7dc8056ec7f824c6211a1aa250caf07db1b0f18 (patch) | |
tree | a50987d86067b12e970706412969392a2ac1645a | |
parent | 19d729829acb8e66b6ca5314bb79f1826fa39637 (diff) | |
download | ydb-a7dc8056ec7f824c6211a1aa250caf07db1b0f18.tar.gz |
Scheme correction for indexes usage (#1180)
* correct schemeshard usage for index operations
* fix build
* fix build
* correction
* fix build
* fix build
* fix ut build
* fix tests
48 files changed, 1760 insertions, 746 deletions
diff --git a/ydb/core/formats/arrow/hash/calcer.cpp b/ydb/core/formats/arrow/hash/calcer.cpp index 3a0d98b7df..9db679c4b7 100644 --- a/ydb/core/formats/arrow/hash/calcer.cpp +++ b/ydb/core/formats/arrow/hash/calcer.cpp @@ -9,7 +9,7 @@ namespace NKikimr::NArrow::NHash { -void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) const { +void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) { NArrow::SwitchType(array->type_id(), [&](const auto& type) { using TWrap = std::decay_t<decltype(type)>; using T = typename TWrap::T; diff --git a/ydb/core/formats/arrow/hash/calcer.h b/ydb/core/formats/arrow/hash/calcer.h index 9afd2020f9..066ba0cb13 100644 --- a/ydb/core/formats/arrow/hash/calcer.h +++ b/ydb/core/formats/arrow/hash/calcer.h @@ -22,7 +22,6 @@ private: ui64 Seed = 0; const std::vector<TString> ColumnNames; const ENoColumnPolicy NoColumnPolicy; - void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer) const; std::vector<std::shared_ptr<arrow::Array>> GetColumns(const std::shared_ptr<arrow::RecordBatch>& batch) const; @@ -30,10 +29,9 @@ public: TXX64(const std::vector<TString>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0); TXX64(const std::vector<std::string>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0); + static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer); std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const; std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const; - - }; } diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp new file mode 100644 index 0000000000..61914cb6e0 --- /dev/null +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp @@ -0,0 +1,49 @@ +#include "upsert_index.h" +#include <util/string/type.h> +#include <library/cpp/json/json_reader.h> + +namespace NKikimr::NKqp { + +TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) { + { + auto fValue = features.Extract("NAME"); + if (!fValue) { + return TConclusionStatus::Fail("can't find alter parameter NAME"); + } + IndexName = *fValue; + } + TString indexType; + { + auto fValue = features.Extract("TYPE"); + if (!fValue) { + return TConclusionStatus::Fail("can't find alter parameter TYPE"); + } + indexType = *fValue; + } + { + auto fValue = features.Extract("FEATURES"); + if (!fValue) { + return TConclusionStatus::Fail("can't find alter parameter FEATURES"); + } + if (!IndexMetaConstructor.Initialize(indexType)) { + return TConclusionStatus::Fail("can't initialize index meta object for type \"" + indexType + "\""); + } + NJson::TJsonValue jsonData; + if (!NJson::ReadJsonFastTree(*fValue, &jsonData)) { + return TConclusionStatus::Fail("incorrect json in request FEATURES parameter"); + } + auto result = IndexMetaConstructor->DeserializeFromJson(jsonData); + if (result.IsFail()) { + return result; + } + } + return TConclusionStatus::Success(); +} + +void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const { + auto* indexProto = schemaData.AddUpsertIndexes(); + indexProto->SetName(IndexName); + IndexMetaConstructor.SerializeToProto(*indexProto); +} + +} diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h new file mode 100644 index 0000000000..753eade06d --- /dev/null +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h @@ -0,0 +1,23 @@ +#include "abstract.h" +#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h> + +namespace NKikimr::NKqp { + +class TUpsertIndexOperation : public ITableStoreOperation { +private: + static TString GetTypeName() { + return "UPSERT_INDEX"; + } + + static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName()); +private: + TString IndexName; + NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor; +public: + TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override; + + void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const override; +}; + +} + diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make index 5325ed5c49..6e186d5f4f 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make @@ -5,6 +5,7 @@ SRCS( GLOBAL add_column.cpp GLOBAL alter_column.cpp GLOBAL drop_column.cpp + GLOBAL upsert_index.cpp ) PEERDIR( diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 792c4ac4c2..45cb5d406a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -421,6 +421,40 @@ message TOlapColumnDescription { optional TDictionaryEncodingSettings DictionaryEncoding = 9; } +message TRequestedBloomFilter { + optional double FalsePositiveProbability = 1 [default = 0.1]; + repeated string ColumnNames = 3; +} + +message TOlapIndexRequested { + optional string Name = 1; + optional TCompressionOptions Compression = 3; + + optional string ClassName = 2; + oneof Implementation { + TRequestedBloomFilter BloomFilter = 40; + } +} + +message TBloomFilter { + optional double FalsePositiveProbability = 1 [default = 0.1]; + optional uint64 MaxBytesCount = 2 [default = 8196]; + repeated uint32 ColumnIds = 3; +} + +message TOlapIndexDescription { + // This id is auto-generated by schemeshard + optional uint32 Id = 1; + + optional string Name = 2; + optional TCompressionOptions Compression = 3; + + optional string ClassName = 4; + oneof Implementation { + TBloomFilter BloomFilter = 40; + } +} + enum EColumnTableEngine { COLUMN_ENGINE_NONE = 0; COLUMN_ENGINE_REPLACING_TIMESERIES = 1; @@ -456,6 +490,7 @@ message TColumnTableSchema { optional TCompressionOptions DefaultCompression = 8; optional bool CompositeMarks = 9 [ default = false ]; + repeated TOlapIndexDescription Indexes = 10; } message TAlterColumnTableSchema { @@ -463,6 +498,8 @@ message TAlterColumnTableSchema { //optional TCompressionOptions DefaultCompression = 5; repeated TOlapColumnDescription DropColumns = 6; repeated TOlapColumnDiff AlterColumns = 7; + repeated TOlapIndexRequested UpsertIndexes = 8; + repeated string DropIndexes = 9; } // Schema presets are used to manage multiple tables with the same schema diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp new file mode 100644 index 0000000000..e57c822285 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp @@ -0,0 +1,10 @@ +#include "abstract.h" +#include <ydb/core/tx/columnshard/engines/portions/column_record.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/formats/arrow/hash/xx_hash.h> +#include <ydb/core/formats/arrow/hash/calcer.h> + +namespace NKikimr::NOlap::NIndexes { + +} // namespace NKikimr::NOlap::NIndexes
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h new file mode 100644 index 0000000000..cb2cb5665e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h @@ -0,0 +1,147 @@ +#pragma once + +#include <ydb/core/tx/columnshard/splitter/chunks.h> +#include <ydb/core/tx/program/program.h> + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <library/cpp/object_factory/object_factory.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <ydb/services/bg_tasks/abstract/interface.h> +#include <util/generic/string.h> + +#include <memory> +#include <vector> + +namespace NKikimr::NOlap { +struct TIndexInfo; +} + +namespace NKikimr::NSchemeShard { +class TOlapSchema; +class IErrorCollector; +} + +namespace NKikimr::NOlap::NIndexes { + +class IIndexChecker { +protected: + virtual bool DoCheck(std::vector<TString>&& blobs) const = 0; +public: + virtual ~IIndexChecker() = default; + bool Check(std::vector<TString>&& blobs) const { + return DoCheck(std::move(blobs)); + } +}; + +class TIndexCheckerContainer { +private: + YDB_READONLY(ui32, IndexId, 0); + YDB_READONLY_DEF(std::shared_ptr<IIndexChecker>, Object); +public: + TIndexCheckerContainer(const ui32 indexId, const std::shared_ptr<IIndexChecker>& object) + : IndexId(indexId) + , Object(object) { + AFL_VERIFY(IndexId); + AFL_VERIFY(Object); + } + + const IIndexChecker* operator->() const { + return Object.get(); + } +}; + +class IIndexMeta { +protected: + virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0; + virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& program) const = 0; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0; + +public: + using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>; + using TProto = NKikimrSchemeOp::TOlapIndexDescription; + + virtual ~IIndexMeta() = default; + + std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const { + return DoBuildIndex(indexId, data, indexInfo); + } + + std::shared_ptr<IIndexChecker> BuildIndexChecker(const TProgramContainer& program) const { + return DoBuildIndexChecker(program); + } + + bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) { + return DoDeserializeFromProto(proto); + } + + void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const { + return DoSerializeToProto(proto); + } + + virtual TString GetClassName() const = 0; +}; + +class IIndexMetaConstructor { +protected: + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; + virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0; + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0; +public: + using TFactory = NObjectFactory::TObjectFactory<IIndexMetaConstructor, TString>; + using TProto = NKikimrSchemeOp::TOlapIndexRequested; + + virtual ~IIndexMetaConstructor() = default; + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + return DoDeserializeFromJson(jsonInfo); + } + + std::shared_ptr<IIndexMeta> CreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { + return DoCreateIndexMeta(currentSchema, errors); + } + + TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { + return DoDeserializeFromProto(proto); + } + + void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { + return DoSerializeToProto(proto); + } + + virtual TString GetClassName() const = 0; +}; + +class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta> { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta>; + YDB_READONLY(ui32, IndexId, 0); +public: + TIndexMetaContainer() = default; + TIndexMetaContainer(const ui32 indexId, const std::shared_ptr<IIndexMeta>& object) + : TBase(object) + , IndexId(indexId) + { + AFL_VERIFY(IndexId); + AFL_VERIFY(Object); + } + + bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) { + if (!TBase::DeserializeFromProto(proto)) { + return false; + } + IndexId = proto.GetId(); + return true; + } + + std::optional<TIndexCheckerContainer> BuildIndexChecker(const TProgramContainer& program) const { + auto checker = GetObjectPtr()->BuildIndexChecker(program); + if (!checker) { + return {}; + } + return TIndexCheckerContainer(IndexId, checker); + } +}; + +} // namespace NKikimr::NOlap::NIndexes
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp new file mode 100644 index 0000000000..62b8d5ddc2 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp @@ -0,0 +1,96 @@ +#include "bloom.h" +#include <ydb/core/formats/arrow/hash/xx_hash.h> +#include <ydb/core/formats/arrow/hash/calcer.h> +#include <ydb/core/tx/schemeshard/olap/schema/schema.h> +#include <ydb/core/tx/schemeshard/olap/common/common.h> + +namespace NKikimr::NOlap::NIndexes { + +std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const { + std::vector<bool> flags; + flags.resize(BitsCount, false); + for (ui32 i = 0; i < HashesCount; ++i) { + NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i); + for (; reader.IsCorrect(); reader.ReadNext()) { + hashCalcer.Start(); + for (auto&& i : reader) { + NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer); + } + flags[hashCalcer.Finish() % BitsCount] = true; + } + } + + arrow::BooleanBuilder builder; + auto res = builder.Reserve(flags.size()); + NArrow::TStatusValidator::Validate(builder.AppendValues(flags)); + std::shared_ptr<arrow::BooleanArray> out; + NArrow::TStatusValidator::Validate(builder.Finish(&out)); + + return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out}); +} + +std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { + std::set<ui32> columnIds; + for (auto&& i : ColumnNames) { + auto* columnInfo = currentSchema.GetColumns().GetByName(i); + if (!columnInfo) { + errors.AddError("no column with name " + i); + return nullptr; + } + AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second); + } + return std::make_shared<TBloomIndexMeta>(columnIds, FalsePositiveProbability); +} + +NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + if (!jsonInfo.Has("column_names")) { + return TConclusionStatus::Fail("column_names have to be in bloom filter features"); + } + const NJson::TJsonValue::TArray* columnNamesArray; + if (!jsonInfo["column_names"].GetArrayPointer(&columnNamesArray)) { + return TConclusionStatus::Fail("column_names have to be in bloom filter features as array ['column_name_1', ... , 'column_name_N']"); + } + for (auto&& i : *columnNamesArray) { + if (!i.IsString()) { + return TConclusionStatus::Fail("column_names have to be in bloom filter features as array of strings ['column_name_1', ... , 'column_name_N']"); + } + ColumnNames.emplace(i.GetString()); + } + if (!jsonInfo["false_positive_probability"].IsDouble()) { + return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field"); + } + FalsePositiveProbability = jsonInfo["false_positive_probability"].GetDouble(); + if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) { + return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field in interval [0.01, 1)"); + } + return TConclusionStatus::Success(); +} + +NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { + if (!proto.HasBloomFilter()) { + const TString errorMessage = "not found BloobFilter section in proto: \"" + proto.DebugString() + "\""; + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage); + return TConclusionStatus::Fail(errorMessage); + } + auto& bFilter = proto.GetBloomFilter(); + FalsePositiveProbability = bFilter.GetFalsePositiveProbability(); + if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) { + const TString errorMessage = "FalsePositiveProbability have to be in interval[0.01, 1)"; + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage); + return TConclusionStatus::Fail(errorMessage); + } + for (auto&& i : bFilter.GetColumnNames()) { + ColumnNames.emplace(i); + } + return TConclusionStatus::Success(); +} + +void TBloomIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { + auto* filterProto = proto.MutableBloomFilter(); + filterProto->SetFalsePositiveProbability(FalsePositiveProbability); + for (auto&& i : ColumnNames) { + filterProto->AddColumnNames(i); + } +} + +} // namespace NKikimr::NOlap::NIndexes
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h new file mode 100644 index 0000000000..15ed36ff3e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h @@ -0,0 +1,99 @@ +#pragma once +#include "abstract.h" + +#include <ydb/core/tx/columnshard/splitter/chunks.h> +#include <ydb/core/tx/program/program.h> + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <library/cpp/object_factory/object_factory.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <ydb/services/bg_tasks/abstract/interface.h> +#include <util/generic/string.h> + +#include <memory> +#include <vector> + +namespace NKikimr::NOlap::NIndexes { + +class TBloomIndexConstructor: public IIndexMetaConstructor { +public: + static TString GetClassNameStatic() { + return "BLOOM_FILTER"; + } +private: + std::set<TString> ColumnNames; + double FalsePositiveProbability = 0.1; + static inline auto Registrator = TFactory::TRegistrator<TBloomIndexConstructor>(GetClassNameStatic()); +protected: + virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; + + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; + + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override; + +public: + TBloomIndexConstructor() = default; + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +class TBloomIndexMeta: public TIndexByColumns { +public: + static TString GetClassNameStatic() { + return "BLOOM_FILTER"; + } +private: + using TBase = TIndexByColumns; + std::shared_ptr<arrow::Schema> ResultSchema; + const ui64 RowsCountExpectation = 10000; + double FalsePositiveProbability = 0.1; + ui32 HashesCount = 0; + ui32 BitsCount = 0; + static inline auto Registrator = TFactory::TRegistrator<TBloomIndexMeta>(GetClassNameStatic()); + void Initialize() { + AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability > 0.01); + HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2); + BitsCount = RowsCountExpectation * HashesCount / (std::log(2)); + } +protected: + virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& /*program*/) const override { + return nullptr; + } + + virtual std::shared_ptr<arrow::RecordBatch> DoBuildIndexImpl(TChunkedBatchReader& reader) const override; + + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override { + AFL_VERIFY(proto.HasBloomFilter()); + auto& bFilter = proto.GetBloomFilter(); + FalsePositiveProbability = bFilter.GetFalsePositiveProbability(); + for (auto&& i : bFilter.GetColumnIds()) { + ColumnIds.emplace(i); + } + Initialize(); + return true; + } + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { + auto* filterProto = proto.MutableBloomFilter(); + filterProto->SetFalsePositiveProbability(FalsePositiveProbability); + for (auto&& i : ColumnIds) { + filterProto->AddColumnIds(i); + } + } + +public: + TBloomIndexMeta() = default; + TBloomIndexMeta(const std::set<ui32>& columnIds, const double fpProbability) + : TBase(columnIds) + , FalsePositiveProbability(fpProbability) { + Initialize(); + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make new file mode 100644 index 0000000000..138eb4d5e1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + abstract.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow +) + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index bace1130d6..2067cdd542 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/core/formats/arrow ydb/library/actors/core + ydb/core/tx/columnshard/engines/scheme/indexes ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp index a8face4b53..4f87ef33e1 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp @@ -40,8 +40,7 @@ Y_UNIT_TEST_SUITE(Splitter) { } virtual std::shared_ptr<arrow::Field> GetField(const ui32 columnId) const override { - Y_ABORT_UNLESS(false); - return nullptr; + return std::make_shared<arrow::Field>(GetColumnName(columnId), std::make_shared<arrow::StringType>()); } virtual ui32 GetColumnId(const std::string& columnName) const override { diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make index b3d71f8085..1b18ea3cfa 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ya.make +++ b/ydb/core/tx/columnshard/splitter/ut/ya.make @@ -8,6 +8,7 @@ PEERDIR( ydb/core/tx/columnshard/counters ydb/core/formats/arrow/compression + ydb/core/tx/columnshard/engines/portions ydb/core/kqp/common ydb/library/yql/parser/pg_wrapper ydb/library/yql/public/udf diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.cpp b/ydb/core/tx/schemeshard/olap/columns/schema.cpp new file mode 100644 index 0000000000..511fc4736a --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/columns/schema.cpp @@ -0,0 +1,216 @@ +#include "schema.h" +#include <ydb/library/accessor/validator.h> +#include <ydb/library/yql/minikql/mkql_type_ops.h> +#include <ydb/core/scheme_types/scheme_type_registry.h> + +namespace NKikimr::NSchemeShard { + +void TOlapColumnSchema::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const { + TBase::Serialize(columnSchema); + columnSchema.SetId(Id); +} + +void TOlapColumnSchema::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { + TBase::ParseFromLocalDB(columnSchema); + Id = columnSchema.GetId(); +} + +bool TOlapColumnsDescription::ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId) { + if (Columns.empty() && schemaUpdate.GetAddColumns().empty()) { + errors.AddError(NKikimrScheme::StatusSchemeError, "No add columns specified"); + return false; + } + + const bool hasColumnsBefore = KeyColumnIds.size(); + std::map<ui32, ui32> orderedKeyColumnIds; + for (auto&& column : schemaUpdate.GetAddColumns()) { + if (ColumnsByName.contains(column.GetName())) { + errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' already exists"); + return false; + } + if (hasColumnsBefore) { + if (column.IsNotNull()) { + errors.AddError(NKikimrScheme::StatusSchemeError, "Cannot add new not null column currently (not supported yet)"); + return false; + } + if (column.GetKeyOrder()) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << column.GetName() << "' is pk column. its impossible to modify pk"); + return false; + } + } + TOlapColumnSchema newColumn(column, nextEntityId++); + if (newColumn.GetKeyOrder()) { + Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second); + } + + Y_ABORT_UNLESS(ColumnsByName.emplace(newColumn.GetName(), newColumn.GetId()).second); + Y_ABORT_UNLESS(Columns.emplace(newColumn.GetId(), std::move(newColumn)).second); + } + + for (auto&& columnDiff : schemaUpdate.GetAlterColumns()) { + auto it = ColumnsByName.find(columnDiff.GetName()); + if (it == ColumnsByName.end()) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' not exists for altering"); + return false; + } else { + auto itColumn = Columns.find(it->second); + Y_ABORT_UNLESS(itColumn != Columns.end()); + TOlapColumnSchema& newColumn = itColumn->second; + if (!newColumn.ApplyDiff(columnDiff, errors)) { + return false; + } + } + } + + if (KeyColumnIds.empty()) { + auto it = orderedKeyColumnIds.begin(); + for (ui32 i = 0; i < orderedKeyColumnIds.size(); ++i, ++it) { + KeyColumnIds.emplace_back(it->second); + Y_ABORT_UNLESS(i == it->first); + } + if (KeyColumnIds.empty()) { + errors.AddError(NKikimrScheme::StatusSchemeError, "No primary key specified"); + return false; + } + } + + for (const auto& columnName : schemaUpdate.GetDropColumns()) { + auto columnInfo = GetByName(columnName); + if (!columnInfo) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Unknown column for drop: " << columnName); + return false; + } + + if (columnInfo->IsKeyColumn()) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Cannot remove pk column: " << columnName); + return false; + } + ColumnsByName.erase(columnName); + Columns.erase(columnInfo->GetId()); + } + + return true; +} + +void TOlapColumnsDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { + TMap<TString, ui32> keyIndexes; + ui32 idx = 0; + for (auto&& kName : tableSchema.GetKeyColumnNames()) { + keyIndexes[kName] = idx++; + } + + TVector<ui32> keyIds; + keyIds.resize(tableSchema.GetKeyColumnNames().size(), 0); + for (const auto& columnSchema : tableSchema.GetColumns()) { + std::optional<ui32> keyOrder; + if (keyIndexes.contains(columnSchema.GetName())) { + keyOrder = keyIndexes.at(columnSchema.GetName()); + } + + TOlapColumnSchema column(keyOrder); + column.ParseFromLocalDB(columnSchema); + if (keyOrder) { + Y_ABORT_UNLESS(*keyOrder < keyIds.size()); + keyIds[*keyOrder] = column.GetId(); + } + + Y_ABORT_UNLESS(ColumnsByName.emplace(column.GetName(), column.GetId()).second); + Y_ABORT_UNLESS(Columns.emplace(column.GetId(), std::move(column)).second); + } + KeyColumnIds.swap(keyIds); +} + +void TOlapColumnsDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { + for (const auto& column : Columns) { + column.second.Serialize(*tableSchema.AddColumns()); + } + + for (auto&& cId : KeyColumnIds) { + auto column = GetById(cId); + Y_ABORT_UNLESS(!!column); + *tableSchema.AddKeyColumnNames() = column->GetName(); + } +} + +bool TOlapColumnsDescription::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; + + ui32 lastColumnId = 0; + THashSet<ui32> usedColumns; + for (const auto& colProto : opSchema.GetColumns()) { + if (colProto.GetName().empty()) { + errors.AddError("Columns cannot have an empty name"); + return false; + } + const TString& colName = colProto.GetName(); + auto* col = GetByName(colName); + if (!col) { + errors.AddError("Column '" + colName + "' does not match schema preset"); + return false; + } + if (colProto.HasId() && colProto.GetId() != col->GetId()) { + errors.AddError("Column '" + colName + "' has id " + colProto.GetId() + " that does not match schema preset"); + return false; + } + + if (!usedColumns.insert(col->GetId()).second) { + errors.AddError("Column '" + colName + "' is specified multiple times"); + return false; + } + if (col->GetId() < lastColumnId) { + errors.AddError("Column order does not match schema preset"); + return false; + } + lastColumnId = col->GetId(); + + if (colProto.HasTypeId()) { + errors.AddError("Cannot set TypeId for column '" + colName + "', use Type"); + return false; + } + if (!colProto.HasType()) { + errors.AddError("Missing Type for column '" + colName + "'"); + return false; + } + + auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); + const NScheme::IType* type = typeRegistry->GetType(typeName); + if (!type || !TOlapColumnAdd::IsAllowedType(type->GetTypeId())) { + errors.AddError("Type '" + colProto.GetType() + "' specified for column '" + colName + "' is not supported"); + return false; + } + NScheme::TTypeInfo typeInfo(type->GetTypeId()); + + if (typeInfo != col->GetType()) { + errors.AddError("Type '" + TypeName(typeInfo) + "' specified for column '" + colName + "' does not match schema preset type '" + TypeName(col->GetType()) + "'"); + return false; + } + } + + for (auto& pr : Columns) { + if (!usedColumns.contains(pr.second.GetId())) { + errors.AddError("Specified schema is missing some schema preset columns"); + return false; + } + } + + TVector<ui32> keyColumnIds; + for (const TString& keyName : opSchema.GetKeyColumnNames()) { + auto* col = GetByName(keyName); + if (!col) { + errors.AddError("Unknown key column '" + keyName + "'"); + return false; + } + keyColumnIds.push_back(col->GetId()); + } + if (keyColumnIds != KeyColumnIds) { + errors.AddError("Specified schema key columns not matching schema preset"); + return false; + } + return true; +} + +const NKikimr::NSchemeShard::TOlapColumnSchema* TOlapColumnsDescription::GetByIdVerified(const ui32 id) const noexcept { + return TValidator::CheckNotNull(GetById(id)); +} + +} diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.h b/ydb/core/tx/schemeshard/olap/columns/schema.h new file mode 100644 index 0000000000..0abec27769 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/columns/schema.h @@ -0,0 +1,60 @@ +#pragma once +#include "update.h" + +namespace NKikimr::NSchemeShard { + +class TOlapColumnSchema: public TOlapColumnAdd { +private: + using TBase = TOlapColumnAdd; + YDB_READONLY(ui32, Id, Max<ui32>()); +public: + TOlapColumnSchema(const TOlapColumnAdd& base, const ui32 id) + : TBase(base) + , Id(id) { + + } + TOlapColumnSchema(const std::optional<ui32>& keyOrder) + : TBase(keyOrder) { + + } + void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const; + void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema); +}; + +class TOlapColumnsDescription { +public: + using TColumn = TOlapColumnSchema; + using TColumns = THashMap<ui32, TOlapColumnSchema>; + using TColumnsByName = THashMap<TString, ui32>; + +private: + YDB_READONLY_DEF(TColumns, Columns); + YDB_READONLY_DEF(TColumnsByName, ColumnsByName); + YDB_READONLY_DEF(TVector<ui32>, KeyColumnIds); + +public: + const TOlapColumnSchema* GetByName(const TString& name) const noexcept { + auto it = ColumnsByName.find(name); + if (it != ColumnsByName.end()) { + return GetByIdVerified(it->second); + } + return nullptr; + } + + const TOlapColumnSchema* GetById(const ui32 id) const noexcept { + auto it = Columns.find(id); + if (it != Columns.end()) { + return &it->second; + } + return nullptr; + } + + const TOlapColumnSchema* GetByIdVerified(const ui32 id) const noexcept; + + bool ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId); + + void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); + void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; + bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; +}; +} diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp new file mode 100644 index 0000000000..a6d602b5fa --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp @@ -0,0 +1,260 @@ +#include "update.h" +#include <ydb/library/yql/minikql/mkql_type_ops.h> +#include <ydb/core/scheme/scheme_types_proto.h> +#include <ydb/core/scheme_types/scheme_type_registry.h> + +namespace NKikimr::NSchemeShard { + + bool TOlapColumnAdd::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) { + if (!columnSchema.GetName()) { + errors.AddError("Columns cannot have an empty name"); + return false; + } + Name = columnSchema.GetName(); + NotNullFlag = columnSchema.GetNotNull(); + TypeName = columnSchema.GetType(); + if (columnSchema.HasCompression()) { + auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); + if (!compression) { + errors.AddError("Cannot parse compression info: " + compression.GetErrorMessage()); + return false; + } + Compression = *compression; + } + if (columnSchema.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); + if (!settings) { + errors.AddError("Cannot parse dictionary compression info: " + settings.GetErrorMessage()); + return false; + } + DictionaryEncoding = *settings; + } + + if (columnSchema.HasTypeId()) { + errors.AddError(TStringBuilder() << "Cannot set TypeId for column '" << Name << ", use Type"); + return false; + } + + if (!columnSchema.HasType()) { + errors.AddError(TStringBuilder() << "Missing Type for column '" << Name); + return false; + } + + auto typeName = NMiniKQL::AdaptLegacyYqlType(TypeName); + Y_ABORT_UNLESS(AppData()->TypeRegistry); + const NScheme::IType* type = AppData()->TypeRegistry->GetType(typeName); + if (!type) { + errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); + return false; + } + if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { + errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); + return false;; + } + Type = NScheme::TTypeInfo(type->GetTypeId()); + if (!IsAllowedType(type->GetTypeId())){ + errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); + return false; + } + return true; + } + + void TOlapColumnAdd::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { + Name = columnSchema.GetName(); + TypeName = columnSchema.GetType(); + + if (columnSchema.HasTypeInfo()) { + Type = NScheme::TypeInfoModFromProtoColumnType( + columnSchema.GetTypeId(), &columnSchema.GetTypeInfo()) + .TypeInfo; + } else { + Type = NScheme::TypeInfoModFromProtoColumnType( + columnSchema.GetTypeId(), nullptr) + .TypeInfo; + } + if (columnSchema.HasCompression()) { + auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); + Y_ABORT_UNLESS(compression.IsSuccess(), "%s", compression.GetErrorMessage().data()); + Compression = *compression; + } + if (columnSchema.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); + Y_ABORT_UNLESS(settings.IsSuccess()); + DictionaryEncoding = *settings; + } + NotNullFlag = columnSchema.GetNotNull(); + } + + void TOlapColumnAdd::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const { + columnSchema.SetName(Name); + columnSchema.SetType(TypeName); + columnSchema.SetNotNull(NotNullFlag); + if (Compression) { + *columnSchema.MutableCompression() = Compression->SerializeToProto(); + } + if (DictionaryEncoding) { + *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto(); + } + + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, ""); + columnSchema.SetTypeId(columnType.TypeId); + if (columnType.TypeInfo) { + *columnSchema.MutableTypeInfo() = *columnType.TypeInfo; + } + } + + bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) { + Y_ABORT_UNLESS(GetName() == diffColumn.GetName()); + { + auto result = diffColumn.GetCompression().Apply(Compression); + if (!result) { + errors.AddError("Cannot merge compression info: " + result.GetErrorMessage()); + return false; + } + } + { + auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding); + if (!result) { + errors.AddError("Cannot merge dictionary encoding info: " + result.GetErrorMessage()); + return false; + } + } + return true; + } + + bool TOlapColumnAdd::IsAllowedType(ui32 typeId) { + if (!NScheme::NTypeIds::IsYqlType(typeId)) { + return false; + } + + switch (typeId) { + case NYql::NProto::Bool: + case NYql::NProto::Interval: + case NYql::NProto::Decimal: + case NYql::NProto::DyNumber: + return false; + default: + break; + } + return true; + } + + bool TOlapColumnAdd::IsAllowedFirstPkType(ui32 typeId) { + switch (typeId) { + case NYql::NProto::Uint8: // Byte + case NYql::NProto::Int32: + case NYql::NProto::Uint32: + case NYql::NProto::Int64: + case NYql::NProto::Uint64: + case NYql::NProto::String: + case NYql::NProto::Utf8: + case NYql::NProto::Date: + case NYql::NProto::Datetime: + case NYql::NProto::Timestamp: + return true; + case NYql::NProto::Interval: + case NYql::NProto::Decimal: + case NYql::NProto::DyNumber: + case NYql::NProto::Yson: + case NYql::NProto::Json: + case NYql::NProto::JsonDocument: + case NYql::NProto::Float: + case NYql::NProto::Double: + case NYql::NProto::Bool: + return false; + default: + break; + } + return false; + } + + bool TOlapColumnsUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { + for (const auto& column : alterRequest.GetDropColumns()) { + if (!DropColumns.emplace(column.GetName()).second) { + errors.AddError(NKikimrScheme::StatusInvalidParameter, "Duplicated column for drop"); + return false; + } + } + TSet<TString> addColumnNames; + for (auto& columnSchema : alterRequest.GetAddColumns()) { + TOlapColumnAdd column({}); + if (!column.ParseFromRequest(columnSchema, errors)) { + return false; + } + if (addColumnNames.contains(column.GetName())) { + errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' duplication for add"); + return false; + } + addColumnNames.emplace(column.GetName()); + AddColumns.emplace_back(std::move(column)); + } + + TSet<TString> alterColumnNames; + for (auto& columnSchemaDiff : alterRequest.GetAlterColumns()) { + TOlapColumnDiff columnDiff; + if (!columnDiff.ParseFromRequest(columnSchemaDiff, errors)) { + return false; + } + if (addColumnNames.contains(columnDiff.GetName())) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' have to be either add or update"); + return false; + } + if (alterColumnNames.contains(columnDiff.GetName())) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' duplication for update"); + return false; + } + alterColumnNames.emplace(columnDiff.GetName()); + AlterColumns.emplace_back(std::move(columnDiff)); + } + return true; + } + + bool TOlapColumnsUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) { + TMap<TString, ui32> keyColumnNames; + for (auto&& pkKey : tableSchema.GetKeyColumnNames()) { + if (!keyColumnNames.emplace(pkKey, keyColumnNames.size()).second) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Duplicate key column '" << pkKey << "'"); + return false; + } + } + + TSet<TString> columnNames; + for (auto& columnSchema : tableSchema.GetColumns()) { + std::optional<ui32> keyOrder; + { + auto it = keyColumnNames.find(columnSchema.GetName()); + if (it != keyColumnNames.end()) { + keyOrder = it->second; + } + } + + TOlapColumnAdd column(keyOrder); + if (!column.ParseFromRequest(columnSchema, errors)) { + return false; + } + if (column.GetKeyOrder() && *column.GetKeyOrder() == 0) { + if (!TOlapColumnAdd::IsAllowedFirstPkType(column.GetType().GetTypeId())) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() + << "Type '" << column.GetType().GetTypeId() << "' specified for column '" << column.GetName() + << "' is not supported in first PK position"); + return false; + } + } + if (columnNames.contains(column.GetName())) { + errors.AddError(NKikimrScheme::StatusMultipleModifications, TStringBuilder() << "Duplicate column '" << column.GetName() << "'"); + return false; + } + if (!allowNullKeys) { + if (keyColumnNames.contains(column.GetName()) && !column.IsNotNull()) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Nullable key column '" << column.GetName() << "'"); + return false; + } + } + columnNames.emplace(column.GetName()); + AddColumns.emplace_back(std::move(column)); + } + + return true; + } + +} diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h new file mode 100644 index 0000000000..45fce0c990 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/columns/update.h @@ -0,0 +1,72 @@ +#pragma once +#include <ydb/core/formats/arrow/compression/diff.h> +#include <ydb/core/formats/arrow/dictionary/diff.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/schemeshard/olap/common/common.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/scheme_types/scheme_type_info.h> +#include <ydb/core/formats/arrow/compression/object.h> +#include <ydb/core/formats/arrow/dictionary/object.h> + +namespace NKikimr::NSchemeShard { + +class TOlapColumnDiff { +private: + YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(NArrow::TCompressionDiff, Compression); + YDB_READONLY_DEF(NArrow::NDictionary::TEncodingDiff, DictionaryEncoding); +public: + bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) { + Name = columnSchema.GetName(); + if (!Name) { + errors.AddError("empty field name"); + return false; + } + if (!Compression.DeserializeFromProto(columnSchema.GetCompression())) { + errors.AddError("cannot parse compression diff from proto"); + return false; + } + if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) { + errors.AddError("cannot parse dictionary encoding diff from proto"); + return false; + } + return true; + } +}; + +class TOlapColumnAdd { +private: + YDB_READONLY_DEF(std::optional<ui32>, KeyOrder); + YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(TString, TypeName); + YDB_READONLY_DEF(NScheme::TTypeInfo, Type); + YDB_FLAG_ACCESSOR(NotNull, false); + YDB_READONLY_DEF(std::optional<NArrow::TCompression>, Compression); + YDB_READONLY_DEF(std::optional<NArrow::NDictionary::TEncodingSettings>, DictionaryEncoding); +public: + TOlapColumnAdd(const std::optional<ui32>& keyOrder) + : KeyOrder(keyOrder) { + + } + bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors); + void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema); + void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const; + bool ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors); + bool IsKeyColumn() const { + return !!KeyOrder; + } + static bool IsAllowedType(ui32 typeId); + static bool IsAllowedFirstPkType(ui32 typeId); +}; + +class TOlapColumnsUpdate { +private: + YDB_READONLY_DEF(TVector<TOlapColumnAdd>, AddColumns); + YDB_READONLY_DEF(TSet<TString>, DropColumns); + YDB_READONLY_DEF(TVector<TOlapColumnDiff>, AlterColumns); +public: + bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false); + bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); +}; + +} diff --git a/ydb/core/tx/schemeshard/olap/columns/ya.make b/ydb/core/tx/schemeshard/olap/columns/ya.make new file mode 100644 index 0000000000..44971d7eeb --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/columns/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + schema.cpp + update.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow/dictionary + ydb/core/formats/arrow/compression +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/schemeshard/olap/common/common.cpp b/ydb/core/tx/schemeshard/olap/common/common.cpp new file mode 100644 index 0000000000..46bf1133f6 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/common/common.cpp @@ -0,0 +1,4 @@ +#include "common.h" + +namespace NKikimr::NSchemeShard { +} diff --git a/ydb/core/tx/schemeshard/olap/common/common.h b/ydb/core/tx/schemeshard/olap/common/common.h new file mode 100644 index 0000000000..4b82a29a33 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/common/common.h @@ -0,0 +1,27 @@ +#pragma once +#include <ydb/core/tx/schemeshard/schemeshard.h> + +namespace NKikimr::NSchemeShard { + + class IErrorCollector { + public: + virtual void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) = 0; + virtual void AddError(const TString& errorMsg) = 0; + }; + + class TProposeErrorCollector : public IErrorCollector { + TEvSchemeShard::TEvModifySchemeTransactionResult& TxResult; + public: + TProposeErrorCollector(TEvSchemeShard::TEvModifySchemeTransactionResult& txResult) + : TxResult(txResult) + {} + + void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) override { + TxResult.SetError(errorStatus, errorMsg); + } + + void AddError(const TString& errorMsg) override { + TxResult.SetError(NKikimrScheme::StatusSchemeError, errorMsg); + } + }; +} diff --git a/ydb/core/tx/schemeshard/olap/common/ya.make b/ydb/core/tx/schemeshard/olap/common/ya.make new file mode 100644 index 0000000000..58bbf1cb44 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/common/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + common.cpp +) + +PEERDIR( + ydb/library/ydb_issue + ydb/core/base +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp new file mode 100644 index 0000000000..e715b13496 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp @@ -0,0 +1,113 @@ +#include "schema.h" +#include <ydb/library/accessor/validator.h> + +namespace NKikimr::NSchemeShard { + +void TOlapIndexSchema::SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& indexSchema) const { + indexSchema.SetId(Id); + indexSchema.SetName(Name); + IndexMeta.SerializeToProto(indexSchema); +} + +void TOlapIndexSchema::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& indexSchema) { + Id = indexSchema.GetId(); + Name = indexSchema.GetName(); + AFL_VERIFY(IndexMeta.DeserializeFromProto(indexSchema))("incorrect_proto", indexSchema.DebugString()); +} + +bool TOlapIndexesDescription::ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexesUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId) { + for (auto&& index : schemaUpdate.GetUpsertIndexes()) { + auto* currentIndex = MutableByName(index.GetName()); + if (currentIndex) { + if (!currentIndex->ApplyUpdate(currentSchema, index, errors)) { + return false; + } + } else { + auto meta = index.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors); + if (!meta) { + return false; + } + const ui32 id = nextEntityId++; + TOlapIndexSchema newIndex(id, index.GetName(), meta); + Y_ABORT_UNLESS(IndexesByName.emplace(index.GetName(), id).second); + Y_ABORT_UNLESS(Indexes.emplace(id, std::move(newIndex)).second); + } + } + + for (const auto& name : schemaUpdate.GetDropIndexes()) { + auto info = GetByName(name); + if (!info) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Unknown index for drop: " << name); + return false; + } + AFL_VERIFY(IndexesByName.erase(name)); + AFL_VERIFY(Indexes.erase(info->GetId())); + } + + return true; +} + +void TOlapIndexesDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { + for (const auto& indexProto : tableSchema.GetIndexes()) { + TOlapIndexSchema index; + index.DeserializeFromProto(indexProto); + Y_ABORT_UNLESS(IndexesByName.emplace(indexProto.GetName(), indexProto.GetId()).second); + Y_ABORT_UNLESS(Indexes.emplace(indexProto.GetId(), std::move(index)).second); + } +} + +void TOlapIndexesDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { + for (const auto& index : Indexes) { + index.second.SerializeToProto(*tableSchema.AddIndexes()); + } +} + +bool TOlapIndexesDescription::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { + THashSet<ui32> usedIndexes; + ui32 lastIdx = 0; + for (const auto& proto : opSchema.GetIndexes()) { + if (proto.GetName().empty()) { + errors.AddError("Index cannot have an empty name"); + return false; + } + const TString& name = proto.GetName(); + auto* index = GetByName(name); + if (!index) { + errors.AddError("Index '" + name + "' does not match schema preset"); + return false; + } + if (proto.HasId() && proto.GetId() != index->GetId()) { + errors.AddError("Index '" + name + "' has id " + proto.GetId() + " that does not match schema preset"); + return false; + } + + if (!usedIndexes.insert(index->GetId()).second) { + errors.AddError("Column '" + name + "' is specified multiple times"); + return false; + } + if (index->GetId() < lastIdx) { + errors.AddError("Index order does not match schema preset"); + return false; + } + lastIdx = index->GetId(); + + } + + for (auto& pr : Indexes) { + if (!usedIndexes.contains(pr.second.GetId())) { + errors.AddError("Specified schema is missing some schema preset indexes"); + return false; + } + } + return true; +} + +const NKikimr::NSchemeShard::TOlapIndexSchema* TOlapIndexesDescription::GetByIdVerified(const ui32 id) const noexcept { + return TValidator::CheckNotNull(GetById(id)); +} + +NKikimr::NSchemeShard::TOlapIndexSchema* TOlapIndexesDescription::MutableByIdVerified(const ui32 id) noexcept { + return TValidator::CheckNotNull(MutableById(id)); +} + +} diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.h b/ydb/core/tx/schemeshard/olap/indexes/schema.h new file mode 100644 index 0000000000..d5aaa8573a --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/indexes/schema.h @@ -0,0 +1,93 @@ +#pragma once +#include "update.h" + +namespace NKikimr::NSchemeShard { + +class TOlapIndexSchema { +private: + using TBase = TOlapIndexUpsert; + YDB_READONLY(ui32, Id, Max<ui32>()); + YDB_READONLY_DEF(TString, Name); + NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta> IndexMeta; +public: + TOlapIndexSchema() = default; + + TOlapIndexSchema(const ui32 id, const TString& name, const std::shared_ptr<NOlap::NIndexes::IIndexMeta>& meta) + : Id(id) + , Name(name) + , IndexMeta(meta) + { + + } + + bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors) { + AFL_VERIFY(upsert.GetName() == GetName()); + AFL_VERIFY(!!upsert.GetIndexConstructor()); + if (upsert.GetIndexConstructor().GetClassName() != IndexMeta.GetClassName()) { + errors.AddError("different index classes: " + upsert.GetIndexConstructor().GetClassName() + " vs " + IndexMeta.GetClassName()); + return false; + } + auto object = upsert.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors); + if (!object) { + return false; + } + IndexMeta = NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta>(object); + return true; + } + + void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& indexSchema) const; + void DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& indexSchema); +}; + +class TOlapIndexesDescription { +public: + using TIndex = TOlapIndexSchema; + using TIndexes = THashMap<ui32, TOlapIndexSchema>; + using TIndexesByName = THashMap<TString, ui32>; + +private: + YDB_READONLY_DEF(TIndexes, Indexes); + YDB_READONLY_DEF(TIndexesByName, IndexesByName); +public: + const TOlapIndexSchema* GetByName(const TString& name) const noexcept { + auto it = IndexesByName.find(name); + if (it != IndexesByName.end()) { + return GetByIdVerified(it->second); + } + return nullptr; + } + + TOlapIndexSchema* MutableByName(const TString& name) noexcept { + auto it = IndexesByName.find(name); + if (it != IndexesByName.end()) { + return MutableByIdVerified(it->second); + } + return nullptr; + } + + const TOlapIndexSchema* GetById(const ui32 id) const noexcept { + auto it = Indexes.find(id); + if (it != Indexes.end()) { + return &it->second; + } + return nullptr; + } + + TOlapIndexSchema* MutableById(const ui32 id) noexcept { + auto it = Indexes.find(id); + if (it != Indexes.end()) { + return &it->second; + } + return nullptr; + } + + const TOlapIndexSchema* GetByIdVerified(const ui32 id) const noexcept; + TOlapIndexSchema* MutableByIdVerified(const ui32 id) noexcept; + + bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexesUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId); + + void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); + void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; + bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; +}; +} diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.cpp b/ydb/core/tx/schemeshard/olap/indexes/update.cpp new file mode 100644 index 0000000000..596cba4b83 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/indexes/update.cpp @@ -0,0 +1,34 @@ +#include "update.h" + +namespace NKikimr::NSchemeShard { + +void TOlapIndexUpsert::SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& requestedProto) const { + requestedProto.SetName(Name); + IndexConstructor.SerializeToProto(requestedProto); +} + +void TOlapIndexUpsert::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& indexSchema) { + Name = indexSchema.GetName(); + AFL_VERIFY(IndexConstructor.DeserializeFromProto(indexSchema))("incorrect_proto", indexSchema.DebugString()); +} + +bool TOlapIndexesUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { + for (const auto& indexName : alterRequest.GetDropIndexes()) { + if (!DropIndexes.emplace(indexName).second) { + errors.AddError(NKikimrScheme::StatusInvalidParameter, "Duplicated index for drop"); + return false; + } + } + TSet<TString> upsertIndexNames; + for (auto& indexSchema : alterRequest.GetUpsertIndexes()) { + TOlapIndexUpsert index; + index.DeserializeFromProto(indexSchema); + if (!upsertIndexNames.emplace(index.GetName()).second) { + errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "index '" << index.GetName() << "' duplication for add"); + return false; + } + UpsertIndexes.emplace_back(std::move(index)); + } + return true; +} +} diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.h b/ydb/core/tx/schemeshard/olap/indexes/update.h new file mode 100644 index 0000000000..0f1db75b0b --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/indexes/update.h @@ -0,0 +1,34 @@ +#pragma once +#include <ydb/services/bg_tasks/abstract/interface.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/schemeshard/olap/common/common.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h> + +namespace NKikimr::NSchemeShard { + + class TOlapIndexUpsert { + private: + YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(TString, TypeName); + protected: + NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexConstructor; + public: + TOlapIndexUpsert() = default; + + const NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor>& GetIndexConstructor() const { + return IndexConstructor; + } + + void DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& requestedProto); + void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& requestedProto) const; + }; + + class TOlapIndexesUpdate { + private: + YDB_READONLY_DEF(TVector<TOlapIndexUpsert>, UpsertIndexes); + YDB_READONLY_DEF(TSet<TString>, DropIndexes); + public: + bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); + }; +} diff --git a/ydb/core/tx/schemeshard/olap/indexes/ya.make b/ydb/core/tx/schemeshard/olap/indexes/ya.make new file mode 100644 index 0000000000..0303a9692f --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/indexes/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + schema.cpp + update.cpp +) + +PEERDIR( + ydb/services/bg_tasks/abstract +) + +END() diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp index bf09d60f90..84ccbf315e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp @@ -1,7 +1,6 @@ -#include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" -#include "schemeshard_impl.h" -#include "schemeshard_olap_types.h" +#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h> +#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> namespace { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp index 7690138e38..3dcfeda8b5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp @@ -1,7 +1,6 @@ -#include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" -#include "schemeshard_olap_types.h" -#include "schemeshard_impl.h" +#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h> +#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <ydb/core/scheme/scheme_types_proto.h> @@ -88,7 +87,7 @@ public: } TOlapSchema currentSchema; - currentSchema.Parse(*tableSchema); + currentSchema.ParseFromLocalDB(*tableSchema); if (!storeInfo) { TOlapSchemaUpdate schemaUpdate; if (!schemaUpdate.Parse(AlterRequest.GetAlterSchema(), errors)) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp index 26704eed4b..a50f1193b1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp @@ -1,7 +1,6 @@ -#include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" -#include "schemeshard_impl.h" -#include "schemeshard_olap_types.h" +#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h> +#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <ydb/core/base/subdomain.h> #include <ydb/core/scheme/scheme_types_proto.h> diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp index 9138e379a9..b2519885d4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp @@ -1,7 +1,6 @@ -#include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" -#include "schemeshard_olap_types.h" -#include "schemeshard_impl.h" +#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h> +#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <ydb/core/base/subdomain.h> #include <ydb/core/tx/columnshard/columnshard.h> @@ -109,7 +108,7 @@ private: return false; } for (const TString& columnName : sharding.GetColumns()) { - auto* pColumn = schema.GetColumnByName(columnName); + auto* pColumn = schema.GetColumns().GetByName(columnName); if (!pColumn) { errors.AddError(Sprintf("Hash sharding is using an unknown column '%s'", columnName.c_str())); return false; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/olap/operations/drop_store.cpp index 7ea837dedf..112e42e864 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/drop_store.cpp @@ -1,6 +1,6 @@ -#include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" -#include "schemeshard_impl.h" +#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h> +#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <ydb/core/base/subdomain.h> diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp index 7bf255f578..d1df41baed 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp @@ -1,6 +1,6 @@ -#include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" -#include "schemeshard_impl.h" +#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h> +#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <ydb/core/base/subdomain.h> #include <ydb/core/tx/tiering/cleaner_task.h> diff --git a/ydb/core/tx/schemeshard/olap/operations/ya.make b/ydb/core/tx/schemeshard/olap/operations/ya.make new file mode 100644 index 0000000000..4e9765d304 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/operations/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +SRCS( + create_table.cpp + drop_table.cpp + alter_table.cpp + create_store.cpp + drop_store.cpp + alter_store.cpp +) + +PEERDIR( + ydb/core/mind/hive + ydb/services/bg_tasks +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.cpp b/ydb/core/tx/schemeshard/olap/schema/schema.cpp new file mode 100644 index 0000000000..ff7ba1bbe4 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/schema/schema.cpp @@ -0,0 +1,93 @@ +#include "schema.h" + +namespace NKikimr::NSchemeShard { + + bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) { + if (!Columns.ApplyUpdate(schemaUpdate.GetColumns(), errors, NextColumnId)) { + return false; + } + + if (!Indexes.ApplyUpdate(*this, schemaUpdate.GetIndexes(), errors, NextColumnId)) { + return false; + } + + if (!HasEngine()) { + Engine = schemaUpdate.GetEngineDef(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); + } else { + if (schemaUpdate.HasEngine()) { + errors.AddError(NKikimrScheme::StatusSchemeError, "No engine updates supported"); + return false; + } + } + + ++Version; + return true; + } + + void TOlapSchema::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { + NextColumnId = tableSchema.GetNextColumnId(); + Version = tableSchema.GetVersion(); + Y_ABORT_UNLESS(tableSchema.HasEngine()); + Engine = tableSchema.GetEngine(); + CompositeMarksFlag = tableSchema.GetCompositeMarks(); + + Columns.Parse(tableSchema); + Indexes.Parse(tableSchema); + } + + void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { + tableSchema.SetNextColumnId(NextColumnId); + tableSchema.SetVersion(Version); + tableSchema.SetCompositeMarks(CompositeMarksFlag); + + Y_ABORT_UNLESS(HasEngine()); + tableSchema.SetEngine(GetEngineUnsafe()); + + Columns.Serialize(tableSchema); + Indexes.Serialize(tableSchema); + } + + bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { + if (!Columns.Validate(opSchema, errors)) { + return false; + } + + if (!Indexes.Validate(opSchema, errors)) { + return false; + } + + if (opSchema.GetEngine() != Engine) { + errors.AddError("Specified schema engine does not match schema preset"); + return false; + } + return true; + } + + void TOlapStoreSchemaPreset::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) { + Y_ABORT_UNLESS(presetProto.HasId()); + Y_ABORT_UNLESS(presetProto.HasName()); + Y_ABORT_UNLESS(presetProto.HasSchema()); + Id = presetProto.GetId(); + Name = presetProto.GetName(); + TOlapSchema::ParseFromLocalDB(presetProto.GetSchema()); + } + + void TOlapStoreSchemaPreset::Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const { + presetProto.SetId(Id); + presetProto.SetName(Name); + TOlapSchema::Serialize(*presetProto.MutableSchema()); + } + + bool TOlapStoreSchemaPreset::ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors) { + if (presetProto.HasId()) { + errors.AddError("Schema preset id cannot be specified explicitly"); + return false; + } + if (!presetProto.GetName()) { + errors.AddError("Schema preset name cannot be empty"); + return false; + } + Name = presetProto.GetName(); + return true; + } +} diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.h b/ydb/core/tx/schemeshard/olap/schema/schema.h new file mode 100644 index 0000000000..ca9e8b14c6 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/schema/schema.h @@ -0,0 +1,40 @@ +#pragma once +#include <ydb/core/tx/schemeshard/olap/columns/update.h> +#include <ydb/core/tx/schemeshard/olap/indexes/update.h> +#include <ydb/core/tx/schemeshard/olap/columns/schema.h> +#include <ydb/core/tx/schemeshard/olap/indexes/schema.h> +#include "update.h" + +namespace NKikimr::NSchemeShard { + + class TOlapSchema { + private: + YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); + YDB_READONLY_DEF(TOlapColumnsDescription, Columns); + YDB_READONLY_DEF(TOlapIndexesDescription, Indexes); + + YDB_READONLY(ui32, NextColumnId, 1); + YDB_READONLY(ui32, Version, 0); + YDB_READONLY_FLAG(CompositeMarks, true); + + public: + bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors); + + void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); + void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; + bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; + bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const; + }; + + class TOlapStoreSchemaPreset : public TOlapSchema { + private: + using TBase = TOlapSchema; + YDB_ACCESSOR_DEF(TString, Name); + YDB_ACCESSOR_DEF(ui32, Id); + YDB_ACCESSOR(size_t, ProtoIndex, -1); // Preset index in the olap store description + public: + void Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const; + void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto); + bool ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors); + }; +} diff --git a/ydb/core/tx/schemeshard/olap/schema/update.cpp b/ydb/core/tx/schemeshard/olap/schema/update.cpp new file mode 100644 index 0000000000..92d7d9038f --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/schema/update.cpp @@ -0,0 +1,28 @@ +#include "schema.h" + +namespace NKikimr::NSchemeShard { + + bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) { + if (tableSchema.HasEngine()) { + Engine = tableSchema.GetEngine(); + } + + if (!Columns.Parse(tableSchema, errors, allowNullKeys)) { + return false; + } + + return true; + } + + bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { + if (!Columns.Parse(alterRequest, errors)) { + return false; + } + + if (!Indexes.Parse(alterRequest, errors)) { + return false; + } + + return true; + } +} diff --git a/ydb/core/tx/schemeshard/olap/schema/update.h b/ydb/core/tx/schemeshard/olap/schema/update.h new file mode 100644 index 0000000000..bb4173433f --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/schema/update.h @@ -0,0 +1,13 @@ +#pragma once + +namespace NKikimr::NSchemeShard { + + class TOlapSchemaUpdate { + YDB_READONLY_DEF(TOlapColumnsUpdate, Columns); + YDB_READONLY_DEF(TOlapIndexesUpdate, Indexes); + YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); + public: + bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false); + bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); + }; +} diff --git a/ydb/core/tx/schemeshard/olap/schema/ya.make b/ydb/core/tx/schemeshard/olap/schema/ya.make new file mode 100644 index 0000000000..9c21a43bbb --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/schema/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + schema.cpp + update.cpp +) + +PEERDIR( + ydb/core/tx/schemeshard/olap/columns + ydb/core/tx/schemeshard/olap/indexes +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/ya.make b/ydb/core/tx/schemeshard/olap/ya.make new file mode 100644 index 0000000000..63e509c263 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +PEERDIR( + ydb/core/tx/schemeshard/olap/columns + ydb/core/tx/schemeshard/olap/indexes + ydb/core/tx/schemeshard/olap/schema + ydb/core/tx/schemeshard/olap/common + ydb/core/tx/schemeshard/olap/operations +) + +END() diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 0f3cc648ce..830b726d91 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -2305,7 +2305,7 @@ TColumnTableInfo::TColumnTableInfo( if (Description.HasSchema()) { TOlapSchema schema; - schema.Parse(Description.GetSchema()); + schema.ParseFromLocalDB(Description.GetSchema()); } ColumnShards.reserve(Sharding.GetColumnShards().size()); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 7a7491c0a8..769a5dfa79 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -5,8 +5,9 @@ #include "schemeshard_tx_infly.h" #include "schemeshard_path_element.h" #include "schemeshard_identificators.h" -#include "schemeshard_olap_types.h" #include "schemeshard_schema.h" +#include "olap/schema/schema.h" +#include "olap/schema/update.h" #include <ydb/core/tx/message_seqno.h> #include <ydb/core/tx/datashard/datashard.h> diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp deleted file mode 100644 index 2b67e0797f..0000000000 --- a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp +++ /dev/null @@ -1,522 +0,0 @@ -#include "schemeshard_olap_types.h" -#include <ydb/library/yql/minikql/mkql_type_ops.h> -#include <ydb/core/scheme_types/scheme_type_registry.h> - -namespace NKikimr::NSchemeShard { - - void TOlapColumnSchema::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const { - TBase::Serialize(columnSchema); - columnSchema.SetId(Id); - } - - void TOlapColumnSchema::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { - TBase::ParseFromLocalDB(columnSchema); - Id = columnSchema.GetId(); - } - - bool TOlapColumnAdd::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) { - if (!columnSchema.GetName()) { - errors.AddError("Columns cannot have an empty name"); - return false; - } - Name = columnSchema.GetName(); - NotNullFlag = columnSchema.GetNotNull(); - TypeName = columnSchema.GetType(); - if (columnSchema.HasCompression()) { - auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); - if (!compression) { - errors.AddError("Cannot parse compression info: " + compression.GetErrorMessage()); - return false; - } - Compression = *compression; - } - if (columnSchema.HasDictionaryEncoding()) { - auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); - if (!settings) { - errors.AddError("Cannot parse dictionary compression info: " + settings.GetErrorMessage()); - return false; - } - DictionaryEncoding = *settings; - } - - if (columnSchema.HasTypeId()) { - errors.AddError(TStringBuilder() << "Cannot set TypeId for column '" << Name << ", use Type"); - return false; - } - - if (!columnSchema.HasType()) { - errors.AddError(TStringBuilder() << "Missing Type for column '" << Name); - return false; - } - - auto typeName = NMiniKQL::AdaptLegacyYqlType(TypeName); - Y_ABORT_UNLESS(AppData()->TypeRegistry); - const NScheme::IType* type = - AppData()->TypeRegistry->GetType(typeName); - if (!type) { - errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); - return false; - } - if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { - errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); - return false;; - } - Type = NScheme::TTypeInfo(type->GetTypeId()); - if (!TOlapSchema::IsAllowedType(type->GetTypeId())){ - errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); - return false; - } - return true; - } - - void TOlapColumnAdd::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { - Name = columnSchema.GetName(); - TypeName = columnSchema.GetType(); - - if (columnSchema.HasTypeInfo()) { - Type = NScheme::TypeInfoModFromProtoColumnType( - columnSchema.GetTypeId(), &columnSchema.GetTypeInfo()) - .TypeInfo; - } else { - Type = NScheme::TypeInfoModFromProtoColumnType( - columnSchema.GetTypeId(), nullptr) - .TypeInfo; - } - if (columnSchema.HasCompression()) { - auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); - Y_ABORT_UNLESS(compression.IsSuccess(), "%s", compression.GetErrorMessage().data()); - Compression = *compression; - } - if (columnSchema.HasDictionaryEncoding()) { - auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); - Y_ABORT_UNLESS(settings.IsSuccess()); - DictionaryEncoding = *settings; - } - NotNullFlag = columnSchema.GetNotNull(); - } - - void TOlapColumnAdd::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const { - columnSchema.SetName(Name); - columnSchema.SetType(TypeName); - columnSchema.SetNotNull(NotNullFlag); - if (Compression) { - *columnSchema.MutableCompression() = Compression->SerializeToProto(); - } - if (DictionaryEncoding) { - *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto(); - } - - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, ""); - columnSchema.SetTypeId(columnType.TypeId); - if (columnType.TypeInfo) { - *columnSchema.MutableTypeInfo() = *columnType.TypeInfo; - } - } - - bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) { - Y_ABORT_UNLESS(GetName() == diffColumn.GetName()); - { - auto result = diffColumn.GetCompression().Apply(Compression); - if (!result) { - errors.AddError("Cannot merge compression info: " + result.GetErrorMessage()); - return false; - } - } - { - auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding); - if (!result) { - errors.AddError("Cannot merge dictionary encoding info: " + result.GetErrorMessage()); - return false; - } - } - return true; - } - - bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) { - if (tableSchema.HasEngine()) { - Engine = tableSchema.GetEngine(); - } - - TMap<TString, ui32> keyColumnNames; - for (auto&& pkKey : tableSchema.GetKeyColumnNames()) { - if (!keyColumnNames.emplace(pkKey, keyColumnNames.size()).second) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Duplicate key column '" << pkKey << "'"); - return false; - } - } - - TSet<TString> columnNames; - for (auto& columnSchema : tableSchema.GetColumns()) { - std::optional<ui32> keyOrder; - { - auto it = keyColumnNames.find(columnSchema.GetName()); - if (it != keyColumnNames.end()) { - keyOrder = it->second; - } - } - - TOlapColumnAdd column(keyOrder); - if (!column.ParseFromRequest(columnSchema, errors)) { - return false; - } - if (column.GetKeyOrder() && *column.GetKeyOrder() == 0) { - if (!TOlapSchema::IsAllowedFirstPkType(column.GetType().GetTypeId())) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() - << "Type '" << column.GetType().GetTypeId() << "' specified for column '" << column.GetName() - << "' is not supported in first PK position"); - return false; - } - } - if (columnNames.contains(column.GetName())) { - errors.AddError(NKikimrScheme::StatusMultipleModifications, TStringBuilder() << "Duplicate column '" << column.GetName() << "'"); - return false; - } - if (!allowNullKeys) { - if (keyColumnNames.contains(column.GetName()) && !column.IsNotNull()) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Nullable key column '" << column.GetName() << "'"); - return false; - } - } - columnNames.emplace(column.GetName()); - AddColumns.emplace_back(std::move(column)); - } - return true; - } - - bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { - for (const auto& column: alterRequest.GetDropColumns()) { - if (!DropColumns.emplace(column.GetName()).second) { - errors.AddError(NKikimrScheme::StatusInvalidParameter, "Duplicated column for drop"); - return false; - } - } - TSet<TString> addColumnNames; - for (auto& columnSchema : alterRequest.GetAddColumns()) { - TOlapColumnAdd column({}); - if (!column.ParseFromRequest(columnSchema, errors)) { - return false; - } - if (addColumnNames.contains(column.GetName())) { - errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' duplication for add"); - return false; - } - addColumnNames.emplace(column.GetName()); - AddColumns.emplace_back(std::move(column)); - } - - TSet<TString> alterColumnNames; - for (auto& columnSchemaDiff: alterRequest.GetAlterColumns()) { - TOlapColumnDiff columnDiff; - if (!columnDiff.ParseFromRequest(columnSchemaDiff, errors)) { - return false; - } - if (addColumnNames.contains(columnDiff.GetName())) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' have to be either add or update"); - return false; - } - if (alterColumnNames.contains(columnDiff.GetName())) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' duplication for update"); - return false; - } - alterColumnNames.emplace(columnDiff.GetName()); - AlterColumns.emplace_back(std::move(columnDiff)); - } - return true; - } - - bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) { - if (Columns.empty() && schemaUpdate.GetAddColumns().empty()) { - errors.AddError(NKikimrScheme::StatusSchemeError, "No add columns specified"); - return false; - } - - if (!HasEngine()) { - Engine = schemaUpdate.GetEngineDef(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); - } else { - if (schemaUpdate.HasEngine()) { - errors.AddError(NKikimrScheme::StatusSchemeError, "No engine updates supported"); - return false; - } - } - - const bool hasColumnsBefore = KeyColumnIds.size(); - std::map<ui32, ui32> orderedKeyColumnIds; - for (auto&& column : schemaUpdate.GetAddColumns()) { - if (ColumnsByName.contains(column.GetName())) { - errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' already exists"); - return false; - } - if (hasColumnsBefore) { - if (column.IsNotNull()) { - errors.AddError(NKikimrScheme::StatusSchemeError, "Cannot add new not null column currently (not supported yet)"); - return false; - } - if (column.GetKeyOrder()) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << column.GetName() << "' is pk column. its impossible to modify pk"); - return false; - } - } - TOlapColumnSchema newColumn(column, NextColumnId++); - if (newColumn.GetKeyOrder()) { - Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second); - } - - Y_ABORT_UNLESS(ColumnsByName.emplace(newColumn.GetName(), newColumn.GetId()).second); - Y_ABORT_UNLESS(Columns.emplace(newColumn.GetId(), std::move(newColumn)).second); - } - - for (auto&& columnDiff : schemaUpdate.GetAlterColumns()) { - auto it = ColumnsByName.find(columnDiff.GetName()); - if (it == ColumnsByName.end()) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' not exists for altering"); - return false; - } else { - auto itColumn = Columns.find(it->second); - Y_ABORT_UNLESS(itColumn != Columns.end()); - TOlapColumnSchema& newColumn = itColumn->second; - if (!newColumn.ApplyDiff(columnDiff, errors)) { - return false; - } - } - } - - if (KeyColumnIds.empty()) { - auto it = orderedKeyColumnIds.begin(); - for (ui32 i = 0; i < orderedKeyColumnIds.size(); ++i, ++it) { - KeyColumnIds.emplace_back(it->second); - Y_ABORT_UNLESS(i == it->first); - } - if (KeyColumnIds.empty()) { - errors.AddError(NKikimrScheme::StatusSchemeError, "No primary key specified"); - return false; - } - } - - for (const auto& columnName : schemaUpdate.GetDropColumns()) { - auto columnInfo = GetColumnByName(columnName); - if (!columnInfo) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Unknown column for drop: " << columnName); - return false; - } - - if (columnInfo->IsKeyColumn()) { - errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Cannot remove pk column: " << columnName); - return false; - } - ColumnsByName.erase(columnName); - Columns.erase(columnInfo->GetId()); - } - - ++Version; - return true; - } - - void TOlapSchema::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { - NextColumnId = tableSchema.GetNextColumnId(); - Version = tableSchema.GetVersion(); - Y_ABORT_UNLESS(tableSchema.HasEngine()); - Engine = tableSchema.GetEngine(); - CompositeMarksFlag = tableSchema.GetCompositeMarks(); - - TMap<TString, ui32> keyIndexes; - ui32 idx = 0; - for (auto&& kName : tableSchema.GetKeyColumnNames()) { - keyIndexes[kName] = idx++; - } - - TVector<ui32> keyIds; - keyIds.resize(tableSchema.GetKeyColumnNames().size(), 0); - for (const auto& columnSchema : tableSchema.GetColumns()) { - std::optional<ui32> keyOrder; - if (keyIndexes.contains(columnSchema.GetName())) { - keyOrder = keyIndexes.at(columnSchema.GetName()); - } - - TOlapColumnSchema column(keyOrder); - column.ParseFromLocalDB(columnSchema); - if (keyOrder) { - Y_ABORT_UNLESS(*keyOrder < keyIds.size()); - keyIds[*keyOrder] = column.GetId(); - } - - Y_ABORT_UNLESS(ColumnsByName.emplace(column.GetName(), column.GetId()).second); - Y_ABORT_UNLESS(Columns.emplace(column.GetId(), std::move(column)).second); - } - KeyColumnIds.swap(keyIds); - } - - void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { - tableSchema.SetNextColumnId(NextColumnId); - tableSchema.SetVersion(Version); - tableSchema.SetCompositeMarks(CompositeMarksFlag); - - Y_ABORT_UNLESS(HasEngine()); - tableSchema.SetEngine(GetEngineUnsafe()); - - for (const auto& column : Columns) { - column.second.Serialize(*tableSchema.AddColumns()); - } - - for (auto&& cId : KeyColumnIds) { - auto column = GetColumnById(cId); - Y_ABORT_UNLESS(!!column); - *tableSchema.AddKeyColumnNames() = column->GetName(); - } - } - - bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { - const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; - - ui32 lastColumnId = 0; - THashSet<ui32> usedColumns; - for (const auto& colProto : opSchema.GetColumns()) { - if (colProto.GetName().empty()) { - errors.AddError("Columns cannot have an empty name"); - return false; - } - const TString& colName = colProto.GetName(); - auto* col = GetColumnByName(colName); - if (!col) { - errors.AddError("Column '" + colName + "' does not match schema preset"); - return false; - } - if (colProto.HasId() && colProto.GetId() != col->GetId()) { - errors.AddError("Column '" + colName + "' has id " + colProto.GetId() + " that does not match schema preset"); - return false; - } - - if (!usedColumns.insert(col->GetId()).second) { - errors.AddError("Column '" + colName + "' is specified multiple times"); - return false; - } - if (col->GetId() < lastColumnId) { - errors.AddError("Column order does not match schema preset"); - return false; - } - lastColumnId = col->GetId(); - - if (colProto.HasTypeId()) { - errors.AddError("Cannot set TypeId for column '" + colName + "', use Type"); - return false; - } - if (!colProto.HasType()) { - errors.AddError("Missing Type for column '" + colName + "'"); - return false; - } - - auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); - const NScheme::IType* type = typeRegistry->GetType(typeName); - if (!type || !IsAllowedType(type->GetTypeId())) { - errors.AddError("Type '" + colProto.GetType() + "' specified for column '" + colName + "' is not supported"); - return false; - } - NScheme::TTypeInfo typeInfo(type->GetTypeId()); - - if (typeInfo != col->GetType()) { - errors.AddError("Type '" + TypeName(typeInfo) + "' specified for column '" + colName + "' does not match schema preset type '" + TypeName(col->GetType()) + "'"); - return false; - } - } - - for (auto& pr : Columns) { - if (!usedColumns.contains(pr.second.GetId())) { - errors.AddError("Specified schema is missing some schema preset columns"); - return false; - } - } - - TVector<ui32> keyColumnIds; - for (const TString& keyName : opSchema.GetKeyColumnNames()) { - auto* col = GetColumnByName(keyName); - if (!col) { - errors.AddError("Unknown key column '" + keyName + "'"); - return false; - } - keyColumnIds.push_back(col->GetId()); - } - if (keyColumnIds != KeyColumnIds) { - errors.AddError("Specified schema key columns not matching schema preset"); - return false; - } - - if (opSchema.GetEngine() != Engine) { - errors.AddError("Specified schema engine does not match schema preset"); - return false; - } - return true; - } - - bool TOlapSchema::IsAllowedType(ui32 typeId) { - if (!NScheme::NTypeIds::IsYqlType(typeId)) { - return false; - } - - switch (typeId) { - case NYql::NProto::Bool: - case NYql::NProto::Interval: - case NYql::NProto::Decimal: - case NYql::NProto::DyNumber: - return false; - default: - break; - } - return true; - } - - bool TOlapSchema::IsAllowedFirstPkType(ui32 typeId) { - switch (typeId) { - case NYql::NProto::Uint8: // Byte - case NYql::NProto::Int32: - case NYql::NProto::Uint32: - case NYql::NProto::Int64: - case NYql::NProto::Uint64: - case NYql::NProto::String: - case NYql::NProto::Utf8: - case NYql::NProto::Date: - case NYql::NProto::Datetime: - case NYql::NProto::Timestamp: - return true; - case NYql::NProto::Interval: - case NYql::NProto::Decimal: - case NYql::NProto::DyNumber: - case NYql::NProto::Yson: - case NYql::NProto::Json: - case NYql::NProto::JsonDocument: - case NYql::NProto::Float: - case NYql::NProto::Double: - case NYql::NProto::Bool: - return false; - default: - break; - } - return false; - } - - void TOlapStoreSchemaPreset::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) { - Y_ABORT_UNLESS(presetProto.HasId()); - Y_ABORT_UNLESS(presetProto.HasName()); - Y_ABORT_UNLESS(presetProto.HasSchema()); - Id = presetProto.GetId(); - Name = presetProto.GetName(); - TOlapSchema::Parse(presetProto.GetSchema()); - } - - void TOlapStoreSchemaPreset::Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const { - presetProto.SetId(Id); - presetProto.SetName(Name); - TOlapSchema::Serialize(*presetProto.MutableSchema()); - } - - bool TOlapStoreSchemaPreset::ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors) { - if (presetProto.HasId()) { - errors.AddError("Schema preset id cannot be specified explicitly"); - return false; - } - if (!presetProto.GetName()) { - errors.AddError("Schema preset name cannot be empty"); - return false; - } - Name = presetProto.GetName(); - return true; - } -} diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h deleted file mode 100644 index e3015d8786..0000000000 --- a/ydb/core/tx/schemeshard/schemeshard_olap_types.h +++ /dev/null @@ -1,166 +0,0 @@ -#pragma once - -#include "defs.h" -#include "schemeshard.h" -#include <ydb/library/accessor/accessor.h> -#include <ydb/core/scheme/scheme_types_proto.h> -#include <ydb/core/formats/arrow/compression/object.h> -#include <ydb/core/formats/arrow/compression/diff.h> -#include <ydb/core/formats/arrow/dictionary/object.h> -#include <ydb/core/formats/arrow/dictionary/diff.h> - -namespace NKikimr::NSchemeShard { - - class IErrorCollector { - public: - virtual void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) = 0; - virtual void AddError(const TString& errorMsg) = 0; - }; - - class TProposeErrorCollector : public IErrorCollector { - NKikimr::NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult& TxResult; - public: - TProposeErrorCollector(NKikimr::NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult& txResult) - : TxResult(txResult) - {} - - void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) override { - TxResult.SetError(errorStatus, errorMsg); - } - - void AddError(const TString& errorMsg) override { - TxResult.SetError(NKikimrScheme::StatusSchemeError, errorMsg); - } - }; - - class TOlapColumnDiff; - - class TOlapColumnAdd { - private: - YDB_READONLY_DEF(std::optional<ui32>, KeyOrder); - YDB_READONLY_DEF(TString, Name); - YDB_READONLY_DEF(TString, TypeName); - YDB_READONLY_DEF(NScheme::TTypeInfo, Type); - YDB_FLAG_ACCESSOR(NotNull, false); - YDB_READONLY_DEF(std::optional<NArrow::TCompression>, Compression); - YDB_READONLY_DEF(std::optional<NArrow::NDictionary::TEncodingSettings>, DictionaryEncoding); - public: - TOlapColumnAdd(const std::optional<ui32>& keyOrder) - : KeyOrder(keyOrder) - { - - } - bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors); - void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema); - void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const; - bool ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors); - bool IsKeyColumn() const { - return !!KeyOrder; - } - }; - - class TOlapColumnSchema: public TOlapColumnAdd { - private: - using TBase = TOlapColumnAdd; - YDB_READONLY(ui32, Id, Max<ui32>()); - public: - TOlapColumnSchema(const TOlapColumnAdd& base, const ui32 id) - : TBase(base) - , Id(id) { - - } - TOlapColumnSchema(const std::optional<ui32>& keyOrder) - : TBase(keyOrder) { - - } - void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const; - void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema); - }; - - class TOlapColumnDiff { - YDB_READONLY_DEF(TString, Name); - YDB_READONLY_DEF(NArrow::TCompressionDiff, Compression); - YDB_READONLY_DEF(NArrow::NDictionary::TEncodingDiff, DictionaryEncoding); - public: - bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) { - Name = columnSchema.GetName(); - if (!Name) { - errors.AddError("empty field name"); - return false; - } - if (!Compression.DeserializeFromProto(columnSchema.GetCompression())) { - errors.AddError("cannot parse compression diff from proto"); - return false; - } - if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) { - errors.AddError("cannot parse dictionary encoding diff from proto"); - return false; - } - return true; - } - }; - - class TOlapSchemaUpdate { - YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); - YDB_READONLY_DEF(TVector<TOlapColumnAdd>, AddColumns); - YDB_READONLY_DEF(TSet<TString>, DropColumns); - YDB_READONLY_DEF(TVector<TOlapColumnDiff>, AlterColumns); - public: - bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false); - bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); - }; - - class TOlapSchema { - public: - using TColumn = TOlapColumnSchema; - using TColumns = THashMap<ui32, TOlapColumnSchema>; - using TColumnsByName = THashMap<TString, ui32>; - - private: - YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); - YDB_READONLY_DEF(TColumns, Columns); - YDB_READONLY_DEF(TColumnsByName, ColumnsByName); - YDB_READONLY_DEF(TVector<ui32>, KeyColumnIds); - - YDB_READONLY(ui32, NextColumnId, 1); - YDB_READONLY(ui32, Version, 0); - YDB_READONLY_FLAG(CompositeMarks, true); - - public: - const TOlapColumnSchema* GetColumnByName(const TString& name) const noexcept { - auto it = ColumnsByName.find(name); - if (it != ColumnsByName.end()) { - return &Columns.at(it->second); - } - return nullptr; - } - - const TOlapColumnSchema* GetColumnById(const ui32 id) const noexcept { - auto it = Columns.find(id); - if (it != Columns.end()) { - return &it->second; - } - return nullptr; - } - - bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors); - - void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); - void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; - bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; - bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const; - - static bool IsAllowedType(ui32 typeId); - static bool IsAllowedFirstPkType(ui32 typeId); - }; - - class TOlapStoreSchemaPreset : public TOlapSchema { - YDB_ACCESSOR_DEF(TString, Name); - YDB_ACCESSOR_DEF(ui32, Id); - YDB_ACCESSOR(size_t, ProtoIndex, -1); // Preset index in the olap store description - public: - void Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const; - void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto); - bool ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors); - }; -} diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp index 41284d86d3..40a792d0f4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp @@ -1,5 +1,5 @@ #include "schemeshard_info_types.h" -#include "schemeshard_olap_types.h" +#include "olap/columns/schema.h" #include <ydb/core/protos/flat_scheme_op.pb.h> namespace NKikimr { @@ -8,13 +8,13 @@ namespace NSchemeShard { // Helper accessors for OLTP and OLAP tables that use different TColumn's namespace { inline - bool IsDropped(const TOlapSchema::TColumn& col) { + bool IsDropped(const TOlapColumnsDescription::TColumn& col) { Y_UNUSED(col); return false; } inline - ui32 GetType(const TOlapSchema::TColumn& col) { + ui32 GetType(const TOlapColumnsDescription::TColumn& col) { Y_ABORT_UNLESS(col.GetType().GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported"); return col.GetType().GetTypeId(); } @@ -118,8 +118,8 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, } static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl, - const THashMap<ui32, TOlapSchema::TColumn>& sourceColumns, - const THashMap<ui32, TOlapSchema::TColumn>& alterColumns, + const THashMap<ui32, TOlapColumnsDescription::TColumn>& sourceColumns, + const THashMap<ui32, TOlapColumnsDescription::TColumn>& alterColumns, const THashMap<TString, ui32>& colName2Id, IErrorCollector& errors) { @@ -131,7 +131,7 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle:: return false; } - const TOlapSchema::TColumn* column = nullptr; + const TOlapColumnsDescription::TColumn* column = nullptr; const ui32 colId = it->second; if (alterColumns.contains(colId)) { column = &alterColumns.at(colId); @@ -179,7 +179,7 @@ bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycl switch (ttl.GetStatusCase()) { case TTtlProto::kEnabled: - return ValidateColumnTableTtl(ttl.GetEnabled(), {}, Columns, ColumnsByName, errors); + return ValidateColumnTableTtl(ttl.GetEnabled(), {}, Columns.GetColumns(), Columns.GetColumnsByName(), errors); case TTtlProto::kDisabled: default: break; diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index e5db2f5a47..d1e8f55992 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -85,8 +85,6 @@ SRCS( schemeshard__operation_alter_index.cpp schemeshard__operation_alter_kesus.cpp schemeshard__operation_alter_login.cpp - schemeshard__operation_alter_olap_store.cpp - schemeshard__operation_alter_olap_table.cpp schemeshard__operation_alter_pq.cpp schemeshard__operation_alter_solomon.cpp schemeshard__operation_alter_subdomain.cpp @@ -109,8 +107,6 @@ SRCS( schemeshard__operation_create_indexed_table.cpp schemeshard__operation_create_kesus.cpp schemeshard__operation_create_lock.cpp - schemeshard__operation_create_olap_store.cpp - schemeshard__operation_create_olap_table.cpp schemeshard__operation_create_pq.cpp schemeshard__operation_create_replication.cpp schemeshard__operation_create_restore.cpp @@ -128,8 +124,6 @@ SRCS( schemeshard__operation_drop_indexed_table.cpp schemeshard__operation_drop_kesus.cpp schemeshard__operation_drop_lock.cpp - schemeshard__operation_drop_olap_store.cpp - schemeshard__operation_drop_olap_table.cpp schemeshard__operation_drop_pq.cpp schemeshard__operation_drop_replication.cpp schemeshard__operation_drop_sequence.cpp @@ -182,7 +176,6 @@ SRCS( schemeshard_identificators.cpp schemeshard_info_types.cpp schemeshard_info_types.h - schemeshard_olap_types.cpp schemeshard_path_describer.cpp schemeshard_path_element.cpp schemeshard_path_element.h @@ -260,6 +253,7 @@ PEERDIR( ydb/core/tablet_flat ydb/core/tx ydb/core/tx/datashard + ydb/core/tx/schemeshard/olap ydb/core/tx/scheme_board ydb/core/tx/tx_allocator_client ydb/core/util diff --git a/ydb/services/bg_tasks/abstract/interface.cpp b/ydb/services/bg_tasks/abstract/interface.cpp index a0587cc583..dae70dfba4 100644 --- a/ydb/services/bg_tasks/abstract/interface.cpp +++ b/ydb/services/bg_tasks/abstract/interface.cpp @@ -1,5 +1,26 @@ #include "interface.h" +#include <ydb/services/bg_tasks/protos/container.pb.h> namespace NKikimr::NBackgroundTasks { +bool TStringContainerProcessor::DeserializeFromContainer(const TString& data, TString& className, TString& binary) { + NKikimrProto::TStringContainer protoData; + if (!protoData.ParseFromArray(data.data(), data.size())) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data); + return false; + } + className = protoData.GetClassName(); + binary = protoData.GetBinaryData(); + return true; +} + +TString TStringContainerProcessor::SerializeToContainer(const TString& className, const TString& binary) { + NKikimrProto::TStringContainer result; + result.SetClassName(className); + if (binary) { + result.SetBinaryData(binary); + } + return result.SerializeAsString(); +} + } diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h index 96317f09c2..f6cd594e49 100644 --- a/ydb/services/bg_tasks/abstract/interface.h +++ b/ydb/services/bg_tasks/abstract/interface.h @@ -3,7 +3,6 @@ #include <ydb/library/actors/core/events.h> #include <ydb/library/actors/core/log.h> #include <ydb/library/services/services.pb.h> -#include <ydb/services/bg_tasks/protos/container.pb.h> #include <library/cpp/json/writer/json_value.h> #include <library/cpp/json/json_reader.h> @@ -110,7 +109,20 @@ public: TCommonInterfaceContainer() = default; TCommonInterfaceContainer(std::shared_ptr<IInterface> object) : Object(object) { + } + bool Initialize(const TString& className) { + AFL_VERIFY(!Object)("problem", "initialize for not-empty-object"); + Object.reset(TFactory::Construct(className)); + if (!Object) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name(); + return false; + } + return true; + } + + TString GetClassName() const { + return Object ? Object->GetClassName() : "UNDEFINED"; } bool HasObject() const { @@ -135,6 +147,16 @@ public: return Object; } + std::shared_ptr<IInterface> GetObjectPtrVerified() const { + AFL_VERIFY(Object); + return Object; + } + + const IInterface& GetObjectVerified() const { + AFL_VERIFY(Object); + return *Object; + } + const IInterface* operator->() const { return Object.get(); } @@ -149,6 +171,13 @@ public: }; +class TStringContainerProcessor { +public: + static bool DeserializeFromContainer(const TString& data, TString& className, TString& binary); + + static TString SerializeToContainer(const TString& className, const TString& binary); +}; + template <class IInterface> class TInterfaceStringContainer: public TCommonInterfaceContainer<IInterface> { protected: @@ -162,13 +191,11 @@ public: } TString SerializeToString() const { - NKikimrProto::TStringContainer result; if (!Object) { - return result.SerializeAsString(); + return TStringContainerProcessor::SerializeToContainer("__UNDEFINED", ""); + } else { + return TStringContainerProcessor::SerializeToContainer(Object->GetClassName(), Object->SerializeToString()); } - result.SetClassName(Object->GetClassName()); - result.SetBinaryData(Object->SerializeToString()); - return result.SerializeAsString(); } bool DeserializeFromString(const TString& data) { @@ -176,19 +203,22 @@ public: Object = nullptr; return true; } - NKikimrProto::TStringContainer protoData; - if (!protoData.ParseFromArray(data.data(), data.size())) { + TString className; + TString binaryData; + if (!TStringContainerProcessor::DeserializeFromContainer(data, className, binaryData)) { ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data); - return {}; + return false; + } + if (className == "__UNDEFINED") { + return true; } - const TString& className = protoData.GetClassName(); - std::shared_ptr<IInterface> object(TFactory::Construct(protoData.GetClassName())); + std::shared_ptr<IInterface> object(TFactory::Construct(className)); if (!object) { ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name(); return false; } - if (!object->DeserializeFromString(protoData.GetBinaryData())) { + if (!object->DeserializeFromString(binaryData)) { ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name(); return false; } @@ -218,6 +248,7 @@ public: NJson::TJsonValue SerializeToJson() const { NJson::TJsonValue result = NJson::JSON_MAP; if (!Object) { + TOperatorPolicy::SetClassName(result, "__UNDEFINED"); return result; } TOperatorPolicy::SetClassName(result, Object->GetClassName()); @@ -227,6 +258,9 @@ public: bool DeserializeFromJson(const NJson::TJsonValue& data) { const TString& className = TOperatorPolicy::GetClassName(data); + if (className == "__UNDEFINED") { + return true; + } std::shared_ptr<IInterface> object(TFactory::Construct(className)); if (!object) { ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name(); @@ -265,6 +299,9 @@ public: using TBase::TBase; bool DeserializeFromProto(const TProto& data) { const TString& className = TOperatorPolicy::GetClassName(data); + if (className == "__UNDEFINED") { + return true; + } std::shared_ptr<IInterface> object(TFactory::Construct(className)); if (!object) { ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name(); @@ -287,6 +324,16 @@ public: TOperatorPolicy::SetClassName(result, Object->GetClassName()); return result; } + + template <class TProto> + void SerializeToProto(TProto& result) const { + if (!Object) { + TOperatorPolicy::SetClassName(result, "__UNDEFINED"); + return; + } + Object->SerializeToProto(result); + TOperatorPolicy::SetClassName(result, Object->GetClassName()); + } }; } |