aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-01-22 16:46:37 +0300
committerGitHub <noreply@github.com>2024-01-22 16:46:37 +0300
commita7dc8056ec7f824c6211a1aa250caf07db1b0f18 (patch)
treea50987d86067b12e970706412969392a2ac1645a
parent19d729829acb8e66b6ca5314bb79f1826fa39637 (diff)
downloadydb-a7dc8056ec7f824c6211a1aa250caf07db1b0f18.tar.gz
Scheme correction for indexes usage (#1180)
* correct schemeshard usage for index operations * fix build * fix build * correction * fix build * fix build * fix ut build * fix tests
-rw-r--r--ydb/core/formats/arrow/hash/calcer.cpp2
-rw-r--r--ydb/core/formats/arrow/hash/calcer.h4
-rw-r--r--ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp49
-rw-r--r--ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h23
-rw-r--r--ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make1
-rw-r--r--ydb/core/protos/flat_scheme_op.proto37
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h147
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp96
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h99
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/ya.make12
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/ya.make1
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp3
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/schema.cpp216
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/schema.h60
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/update.cpp260
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/update.h72
-rw-r--r--ydb/core/tx/schemeshard/olap/columns/ya.make16
-rw-r--r--ydb/core/tx/schemeshard/olap/common/common.cpp4
-rw-r--r--ydb/core/tx/schemeshard/olap/common/common.h27
-rw-r--r--ydb/core/tx/schemeshard/olap/common/ya.make12
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/schema.cpp113
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/schema.h93
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/update.cpp34
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/update.h34
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/ya.make12
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter_store.cpp (renamed from ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp)7
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter_table.cpp (renamed from ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp)9
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/create_store.cpp (renamed from ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp)7
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/create_table.cpp (renamed from ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp)9
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/drop_store.cpp (renamed from ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp)6
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/drop_table.cpp (renamed from ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp)6
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/ya.make19
-rw-r--r--ydb/core/tx/schemeshard/olap/schema/schema.cpp93
-rw-r--r--ydb/core/tx/schemeshard/olap/schema/schema.h40
-rw-r--r--ydb/core/tx/schemeshard/olap/schema/update.cpp28
-rw-r--r--ydb/core/tx/schemeshard/olap/schema/update.h13
-rw-r--r--ydb/core/tx/schemeshard/olap/schema/ya.make13
-rw-r--r--ydb/core/tx/schemeshard/olap/ya.make11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_olap_types.cpp522
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_olap_types.h166
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp14
-rw-r--r--ydb/core/tx/schemeshard/ya.make8
-rw-r--r--ydb/services/bg_tasks/abstract/interface.cpp21
-rw-r--r--ydb/services/bg_tasks/abstract/interface.h71
48 files changed, 1760 insertions, 746 deletions
diff --git a/ydb/core/formats/arrow/hash/calcer.cpp b/ydb/core/formats/arrow/hash/calcer.cpp
index 3a0d98b7df..9db679c4b7 100644
--- a/ydb/core/formats/arrow/hash/calcer.cpp
+++ b/ydb/core/formats/arrow/hash/calcer.cpp
@@ -9,7 +9,7 @@
namespace NKikimr::NArrow::NHash {
-void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) const {
+void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using T = typename TWrap::T;
diff --git a/ydb/core/formats/arrow/hash/calcer.h b/ydb/core/formats/arrow/hash/calcer.h
index 9afd2020f9..066ba0cb13 100644
--- a/ydb/core/formats/arrow/hash/calcer.h
+++ b/ydb/core/formats/arrow/hash/calcer.h
@@ -22,7 +22,6 @@ private:
ui64 Seed = 0;
const std::vector<TString> ColumnNames;
const ENoColumnPolicy NoColumnPolicy;
- void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer) const;
std::vector<std::shared_ptr<arrow::Array>> GetColumns(const std::shared_ptr<arrow::RecordBatch>& batch) const;
@@ -30,10 +29,9 @@ public:
TXX64(const std::vector<TString>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);
TXX64(const std::vector<std::string>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);
+ static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const;
-
-
};
}
diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp
new file mode 100644
index 0000000000..61914cb6e0
--- /dev/null
+++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp
@@ -0,0 +1,49 @@
+#include "upsert_index.h"
+#include <util/string/type.h>
+#include <library/cpp/json/json_reader.h>
+
+namespace NKikimr::NKqp {
+
+TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) {
+ {
+ auto fValue = features.Extract("NAME");
+ if (!fValue) {
+ return TConclusionStatus::Fail("can't find alter parameter NAME");
+ }
+ IndexName = *fValue;
+ }
+ TString indexType;
+ {
+ auto fValue = features.Extract("TYPE");
+ if (!fValue) {
+ return TConclusionStatus::Fail("can't find alter parameter TYPE");
+ }
+ indexType = *fValue;
+ }
+ {
+ auto fValue = features.Extract("FEATURES");
+ if (!fValue) {
+ return TConclusionStatus::Fail("can't find alter parameter FEATURES");
+ }
+ if (!IndexMetaConstructor.Initialize(indexType)) {
+ return TConclusionStatus::Fail("can't initialize index meta object for type \"" + indexType + "\"");
+ }
+ NJson::TJsonValue jsonData;
+ if (!NJson::ReadJsonFastTree(*fValue, &jsonData)) {
+ return TConclusionStatus::Fail("incorrect json in request FEATURES parameter");
+ }
+ auto result = IndexMetaConstructor->DeserializeFromJson(jsonData);
+ if (result.IsFail()) {
+ return result;
+ }
+ }
+ return TConclusionStatus::Success();
+}
+
+void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
+ auto* indexProto = schemaData.AddUpsertIndexes();
+ indexProto->SetName(IndexName);
+ IndexMetaConstructor.SerializeToProto(*indexProto);
+}
+
+}
diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h
new file mode 100644
index 0000000000..753eade06d
--- /dev/null
+++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h
@@ -0,0 +1,23 @@
+#include "abstract.h"
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h>
+
+namespace NKikimr::NKqp {
+
+class TUpsertIndexOperation : public ITableStoreOperation {
+private:
+ static TString GetTypeName() {
+ return "UPSERT_INDEX";
+ }
+
+ static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
+private:
+ TString IndexName;
+ NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
+public:
+ TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
+
+ void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const override;
+};
+
+}
+
diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
index 5325ed5c49..6e186d5f4f 100644
--- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
+++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
@@ -5,6 +5,7 @@ SRCS(
GLOBAL add_column.cpp
GLOBAL alter_column.cpp
GLOBAL drop_column.cpp
+ GLOBAL upsert_index.cpp
)
PEERDIR(
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 792c4ac4c2..45cb5d406a 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -421,6 +421,40 @@ message TOlapColumnDescription {
optional TDictionaryEncodingSettings DictionaryEncoding = 9;
}
+message TRequestedBloomFilter {
+ optional double FalsePositiveProbability = 1 [default = 0.1];
+ repeated string ColumnNames = 3;
+}
+
+message TOlapIndexRequested {
+ optional string Name = 1;
+ optional TCompressionOptions Compression = 3;
+
+ optional string ClassName = 2;
+ oneof Implementation {
+ TRequestedBloomFilter BloomFilter = 40;
+ }
+}
+
+message TBloomFilter {
+ optional double FalsePositiveProbability = 1 [default = 0.1];
+ optional uint64 MaxBytesCount = 2 [default = 8196];
+ repeated uint32 ColumnIds = 3;
+}
+
+message TOlapIndexDescription {
+ // This id is auto-generated by schemeshard
+ optional uint32 Id = 1;
+
+ optional string Name = 2;
+ optional TCompressionOptions Compression = 3;
+
+ optional string ClassName = 4;
+ oneof Implementation {
+ TBloomFilter BloomFilter = 40;
+ }
+}
+
enum EColumnTableEngine {
COLUMN_ENGINE_NONE = 0;
COLUMN_ENGINE_REPLACING_TIMESERIES = 1;
@@ -456,6 +490,7 @@ message TColumnTableSchema {
optional TCompressionOptions DefaultCompression = 8;
optional bool CompositeMarks = 9 [ default = false ];
+ repeated TOlapIndexDescription Indexes = 10;
}
message TAlterColumnTableSchema {
@@ -463,6 +498,8 @@ message TAlterColumnTableSchema {
//optional TCompressionOptions DefaultCompression = 5;
repeated TOlapColumnDescription DropColumns = 6;
repeated TOlapColumnDiff AlterColumns = 7;
+ repeated TOlapIndexRequested UpsertIndexes = 8;
+ repeated string DropIndexes = 9;
}
// Schema presets are used to manage multiple tables with the same schema
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp
new file mode 100644
index 0000000000..e57c822285
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp
@@ -0,0 +1,10 @@
+#include "abstract.h"
+#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/formats/arrow/hash/xx_hash.h>
+#include <ydb/core/formats/arrow/hash/calcer.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h
new file mode 100644
index 0000000000..cb2cb5665e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h
@@ -0,0 +1,147 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/splitter/chunks.h>
+#include <ydb/core/tx/program/program.h>
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <library/cpp/object_factory/object_factory.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
+#include <util/generic/string.h>
+
+#include <memory>
+#include <vector>
+
+namespace NKikimr::NOlap {
+struct TIndexInfo;
+}
+
+namespace NKikimr::NSchemeShard {
+class TOlapSchema;
+class IErrorCollector;
+}
+
+namespace NKikimr::NOlap::NIndexes {
+
+class IIndexChecker {
+protected:
+ virtual bool DoCheck(std::vector<TString>&& blobs) const = 0;
+public:
+ virtual ~IIndexChecker() = default;
+ bool Check(std::vector<TString>&& blobs) const {
+ return DoCheck(std::move(blobs));
+ }
+};
+
+class TIndexCheckerContainer {
+private:
+ YDB_READONLY(ui32, IndexId, 0);
+ YDB_READONLY_DEF(std::shared_ptr<IIndexChecker>, Object);
+public:
+ TIndexCheckerContainer(const ui32 indexId, const std::shared_ptr<IIndexChecker>& object)
+ : IndexId(indexId)
+ , Object(object) {
+ AFL_VERIFY(IndexId);
+ AFL_VERIFY(Object);
+ }
+
+ const IIndexChecker* operator->() const {
+ return Object.get();
+ }
+};
+
+class IIndexMeta {
+protected:
+ virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0;
+ virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& program) const = 0;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;
+
+public:
+ using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
+ using TProto = NKikimrSchemeOp::TOlapIndexDescription;
+
+ virtual ~IIndexMeta() = default;
+
+ std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
+ return DoBuildIndex(indexId, data, indexInfo);
+ }
+
+ std::shared_ptr<IIndexChecker> BuildIndexChecker(const TProgramContainer& program) const {
+ return DoBuildIndexChecker(program);
+ }
+
+ bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
+ return DoDeserializeFromProto(proto);
+ }
+
+ void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const {
+ return DoSerializeToProto(proto);
+ }
+
+ virtual TString GetClassName() const = 0;
+};
+
+class IIndexMetaConstructor {
+protected:
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
+ virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0;
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0;
+public:
+ using TFactory = NObjectFactory::TObjectFactory<IIndexMetaConstructor, TString>;
+ using TProto = NKikimrSchemeOp::TOlapIndexRequested;
+
+ virtual ~IIndexMetaConstructor() = default;
+
+ TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ return DoDeserializeFromJson(jsonInfo);
+ }
+
+ std::shared_ptr<IIndexMeta> CreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
+ return DoCreateIndexMeta(currentSchema, errors);
+ }
+
+ TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
+ return DoDeserializeFromProto(proto);
+ }
+
+ void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
+ return DoSerializeToProto(proto);
+ }
+
+ virtual TString GetClassName() const = 0;
+};
+
+class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta> {
+private:
+ using TBase = NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta>;
+ YDB_READONLY(ui32, IndexId, 0);
+public:
+ TIndexMetaContainer() = default;
+ TIndexMetaContainer(const ui32 indexId, const std::shared_ptr<IIndexMeta>& object)
+ : TBase(object)
+ , IndexId(indexId)
+ {
+ AFL_VERIFY(IndexId);
+ AFL_VERIFY(Object);
+ }
+
+ bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
+ if (!TBase::DeserializeFromProto(proto)) {
+ return false;
+ }
+ IndexId = proto.GetId();
+ return true;
+ }
+
+ std::optional<TIndexCheckerContainer> BuildIndexChecker(const TProgramContainer& program) const {
+ auto checker = GetObjectPtr()->BuildIndexChecker(program);
+ if (!checker) {
+ return {};
+ }
+ return TIndexCheckerContainer(IndexId, checker);
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp
new file mode 100644
index 0000000000..62b8d5ddc2
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp
@@ -0,0 +1,96 @@
+#include "bloom.h"
+#include <ydb/core/formats/arrow/hash/xx_hash.h>
+#include <ydb/core/formats/arrow/hash/calcer.h>
+#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
+#include <ydb/core/tx/schemeshard/olap/common/common.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
+ std::vector<bool> flags;
+ flags.resize(BitsCount, false);
+ for (ui32 i = 0; i < HashesCount; ++i) {
+ NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i);
+ for (; reader.IsCorrect(); reader.ReadNext()) {
+ hashCalcer.Start();
+ for (auto&& i : reader) {
+ NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer);
+ }
+ flags[hashCalcer.Finish() % BitsCount] = true;
+ }
+ }
+
+ arrow::BooleanBuilder builder;
+ auto res = builder.Reserve(flags.size());
+ NArrow::TStatusValidator::Validate(builder.AppendValues(flags));
+ std::shared_ptr<arrow::BooleanArray> out;
+ NArrow::TStatusValidator::Validate(builder.Finish(&out));
+
+ return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out});
+}
+
+std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
+ std::set<ui32> columnIds;
+ for (auto&& i : ColumnNames) {
+ auto* columnInfo = currentSchema.GetColumns().GetByName(i);
+ if (!columnInfo) {
+ errors.AddError("no column with name " + i);
+ return nullptr;
+ }
+ AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
+ }
+ return std::make_shared<TBloomIndexMeta>(columnIds, FalsePositiveProbability);
+}
+
+NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ if (!jsonInfo.Has("column_names")) {
+ return TConclusionStatus::Fail("column_names have to be in bloom filter features");
+ }
+ const NJson::TJsonValue::TArray* columnNamesArray;
+ if (!jsonInfo["column_names"].GetArrayPointer(&columnNamesArray)) {
+ return TConclusionStatus::Fail("column_names have to be in bloom filter features as array ['column_name_1', ... , 'column_name_N']");
+ }
+ for (auto&& i : *columnNamesArray) {
+ if (!i.IsString()) {
+ return TConclusionStatus::Fail("column_names have to be in bloom filter features as array of strings ['column_name_1', ... , 'column_name_N']");
+ }
+ ColumnNames.emplace(i.GetString());
+ }
+ if (!jsonInfo["false_positive_probability"].IsDouble()) {
+ return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field");
+ }
+ FalsePositiveProbability = jsonInfo["false_positive_probability"].GetDouble();
+ if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) {
+ return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field in interval [0.01, 1)");
+ }
+ return TConclusionStatus::Success();
+}
+
+NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
+ if (!proto.HasBloomFilter()) {
+ const TString errorMessage = "not found BloobFilter section in proto: \"" + proto.DebugString() + "\"";
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage);
+ return TConclusionStatus::Fail(errorMessage);
+ }
+ auto& bFilter = proto.GetBloomFilter();
+ FalsePositiveProbability = bFilter.GetFalsePositiveProbability();
+ if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) {
+ const TString errorMessage = "FalsePositiveProbability have to be in interval[0.01, 1)";
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage);
+ return TConclusionStatus::Fail(errorMessage);
+ }
+ for (auto&& i : bFilter.GetColumnNames()) {
+ ColumnNames.emplace(i);
+ }
+ return TConclusionStatus::Success();
+}
+
+void TBloomIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
+ auto* filterProto = proto.MutableBloomFilter();
+ filterProto->SetFalsePositiveProbability(FalsePositiveProbability);
+ for (auto&& i : ColumnNames) {
+ filterProto->AddColumnNames(i);
+ }
+}
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h
new file mode 100644
index 0000000000..15ed36ff3e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h
@@ -0,0 +1,99 @@
+#pragma once
+#include "abstract.h"
+
+#include <ydb/core/tx/columnshard/splitter/chunks.h>
+#include <ydb/core/tx/program/program.h>
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <library/cpp/object_factory/object_factory.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
+#include <util/generic/string.h>
+
+#include <memory>
+#include <vector>
+
+namespace NKikimr::NOlap::NIndexes {
+
+class TBloomIndexConstructor: public IIndexMetaConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "BLOOM_FILTER";
+ }
+private:
+ std::set<TString> ColumnNames;
+ double FalsePositiveProbability = 0.1;
+ static inline auto Registrator = TFactory::TRegistrator<TBloomIndexConstructor>(GetClassNameStatic());
+protected:
+ virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override;
+
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;
+
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override;
+
+public:
+ TBloomIndexConstructor() = default;
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+class TBloomIndexMeta: public TIndexByColumns {
+public:
+ static TString GetClassNameStatic() {
+ return "BLOOM_FILTER";
+ }
+private:
+ using TBase = TIndexByColumns;
+ std::shared_ptr<arrow::Schema> ResultSchema;
+ const ui64 RowsCountExpectation = 10000;
+ double FalsePositiveProbability = 0.1;
+ ui32 HashesCount = 0;
+ ui32 BitsCount = 0;
+ static inline auto Registrator = TFactory::TRegistrator<TBloomIndexMeta>(GetClassNameStatic());
+ void Initialize() {
+ AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability > 0.01);
+ HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2);
+ BitsCount = RowsCountExpectation * HashesCount / (std::log(2));
+ }
+protected:
+ virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& /*program*/) const override {
+ return nullptr;
+ }
+
+ virtual std::shared_ptr<arrow::RecordBatch> DoBuildIndexImpl(TChunkedBatchReader& reader) const override;
+
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override {
+ AFL_VERIFY(proto.HasBloomFilter());
+ auto& bFilter = proto.GetBloomFilter();
+ FalsePositiveProbability = bFilter.GetFalsePositiveProbability();
+ for (auto&& i : bFilter.GetColumnIds()) {
+ ColumnIds.emplace(i);
+ }
+ Initialize();
+ return true;
+ }
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override {
+ auto* filterProto = proto.MutableBloomFilter();
+ filterProto->SetFalsePositiveProbability(FalsePositiveProbability);
+ for (auto&& i : ColumnIds) {
+ filterProto->AddColumnIds(i);
+ }
+ }
+
+public:
+ TBloomIndexMeta() = default;
+ TBloomIndexMeta(const std::set<ui32>& columnIds, const double fpProbability)
+ : TBase(columnIds)
+ , FalsePositiveProbability(fpProbability) {
+ Initialize();
+ }
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make
new file mode 100644
index 0000000000..138eb4d5e1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ ydb/core/formats/arrow
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make
index bace1130d6..2067cdd542 100644
--- a/ydb/core/tx/columnshard/engines/scheme/ya.make
+++ b/ydb/core/tx/columnshard/engines/scheme/ya.make
@@ -14,6 +14,7 @@ PEERDIR(
ydb/core/formats/arrow
ydb/library/actors/core
+ ydb/core/tx/columnshard/engines/scheme/indexes
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
index a8face4b53..4f87ef33e1 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp
@@ -40,8 +40,7 @@ Y_UNIT_TEST_SUITE(Splitter) {
}
virtual std::shared_ptr<arrow::Field> GetField(const ui32 columnId) const override {
- Y_ABORT_UNLESS(false);
- return nullptr;
+ return std::make_shared<arrow::Field>(GetColumnName(columnId), std::make_shared<arrow::StringType>());
}
virtual ui32 GetColumnId(const std::string& columnName) const override {
diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make
index b3d71f8085..1b18ea3cfa 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ya.make
+++ b/ydb/core/tx/columnshard/splitter/ut/ya.make
@@ -8,6 +8,7 @@ PEERDIR(
ydb/core/tx/columnshard/counters
ydb/core/formats/arrow/compression
+ ydb/core/tx/columnshard/engines/portions
ydb/core/kqp/common
ydb/library/yql/parser/pg_wrapper
ydb/library/yql/public/udf
diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.cpp b/ydb/core/tx/schemeshard/olap/columns/schema.cpp
new file mode 100644
index 0000000000..511fc4736a
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/columns/schema.cpp
@@ -0,0 +1,216 @@
+#include "schema.h"
+#include <ydb/library/accessor/validator.h>
+#include <ydb/library/yql/minikql/mkql_type_ops.h>
+#include <ydb/core/scheme_types/scheme_type_registry.h>
+
+namespace NKikimr::NSchemeShard {
+
+void TOlapColumnSchema::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const {
+ TBase::Serialize(columnSchema);
+ columnSchema.SetId(Id);
+}
+
+void TOlapColumnSchema::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) {
+ TBase::ParseFromLocalDB(columnSchema);
+ Id = columnSchema.GetId();
+}
+
+bool TOlapColumnsDescription::ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId) {
+ if (Columns.empty() && schemaUpdate.GetAddColumns().empty()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, "No add columns specified");
+ return false;
+ }
+
+ const bool hasColumnsBefore = KeyColumnIds.size();
+ std::map<ui32, ui32> orderedKeyColumnIds;
+ for (auto&& column : schemaUpdate.GetAddColumns()) {
+ if (ColumnsByName.contains(column.GetName())) {
+ errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' already exists");
+ return false;
+ }
+ if (hasColumnsBefore) {
+ if (column.IsNotNull()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, "Cannot add new not null column currently (not supported yet)");
+ return false;
+ }
+ if (column.GetKeyOrder()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << column.GetName() << "' is pk column. its impossible to modify pk");
+ return false;
+ }
+ }
+ TOlapColumnSchema newColumn(column, nextEntityId++);
+ if (newColumn.GetKeyOrder()) {
+ Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second);
+ }
+
+ Y_ABORT_UNLESS(ColumnsByName.emplace(newColumn.GetName(), newColumn.GetId()).second);
+ Y_ABORT_UNLESS(Columns.emplace(newColumn.GetId(), std::move(newColumn)).second);
+ }
+
+ for (auto&& columnDiff : schemaUpdate.GetAlterColumns()) {
+ auto it = ColumnsByName.find(columnDiff.GetName());
+ if (it == ColumnsByName.end()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' not exists for altering");
+ return false;
+ } else {
+ auto itColumn = Columns.find(it->second);
+ Y_ABORT_UNLESS(itColumn != Columns.end());
+ TOlapColumnSchema& newColumn = itColumn->second;
+ if (!newColumn.ApplyDiff(columnDiff, errors)) {
+ return false;
+ }
+ }
+ }
+
+ if (KeyColumnIds.empty()) {
+ auto it = orderedKeyColumnIds.begin();
+ for (ui32 i = 0; i < orderedKeyColumnIds.size(); ++i, ++it) {
+ KeyColumnIds.emplace_back(it->second);
+ Y_ABORT_UNLESS(i == it->first);
+ }
+ if (KeyColumnIds.empty()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, "No primary key specified");
+ return false;
+ }
+ }
+
+ for (const auto& columnName : schemaUpdate.GetDropColumns()) {
+ auto columnInfo = GetByName(columnName);
+ if (!columnInfo) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Unknown column for drop: " << columnName);
+ return false;
+ }
+
+ if (columnInfo->IsKeyColumn()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Cannot remove pk column: " << columnName);
+ return false;
+ }
+ ColumnsByName.erase(columnName);
+ Columns.erase(columnInfo->GetId());
+ }
+
+ return true;
+}
+
+void TOlapColumnsDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) {
+ TMap<TString, ui32> keyIndexes;
+ ui32 idx = 0;
+ for (auto&& kName : tableSchema.GetKeyColumnNames()) {
+ keyIndexes[kName] = idx++;
+ }
+
+ TVector<ui32> keyIds;
+ keyIds.resize(tableSchema.GetKeyColumnNames().size(), 0);
+ for (const auto& columnSchema : tableSchema.GetColumns()) {
+ std::optional<ui32> keyOrder;
+ if (keyIndexes.contains(columnSchema.GetName())) {
+ keyOrder = keyIndexes.at(columnSchema.GetName());
+ }
+
+ TOlapColumnSchema column(keyOrder);
+ column.ParseFromLocalDB(columnSchema);
+ if (keyOrder) {
+ Y_ABORT_UNLESS(*keyOrder < keyIds.size());
+ keyIds[*keyOrder] = column.GetId();
+ }
+
+ Y_ABORT_UNLESS(ColumnsByName.emplace(column.GetName(), column.GetId()).second);
+ Y_ABORT_UNLESS(Columns.emplace(column.GetId(), std::move(column)).second);
+ }
+ KeyColumnIds.swap(keyIds);
+}
+
+void TOlapColumnsDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const {
+ for (const auto& column : Columns) {
+ column.second.Serialize(*tableSchema.AddColumns());
+ }
+
+ for (auto&& cId : KeyColumnIds) {
+ auto column = GetById(cId);
+ Y_ABORT_UNLESS(!!column);
+ *tableSchema.AddKeyColumnNames() = column->GetName();
+ }
+}
+
+bool TOlapColumnsDescription::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const {
+ const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
+
+ ui32 lastColumnId = 0;
+ THashSet<ui32> usedColumns;
+ for (const auto& colProto : opSchema.GetColumns()) {
+ if (colProto.GetName().empty()) {
+ errors.AddError("Columns cannot have an empty name");
+ return false;
+ }
+ const TString& colName = colProto.GetName();
+ auto* col = GetByName(colName);
+ if (!col) {
+ errors.AddError("Column '" + colName + "' does not match schema preset");
+ return false;
+ }
+ if (colProto.HasId() && colProto.GetId() != col->GetId()) {
+ errors.AddError("Column '" + colName + "' has id " + colProto.GetId() + " that does not match schema preset");
+ return false;
+ }
+
+ if (!usedColumns.insert(col->GetId()).second) {
+ errors.AddError("Column '" + colName + "' is specified multiple times");
+ return false;
+ }
+ if (col->GetId() < lastColumnId) {
+ errors.AddError("Column order does not match schema preset");
+ return false;
+ }
+ lastColumnId = col->GetId();
+
+ if (colProto.HasTypeId()) {
+ errors.AddError("Cannot set TypeId for column '" + colName + "', use Type");
+ return false;
+ }
+ if (!colProto.HasType()) {
+ errors.AddError("Missing Type for column '" + colName + "'");
+ return false;
+ }
+
+ auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType());
+ const NScheme::IType* type = typeRegistry->GetType(typeName);
+ if (!type || !TOlapColumnAdd::IsAllowedType(type->GetTypeId())) {
+ errors.AddError("Type '" + colProto.GetType() + "' specified for column '" + colName + "' is not supported");
+ return false;
+ }
+ NScheme::TTypeInfo typeInfo(type->GetTypeId());
+
+ if (typeInfo != col->GetType()) {
+ errors.AddError("Type '" + TypeName(typeInfo) + "' specified for column '" + colName + "' does not match schema preset type '" + TypeName(col->GetType()) + "'");
+ return false;
+ }
+ }
+
+ for (auto& pr : Columns) {
+ if (!usedColumns.contains(pr.second.GetId())) {
+ errors.AddError("Specified schema is missing some schema preset columns");
+ return false;
+ }
+ }
+
+ TVector<ui32> keyColumnIds;
+ for (const TString& keyName : opSchema.GetKeyColumnNames()) {
+ auto* col = GetByName(keyName);
+ if (!col) {
+ errors.AddError("Unknown key column '" + keyName + "'");
+ return false;
+ }
+ keyColumnIds.push_back(col->GetId());
+ }
+ if (keyColumnIds != KeyColumnIds) {
+ errors.AddError("Specified schema key columns not matching schema preset");
+ return false;
+ }
+ return true;
+}
+
+const NKikimr::NSchemeShard::TOlapColumnSchema* TOlapColumnsDescription::GetByIdVerified(const ui32 id) const noexcept {
+ return TValidator::CheckNotNull(GetById(id));
+}
+
+}
diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.h b/ydb/core/tx/schemeshard/olap/columns/schema.h
new file mode 100644
index 0000000000..0abec27769
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/columns/schema.h
@@ -0,0 +1,60 @@
+#pragma once
+#include "update.h"
+
+namespace NKikimr::NSchemeShard {
+
+class TOlapColumnSchema: public TOlapColumnAdd {
+private:
+ using TBase = TOlapColumnAdd;
+ YDB_READONLY(ui32, Id, Max<ui32>());
+public:
+ TOlapColumnSchema(const TOlapColumnAdd& base, const ui32 id)
+ : TBase(base)
+ , Id(id) {
+
+ }
+ TOlapColumnSchema(const std::optional<ui32>& keyOrder)
+ : TBase(keyOrder) {
+
+ }
+ void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const;
+ void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema);
+};
+
+class TOlapColumnsDescription {
+public:
+ using TColumn = TOlapColumnSchema;
+ using TColumns = THashMap<ui32, TOlapColumnSchema>;
+ using TColumnsByName = THashMap<TString, ui32>;
+
+private:
+ YDB_READONLY_DEF(TColumns, Columns);
+ YDB_READONLY_DEF(TColumnsByName, ColumnsByName);
+ YDB_READONLY_DEF(TVector<ui32>, KeyColumnIds);
+
+public:
+ const TOlapColumnSchema* GetByName(const TString& name) const noexcept {
+ auto it = ColumnsByName.find(name);
+ if (it != ColumnsByName.end()) {
+ return GetByIdVerified(it->second);
+ }
+ return nullptr;
+ }
+
+ const TOlapColumnSchema* GetById(const ui32 id) const noexcept {
+ auto it = Columns.find(id);
+ if (it != Columns.end()) {
+ return &it->second;
+ }
+ return nullptr;
+ }
+
+ const TOlapColumnSchema* GetByIdVerified(const ui32 id) const noexcept;
+
+ bool ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId);
+
+ void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema);
+ void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const;
+ bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const;
+};
+}
diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp
new file mode 100644
index 0000000000..a6d602b5fa
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp
@@ -0,0 +1,260 @@
+#include "update.h"
+#include <ydb/library/yql/minikql/mkql_type_ops.h>
+#include <ydb/core/scheme/scheme_types_proto.h>
+#include <ydb/core/scheme_types/scheme_type_registry.h>
+
+namespace NKikimr::NSchemeShard {
+
+ bool TOlapColumnAdd::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) {
+ if (!columnSchema.GetName()) {
+ errors.AddError("Columns cannot have an empty name");
+ return false;
+ }
+ Name = columnSchema.GetName();
+ NotNullFlag = columnSchema.GetNotNull();
+ TypeName = columnSchema.GetType();
+ if (columnSchema.HasCompression()) {
+ auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression());
+ if (!compression) {
+ errors.AddError("Cannot parse compression info: " + compression.GetErrorMessage());
+ return false;
+ }
+ Compression = *compression;
+ }
+ if (columnSchema.HasDictionaryEncoding()) {
+ auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding());
+ if (!settings) {
+ errors.AddError("Cannot parse dictionary compression info: " + settings.GetErrorMessage());
+ return false;
+ }
+ DictionaryEncoding = *settings;
+ }
+
+ if (columnSchema.HasTypeId()) {
+ errors.AddError(TStringBuilder() << "Cannot set TypeId for column '" << Name << ", use Type");
+ return false;
+ }
+
+ if (!columnSchema.HasType()) {
+ errors.AddError(TStringBuilder() << "Missing Type for column '" << Name);
+ return false;
+ }
+
+ auto typeName = NMiniKQL::AdaptLegacyYqlType(TypeName);
+ Y_ABORT_UNLESS(AppData()->TypeRegistry);
+ const NScheme::IType* type = AppData()->TypeRegistry->GetType(typeName);
+ if (!type) {
+ errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported");
+ return false;
+ }
+ if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
+ errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported");
+ return false;;
+ }
+ Type = NScheme::TTypeInfo(type->GetTypeId());
+ if (!IsAllowedType(type->GetTypeId())){
+ errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported");
+ return false;
+ }
+ return true;
+ }
+
+ void TOlapColumnAdd::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) {
+ Name = columnSchema.GetName();
+ TypeName = columnSchema.GetType();
+
+ if (columnSchema.HasTypeInfo()) {
+ Type = NScheme::TypeInfoModFromProtoColumnType(
+ columnSchema.GetTypeId(), &columnSchema.GetTypeInfo())
+ .TypeInfo;
+ } else {
+ Type = NScheme::TypeInfoModFromProtoColumnType(
+ columnSchema.GetTypeId(), nullptr)
+ .TypeInfo;
+ }
+ if (columnSchema.HasCompression()) {
+ auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression());
+ Y_ABORT_UNLESS(compression.IsSuccess(), "%s", compression.GetErrorMessage().data());
+ Compression = *compression;
+ }
+ if (columnSchema.HasDictionaryEncoding()) {
+ auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding());
+ Y_ABORT_UNLESS(settings.IsSuccess());
+ DictionaryEncoding = *settings;
+ }
+ NotNullFlag = columnSchema.GetNotNull();
+ }
+
+ void TOlapColumnAdd::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const {
+ columnSchema.SetName(Name);
+ columnSchema.SetType(TypeName);
+ columnSchema.SetNotNull(NotNullFlag);
+ if (Compression) {
+ *columnSchema.MutableCompression() = Compression->SerializeToProto();
+ }
+ if (DictionaryEncoding) {
+ *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto();
+ }
+
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, "");
+ columnSchema.SetTypeId(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *columnSchema.MutableTypeInfo() = *columnType.TypeInfo;
+ }
+ }
+
+ bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) {
+ Y_ABORT_UNLESS(GetName() == diffColumn.GetName());
+ {
+ auto result = diffColumn.GetCompression().Apply(Compression);
+ if (!result) {
+ errors.AddError("Cannot merge compression info: " + result.GetErrorMessage());
+ return false;
+ }
+ }
+ {
+ auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding);
+ if (!result) {
+ errors.AddError("Cannot merge dictionary encoding info: " + result.GetErrorMessage());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool TOlapColumnAdd::IsAllowedType(ui32 typeId) {
+ if (!NScheme::NTypeIds::IsYqlType(typeId)) {
+ return false;
+ }
+
+ switch (typeId) {
+ case NYql::NProto::Bool:
+ case NYql::NProto::Interval:
+ case NYql::NProto::Decimal:
+ case NYql::NProto::DyNumber:
+ return false;
+ default:
+ break;
+ }
+ return true;
+ }
+
+ bool TOlapColumnAdd::IsAllowedFirstPkType(ui32 typeId) {
+ switch (typeId) {
+ case NYql::NProto::Uint8: // Byte
+ case NYql::NProto::Int32:
+ case NYql::NProto::Uint32:
+ case NYql::NProto::Int64:
+ case NYql::NProto::Uint64:
+ case NYql::NProto::String:
+ case NYql::NProto::Utf8:
+ case NYql::NProto::Date:
+ case NYql::NProto::Datetime:
+ case NYql::NProto::Timestamp:
+ return true;
+ case NYql::NProto::Interval:
+ case NYql::NProto::Decimal:
+ case NYql::NProto::DyNumber:
+ case NYql::NProto::Yson:
+ case NYql::NProto::Json:
+ case NYql::NProto::JsonDocument:
+ case NYql::NProto::Float:
+ case NYql::NProto::Double:
+ case NYql::NProto::Bool:
+ return false;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ bool TOlapColumnsUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) {
+ for (const auto& column : alterRequest.GetDropColumns()) {
+ if (!DropColumns.emplace(column.GetName()).second) {
+ errors.AddError(NKikimrScheme::StatusInvalidParameter, "Duplicated column for drop");
+ return false;
+ }
+ }
+ TSet<TString> addColumnNames;
+ for (auto& columnSchema : alterRequest.GetAddColumns()) {
+ TOlapColumnAdd column({});
+ if (!column.ParseFromRequest(columnSchema, errors)) {
+ return false;
+ }
+ if (addColumnNames.contains(column.GetName())) {
+ errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' duplication for add");
+ return false;
+ }
+ addColumnNames.emplace(column.GetName());
+ AddColumns.emplace_back(std::move(column));
+ }
+
+ TSet<TString> alterColumnNames;
+ for (auto& columnSchemaDiff : alterRequest.GetAlterColumns()) {
+ TOlapColumnDiff columnDiff;
+ if (!columnDiff.ParseFromRequest(columnSchemaDiff, errors)) {
+ return false;
+ }
+ if (addColumnNames.contains(columnDiff.GetName())) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' have to be either add or update");
+ return false;
+ }
+ if (alterColumnNames.contains(columnDiff.GetName())) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' duplication for update");
+ return false;
+ }
+ alterColumnNames.emplace(columnDiff.GetName());
+ AlterColumns.emplace_back(std::move(columnDiff));
+ }
+ return true;
+ }
+
+ bool TOlapColumnsUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) {
+ TMap<TString, ui32> keyColumnNames;
+ for (auto&& pkKey : tableSchema.GetKeyColumnNames()) {
+ if (!keyColumnNames.emplace(pkKey, keyColumnNames.size()).second) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Duplicate key column '" << pkKey << "'");
+ return false;
+ }
+ }
+
+ TSet<TString> columnNames;
+ for (auto& columnSchema : tableSchema.GetColumns()) {
+ std::optional<ui32> keyOrder;
+ {
+ auto it = keyColumnNames.find(columnSchema.GetName());
+ if (it != keyColumnNames.end()) {
+ keyOrder = it->second;
+ }
+ }
+
+ TOlapColumnAdd column(keyOrder);
+ if (!column.ParseFromRequest(columnSchema, errors)) {
+ return false;
+ }
+ if (column.GetKeyOrder() && *column.GetKeyOrder() == 0) {
+ if (!TOlapColumnAdd::IsAllowedFirstPkType(column.GetType().GetTypeId())) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder()
+ << "Type '" << column.GetType().GetTypeId() << "' specified for column '" << column.GetName()
+ << "' is not supported in first PK position");
+ return false;
+ }
+ }
+ if (columnNames.contains(column.GetName())) {
+ errors.AddError(NKikimrScheme::StatusMultipleModifications, TStringBuilder() << "Duplicate column '" << column.GetName() << "'");
+ return false;
+ }
+ if (!allowNullKeys) {
+ if (keyColumnNames.contains(column.GetName()) && !column.IsNotNull()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Nullable key column '" << column.GetName() << "'");
+ return false;
+ }
+ }
+ columnNames.emplace(column.GetName());
+ AddColumns.emplace_back(std::move(column));
+ }
+
+ return true;
+ }
+
+}
diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h
new file mode 100644
index 0000000000..45fce0c990
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/columns/update.h
@@ -0,0 +1,72 @@
+#pragma once
+#include <ydb/core/formats/arrow/compression/diff.h>
+#include <ydb/core/formats/arrow/dictionary/diff.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/schemeshard/olap/common/common.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/scheme_types/scheme_type_info.h>
+#include <ydb/core/formats/arrow/compression/object.h>
+#include <ydb/core/formats/arrow/dictionary/object.h>
+
+namespace NKikimr::NSchemeShard {
+
+class TOlapColumnDiff {
+private:
+ YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY_DEF(NArrow::TCompressionDiff, Compression);
+ YDB_READONLY_DEF(NArrow::NDictionary::TEncodingDiff, DictionaryEncoding);
+public:
+ bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) {
+ Name = columnSchema.GetName();
+ if (!Name) {
+ errors.AddError("empty field name");
+ return false;
+ }
+ if (!Compression.DeserializeFromProto(columnSchema.GetCompression())) {
+ errors.AddError("cannot parse compression diff from proto");
+ return false;
+ }
+ if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) {
+ errors.AddError("cannot parse dictionary encoding diff from proto");
+ return false;
+ }
+ return true;
+ }
+};
+
+class TOlapColumnAdd {
+private:
+ YDB_READONLY_DEF(std::optional<ui32>, KeyOrder);
+ YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY_DEF(TString, TypeName);
+ YDB_READONLY_DEF(NScheme::TTypeInfo, Type);
+ YDB_FLAG_ACCESSOR(NotNull, false);
+ YDB_READONLY_DEF(std::optional<NArrow::TCompression>, Compression);
+ YDB_READONLY_DEF(std::optional<NArrow::NDictionary::TEncodingSettings>, DictionaryEncoding);
+public:
+ TOlapColumnAdd(const std::optional<ui32>& keyOrder)
+ : KeyOrder(keyOrder) {
+
+ }
+ bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors);
+ void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema);
+ void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const;
+ bool ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors);
+ bool IsKeyColumn() const {
+ return !!KeyOrder;
+ }
+ static bool IsAllowedType(ui32 typeId);
+ static bool IsAllowedFirstPkType(ui32 typeId);
+};
+
+class TOlapColumnsUpdate {
+private:
+ YDB_READONLY_DEF(TVector<TOlapColumnAdd>, AddColumns);
+ YDB_READONLY_DEF(TSet<TString>, DropColumns);
+ YDB_READONLY_DEF(TVector<TOlapColumnDiff>, AlterColumns);
+public:
+ bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false);
+ bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors);
+};
+
+}
diff --git a/ydb/core/tx/schemeshard/olap/columns/ya.make b/ydb/core/tx/schemeshard/olap/columns/ya.make
new file mode 100644
index 0000000000..44971d7eeb
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/columns/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ schema.cpp
+ update.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ ydb/core/formats/arrow/dictionary
+ ydb/core/formats/arrow/compression
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/schemeshard/olap/common/common.cpp b/ydb/core/tx/schemeshard/olap/common/common.cpp
new file mode 100644
index 0000000000..46bf1133f6
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/common/common.cpp
@@ -0,0 +1,4 @@
+#include "common.h"
+
+namespace NKikimr::NSchemeShard {
+}
diff --git a/ydb/core/tx/schemeshard/olap/common/common.h b/ydb/core/tx/schemeshard/olap/common/common.h
new file mode 100644
index 0000000000..4b82a29a33
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/common/common.h
@@ -0,0 +1,27 @@
+#pragma once
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+
+namespace NKikimr::NSchemeShard {
+
+ class IErrorCollector {
+ public:
+ virtual void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) = 0;
+ virtual void AddError(const TString& errorMsg) = 0;
+ };
+
+ class TProposeErrorCollector : public IErrorCollector {
+ TEvSchemeShard::TEvModifySchemeTransactionResult& TxResult;
+ public:
+ TProposeErrorCollector(TEvSchemeShard::TEvModifySchemeTransactionResult& txResult)
+ : TxResult(txResult)
+ {}
+
+ void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) override {
+ TxResult.SetError(errorStatus, errorMsg);
+ }
+
+ void AddError(const TString& errorMsg) override {
+ TxResult.SetError(NKikimrScheme::StatusSchemeError, errorMsg);
+ }
+ };
+}
diff --git a/ydb/core/tx/schemeshard/olap/common/ya.make b/ydb/core/tx/schemeshard/olap/common/ya.make
new file mode 100644
index 0000000000..58bbf1cb44
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/common/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ common.cpp
+)
+
+PEERDIR(
+ ydb/library/ydb_issue
+ ydb/core/base
+)
+
+END()
diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
new file mode 100644
index 0000000000..e715b13496
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
@@ -0,0 +1,113 @@
+#include "schema.h"
+#include <ydb/library/accessor/validator.h>
+
+namespace NKikimr::NSchemeShard {
+
+void TOlapIndexSchema::SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& indexSchema) const {
+ indexSchema.SetId(Id);
+ indexSchema.SetName(Name);
+ IndexMeta.SerializeToProto(indexSchema);
+}
+
+void TOlapIndexSchema::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& indexSchema) {
+ Id = indexSchema.GetId();
+ Name = indexSchema.GetName();
+ AFL_VERIFY(IndexMeta.DeserializeFromProto(indexSchema))("incorrect_proto", indexSchema.DebugString());
+}
+
+bool TOlapIndexesDescription::ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexesUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId) {
+ for (auto&& index : schemaUpdate.GetUpsertIndexes()) {
+ auto* currentIndex = MutableByName(index.GetName());
+ if (currentIndex) {
+ if (!currentIndex->ApplyUpdate(currentSchema, index, errors)) {
+ return false;
+ }
+ } else {
+ auto meta = index.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors);
+ if (!meta) {
+ return false;
+ }
+ const ui32 id = nextEntityId++;
+ TOlapIndexSchema newIndex(id, index.GetName(), meta);
+ Y_ABORT_UNLESS(IndexesByName.emplace(index.GetName(), id).second);
+ Y_ABORT_UNLESS(Indexes.emplace(id, std::move(newIndex)).second);
+ }
+ }
+
+ for (const auto& name : schemaUpdate.GetDropIndexes()) {
+ auto info = GetByName(name);
+ if (!info) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Unknown index for drop: " << name);
+ return false;
+ }
+ AFL_VERIFY(IndexesByName.erase(name));
+ AFL_VERIFY(Indexes.erase(info->GetId()));
+ }
+
+ return true;
+}
+
+void TOlapIndexesDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) {
+ for (const auto& indexProto : tableSchema.GetIndexes()) {
+ TOlapIndexSchema index;
+ index.DeserializeFromProto(indexProto);
+ Y_ABORT_UNLESS(IndexesByName.emplace(indexProto.GetName(), indexProto.GetId()).second);
+ Y_ABORT_UNLESS(Indexes.emplace(indexProto.GetId(), std::move(index)).second);
+ }
+}
+
+void TOlapIndexesDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const {
+ for (const auto& index : Indexes) {
+ index.second.SerializeToProto(*tableSchema.AddIndexes());
+ }
+}
+
+bool TOlapIndexesDescription::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const {
+ THashSet<ui32> usedIndexes;
+ ui32 lastIdx = 0;
+ for (const auto& proto : opSchema.GetIndexes()) {
+ if (proto.GetName().empty()) {
+ errors.AddError("Index cannot have an empty name");
+ return false;
+ }
+ const TString& name = proto.GetName();
+ auto* index = GetByName(name);
+ if (!index) {
+ errors.AddError("Index '" + name + "' does not match schema preset");
+ return false;
+ }
+ if (proto.HasId() && proto.GetId() != index->GetId()) {
+ errors.AddError("Index '" + name + "' has id " + proto.GetId() + " that does not match schema preset");
+ return false;
+ }
+
+ if (!usedIndexes.insert(index->GetId()).second) {
+ errors.AddError("Column '" + name + "' is specified multiple times");
+ return false;
+ }
+ if (index->GetId() < lastIdx) {
+ errors.AddError("Index order does not match schema preset");
+ return false;
+ }
+ lastIdx = index->GetId();
+
+ }
+
+ for (auto& pr : Indexes) {
+ if (!usedIndexes.contains(pr.second.GetId())) {
+ errors.AddError("Specified schema is missing some schema preset indexes");
+ return false;
+ }
+ }
+ return true;
+}
+
+const NKikimr::NSchemeShard::TOlapIndexSchema* TOlapIndexesDescription::GetByIdVerified(const ui32 id) const noexcept {
+ return TValidator::CheckNotNull(GetById(id));
+}
+
+NKikimr::NSchemeShard::TOlapIndexSchema* TOlapIndexesDescription::MutableByIdVerified(const ui32 id) noexcept {
+ return TValidator::CheckNotNull(MutableById(id));
+}
+
+}
diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.h b/ydb/core/tx/schemeshard/olap/indexes/schema.h
new file mode 100644
index 0000000000..d5aaa8573a
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/indexes/schema.h
@@ -0,0 +1,93 @@
+#pragma once
+#include "update.h"
+
+namespace NKikimr::NSchemeShard {
+
+class TOlapIndexSchema {
+private:
+ using TBase = TOlapIndexUpsert;
+ YDB_READONLY(ui32, Id, Max<ui32>());
+ YDB_READONLY_DEF(TString, Name);
+ NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta> IndexMeta;
+public:
+ TOlapIndexSchema() = default;
+
+ TOlapIndexSchema(const ui32 id, const TString& name, const std::shared_ptr<NOlap::NIndexes::IIndexMeta>& meta)
+ : Id(id)
+ , Name(name)
+ , IndexMeta(meta)
+ {
+
+ }
+
+ bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors) {
+ AFL_VERIFY(upsert.GetName() == GetName());
+ AFL_VERIFY(!!upsert.GetIndexConstructor());
+ if (upsert.GetIndexConstructor().GetClassName() != IndexMeta.GetClassName()) {
+ errors.AddError("different index classes: " + upsert.GetIndexConstructor().GetClassName() + " vs " + IndexMeta.GetClassName());
+ return false;
+ }
+ auto object = upsert.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors);
+ if (!object) {
+ return false;
+ }
+ IndexMeta = NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta>(object);
+ return true;
+ }
+
+ void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& indexSchema) const;
+ void DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& indexSchema);
+};
+
+class TOlapIndexesDescription {
+public:
+ using TIndex = TOlapIndexSchema;
+ using TIndexes = THashMap<ui32, TOlapIndexSchema>;
+ using TIndexesByName = THashMap<TString, ui32>;
+
+private:
+ YDB_READONLY_DEF(TIndexes, Indexes);
+ YDB_READONLY_DEF(TIndexesByName, IndexesByName);
+public:
+ const TOlapIndexSchema* GetByName(const TString& name) const noexcept {
+ auto it = IndexesByName.find(name);
+ if (it != IndexesByName.end()) {
+ return GetByIdVerified(it->second);
+ }
+ return nullptr;
+ }
+
+ TOlapIndexSchema* MutableByName(const TString& name) noexcept {
+ auto it = IndexesByName.find(name);
+ if (it != IndexesByName.end()) {
+ return MutableByIdVerified(it->second);
+ }
+ return nullptr;
+ }
+
+ const TOlapIndexSchema* GetById(const ui32 id) const noexcept {
+ auto it = Indexes.find(id);
+ if (it != Indexes.end()) {
+ return &it->second;
+ }
+ return nullptr;
+ }
+
+ TOlapIndexSchema* MutableById(const ui32 id) noexcept {
+ auto it = Indexes.find(id);
+ if (it != Indexes.end()) {
+ return &it->second;
+ }
+ return nullptr;
+ }
+
+ const TOlapIndexSchema* GetByIdVerified(const ui32 id) const noexcept;
+ TOlapIndexSchema* MutableByIdVerified(const ui32 id) noexcept;
+
+ bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexesUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId);
+
+ void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema);
+ void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const;
+ bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const;
+};
+}
diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.cpp b/ydb/core/tx/schemeshard/olap/indexes/update.cpp
new file mode 100644
index 0000000000..596cba4b83
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/indexes/update.cpp
@@ -0,0 +1,34 @@
+#include "update.h"
+
+namespace NKikimr::NSchemeShard {
+
+void TOlapIndexUpsert::SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& requestedProto) const {
+ requestedProto.SetName(Name);
+ IndexConstructor.SerializeToProto(requestedProto);
+}
+
+void TOlapIndexUpsert::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& indexSchema) {
+ Name = indexSchema.GetName();
+ AFL_VERIFY(IndexConstructor.DeserializeFromProto(indexSchema))("incorrect_proto", indexSchema.DebugString());
+}
+
+bool TOlapIndexesUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) {
+ for (const auto& indexName : alterRequest.GetDropIndexes()) {
+ if (!DropIndexes.emplace(indexName).second) {
+ errors.AddError(NKikimrScheme::StatusInvalidParameter, "Duplicated index for drop");
+ return false;
+ }
+ }
+ TSet<TString> upsertIndexNames;
+ for (auto& indexSchema : alterRequest.GetUpsertIndexes()) {
+ TOlapIndexUpsert index;
+ index.DeserializeFromProto(indexSchema);
+ if (!upsertIndexNames.emplace(index.GetName()).second) {
+ errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "index '" << index.GetName() << "' duplication for add");
+ return false;
+ }
+ UpsertIndexes.emplace_back(std::move(index));
+ }
+ return true;
+}
+}
diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.h b/ydb/core/tx/schemeshard/olap/indexes/update.h
new file mode 100644
index 0000000000..0f1db75b0b
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/indexes/update.h
@@ -0,0 +1,34 @@
+#pragma once
+#include <ydb/services/bg_tasks/abstract/interface.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/schemeshard/olap/common/common.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h>
+
+namespace NKikimr::NSchemeShard {
+
+ class TOlapIndexUpsert {
+ private:
+ YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY_DEF(TString, TypeName);
+ protected:
+ NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexConstructor;
+ public:
+ TOlapIndexUpsert() = default;
+
+ const NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor>& GetIndexConstructor() const {
+ return IndexConstructor;
+ }
+
+ void DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& requestedProto);
+ void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& requestedProto) const;
+ };
+
+ class TOlapIndexesUpdate {
+ private:
+ YDB_READONLY_DEF(TVector<TOlapIndexUpsert>, UpsertIndexes);
+ YDB_READONLY_DEF(TSet<TString>, DropIndexes);
+ public:
+ bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors);
+ };
+}
diff --git a/ydb/core/tx/schemeshard/olap/indexes/ya.make b/ydb/core/tx/schemeshard/olap/indexes/ya.make
new file mode 100644
index 0000000000..0303a9692f
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/indexes/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ schema.cpp
+ update.cpp
+)
+
+PEERDIR(
+ ydb/services/bg_tasks/abstract
+)
+
+END()
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
index bf09d60f90..84ccbf315e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
@@ -1,7 +1,6 @@
-#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
-#include "schemeshard_impl.h"
-#include "schemeshard_olap_types.h"
+#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h>
+#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
namespace {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp
index 7690138e38..3dcfeda8b5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp
@@ -1,7 +1,6 @@
-#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
-#include "schemeshard_olap_types.h"
-#include "schemeshard_impl.h"
+#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h>
+#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <ydb/core/scheme/scheme_types_proto.h>
@@ -88,7 +87,7 @@ public:
}
TOlapSchema currentSchema;
- currentSchema.Parse(*tableSchema);
+ currentSchema.ParseFromLocalDB(*tableSchema);
if (!storeInfo) {
TOlapSchemaUpdate schemaUpdate;
if (!schemaUpdate.Parse(AlterRequest.GetAlterSchema(), errors)) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
index 26704eed4b..a50f1193b1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
@@ -1,7 +1,6 @@
-#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
-#include "schemeshard_impl.h"
-#include "schemeshard_olap_types.h"
+#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h>
+#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <ydb/core/base/subdomain.h>
#include <ydb/core/scheme/scheme_types_proto.h>
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
index 9138e379a9..b2519885d4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
@@ -1,7 +1,6 @@
-#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
-#include "schemeshard_olap_types.h"
-#include "schemeshard_impl.h"
+#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h>
+#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <ydb/core/base/subdomain.h>
#include <ydb/core/tx/columnshard/columnshard.h>
@@ -109,7 +108,7 @@ private:
return false;
}
for (const TString& columnName : sharding.GetColumns()) {
- auto* pColumn = schema.GetColumnByName(columnName);
+ auto* pColumn = schema.GetColumns().GetByName(columnName);
if (!pColumn) {
errors.AddError(Sprintf("Hash sharding is using an unknown column '%s'", columnName.c_str()));
return false;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/olap/operations/drop_store.cpp
index 7ea837dedf..112e42e864 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/drop_store.cpp
@@ -1,6 +1,6 @@
-#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
-#include "schemeshard_impl.h"
+#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h>
+#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <ydb/core/base/subdomain.h>
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
index 7bf255f578..d1df41baed 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
@@ -1,6 +1,6 @@
-#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
-#include "schemeshard_impl.h"
+#include <ydb/core/tx/schemeshard/schemeshard__operation_part.h>
+#include <ydb/core/tx/schemeshard/schemeshard__operation_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <ydb/core/base/subdomain.h>
#include <ydb/core/tx/tiering/cleaner_task.h>
diff --git a/ydb/core/tx/schemeshard/olap/operations/ya.make b/ydb/core/tx/schemeshard/olap/operations/ya.make
new file mode 100644
index 0000000000..4e9765d304
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/operations/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+SRCS(
+ create_table.cpp
+ drop_table.cpp
+ alter_table.cpp
+ create_store.cpp
+ drop_store.cpp
+ alter_store.cpp
+)
+
+PEERDIR(
+ ydb/core/mind/hive
+ ydb/services/bg_tasks
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.cpp b/ydb/core/tx/schemeshard/olap/schema/schema.cpp
new file mode 100644
index 0000000000..ff7ba1bbe4
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/schema/schema.cpp
@@ -0,0 +1,93 @@
+#include "schema.h"
+
+namespace NKikimr::NSchemeShard {
+
+ bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) {
+ if (!Columns.ApplyUpdate(schemaUpdate.GetColumns(), errors, NextColumnId)) {
+ return false;
+ }
+
+ if (!Indexes.ApplyUpdate(*this, schemaUpdate.GetIndexes(), errors, NextColumnId)) {
+ return false;
+ }
+
+ if (!HasEngine()) {
+ Engine = schemaUpdate.GetEngineDef(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
+ } else {
+ if (schemaUpdate.HasEngine()) {
+ errors.AddError(NKikimrScheme::StatusSchemeError, "No engine updates supported");
+ return false;
+ }
+ }
+
+ ++Version;
+ return true;
+ }
+
+ void TOlapSchema::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) {
+ NextColumnId = tableSchema.GetNextColumnId();
+ Version = tableSchema.GetVersion();
+ Y_ABORT_UNLESS(tableSchema.HasEngine());
+ Engine = tableSchema.GetEngine();
+ CompositeMarksFlag = tableSchema.GetCompositeMarks();
+
+ Columns.Parse(tableSchema);
+ Indexes.Parse(tableSchema);
+ }
+
+ void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const {
+ tableSchema.SetNextColumnId(NextColumnId);
+ tableSchema.SetVersion(Version);
+ tableSchema.SetCompositeMarks(CompositeMarksFlag);
+
+ Y_ABORT_UNLESS(HasEngine());
+ tableSchema.SetEngine(GetEngineUnsafe());
+
+ Columns.Serialize(tableSchema);
+ Indexes.Serialize(tableSchema);
+ }
+
+ bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const {
+ if (!Columns.Validate(opSchema, errors)) {
+ return false;
+ }
+
+ if (!Indexes.Validate(opSchema, errors)) {
+ return false;
+ }
+
+ if (opSchema.GetEngine() != Engine) {
+ errors.AddError("Specified schema engine does not match schema preset");
+ return false;
+ }
+ return true;
+ }
+
+ void TOlapStoreSchemaPreset::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) {
+ Y_ABORT_UNLESS(presetProto.HasId());
+ Y_ABORT_UNLESS(presetProto.HasName());
+ Y_ABORT_UNLESS(presetProto.HasSchema());
+ Id = presetProto.GetId();
+ Name = presetProto.GetName();
+ TOlapSchema::ParseFromLocalDB(presetProto.GetSchema());
+ }
+
+ void TOlapStoreSchemaPreset::Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const {
+ presetProto.SetId(Id);
+ presetProto.SetName(Name);
+ TOlapSchema::Serialize(*presetProto.MutableSchema());
+ }
+
+ bool TOlapStoreSchemaPreset::ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors) {
+ if (presetProto.HasId()) {
+ errors.AddError("Schema preset id cannot be specified explicitly");
+ return false;
+ }
+ if (!presetProto.GetName()) {
+ errors.AddError("Schema preset name cannot be empty");
+ return false;
+ }
+ Name = presetProto.GetName();
+ return true;
+ }
+}
diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.h b/ydb/core/tx/schemeshard/olap/schema/schema.h
new file mode 100644
index 0000000000..ca9e8b14c6
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/schema/schema.h
@@ -0,0 +1,40 @@
+#pragma once
+#include <ydb/core/tx/schemeshard/olap/columns/update.h>
+#include <ydb/core/tx/schemeshard/olap/indexes/update.h>
+#include <ydb/core/tx/schemeshard/olap/columns/schema.h>
+#include <ydb/core/tx/schemeshard/olap/indexes/schema.h>
+#include "update.h"
+
+namespace NKikimr::NSchemeShard {
+
+ class TOlapSchema {
+ private:
+ YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine);
+ YDB_READONLY_DEF(TOlapColumnsDescription, Columns);
+ YDB_READONLY_DEF(TOlapIndexesDescription, Indexes);
+
+ YDB_READONLY(ui32, NextColumnId, 1);
+ YDB_READONLY(ui32, Version, 0);
+ YDB_READONLY_FLAG(CompositeMarks, true);
+
+ public:
+ bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors);
+
+ void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema);
+ void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const;
+ bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const;
+ bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const;
+ };
+
+ class TOlapStoreSchemaPreset : public TOlapSchema {
+ private:
+ using TBase = TOlapSchema;
+ YDB_ACCESSOR_DEF(TString, Name);
+ YDB_ACCESSOR_DEF(ui32, Id);
+ YDB_ACCESSOR(size_t, ProtoIndex, -1); // Preset index in the olap store description
+ public:
+ void Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const;
+ void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto);
+ bool ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors);
+ };
+}
diff --git a/ydb/core/tx/schemeshard/olap/schema/update.cpp b/ydb/core/tx/schemeshard/olap/schema/update.cpp
new file mode 100644
index 0000000000..92d7d9038f
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/schema/update.cpp
@@ -0,0 +1,28 @@
+#include "schema.h"
+
+namespace NKikimr::NSchemeShard {
+
+ bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) {
+ if (tableSchema.HasEngine()) {
+ Engine = tableSchema.GetEngine();
+ }
+
+ if (!Columns.Parse(tableSchema, errors, allowNullKeys)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) {
+ if (!Columns.Parse(alterRequest, errors)) {
+ return false;
+ }
+
+ if (!Indexes.Parse(alterRequest, errors)) {
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/ydb/core/tx/schemeshard/olap/schema/update.h b/ydb/core/tx/schemeshard/olap/schema/update.h
new file mode 100644
index 0000000000..bb4173433f
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/schema/update.h
@@ -0,0 +1,13 @@
+#pragma once
+
+namespace NKikimr::NSchemeShard {
+
+ class TOlapSchemaUpdate {
+ YDB_READONLY_DEF(TOlapColumnsUpdate, Columns);
+ YDB_READONLY_DEF(TOlapIndexesUpdate, Indexes);
+ YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine);
+ public:
+ bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false);
+ bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors);
+ };
+}
diff --git a/ydb/core/tx/schemeshard/olap/schema/ya.make b/ydb/core/tx/schemeshard/olap/schema/ya.make
new file mode 100644
index 0000000000..9c21a43bbb
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/schema/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ schema.cpp
+ update.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/schemeshard/olap/columns
+ ydb/core/tx/schemeshard/olap/indexes
+)
+
+END()
diff --git a/ydb/core/tx/schemeshard/olap/ya.make b/ydb/core/tx/schemeshard/olap/ya.make
new file mode 100644
index 0000000000..63e509c263
--- /dev/null
+++ b/ydb/core/tx/schemeshard/olap/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+PEERDIR(
+ ydb/core/tx/schemeshard/olap/columns
+ ydb/core/tx/schemeshard/olap/indexes
+ ydb/core/tx/schemeshard/olap/schema
+ ydb/core/tx/schemeshard/olap/common
+ ydb/core/tx/schemeshard/olap/operations
+)
+
+END()
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 0f3cc648ce..830b726d91 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -2305,7 +2305,7 @@ TColumnTableInfo::TColumnTableInfo(
if (Description.HasSchema()) {
TOlapSchema schema;
- schema.Parse(Description.GetSchema());
+ schema.ParseFromLocalDB(Description.GetSchema());
}
ColumnShards.reserve(Sharding.GetColumnShards().size());
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 7a7491c0a8..769a5dfa79 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -5,8 +5,9 @@
#include "schemeshard_tx_infly.h"
#include "schemeshard_path_element.h"
#include "schemeshard_identificators.h"
-#include "schemeshard_olap_types.h"
#include "schemeshard_schema.h"
+#include "olap/schema/schema.h"
+#include "olap/schema/update.h"
#include <ydb/core/tx/message_seqno.h>
#include <ydb/core/tx/datashard/datashard.h>
diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp
deleted file mode 100644
index 2b67e0797f..0000000000
--- a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp
+++ /dev/null
@@ -1,522 +0,0 @@
-#include "schemeshard_olap_types.h"
-#include <ydb/library/yql/minikql/mkql_type_ops.h>
-#include <ydb/core/scheme_types/scheme_type_registry.h>
-
-namespace NKikimr::NSchemeShard {
-
- void TOlapColumnSchema::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const {
- TBase::Serialize(columnSchema);
- columnSchema.SetId(Id);
- }
-
- void TOlapColumnSchema::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) {
- TBase::ParseFromLocalDB(columnSchema);
- Id = columnSchema.GetId();
- }
-
- bool TOlapColumnAdd::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) {
- if (!columnSchema.GetName()) {
- errors.AddError("Columns cannot have an empty name");
- return false;
- }
- Name = columnSchema.GetName();
- NotNullFlag = columnSchema.GetNotNull();
- TypeName = columnSchema.GetType();
- if (columnSchema.HasCompression()) {
- auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression());
- if (!compression) {
- errors.AddError("Cannot parse compression info: " + compression.GetErrorMessage());
- return false;
- }
- Compression = *compression;
- }
- if (columnSchema.HasDictionaryEncoding()) {
- auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding());
- if (!settings) {
- errors.AddError("Cannot parse dictionary compression info: " + settings.GetErrorMessage());
- return false;
- }
- DictionaryEncoding = *settings;
- }
-
- if (columnSchema.HasTypeId()) {
- errors.AddError(TStringBuilder() << "Cannot set TypeId for column '" << Name << ", use Type");
- return false;
- }
-
- if (!columnSchema.HasType()) {
- errors.AddError(TStringBuilder() << "Missing Type for column '" << Name);
- return false;
- }
-
- auto typeName = NMiniKQL::AdaptLegacyYqlType(TypeName);
- Y_ABORT_UNLESS(AppData()->TypeRegistry);
- const NScheme::IType* type =
- AppData()->TypeRegistry->GetType(typeName);
- if (!type) {
- errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported");
- return false;
- }
- if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
- errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported");
- return false;;
- }
- Type = NScheme::TTypeInfo(type->GetTypeId());
- if (!TOlapSchema::IsAllowedType(type->GetTypeId())){
- errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported");
- return false;
- }
- return true;
- }
-
- void TOlapColumnAdd::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) {
- Name = columnSchema.GetName();
- TypeName = columnSchema.GetType();
-
- if (columnSchema.HasTypeInfo()) {
- Type = NScheme::TypeInfoModFromProtoColumnType(
- columnSchema.GetTypeId(), &columnSchema.GetTypeInfo())
- .TypeInfo;
- } else {
- Type = NScheme::TypeInfoModFromProtoColumnType(
- columnSchema.GetTypeId(), nullptr)
- .TypeInfo;
- }
- if (columnSchema.HasCompression()) {
- auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression());
- Y_ABORT_UNLESS(compression.IsSuccess(), "%s", compression.GetErrorMessage().data());
- Compression = *compression;
- }
- if (columnSchema.HasDictionaryEncoding()) {
- auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding());
- Y_ABORT_UNLESS(settings.IsSuccess());
- DictionaryEncoding = *settings;
- }
- NotNullFlag = columnSchema.GetNotNull();
- }
-
- void TOlapColumnAdd::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const {
- columnSchema.SetName(Name);
- columnSchema.SetType(TypeName);
- columnSchema.SetNotNull(NotNullFlag);
- if (Compression) {
- *columnSchema.MutableCompression() = Compression->SerializeToProto();
- }
- if (DictionaryEncoding) {
- *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto();
- }
-
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, "");
- columnSchema.SetTypeId(columnType.TypeId);
- if (columnType.TypeInfo) {
- *columnSchema.MutableTypeInfo() = *columnType.TypeInfo;
- }
- }
-
- bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) {
- Y_ABORT_UNLESS(GetName() == diffColumn.GetName());
- {
- auto result = diffColumn.GetCompression().Apply(Compression);
- if (!result) {
- errors.AddError("Cannot merge compression info: " + result.GetErrorMessage());
- return false;
- }
- }
- {
- auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding);
- if (!result) {
- errors.AddError("Cannot merge dictionary encoding info: " + result.GetErrorMessage());
- return false;
- }
- }
- return true;
- }
-
- bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) {
- if (tableSchema.HasEngine()) {
- Engine = tableSchema.GetEngine();
- }
-
- TMap<TString, ui32> keyColumnNames;
- for (auto&& pkKey : tableSchema.GetKeyColumnNames()) {
- if (!keyColumnNames.emplace(pkKey, keyColumnNames.size()).second) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Duplicate key column '" << pkKey << "'");
- return false;
- }
- }
-
- TSet<TString> columnNames;
- for (auto& columnSchema : tableSchema.GetColumns()) {
- std::optional<ui32> keyOrder;
- {
- auto it = keyColumnNames.find(columnSchema.GetName());
- if (it != keyColumnNames.end()) {
- keyOrder = it->second;
- }
- }
-
- TOlapColumnAdd column(keyOrder);
- if (!column.ParseFromRequest(columnSchema, errors)) {
- return false;
- }
- if (column.GetKeyOrder() && *column.GetKeyOrder() == 0) {
- if (!TOlapSchema::IsAllowedFirstPkType(column.GetType().GetTypeId())) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder()
- << "Type '" << column.GetType().GetTypeId() << "' specified for column '" << column.GetName()
- << "' is not supported in first PK position");
- return false;
- }
- }
- if (columnNames.contains(column.GetName())) {
- errors.AddError(NKikimrScheme::StatusMultipleModifications, TStringBuilder() << "Duplicate column '" << column.GetName() << "'");
- return false;
- }
- if (!allowNullKeys) {
- if (keyColumnNames.contains(column.GetName()) && !column.IsNotNull()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Nullable key column '" << column.GetName() << "'");
- return false;
- }
- }
- columnNames.emplace(column.GetName());
- AddColumns.emplace_back(std::move(column));
- }
- return true;
- }
-
- bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) {
- for (const auto& column: alterRequest.GetDropColumns()) {
- if (!DropColumns.emplace(column.GetName()).second) {
- errors.AddError(NKikimrScheme::StatusInvalidParameter, "Duplicated column for drop");
- return false;
- }
- }
- TSet<TString> addColumnNames;
- for (auto& columnSchema : alterRequest.GetAddColumns()) {
- TOlapColumnAdd column({});
- if (!column.ParseFromRequest(columnSchema, errors)) {
- return false;
- }
- if (addColumnNames.contains(column.GetName())) {
- errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' duplication for add");
- return false;
- }
- addColumnNames.emplace(column.GetName());
- AddColumns.emplace_back(std::move(column));
- }
-
- TSet<TString> alterColumnNames;
- for (auto& columnSchemaDiff: alterRequest.GetAlterColumns()) {
- TOlapColumnDiff columnDiff;
- if (!columnDiff.ParseFromRequest(columnSchemaDiff, errors)) {
- return false;
- }
- if (addColumnNames.contains(columnDiff.GetName())) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' have to be either add or update");
- return false;
- }
- if (alterColumnNames.contains(columnDiff.GetName())) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' duplication for update");
- return false;
- }
- alterColumnNames.emplace(columnDiff.GetName());
- AlterColumns.emplace_back(std::move(columnDiff));
- }
- return true;
- }
-
- bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) {
- if (Columns.empty() && schemaUpdate.GetAddColumns().empty()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, "No add columns specified");
- return false;
- }
-
- if (!HasEngine()) {
- Engine = schemaUpdate.GetEngineDef(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
- } else {
- if (schemaUpdate.HasEngine()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, "No engine updates supported");
- return false;
- }
- }
-
- const bool hasColumnsBefore = KeyColumnIds.size();
- std::map<ui32, ui32> orderedKeyColumnIds;
- for (auto&& column : schemaUpdate.GetAddColumns()) {
- if (ColumnsByName.contains(column.GetName())) {
- errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column '" << column.GetName() << "' already exists");
- return false;
- }
- if (hasColumnsBefore) {
- if (column.IsNotNull()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, "Cannot add new not null column currently (not supported yet)");
- return false;
- }
- if (column.GetKeyOrder()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << column.GetName() << "' is pk column. its impossible to modify pk");
- return false;
- }
- }
- TOlapColumnSchema newColumn(column, NextColumnId++);
- if (newColumn.GetKeyOrder()) {
- Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second);
- }
-
- Y_ABORT_UNLESS(ColumnsByName.emplace(newColumn.GetName(), newColumn.GetId()).second);
- Y_ABORT_UNLESS(Columns.emplace(newColumn.GetId(), std::move(newColumn)).second);
- }
-
- for (auto&& columnDiff : schemaUpdate.GetAlterColumns()) {
- auto it = ColumnsByName.find(columnDiff.GetName());
- if (it == ColumnsByName.end()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "column '" << columnDiff.GetName() << "' not exists for altering");
- return false;
- } else {
- auto itColumn = Columns.find(it->second);
- Y_ABORT_UNLESS(itColumn != Columns.end());
- TOlapColumnSchema& newColumn = itColumn->second;
- if (!newColumn.ApplyDiff(columnDiff, errors)) {
- return false;
- }
- }
- }
-
- if (KeyColumnIds.empty()) {
- auto it = orderedKeyColumnIds.begin();
- for (ui32 i = 0; i < orderedKeyColumnIds.size(); ++i, ++it) {
- KeyColumnIds.emplace_back(it->second);
- Y_ABORT_UNLESS(i == it->first);
- }
- if (KeyColumnIds.empty()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, "No primary key specified");
- return false;
- }
- }
-
- for (const auto& columnName : schemaUpdate.GetDropColumns()) {
- auto columnInfo = GetColumnByName(columnName);
- if (!columnInfo) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Unknown column for drop: " << columnName);
- return false;
- }
-
- if (columnInfo->IsKeyColumn()) {
- errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Cannot remove pk column: " << columnName);
- return false;
- }
- ColumnsByName.erase(columnName);
- Columns.erase(columnInfo->GetId());
- }
-
- ++Version;
- return true;
- }
-
- void TOlapSchema::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) {
- NextColumnId = tableSchema.GetNextColumnId();
- Version = tableSchema.GetVersion();
- Y_ABORT_UNLESS(tableSchema.HasEngine());
- Engine = tableSchema.GetEngine();
- CompositeMarksFlag = tableSchema.GetCompositeMarks();
-
- TMap<TString, ui32> keyIndexes;
- ui32 idx = 0;
- for (auto&& kName : tableSchema.GetKeyColumnNames()) {
- keyIndexes[kName] = idx++;
- }
-
- TVector<ui32> keyIds;
- keyIds.resize(tableSchema.GetKeyColumnNames().size(), 0);
- for (const auto& columnSchema : tableSchema.GetColumns()) {
- std::optional<ui32> keyOrder;
- if (keyIndexes.contains(columnSchema.GetName())) {
- keyOrder = keyIndexes.at(columnSchema.GetName());
- }
-
- TOlapColumnSchema column(keyOrder);
- column.ParseFromLocalDB(columnSchema);
- if (keyOrder) {
- Y_ABORT_UNLESS(*keyOrder < keyIds.size());
- keyIds[*keyOrder] = column.GetId();
- }
-
- Y_ABORT_UNLESS(ColumnsByName.emplace(column.GetName(), column.GetId()).second);
- Y_ABORT_UNLESS(Columns.emplace(column.GetId(), std::move(column)).second);
- }
- KeyColumnIds.swap(keyIds);
- }
-
- void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const {
- tableSchema.SetNextColumnId(NextColumnId);
- tableSchema.SetVersion(Version);
- tableSchema.SetCompositeMarks(CompositeMarksFlag);
-
- Y_ABORT_UNLESS(HasEngine());
- tableSchema.SetEngine(GetEngineUnsafe());
-
- for (const auto& column : Columns) {
- column.second.Serialize(*tableSchema.AddColumns());
- }
-
- for (auto&& cId : KeyColumnIds) {
- auto column = GetColumnById(cId);
- Y_ABORT_UNLESS(!!column);
- *tableSchema.AddKeyColumnNames() = column->GetName();
- }
- }
-
- bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const {
- const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
-
- ui32 lastColumnId = 0;
- THashSet<ui32> usedColumns;
- for (const auto& colProto : opSchema.GetColumns()) {
- if (colProto.GetName().empty()) {
- errors.AddError("Columns cannot have an empty name");
- return false;
- }
- const TString& colName = colProto.GetName();
- auto* col = GetColumnByName(colName);
- if (!col) {
- errors.AddError("Column '" + colName + "' does not match schema preset");
- return false;
- }
- if (colProto.HasId() && colProto.GetId() != col->GetId()) {
- errors.AddError("Column '" + colName + "' has id " + colProto.GetId() + " that does not match schema preset");
- return false;
- }
-
- if (!usedColumns.insert(col->GetId()).second) {
- errors.AddError("Column '" + colName + "' is specified multiple times");
- return false;
- }
- if (col->GetId() < lastColumnId) {
- errors.AddError("Column order does not match schema preset");
- return false;
- }
- lastColumnId = col->GetId();
-
- if (colProto.HasTypeId()) {
- errors.AddError("Cannot set TypeId for column '" + colName + "', use Type");
- return false;
- }
- if (!colProto.HasType()) {
- errors.AddError("Missing Type for column '" + colName + "'");
- return false;
- }
-
- auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType());
- const NScheme::IType* type = typeRegistry->GetType(typeName);
- if (!type || !IsAllowedType(type->GetTypeId())) {
- errors.AddError("Type '" + colProto.GetType() + "' specified for column '" + colName + "' is not supported");
- return false;
- }
- NScheme::TTypeInfo typeInfo(type->GetTypeId());
-
- if (typeInfo != col->GetType()) {
- errors.AddError("Type '" + TypeName(typeInfo) + "' specified for column '" + colName + "' does not match schema preset type '" + TypeName(col->GetType()) + "'");
- return false;
- }
- }
-
- for (auto& pr : Columns) {
- if (!usedColumns.contains(pr.second.GetId())) {
- errors.AddError("Specified schema is missing some schema preset columns");
- return false;
- }
- }
-
- TVector<ui32> keyColumnIds;
- for (const TString& keyName : opSchema.GetKeyColumnNames()) {
- auto* col = GetColumnByName(keyName);
- if (!col) {
- errors.AddError("Unknown key column '" + keyName + "'");
- return false;
- }
- keyColumnIds.push_back(col->GetId());
- }
- if (keyColumnIds != KeyColumnIds) {
- errors.AddError("Specified schema key columns not matching schema preset");
- return false;
- }
-
- if (opSchema.GetEngine() != Engine) {
- errors.AddError("Specified schema engine does not match schema preset");
- return false;
- }
- return true;
- }
-
- bool TOlapSchema::IsAllowedType(ui32 typeId) {
- if (!NScheme::NTypeIds::IsYqlType(typeId)) {
- return false;
- }
-
- switch (typeId) {
- case NYql::NProto::Bool:
- case NYql::NProto::Interval:
- case NYql::NProto::Decimal:
- case NYql::NProto::DyNumber:
- return false;
- default:
- break;
- }
- return true;
- }
-
- bool TOlapSchema::IsAllowedFirstPkType(ui32 typeId) {
- switch (typeId) {
- case NYql::NProto::Uint8: // Byte
- case NYql::NProto::Int32:
- case NYql::NProto::Uint32:
- case NYql::NProto::Int64:
- case NYql::NProto::Uint64:
- case NYql::NProto::String:
- case NYql::NProto::Utf8:
- case NYql::NProto::Date:
- case NYql::NProto::Datetime:
- case NYql::NProto::Timestamp:
- return true;
- case NYql::NProto::Interval:
- case NYql::NProto::Decimal:
- case NYql::NProto::DyNumber:
- case NYql::NProto::Yson:
- case NYql::NProto::Json:
- case NYql::NProto::JsonDocument:
- case NYql::NProto::Float:
- case NYql::NProto::Double:
- case NYql::NProto::Bool:
- return false;
- default:
- break;
- }
- return false;
- }
-
- void TOlapStoreSchemaPreset::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) {
- Y_ABORT_UNLESS(presetProto.HasId());
- Y_ABORT_UNLESS(presetProto.HasName());
- Y_ABORT_UNLESS(presetProto.HasSchema());
- Id = presetProto.GetId();
- Name = presetProto.GetName();
- TOlapSchema::Parse(presetProto.GetSchema());
- }
-
- void TOlapStoreSchemaPreset::Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const {
- presetProto.SetId(Id);
- presetProto.SetName(Name);
- TOlapSchema::Serialize(*presetProto.MutableSchema());
- }
-
- bool TOlapStoreSchemaPreset::ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors) {
- if (presetProto.HasId()) {
- errors.AddError("Schema preset id cannot be specified explicitly");
- return false;
- }
- if (!presetProto.GetName()) {
- errors.AddError("Schema preset name cannot be empty");
- return false;
- }
- Name = presetProto.GetName();
- return true;
- }
-}
diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h
deleted file mode 100644
index e3015d8786..0000000000
--- a/ydb/core/tx/schemeshard/schemeshard_olap_types.h
+++ /dev/null
@@ -1,166 +0,0 @@
-#pragma once
-
-#include "defs.h"
-#include "schemeshard.h"
-#include <ydb/library/accessor/accessor.h>
-#include <ydb/core/scheme/scheme_types_proto.h>
-#include <ydb/core/formats/arrow/compression/object.h>
-#include <ydb/core/formats/arrow/compression/diff.h>
-#include <ydb/core/formats/arrow/dictionary/object.h>
-#include <ydb/core/formats/arrow/dictionary/diff.h>
-
-namespace NKikimr::NSchemeShard {
-
- class IErrorCollector {
- public:
- virtual void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) = 0;
- virtual void AddError(const TString& errorMsg) = 0;
- };
-
- class TProposeErrorCollector : public IErrorCollector {
- NKikimr::NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult& TxResult;
- public:
- TProposeErrorCollector(NKikimr::NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult& txResult)
- : TxResult(txResult)
- {}
-
- void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) override {
- TxResult.SetError(errorStatus, errorMsg);
- }
-
- void AddError(const TString& errorMsg) override {
- TxResult.SetError(NKikimrScheme::StatusSchemeError, errorMsg);
- }
- };
-
- class TOlapColumnDiff;
-
- class TOlapColumnAdd {
- private:
- YDB_READONLY_DEF(std::optional<ui32>, KeyOrder);
- YDB_READONLY_DEF(TString, Name);
- YDB_READONLY_DEF(TString, TypeName);
- YDB_READONLY_DEF(NScheme::TTypeInfo, Type);
- YDB_FLAG_ACCESSOR(NotNull, false);
- YDB_READONLY_DEF(std::optional<NArrow::TCompression>, Compression);
- YDB_READONLY_DEF(std::optional<NArrow::NDictionary::TEncodingSettings>, DictionaryEncoding);
- public:
- TOlapColumnAdd(const std::optional<ui32>& keyOrder)
- : KeyOrder(keyOrder)
- {
-
- }
- bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors);
- void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema);
- void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const;
- bool ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors);
- bool IsKeyColumn() const {
- return !!KeyOrder;
- }
- };
-
- class TOlapColumnSchema: public TOlapColumnAdd {
- private:
- using TBase = TOlapColumnAdd;
- YDB_READONLY(ui32, Id, Max<ui32>());
- public:
- TOlapColumnSchema(const TOlapColumnAdd& base, const ui32 id)
- : TBase(base)
- , Id(id) {
-
- }
- TOlapColumnSchema(const std::optional<ui32>& keyOrder)
- : TBase(keyOrder) {
-
- }
- void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const;
- void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema);
- };
-
- class TOlapColumnDiff {
- YDB_READONLY_DEF(TString, Name);
- YDB_READONLY_DEF(NArrow::TCompressionDiff, Compression);
- YDB_READONLY_DEF(NArrow::NDictionary::TEncodingDiff, DictionaryEncoding);
- public:
- bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) {
- Name = columnSchema.GetName();
- if (!Name) {
- errors.AddError("empty field name");
- return false;
- }
- if (!Compression.DeserializeFromProto(columnSchema.GetCompression())) {
- errors.AddError("cannot parse compression diff from proto");
- return false;
- }
- if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) {
- errors.AddError("cannot parse dictionary encoding diff from proto");
- return false;
- }
- return true;
- }
- };
-
- class TOlapSchemaUpdate {
- YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine);
- YDB_READONLY_DEF(TVector<TOlapColumnAdd>, AddColumns);
- YDB_READONLY_DEF(TSet<TString>, DropColumns);
- YDB_READONLY_DEF(TVector<TOlapColumnDiff>, AlterColumns);
- public:
- bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false);
- bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors);
- };
-
- class TOlapSchema {
- public:
- using TColumn = TOlapColumnSchema;
- using TColumns = THashMap<ui32, TOlapColumnSchema>;
- using TColumnsByName = THashMap<TString, ui32>;
-
- private:
- YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine);
- YDB_READONLY_DEF(TColumns, Columns);
- YDB_READONLY_DEF(TColumnsByName, ColumnsByName);
- YDB_READONLY_DEF(TVector<ui32>, KeyColumnIds);
-
- YDB_READONLY(ui32, NextColumnId, 1);
- YDB_READONLY(ui32, Version, 0);
- YDB_READONLY_FLAG(CompositeMarks, true);
-
- public:
- const TOlapColumnSchema* GetColumnByName(const TString& name) const noexcept {
- auto it = ColumnsByName.find(name);
- if (it != ColumnsByName.end()) {
- return &Columns.at(it->second);
- }
- return nullptr;
- }
-
- const TOlapColumnSchema* GetColumnById(const ui32 id) const noexcept {
- auto it = Columns.find(id);
- if (it != Columns.end()) {
- return &it->second;
- }
- return nullptr;
- }
-
- bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors);
-
- void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema);
- void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const;
- bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const;
- bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const;
-
- static bool IsAllowedType(ui32 typeId);
- static bool IsAllowedFirstPkType(ui32 typeId);
- };
-
- class TOlapStoreSchemaPreset : public TOlapSchema {
- YDB_ACCESSOR_DEF(TString, Name);
- YDB_ACCESSOR_DEF(ui32, Id);
- YDB_ACCESSOR(size_t, ProtoIndex, -1); // Preset index in the olap store description
- public:
- void Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const;
- void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto);
- bool ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors);
- };
-}
diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
index 41284d86d3..40a792d0f4 100644
--- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
@@ -1,5 +1,5 @@
#include "schemeshard_info_types.h"
-#include "schemeshard_olap_types.h"
+#include "olap/columns/schema.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
namespace NKikimr {
@@ -8,13 +8,13 @@ namespace NSchemeShard {
// Helper accessors for OLTP and OLAP tables that use different TColumn's
namespace {
inline
- bool IsDropped(const TOlapSchema::TColumn& col) {
+ bool IsDropped(const TOlapColumnsDescription::TColumn& col) {
Y_UNUSED(col);
return false;
}
inline
- ui32 GetType(const TOlapSchema::TColumn& col) {
+ ui32 GetType(const TOlapColumnsDescription::TColumn& col) {
Y_ABORT_UNLESS(col.GetType().GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported");
return col.GetType().GetTypeId();
}
@@ -118,8 +118,8 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl,
}
static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl,
- const THashMap<ui32, TOlapSchema::TColumn>& sourceColumns,
- const THashMap<ui32, TOlapSchema::TColumn>& alterColumns,
+ const THashMap<ui32, TOlapColumnsDescription::TColumn>& sourceColumns,
+ const THashMap<ui32, TOlapColumnsDescription::TColumn>& alterColumns,
const THashMap<TString, ui32>& colName2Id,
IErrorCollector& errors)
{
@@ -131,7 +131,7 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::
return false;
}
- const TOlapSchema::TColumn* column = nullptr;
+ const TOlapColumnsDescription::TColumn* column = nullptr;
const ui32 colId = it->second;
if (alterColumns.contains(colId)) {
column = &alterColumns.at(colId);
@@ -179,7 +179,7 @@ bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycl
switch (ttl.GetStatusCase()) {
case TTtlProto::kEnabled:
- return ValidateColumnTableTtl(ttl.GetEnabled(), {}, Columns, ColumnsByName, errors);
+ return ValidateColumnTableTtl(ttl.GetEnabled(), {}, Columns.GetColumns(), Columns.GetColumnsByName(), errors);
case TTtlProto::kDisabled:
default:
break;
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index e5db2f5a47..d1e8f55992 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -85,8 +85,6 @@ SRCS(
schemeshard__operation_alter_index.cpp
schemeshard__operation_alter_kesus.cpp
schemeshard__operation_alter_login.cpp
- schemeshard__operation_alter_olap_store.cpp
- schemeshard__operation_alter_olap_table.cpp
schemeshard__operation_alter_pq.cpp
schemeshard__operation_alter_solomon.cpp
schemeshard__operation_alter_subdomain.cpp
@@ -109,8 +107,6 @@ SRCS(
schemeshard__operation_create_indexed_table.cpp
schemeshard__operation_create_kesus.cpp
schemeshard__operation_create_lock.cpp
- schemeshard__operation_create_olap_store.cpp
- schemeshard__operation_create_olap_table.cpp
schemeshard__operation_create_pq.cpp
schemeshard__operation_create_replication.cpp
schemeshard__operation_create_restore.cpp
@@ -128,8 +124,6 @@ SRCS(
schemeshard__operation_drop_indexed_table.cpp
schemeshard__operation_drop_kesus.cpp
schemeshard__operation_drop_lock.cpp
- schemeshard__operation_drop_olap_store.cpp
- schemeshard__operation_drop_olap_table.cpp
schemeshard__operation_drop_pq.cpp
schemeshard__operation_drop_replication.cpp
schemeshard__operation_drop_sequence.cpp
@@ -182,7 +176,6 @@ SRCS(
schemeshard_identificators.cpp
schemeshard_info_types.cpp
schemeshard_info_types.h
- schemeshard_olap_types.cpp
schemeshard_path_describer.cpp
schemeshard_path_element.cpp
schemeshard_path_element.h
@@ -260,6 +253,7 @@ PEERDIR(
ydb/core/tablet_flat
ydb/core/tx
ydb/core/tx/datashard
+ ydb/core/tx/schemeshard/olap
ydb/core/tx/scheme_board
ydb/core/tx/tx_allocator_client
ydb/core/util
diff --git a/ydb/services/bg_tasks/abstract/interface.cpp b/ydb/services/bg_tasks/abstract/interface.cpp
index a0587cc583..dae70dfba4 100644
--- a/ydb/services/bg_tasks/abstract/interface.cpp
+++ b/ydb/services/bg_tasks/abstract/interface.cpp
@@ -1,5 +1,26 @@
#include "interface.h"
+#include <ydb/services/bg_tasks/protos/container.pb.h>
namespace NKikimr::NBackgroundTasks {
+bool TStringContainerProcessor::DeserializeFromContainer(const TString& data, TString& className, TString& binary) {
+ NKikimrProto::TStringContainer protoData;
+ if (!protoData.ParseFromArray(data.data(), data.size())) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data);
+ return false;
+ }
+ className = protoData.GetClassName();
+ binary = protoData.GetBinaryData();
+ return true;
+}
+
+TString TStringContainerProcessor::SerializeToContainer(const TString& className, const TString& binary) {
+ NKikimrProto::TStringContainer result;
+ result.SetClassName(className);
+ if (binary) {
+ result.SetBinaryData(binary);
+ }
+ return result.SerializeAsString();
+}
+
}
diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h
index 96317f09c2..f6cd594e49 100644
--- a/ydb/services/bg_tasks/abstract/interface.h
+++ b/ydb/services/bg_tasks/abstract/interface.h
@@ -3,7 +3,6 @@
#include <ydb/library/actors/core/events.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>
-#include <ydb/services/bg_tasks/protos/container.pb.h>
#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/json/json_reader.h>
@@ -110,7 +109,20 @@ public:
TCommonInterfaceContainer() = default;
TCommonInterfaceContainer(std::shared_ptr<IInterface> object)
: Object(object) {
+ }
+ bool Initialize(const TString& className) {
+ AFL_VERIFY(!Object)("problem", "initialize for not-empty-object");
+ Object.reset(TFactory::Construct(className));
+ if (!Object) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name();
+ return false;
+ }
+ return true;
+ }
+
+ TString GetClassName() const {
+ return Object ? Object->GetClassName() : "UNDEFINED";
}
bool HasObject() const {
@@ -135,6 +147,16 @@ public:
return Object;
}
+ std::shared_ptr<IInterface> GetObjectPtrVerified() const {
+ AFL_VERIFY(Object);
+ return Object;
+ }
+
+ const IInterface& GetObjectVerified() const {
+ AFL_VERIFY(Object);
+ return *Object;
+ }
+
const IInterface* operator->() const {
return Object.get();
}
@@ -149,6 +171,13 @@ public:
};
+class TStringContainerProcessor {
+public:
+ static bool DeserializeFromContainer(const TString& data, TString& className, TString& binary);
+
+ static TString SerializeToContainer(const TString& className, const TString& binary);
+};
+
template <class IInterface>
class TInterfaceStringContainer: public TCommonInterfaceContainer<IInterface> {
protected:
@@ -162,13 +191,11 @@ public:
}
TString SerializeToString() const {
- NKikimrProto::TStringContainer result;
if (!Object) {
- return result.SerializeAsString();
+ return TStringContainerProcessor::SerializeToContainer("__UNDEFINED", "");
+ } else {
+ return TStringContainerProcessor::SerializeToContainer(Object->GetClassName(), Object->SerializeToString());
}
- result.SetClassName(Object->GetClassName());
- result.SetBinaryData(Object->SerializeToString());
- return result.SerializeAsString();
}
bool DeserializeFromString(const TString& data) {
@@ -176,19 +203,22 @@ public:
Object = nullptr;
return true;
}
- NKikimrProto::TStringContainer protoData;
- if (!protoData.ParseFromArray(data.data(), data.size())) {
+ TString className;
+ TString binaryData;
+ if (!TStringContainerProcessor::DeserializeFromContainer(data, className, binaryData)) {
ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data);
- return {};
+ return false;
+ }
+ if (className == "__UNDEFINED") {
+ return true;
}
- const TString& className = protoData.GetClassName();
- std::shared_ptr<IInterface> object(TFactory::Construct(protoData.GetClassName()));
+ std::shared_ptr<IInterface> object(TFactory::Construct(className));
if (!object) {
ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name();
return false;
}
- if (!object->DeserializeFromString(protoData.GetBinaryData())) {
+ if (!object->DeserializeFromString(binaryData)) {
ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name();
return false;
}
@@ -218,6 +248,7 @@ public:
NJson::TJsonValue SerializeToJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
if (!Object) {
+ TOperatorPolicy::SetClassName(result, "__UNDEFINED");
return result;
}
TOperatorPolicy::SetClassName(result, Object->GetClassName());
@@ -227,6 +258,9 @@ public:
bool DeserializeFromJson(const NJson::TJsonValue& data) {
const TString& className = TOperatorPolicy::GetClassName(data);
+ if (className == "__UNDEFINED") {
+ return true;
+ }
std::shared_ptr<IInterface> object(TFactory::Construct(className));
if (!object) {
ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name();
@@ -265,6 +299,9 @@ public:
using TBase::TBase;
bool DeserializeFromProto(const TProto& data) {
const TString& className = TOperatorPolicy::GetClassName(data);
+ if (className == "__UNDEFINED") {
+ return true;
+ }
std::shared_ptr<IInterface> object(TFactory::Construct(className));
if (!object) {
ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name();
@@ -287,6 +324,16 @@ public:
TOperatorPolicy::SetClassName(result, Object->GetClassName());
return result;
}
+
+ template <class TProto>
+ void SerializeToProto(TProto& result) const {
+ if (!Object) {
+ TOperatorPolicy::SetClassName(result, "__UNDEFINED");
+ return;
+ }
+ Object->SerializeToProto(result);
+ TOperatorPolicy::SetClassName(result, Object->GetClassName());
+ }
};
}