diff options
author | chertus <azuikov@ydb.tech> | 2022-11-07 12:35:38 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-11-07 12:35:38 +0300 |
commit | f570e9e1e98392e10ddfe79909f0ad949f9dec8f (patch) | |
tree | fc67d83d75a302df74bf5748e54f0fe3302fae3c | |
parent | 4440f803dff303fe38644369105d264fe848946c (diff) | |
download | ydb-f570e9e1e98392e10ddfe79909f0ad949f9dec8f.tar.gz |
CREATE/DROP standalone column table
29 files changed, 1263 insertions, 569 deletions
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index 7a407992e0e..5a4a5e737b9 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -2135,6 +2135,8 @@ private: for (const auto& keyColumn : metadata.KeyColumnNames) { schema.AddKeyColumnNames(keyColumn); } + + schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES); } static bool CheckLoadTableMetadataStatus(ui32 status, const TString& reason, @@ -2487,7 +2489,9 @@ private: static bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata, NKikimrSchemeOp::TColumnTableDescription& tableDesc, Ydb::StatusIds::StatusCode& code, TString& error) { - tableDesc.SetSchemaPresetName("default"); // TODO: CREATE TABLE without TABLESTORE needs schema + if (metadata->Columns.empty()) { + tableDesc.SetSchemaPresetName("default"); + } // TODO: not first PK column if (metadata->KeyColumnNames.empty()) { diff --git a/ydb/core/kqp/ut/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/kqp_scheme_ut.cpp index d20202bf59f..0775f128245 100644 --- a/ydb/core/kqp/ut/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scheme_ut.cpp @@ -3042,7 +3042,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) { auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); -#if 0 // TODO TString tableName = "/Root/TableStoreTest/ColumnTableTest"; auto query2 = TStringBuilder() << R"( --!syntax_v1 @@ -3058,19 +3057,18 @@ Y_UNIT_TEST_SUITE(KqpScheme) { );)"; result = session.ExecuteSchemeQuery(query2).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - +#if 0 // TODO auto query3 = TStringBuilder() << R"( --!syntax_v1 ALTER TABLE `)" << tableName << R"(`;)"; result = session.ExecuteSchemeQuery(query3).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - +#endif auto query4 = TStringBuilder() << R"( --!syntax_v1 DROP TABLE `)" << tableName << R"(`;)"; result = session.ExecuteSchemeQuery(query4).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); -#endif auto query5 = TStringBuilder() << R"( --!syntax_v1 @@ -3079,7 +3077,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } -#if 0 // TODO Y_UNIT_TEST(CreateDropColumnTable) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); @@ -3106,7 +3103,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) { result = session.ExecuteSchemeQuery(query2).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } -#endif Y_UNIT_TEST(CreateDropColumnTableNegative) { TKikimrRunner kikimr; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index f5e3384bab4..e8ffb753a6b 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -646,6 +646,9 @@ message TColumnTableDescription { // Internal fields that make sure versions always increase when presets are switched optional uint64 SchemaPresetVersionAdj = 11; optional uint64 TtlSettingsPresetVersionAdj = 12; + + // Channels for standalone column table + optional TColumnStorageConfig StorageConfig = 13; } message TAlterColumnTable { diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 92514e6a920..e503da4984c 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -159,13 +159,6 @@ message TCommitTxBody { repeated uint64 WriteIds = 2; } -message TInitShard { - optional uint32 DataChannelCount = 1; - optional uint64 StorePathId = 2; - optional uint64 TablePathId = 3; - optional string OwnerPath = 4; // tablestore or table path -} - message TSchemaPresetVersionInfo { optional uint64 Id = 1; optional uint64 SinceStep = 2; @@ -202,6 +195,13 @@ message TCreateTable { optional uint64 TtlSettingsPresetVersionAdj = 7; } +message TInitShard { + optional uint32 DataChannelCount = 1; + optional uint64 OwnerPathId = 2; + repeated TCreateTable Tables = 3; + optional string OwnerPath = 4; +} + message TAlterTable { optional uint64 PathId = 1; optional NKikimrSchemeOp.TAlterColumnTable AlterBody = 2; diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 67aedff28cb..4af0bf3dd85 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -276,7 +276,7 @@ void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage& } void TColumnShard::SendPeriodicStats() { - if (!CurrentSchemeShardId || !StorePathId) { + if (!CurrentSchemeShardId || !OwnerPathId) { LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID()); return; } @@ -294,7 +294,7 @@ void TColumnShard::SendPeriodicStats() { StatsReportPipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig)); } - auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), StorePathId); + auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), OwnerPathId); { ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready ev->Record.SetGeneration(Executor()->Generation()); diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 6683260c142..b0f3882c6ad 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -34,7 +34,7 @@ void TTxInit::SetDefaults() { Self->LastWriteId = TWriteId{0}; Self->LastPlannedStep = 0; Self->LastPlannedTxId = 0; - Self->StorePathId = 0; + Self->OwnerPathId = 0; Self->OwnerPath.clear(); Self->BasicTxInfo.clear(); Self->DeadlineQueue.clear(); @@ -79,7 +79,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep); ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId); ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); - ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::StorePathId, Self->StorePathId); + ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPathId, Self->OwnerPathId); ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPath, Self->OwnerPath); if (!ready) @@ -139,6 +139,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) // Primary index defaut schema and TTL (both are versioned) TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset; + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> commonSchema; THashMap<ui64, TMap<TRowVersion, TTtl::TDescription>> ttls; { // Load schema presets @@ -150,8 +151,10 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) const ui32 id = rowset.GetValue<Schema::SchemaPresetInfo::Id>(); auto& preset = Self->SchemaPresets[id]; preset.Id = id; - preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>(); - Y_VERIFY(preset.Name == "default", "Unsupported preset at load time"); + if (id) { + preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>(); + } + Y_VERIFY(!id || preset.Name == "default", "Unsupported preset at load time"); if (rowset.HaveValue<Schema::SchemaPresetInfo::DropStep>() && rowset.HaveValue<Schema::SchemaPresetInfo::DropTxId>()) @@ -180,9 +183,11 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) auto& info = preset.Versions[version]; Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>())); - if (preset.Name == "default") { - schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId}, - Self->ConvertSchema(info.GetSchema())); + NOlap::TSnapshot snap{version.Step, version.TxId}; + if (!id) { + commonSchema.emplace(snap, Self->ConvertSchema(info.GetSchema())); + } else if (preset.Name == "default") { + schemaPreset.emplace(snap, Self->ConvertSchema(info.GetSchema())); } if (!rowset.Next()) @@ -246,7 +251,13 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size()); if (!schemaPreset.empty()) { + Y_VERIFY(commonSchema.empty(), "Mix of schema preset and common schema"); Self->SetPrimaryIndex(std::move(schemaPreset)); + } else if (!commonSchema.empty()) { + Self->SetPrimaryIndex(std::move(commonSchema)); + } else { + Y_VERIFY(Self->Tables.empty()); + Y_VERIFY(Self->SchemaPresets.empty()); } { // Load long tx writes diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index ac332a83127..6b672049d45 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -380,26 +380,36 @@ bool TColumnShard::IsTableWritable(ui64 tableId) const { return !it->second.IsDropped(); } -ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, +ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name, + const NKikimrSchemeOp::TColumnTableSchema& schemaProto, const TRowVersion& version) { - if (!SchemaPresets.contains(presetProto.GetId())) { - auto& preset = SchemaPresets[presetProto.GetId()]; - preset.Id = presetProto.GetId(); - preset.Name = presetProto.GetName(); + if (!SchemaPresets.contains(presetId)) { + LOG_S_DEBUG("EnsureSchemaPreset " << presetId << " at tablet " << TabletID()); + + auto& preset = SchemaPresets[presetId]; + preset.Id = presetId; + preset.Name = name; auto& info = preset.Versions[version]; info.SetId(preset.Id); info.SetSinceStep(version.Step); info.SetSinceTxId(version.TxId); - *info.MutableSchema() = presetProto.GetSchema(); - - Y_VERIFY(preset.Name == "default", "Only schema preset named 'default' is supported"); + *info.MutableSchema() = schemaProto; Schema::SaveSchemaPresetInfo(db, preset.Id, preset.Name); Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info); SetCounter(COUNTER_TABLE_PRESETS, SchemaPresets.size()); + } else { + LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletID()); } - return presetProto.GetId(); + return presetId; +} + +ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, + const TRowVersion& version) { + Y_VERIFY(presetProto.GetName() == "default", "Only schema preset named 'default' is supported"); + + return EnsureSchemaPreset(db, presetProto.GetId(), presetProto.GetName(), presetProto.GetSchema(), version); } void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, @@ -441,14 +451,19 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const NIceDb::TNiceDb db(txc.DB); - if (proto.HasStorePathId()) { - StorePathId = proto.GetStorePathId(); - Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId); + if (proto.HasOwnerPathId()) { + OwnerPathId = proto.GetOwnerPathId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); } + if (proto.HasOwnerPath()) { OwnerPath = proto.GetOwnerPath(); Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath); } + + for (auto& createTable : proto.GetTables()) { + RunEnsureTable(createTable, version, txc); + } } void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const TRowVersion& version, @@ -457,32 +472,49 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl const ui64 pathId = tableProto.GetPathId(); if (!Tables.contains(pathId)) { - auto& table = Tables[pathId]; - table.PathId = pathId; - auto& tableVerProto = table.Versions[version]; - tableVerProto.SetPathId(pathId); - - Y_VERIFY(!tableProto.HasSchema(), "Tables with explicit schema are not supported"); + LOG_S_DEBUG("EnsureTable for pathId: " << pathId << " at tablet " << TabletID()); ui32 schemaPresetId = 0; if (tableProto.HasSchemaPreset()) { + Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset"); + schemaPresetId = EnsureSchemaPreset(db, tableProto.GetSchemaPreset(), version); - tableVerProto.SetSchemaPresetId(schemaPresetId); + Y_VERIFY(schemaPresetId); + } else { + Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset"); + + // Save first table schema as common one with schemaPresetId == 0 + + if (SchemaPresets.count(0)) { + LOG_S_WARN("Colocated standalone tables are not supported. " + << "EnsureTable failed at tablet " << TabletID()); + return; + } + + schemaPresetId = EnsureSchemaPreset(db, 0, "", tableProto.GetSchema(), version); + Y_VERIFY(!schemaPresetId); } + auto& table = Tables[pathId]; + table.PathId = pathId; + auto& tableVerProto = table.Versions[version]; + tableVerProto.SetPathId(pathId); + tableVerProto.SetSchemaPresetId(schemaPresetId); + if (tableProto.HasTtlSettings()) { *tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings(); Ttl.SetPathTtl(pathId, TTtl::TDescription(tableProto.GetTtlSettings())); SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount()); } - if (!PrimaryIndex && schemaPresetId) { + if (!PrimaryIndex) { + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory; + auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version]; - TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset; - schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId}, - ConvertSchema(schemaPresetVerProto.GetSchema())); + schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, + ConvertSchema(schemaPresetVerProto.GetSchema())); - SetPrimaryIndex(std::move(schemaPreset)); + SetPrimaryIndex(std::move(schemaHistory)); } tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj()); @@ -491,6 +523,8 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl Schema::SaveTableInfo(db, table.PathId); Schema::SaveTableVersionInfo(db, table.PathId, version, tableVerProto); SetCounter(COUNTER_TABLES, Tables.size()); + } else { + LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID()); } } @@ -503,6 +537,8 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table"); auto& table = *tablePtr; + LOG_S_DEBUG("AlterTable for pathId: " << pathId << " at tablet " << TabletID()); + Y_VERIFY(!alterProto.HasSchema(), "Tables with explicit schema are not supported"); auto& info = table.Versions[version]; @@ -531,6 +567,8 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt auto* table = Tables.FindPtr(pathId); Y_VERIFY_DEBUG(table && !table->IsDropped()); if (table && !table->IsDropped()) { + LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID()); + PathsToDrop.insert(pathId); Ttl.DropPathTtl(pathId); @@ -542,6 +580,8 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt table->DropVersion = version; Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); + } else { + LOG_S_DEBUG("DropTable for unknown or deleted pathId: " << pathId << " at tablet " << TabletID()); } } @@ -550,11 +590,11 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, NIceDb::TNiceDb db(txc.DB); if (proto.HasStorePathId()) { - StorePathId = proto.GetStorePathId(); - Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId); + OwnerPathId = proto.GetStorePathId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); } - TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset; + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory; for (ui32 id : proto.GetDroppedSchemaPresets()) { if (!SchemaPresets.contains(id)) { @@ -579,14 +619,14 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, *info.MutableSchema() = presetProto.GetSchema(); if (preset.Name == "default") { - schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema())); + schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema())); } Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info); } - if (!schemaPreset.empty()) { - SetPrimaryIndex(std::move(schemaPreset)); + if (!schemaHistory.empty()) { + SetPrimaryIndex(std::move(schemaHistory)); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 5595ccf502d..1595cbda2dd 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -125,7 +125,7 @@ class TColumnShard void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev); - + ITransaction* CreateTxInitSchema(); ITransaction* CreateTxRunGc(); @@ -293,11 +293,11 @@ private: }; struct TSchemaPreset { - using TVerProto = NKikimrTxColumnShard::TSchemaPresetVersionInfo; + using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo; ui32 Id; TString Name; - TMap<TRowVersion, TVerProto> Versions; + TMap<TRowVersion, TSchemaPresetVersionInfo> Versions; TRowVersion DropVersion = TRowVersion::Max(); bool IsDropped() const { @@ -306,10 +306,10 @@ private: }; struct TTableInfo { - using TVerProto = NKikimrTxColumnShard::TTableVersionInfo; + using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo; ui64 PathId; - std::map<TRowVersion, TVerProto> Versions; + std::map<TRowVersion, TTableVersionInfo> Versions; TRowVersion DropVersion = TRowVersion::Max(); bool IsDropped() const { @@ -332,7 +332,7 @@ private: ui64 LastCompactedGranule = 0; ui64 LastExportNo = 0; ui64 WritesInFly = 0; - ui64 StorePathId = 0; + ui64 OwnerPathId = 0; ui64 StatsReportRound = 0; ui64 BackgroundActivation = 0; ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init @@ -432,6 +432,8 @@ private: bool IsTableWritable(ui64 tableId) const; + ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name, + const NKikimrSchemeOp::TColumnTableSchema& schemaProto, const TRowVersion& version); ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version); //ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 671e49cf487..360e30d7140 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -47,7 +47,7 @@ struct Schema : NIceDb::Schema { LastGcBarrierGen = 8, LastGcBarrierStep = 9, LastExportNumber = 10, - StorePathId = 11, + OwnerPathId = 11, OwnerPath = 12, }; @@ -95,7 +95,7 @@ struct Schema : NIceDb::Schema { struct Id : Column<1, NScheme::NTypeIds::Uint32> {}; struct SinceStep : Column<2, NScheme::NTypeIds::Uint64> {}; struct SinceTxId : Column<3, NScheme::NTypeIds::Uint64> {}; - struct InfoProto : Column<4, NScheme::NTypeIds::String> {}; // TSchemaPresetVersionInfo + struct InfoProto : Column<4, NScheme::NTypeIds::String> {}; // TCommonSchemaVersionInfo using TKey = TableKey<Id, SinceStep, SinceTxId>; using TColumns = TableColumns<Id, SinceStep, SinceTxId, InfoProto>; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index e0cdb1766f0..3c4829c6c27 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -38,7 +38,7 @@ struct TTestSchema { TStorageTier(const TString& name = {}) : Name(name) - + {} NKikimrSchemeOp::EColumnCodec GetCodecId() const { @@ -173,6 +173,39 @@ struct TTestSchema { return col; } + static void InitSchema(const TVector<std::pair<TString, TTypeInfo>>& columns, + const TVector<std::pair<TString, TTypeInfo>>& pk, + const TTableSpecials& specials, + NKikimrSchemeOp::TColumnTableSchema* schema) + { + schema->SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); + + for (ui32 i = 0; i < columns.size(); ++i) { + *schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second); + } + + Y_VERIFY(pk.size() == 4); + for (auto& column : ExtractNames(pk)) { + schema->AddKeyColumnNames(column); + } + + if (specials.HasCodec()) { + schema->MutableDefaultCompression()->SetCompressionCodec(specials.GetCodecId()); + } + if (specials.CompressionLevel) { + schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel); + } + + schema->SetEnableTiering(specials.HasTiers()); + } + + static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) { + ttlSettings->SetVersion(1); + auto* enable = ttlSettings->MutableEnabled(); + enable->SetColumnName(specials.GetTtlColumn()); + enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe()); + } + static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns, const TVector<std::pair<TString, TTypeInfo>>& pk, const TTableSpecials& specials = {}) { @@ -186,35 +219,29 @@ struct TTestSchema { preset->SetName("default"); // schema + InitSchema(columns, pk, specials, preset->MutableSchema()); + } - auto* schema = preset->MutableSchema(); - schema->SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); - - for (ui32 i = 0; i < columns.size(); ++i) { - *schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second); - } + if (specials.HasTtl()) { + InitTtl(specials, table->MutableTtlSettings()); + } - Y_VERIFY(pk.size() == 4); - for (auto& column : ExtractNames(pk)) { - schema->AddKeyColumnNames(column); - } + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out); + return out; + } - if (specials.HasCodec()) { - schema->MutableDefaultCompression()->SetCompressionCodec(specials.GetCodecId()); - } - if (specials.CompressionLevel) { - schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel); - } + static TString CreateStandaloneTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns, + const TVector<std::pair<TString, TTypeInfo>>& pk, + const TTableSpecials& specials = {}) { + NKikimrTxColumnShard::TSchemaTxBody tx; + auto* table = tx.MutableEnsureTables()->AddTables(); + table->SetPathId(pathId); - schema->SetEnableTiering(specials.HasTiers()); - } + InitSchema(columns, pk, specials, table->MutableSchema()); if (specials.HasTtl()) { - auto* ttlSettings = table->MutableTtlSettings(); - ttlSettings->SetVersion(1); - auto* enable = ttlSettings->MutableEnabled(); - enable->SetColumnName(specials.GetTtlColumn()); - enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe()); + InitTtl(specials, table->MutableTtlSettings()); } TString out; diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index 5836e05b087..40db0e15c49 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -282,20 +282,39 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me return CheckColumns(batch, colNames, rowsCount); } +struct TestTableDescription { + TVector<std::pair<TString, TTypeInfo>> Schema = TTestSchema::YdbSchema(); + TVector<std::pair<TString, TTypeInfo>> Pk = TTestSchema::YdbPkSchema(); + bool InStore = true; +}; + void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, - const TVector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(), - const TVector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(), - TString codec = "none") { + const TestTableDescription& table, TString codec = "none") { NOlap::TSnapshot snap = {10, 10}; - bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateTableTxBody(pathId, schema, pk, - TTestSchema::TTableSpecials().WithCodec(codec)), - snap); + TString txBody; + if (table.InStore) { + txBody = TTestSchema::CreateTableTxBody( + pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec)); + + } else { + txBody = TTestSchema::CreateStandaloneTableTxBody( + pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec)); + } + bool ok = ProposeSchemaTx(runtime, sender, txBody, snap); UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, snap); } -void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) { +void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, + const TVector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(), + const TVector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(), + TString codec = "none") { + TestTableDescription table{schema, pk, true}; + SetupSchema(runtime, sender, pathId, table, codec); +} + +void TestWrite(const TestTableDescription& table) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -312,7 +331,10 @@ void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) { ui64 writeId = 0; ui64 tableId = 1; - SetupSchema(runtime, sender, tableId, ydbSchema); + SetupSchema(runtime, sender, tableId, table); + + const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema; + bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema)); UNIT_ASSERT(ok); @@ -453,9 +475,7 @@ void TestWriteReadDup() { } } -void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = TTestSchema::YdbSchema(), - const TVector<std::pair<TString, TTypeInfo>>& testYdbPk = TTestSchema::YdbPkSchema(), - TString codec = "") { +void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString codec = "") { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -498,7 +518,10 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y ui64 writeId = 0; ui64 tableId = 1; - SetupSchema(runtime, sender, tableId, ydbSchema, testYdbPk, codec); + SetupSchema(runtime, sender, tableId, table, codec); + + const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema; + const TVector<std::pair<TString, TTypeInfo>>& testYdbPk = table.Pk; // ----xx // -----xx.. @@ -1474,11 +1497,27 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema, Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Y_UNIT_TEST(Write) { - TestWrite(TTestSchema::YdbSchema()); + TestTableDescription table; + TestWrite(table); + } + + Y_UNIT_TEST(WriteStandalone) { + TestTableDescription table; + table.InStore = false; + TestWrite(table); } Y_UNIT_TEST(WriteExoticTypes) { - TestWrite(TTestSchema::YdbExoticSchema()); + TestTableDescription table; + table.Schema = TTestSchema::YdbExoticSchema(); + TestWrite(table); + } + + Y_UNIT_TEST(WriteStandaloneExoticTypes) { + TestTableDescription table; + table.Schema = TTestSchema::YdbExoticSchema(); + table.InStore = false; + TestWrite(table); } Y_UNIT_TEST(WriteReadDuplicate) { @@ -1486,23 +1525,45 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } Y_UNIT_TEST(WriteRead) { - TestWriteRead(false); + TestTableDescription table; + TestWriteRead(false, table); + } + + Y_UNIT_TEST(WriteReadStandalone) { + TestTableDescription table; + table.InStore = false; + TestWriteRead(false, table); } Y_UNIT_TEST(WriteReadExoticTypes) { - TestWriteRead(false, TTestSchema::YdbExoticSchema()); + TestTableDescription table; + table.Schema = TTestSchema::YdbExoticSchema(); + TestWriteRead(false, table); + } + + Y_UNIT_TEST(WriteReadStandaloneExoticTypes) { + TestTableDescription table; + table.Schema = TTestSchema::YdbExoticSchema(); + table.InStore = false; + TestWriteRead(false, table); } Y_UNIT_TEST(RebootWriteRead) { TestWriteRead(true); } + Y_UNIT_TEST(RebootWriteReadStandalone) { + TestTableDescription table; + table.InStore = false; + TestWriteRead(true, table); + } + Y_UNIT_TEST(WriteReadNoCompression) { - TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "none"); + TestWriteRead(true, {}, "none"); } Y_UNIT_TEST(WriteReadZSTD) { - TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "zstd"); + TestWriteRead(true, {}, "zstd"); } Y_UNIT_TEST(CompactionInGranule) { diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp index d5c39e7d7ba..c8bfaffba65 100644 --- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp @@ -93,7 +93,7 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase { Self->TabletCounters->Simple()[COUNTER_SYS_VIEW_PROCESSOR_COUNT].Sub(1); break; case ETabletType::ColumnShard: - Self->TabletCounters->Simple()[COUNTER_COLUMN_SHARDS].Sub(-1); + Self->TabletCounters->Simple()[COUNTER_COLUMN_SHARDS].Sub(1); break; case ETabletType::SequenceShard: Self->TabletCounters->Simple()[COUNTER_SEQUENCESHARD_COUNT].Sub(1); diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 94517ae74ea..c562e944841 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4460,16 +4460,24 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Y_VERIFY(description.ParseFromString(rowset.GetValue<Schema::ColumnTables::Description>())); NKikimrSchemeOp::TColumnTableSharding sharding; Y_VERIFY(sharding.ParseFromString(rowset.GetValue<Schema::ColumnTables::Sharding>())); + TMaybe<NKikimrSchemeOp::TColumnStoreSharding> storeSharding; + if (rowset.HaveValue<Schema::ColumnTables::StandaloneSharding>()) { + Y_VERIFY(storeSharding.ConstructInPlace().ParseFromString( + rowset.GetValue<Schema::ColumnTables::StandaloneSharding>())); + } - TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo(alterVersion, std::move(description), std::move(sharding)); + TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo(alterVersion, + std::move(description), std::move(sharding), std::move(storeSharding)); Self->ColumnTables[pathId] = tableInfo; Self->IncrementPathDbRefCount(pathId); - auto itStore = Self->OlapStores.find(tableInfo->OlapStorePathId); - if (itStore != Self->OlapStores.end()) { - itStore->second->ColumnTables.insert(pathId); - if (pathsUnderOperation.contains(pathId)) { - itStore->second->ColumnTablesUnderOperation.insert(pathId); + if (tableInfo->OlapStorePathId) { + auto itStore = Self->OlapStores.find(*tableInfo->OlapStorePathId); + if (itStore != Self->OlapStores.end()) { + itStore->second->ColumnTables.insert(pathId); + if (pathsUnderOperation.contains(pathId)) { + itStore->second->ColumnTablesUnderOperation.insert(pathId); + } } } @@ -4497,11 +4505,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { if (rowset.HaveValue<Schema::ColumnTablesAlters::AlterBody>()) { Y_VERIFY(alterBody.ConstructInPlace().ParseFromString(rowset.GetValue<Schema::ColumnTablesAlters::AlterBody>())); } - - TColumnTableInfo::TPtr alterData = new TColumnTableInfo(alterVersion, std::move(description), std::move(sharding), std::move(alterBody)); + TMaybe<NKikimrSchemeOp::TColumnStoreSharding> storeSharding; + if (rowset.HaveValue<Schema::ColumnTablesAlters::StandaloneSharding>()) { + Y_VERIFY(storeSharding.ConstructInPlace().ParseFromString( + rowset.GetValue<Schema::ColumnTablesAlters::StandaloneSharding>())); + } Y_VERIFY_S(Self->ColumnTables.contains(pathId), "Cannot load alter for olap table " << pathId); + + TColumnTableInfo::TPtr alterData = new TColumnTableInfo(alterVersion, + std::move(description), std::move(sharding), std::move(storeSharding), std::move(alterBody)); + Self->ColumnTables[pathId]->AlterData = alterData; if (!rowset.Next()) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp index 6f167751aa1..4b3626c6e43 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp @@ -453,7 +453,15 @@ public: Y_VERIFY(context.SS->ColumnTables.contains(path.Base()->PathId)); TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId); - TPath storePath = TPath::Init(tableInfo->OlapStorePathId, context.SS); + if (!tableInfo->OlapStorePathId) { + result->SetError(NKikimrScheme::StatusSchemeError, + "Alter for standalone column table is not supported yet"); + return result; + } + + auto& storePathId = *tableInfo->OlapStorePathId; + + TPath storePath = TPath::Init(storePathId, context.SS); { TPath::TChecker checks = storePath.Check(); checks @@ -471,8 +479,8 @@ public: } } - Y_VERIFY(context.SS->OlapStores.contains(tableInfo->OlapStorePathId)); - TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(tableInfo->OlapStorePathId); + Y_VERIFY(context.SS->OlapStores.contains(storePathId)); + TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(storePathId); TString errStr; if (!context.SS->CheckApplyIf(Transaction, errStr)) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 413f18d8666..54acba06b11 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -887,9 +887,12 @@ public: // OlapStore tracks all tables that are under operation, make sure to unlink if (context.SS->ColumnTables.contains(pathId)) { auto tableInfo = context.SS->ColumnTables.at(pathId); - if (context.SS->OlapStores.contains(tableInfo->OlapStorePathId)) { - auto storeInfo = context.SS->OlapStores.at(tableInfo->OlapStorePathId); - storeInfo->ColumnTablesUnderOperation.erase(pathId); + if (tableInfo->OlapStorePathId) { + auto& storePathId = *tableInfo->OlapStorePathId; + if (context.SS->OlapStores.contains(storePathId)) { + auto storeInfo = context.SS->OlapStores.at(storePathId); + storeInfo->ColumnTablesUnderOperation.erase(pathId); + } } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index a2685bf2b2d..4da833de00a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -12,113 +12,11 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; -// TODO: make it a part of TOlapSchema bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) { - schema.NextColumnId = proto.GetNextColumnId(); - - const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; - - schema.Columns.clear(); - for (auto& colProto : *proto.MutableColumns()) { - if (colProto.GetName().empty()) { - errStr = Sprintf("Columns cannot have an empty name"); - return false; - } - if (!colProto.HasId()) { - colProto.SetId(schema.NextColumnId++); - } else if (colProto.GetId() <= 0 || colProto.GetId() >= schema.NextColumnId) { - errStr = Sprintf("Column id is incorrect"); - return false; - } - ui32 colId = colProto.GetId(); - if (schema.Columns.contains(colId)) { - errStr = Sprintf("Duplicate column id %" PRIu32 " for column '%s'", colId, colProto.GetName().c_str()); - return false; - } - auto& col = schema.Columns[colId]; - col.Id = colId; - col.Name = colProto.GetName(); - - if (colProto.HasTypeId()) { - errStr = Sprintf("Cannot set TypeId for column '%s', use Type", col.Name.c_str()); - return false; - } - if (!colProto.HasType()) { - errStr = Sprintf("Missing Type for column '%s'", col.Name.c_str()); - return false; - } - - auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); - const NScheme::IType* type = typeRegistry->GetType(typeName); - if (type) { - if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { - errStr = Sprintf("Type '%s' specified for column '%s' is not supported", colProto.GetType().c_str(), col.Name.c_str()); - return false; - } - col.Type = NScheme::TTypeInfo(type->GetTypeId()); - } else { - auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); - if (!typeDesc) { - errStr = Sprintf("Type '%s' specified for column '%s' is not supported", colProto.GetType().c_str(), col.Name.c_str()); - } - col.Type = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); - } - auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(col.Type); - colProto.SetTypeId(columnType.TypeId); - if (columnType.TypeInfo) { - *colProto.MutableTypeInfo() = *columnType.TypeInfo; - } - - if (schema.ColumnsByName.contains(col.Name)) { - errStr = Sprintf("Duplicate column '%s'", col.Name.c_str()); - return false; - } - schema.ColumnsByName[col.Name] = col.Id; - } - - if (schema.Columns.empty()) { - errStr = Sprintf("At least one column is required"); - return false; - } - - schema.KeyColumnIds.clear(); - for (const TString& keyName : proto.GetKeyColumnNames()) { - auto* col = schema.FindColumnByName(keyName); - if (!col) { - errStr = Sprintf("Unknown key column '%s'", keyName.c_str()); - return false; - } - if (col->IsKeyColumn()) { - errStr = Sprintf("Duplicate key column '%s'", keyName.c_str()); - return false; - } - col->KeyOrder = schema.KeyColumnIds.size(); - schema.KeyColumnIds.push_back(col->Id); - } - - if (schema.KeyColumnIds.empty()) { - errStr = "At least one key column is required"; + if (!TOlapSchema::UpdateProto(proto, errStr)) { return false; } -#if 0 - for (auto& tierConfig : proto.GetStorageTiers()) { - TString tierName = tierConfig.GetName(); - if (schema.Tiers.count(tierName)) { - errStr = Sprintf("Same tier name in schema: '%s'", tierName.c_str()); - return false; - } - schema.Tiers.insert(tierName); - - if (!PrepareTier(tierConfig, errStr)) { - return false; - } - } -#endif - - schema.Engine = proto.GetEngine(); - - proto.SetNextColumnId(schema.NextColumnId); - return true; + return schema.Parse(proto, errStr); } // TODO: make it a part of TOlapStoreInfo @@ -294,7 +192,7 @@ public: // TODO: we may need to specify a more complex data channel mapping auto* init = tx.MutableInitShard(); init->SetDataChannelCount(storeInfo->Description.GetStorageConfig().GetDataChannelCount()); - init->SetStorePathId(txState->TargetPathId.LocalPathId); + init->SetOwnerPathId(txState->TargetPathId.LocalPathId); init->SetOwnerPath(path.PathString()); Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&columnShardTxBody); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp index 917688b71c5..3f5f52ef035 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp @@ -13,201 +13,147 @@ namespace NSchemeShard { namespace { -TColumnTableInfo::TPtr CreateColumnTable( - const NKikimrSchemeOp::TColumnTableDescription& opSrc, - TOlapStoreInfo::TPtr storeInfo, const TSubDomainInfo& subDomain, - TEvSchemeShard::EStatus& status, TString& errStr, - TSchemeShard* ss) -{ - Y_UNUSED(subDomain); - - TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo; - tableInfo->AlterVersion = 1; - tableInfo->Description.CopyFrom(opSrc); - - auto& op = tableInfo->Description; +bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) { + proto.SetNextColumnId(1); + proto.SetVersion(1); - if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { - status = NKikimrScheme::StatusSchemeError; - errStr = "TTL presets are not supported"; - return nullptr; - } - - if (!op.HasSchemaPresetName() && !op.HasSchemaPresetId()) { - op.SetSchemaPresetName("default"); + if (!TOlapSchema::UpdateProto(proto, errStr)) { + return false; } + return schema.Parse(proto, errStr); +} - const TOlapSchema* pSchema = nullptr; +bool ValidateSchema(const TOlapSchema& schema, const NKikimrSchemeOp::TColumnTableSchema& opSchema, + TEvSchemeShard::EStatus& status, TString& errStr) +{ + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; - if (op.HasSchemaPresetName()) { - const TString presetName = op.GetSchemaPresetName(); - if (!storeInfo->SchemaPresetByName.contains(presetName)) { + ui32 lastColumnId = 0; + THashSet<ui32> usedColumns; + for (const auto& colProto : opSchema.GetColumns()) { + if (colProto.GetName().empty()) { status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified schema preset '%s' does not exist in olap store", presetName.c_str()); - return nullptr; + errStr = "Columns cannot have an empty name"; + return false; } - const ui32 presetId = storeInfo->SchemaPresetByName.at(presetName); - if (!op.HasSchemaPresetId()) { - op.SetSchemaPresetId(presetId); - } - if (op.GetSchemaPresetId() != presetId) { + const TString& colName = colProto.GetName(); + auto* col = schema.FindColumnByName(colName); + if (!col) { status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified schema preset '%s' and id %" PRIu32 " do not match in olap store", presetName.c_str(), presetId); - return nullptr; + errStr = TStringBuilder() + << "Column '" << colName << "' does not match schema preset"; + return false; } - pSchema = &storeInfo->SchemaPresets.at(presetId); - } else if (op.HasSchemaPresetId()) { - const ui32 presetId = op.GetSchemaPresetId(); - if (!storeInfo->SchemaPresets.contains(presetId)) { + if (colProto.HasId() && colProto.GetId() != col->Id) { status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified schema preset %" PRIu32 " does not exist in olap store", presetId); - return nullptr; + errStr = TStringBuilder() + << "Column '" << colName << "' has id " << colProto.GetId() << " that does not match schema preset"; + return false; } - const TString& presetName = storeInfo->SchemaPresets.at(presetId).Name; - op.SetSchemaPresetName(presetName); - pSchema = &storeInfo->SchemaPresets.at(presetId); - } - Y_VERIFY(pSchema, "Expected to find a preset schema"); - - if (op.HasSchema()) { - auto& opSchema = op.GetSchema(); - const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; - - ui32 lastColumnId = 0; - THashSet<ui32> usedColumns; - for (const auto& colProto : opSchema.GetColumns()) { - if (colProto.GetName().empty()) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Columns cannot have an empty name"; - return nullptr; - } - const TString& colName = colProto.GetName(); - auto* col = pSchema->FindColumnByName(colName); - if (!col) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() - << "Column '" << colName << "' does not match schema preset"; - return nullptr; - } - if (colProto.HasId() && colProto.GetId() != col->Id) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() - << "Column '" << colName << "' has id " << colProto.GetId() << " that does not match schema preset"; - return nullptr; - } - - if (!usedColumns.insert(col->Id).second) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Column '" << colName << "' is specified multiple times"; - return nullptr; - } - if (col->Id < lastColumnId) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Column order does not match schema preset"; - return nullptr; - } - lastColumnId = col->Id; - - if (colProto.HasTypeId()) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type"; - return nullptr; - } - if (!colProto.HasType()) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Missing Type for column '" << colName << "'"; - return nullptr; - } + if (!usedColumns.insert(col->Id).second) { + status = NKikimrScheme::StatusSchemeError; + errStr = TStringBuilder() << "Column '" << colName << "' is specified multiple times"; + return false; + } + if (col->Id < lastColumnId) { + status = NKikimrScheme::StatusSchemeError; + errStr = "Column order does not match schema preset"; + return false; + } + lastColumnId = col->Id; - auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); - const NScheme::IType* type = typeRegistry->GetType(typeName); - NScheme::TTypeInfo typeInfo; - if (!type || !NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { - auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); - if (!typeDesc) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() - << "Type '" << colProto.GetType() << "' specified for column '" << colName << "' is not supported"; - return nullptr; - } - typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); - } else { - typeInfo = NScheme::TTypeInfo(type->GetTypeId()); - } + if (colProto.HasTypeId()) { + status = NKikimrScheme::StatusSchemeError; + errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type"; + return false; + } + if (!colProto.HasType()) { + status = NKikimrScheme::StatusSchemeError; + errStr = TStringBuilder() << "Missing Type for column '" << colName << "'"; + return false; + } - if (typeInfo != col->Type) { + auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); + const NScheme::IType* type = typeRegistry->GetType(typeName); + NScheme::TTypeInfo typeInfo; + if (!type || !NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { + auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); + if (!typeDesc) { status = NKikimrScheme::StatusSchemeError; errStr = TStringBuilder() - << "Type '" << colProto.GetType() << "' specified for column '" << colName - << "' does not match schema preset"; - return nullptr; + << "Type '" << colProto.GetType() << "' specified for column '" << colName << "' is not supported"; + return false; } + typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); + } else { + typeInfo = NScheme::TTypeInfo(type->GetTypeId()); } - for (auto& pr : pSchema->Columns) { - if (!usedColumns.contains(pr.second.Id)) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Specified schema is missing some schema preset columns"; - return nullptr; - } + if (typeInfo != col->Type) { + status = NKikimrScheme::StatusSchemeError; + errStr = TStringBuilder() + << "Type '" << TypeName(typeInfo) << "' specified for column '" << colName + << "' does not match schema preset type '" << TypeName(col->Type) << "'"; + return false; } + } - TVector<ui32> keyColumnIds; - for (const TString& keyName : opSchema.GetKeyColumnNames()) { - auto* col = pSchema->FindColumnByName(keyName); - if (!col) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Unknown key column '" << keyName << "'"; - return nullptr; - } - keyColumnIds.push_back(col->Id); - } - if (keyColumnIds != pSchema->KeyColumnIds) { + for (auto& pr : schema.Columns) { + if (!usedColumns.contains(pr.second.Id)) { status = NKikimrScheme::StatusSchemeError; - errStr = "Specified schema key columns not matching schema preset"; - return nullptr; + errStr = "Specified schema is missing some schema preset columns"; + return false; } + } - if (opSchema.GetEngine() != pSchema->Engine) { + TVector<ui32> keyColumnIds; + for (const TString& keyName : opSchema.GetKeyColumnNames()) { + auto* col = schema.FindColumnByName(keyName); + if (!col) { status = NKikimrScheme::StatusSchemeError; - errStr = "Specified schema engine does not match schema preset"; - return nullptr; + errStr = TStringBuilder() << "Unknown key column '" << keyName << "'"; + return false; } - - op.ClearSchema(); + keyColumnIds.push_back(col->Id); } - - if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { + if (keyColumnIds != schema.KeyColumnIds) { status = NKikimrScheme::StatusSchemeError; - errStr = "TTL presets are not supported"; - return nullptr; - } - - if (op.HasTtlSettings()) { - op.MutableTtlSettings()->SetVersion(1); + errStr = "Specified schema key columns not matching schema preset"; + return false; } - // Validate ttl settings and schema compatibility - if (op.HasTtlSettings()) { - if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, errStr)) { - status = NKikimrScheme::StatusInvalidParameter; - return nullptr; - } + if (opSchema.GetEngine() != schema.Engine) { + status = NKikimrScheme::StatusSchemeError; + errStr = "Specified schema engine does not match schema preset"; + return false; } + return true; +} +bool SetSharding(const TOlapSchema& schema, NKikimrSchemeOp::TColumnTableDescription& op, + TColumnTableInfo::TPtr tableInfo, + TEvSchemeShard::EStatus& status, TString& errStr) +{ + ui32 shardsCount = Max(ui32(1), op.GetColumnShardCount()); if (op.HasSharding()) { tableInfo->Sharding = std::move(*op.MutableSharding()); - op.ClearSharding(); - } else { - // Use default random sharding + } else if (shardsCount < 2) { tableInfo->Sharding.MutableRandomSharding(); + } else { + status = NKikimrScheme::StatusSchemeError; + errStr = Sprintf("Sharding is not set"); + return false; } + op.ClearSharding(); + switch (tableInfo->Sharding.Method_case()) { case NKikimrSchemeOp::TColumnTableSharding::kRandomSharding: { // Random sharding implies non-unique primary key - tableInfo->Sharding.SetUniquePrimaryKey(false); + if (shardsCount > 1) { + tableInfo->Sharding.SetUniquePrimaryKey(false); + } break; } case NKikimrSchemeOp::TColumnTableSharding::kHashSharding: { @@ -215,16 +161,15 @@ TColumnTableInfo::TPtr CreateColumnTable( if (sharding.ColumnsSize() == 0) { status = NKikimrScheme::StatusSchemeError; errStr = Sprintf("Hash sharding requires a non-empty list of columns"); - return nullptr; + return false; } - Y_VERIFY(pSchema); bool keysOnly = true; for (const TString& columnName : sharding.GetColumns()) { - auto* pColumn = pSchema->FindColumnByName(columnName); + auto* pColumn = schema.FindColumnByName(columnName); if (!pColumn) { status = NKikimrScheme::StatusSchemeError; errStr = Sprintf("Hash sharding is using an unknown column '%s'", columnName.c_str()); - return nullptr; + return false; } if (!pColumn->IsKeyColumn()) { keysOnly = false; @@ -237,11 +182,103 @@ TColumnTableInfo::TPtr CreateColumnTable( default: { status = NKikimrScheme::StatusSchemeError; errStr = "Unsupported sharding method"; + return false; + } + } + return true; +} + +bool CheckSupported(const NKikimrSchemeOp::TColumnTableDescription& op, + TEvSchemeShard::EStatus& status, TString& errStr) +{ + if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { + status = NKikimrScheme::StatusSchemeError; + errStr = "TTL presets are not supported"; + return false; + } + if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { + status = NKikimrScheme::StatusSchemeError; + errStr = "TTL presets are not supported"; + return false; + } + return true; +} + +TColumnTableInfo::TPtr CreateColumnTableInStore( + TColumnTableInfo::TPtr& tableInfo, + TOlapStoreInfo::TPtr storeInfo, + ui32 columnShardCount, + TEvSchemeShard::EStatus& status, TString& errStr) +{ + auto& op = tableInfo->Description; + + if (!CheckSupported(op, status, errStr)) { + return nullptr; + } + + if (!op.HasSchemaPresetName() && !op.HasSchemaPresetId()) { + op.SetSchemaPresetName("default"); + } + + const TOlapSchema* pSchema = nullptr; + + if (op.HasSchemaPresetName()) { + const TString presetName = op.GetSchemaPresetName(); + if (!storeInfo->SchemaPresetByName.contains(presetName)) { + status = NKikimrScheme::StatusSchemeError; + errStr = Sprintf("Specified schema preset '%s' does not exist in tablestore", presetName.c_str()); return nullptr; } + const ui32 presetId = storeInfo->SchemaPresetByName.at(presetName); + if (!op.HasSchemaPresetId()) { + op.SetSchemaPresetId(presetId); + } + if (op.GetSchemaPresetId() != presetId) { + status = NKikimrScheme::StatusSchemeError; + errStr = Sprintf("Specified schema preset '%s' and id %" PRIu32 " do not match in tablestore", presetName.c_str(), presetId); + return nullptr; + } + pSchema = &storeInfo->SchemaPresets.at(presetId); + } else if (op.HasSchemaPresetId()) { + const ui32 presetId = op.GetSchemaPresetId(); + if (!storeInfo->SchemaPresets.contains(presetId)) { + status = NKikimrScheme::StatusSchemeError; + errStr = Sprintf("Specified schema preset %" PRIu32 " does not exist in tablestore", presetId); + return nullptr; + } + const TString& presetName = storeInfo->SchemaPresets.at(presetId).Name; + op.SetSchemaPresetName(presetName); + pSchema = &storeInfo->SchemaPresets.at(presetId); + } + + Y_VERIFY(pSchema, "No schema preset id/name for in-store column table"); + + if (op.HasSchema()) { + auto& opSchema = op.GetSchema(); + + if (!ValidateSchema(*pSchema, opSchema, status, errStr)) { + return nullptr; + } + + op.ClearSchema(); + } + + if (op.HasTtlSettings()) { + op.MutableTtlSettings()->SetVersion(1); + } + + // Validate ttl settings and schema compatibility + if (op.HasTtlSettings()) { + if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, errStr)) { + status = NKikimrScheme::StatusInvalidParameter; + return nullptr; + } + } + + if (!SetSharding(*pSchema, op, tableInfo, status, errStr)) { + return nullptr; } - const ui32 columnShardCount = Max(ui32(1), op.GetColumnShardCount()); if (columnShardCount > storeInfo->ColumnShards.size()) { status = NKikimrScheme::StatusSchemeError; errStr = Sprintf("Cannot create table with %" PRIu32 " column shards, only %" PRIu32 " are available", @@ -249,13 +286,23 @@ TColumnTableInfo::TPtr CreateColumnTable( return nullptr; } - tableInfo->ColumnShards.reserve(storeInfo->ColumnShards.size()); - for (const auto& shardIdx : storeInfo->ColumnShards) { + return tableInfo; +} + +void SetShardingTablets( + TColumnTableInfo::TPtr& tableInfo, + const TVector<TShardIdx>& columnShards, ui32 columnShardCount, bool shuffle, + TSchemeShard* ss) +{ + tableInfo->ColumnShards.reserve(columnShards.size()); + for (const auto& shardIdx : columnShards) { auto* shardInfo = ss->ShardInfos.FindPtr(shardIdx); Y_VERIFY(shardInfo, "ColumnShard not found"); tableInfo->ColumnShards.push_back(shardInfo->TabletID.GetValue()); } - ShuffleRange(tableInfo->ColumnShards); + if (shuffle) { + ShuffleRange(tableInfo->ColumnShards); + } tableInfo->ColumnShards.resize(columnShardCount); tableInfo->Sharding.SetVersion(1); @@ -267,14 +314,60 @@ TColumnTableInfo::TPtr CreateColumnTable( } tableInfo->Sharding.ClearAdditionalColumnShards(); +} - // Don't allow users to set these fields - op.ClearSchemaPresetVersionAdj(); - op.ClearTtlSettingsPresetVersionAdj(); +TColumnTableInfo::TPtr CreateColumnTable( + TColumnTableInfo::TPtr& tableInfo, + TEvSchemeShard::EStatus& status, TString& errStr) +{ + auto& op = tableInfo->Description; + + if (!CheckSupported(op, status, errStr)) { + return nullptr; + } + + if (op.HasSchemaPresetName() || op.HasSchemaPresetId()) { + status = NKikimrScheme::StatusSchemeError; + errStr = "Schema presets are not supported for standalone column tables"; + return nullptr; + } + + if (!op.HasSchema()) { + status = NKikimrScheme::StatusSchemeError; + errStr = "No schema for column table specified"; + return nullptr; + } + + NKikimrSchemeOp::TColumnTableSchema* opSchema = op.MutableSchema(); + tableInfo->Schema = TOlapSchema(); + auto& schema = *tableInfo->Schema; + + if (!PrepareSchema(*opSchema, schema, errStr)) { + status = NKikimrScheme::StatusSchemeError; + return nullptr; + } + + if (op.HasTtlSettings()) { + op.MutableTtlSettings()->SetVersion(1); + + if (!ValidateTtlSettings(op.GetTtlSettings(), schema.Columns, schema.ColumnsByName, errStr)) { + status = NKikimrScheme::StatusInvalidParameter; + return nullptr; + } + } + + if (!SetSharding(schema, op, tableInfo, status, errStr)) { + return nullptr; + } + + if (!op.GetStorageConfig().HasDataChannelCount()) { + op.MutableStorageConfig()->SetDataChannelCount(1); + } return tableInfo; } + class TConfigureParts: public TSubOperationState { private: TOperationId OperationId; @@ -308,50 +401,67 @@ public: TPathId pathId = txState->TargetPathId; TPath path = TPath::Init(pathId, context.SS); - TString pathString = path.PathString(); TColumnTableInfo::TPtr pendingInfo = context.SS->ColumnTables[pathId]; Y_VERIFY(pendingInfo); Y_VERIFY(pendingInfo->AlterData); TColumnTableInfo::TPtr tableInfo = pendingInfo->AlterData; - auto olapStorePath = path.FindOlapStore(); - Y_VERIFY(olapStorePath, "Unexpected failure to find an olap store"); - auto storeInfo = context.SS->OlapStores.at(olapStorePath->PathId); - txState->ClearShardsInProgress(); auto seqNo = context.SS->StartRound(*txState); + Y_VERIFY(tableInfo->ColumnShards.empty() || tableInfo->OwnedColumnShards.empty()); + TString columnShardTxBody; + NKikimrTxColumnShard::TSchemaTxBody tx; + context.SS->FillSeqNo(tx, seqNo); { - NKikimrTxColumnShard::TSchemaTxBody tx; - context.SS->FillSeqNo(tx, seqNo); + NKikimrTxColumnShard::TCreateTable* create{}; + if (tableInfo->IsStandalone()) { + Y_VERIFY(tableInfo->ColumnShards.empty()); + Y_VERIFY(tableInfo->Description.HasSchema()); - auto* create = tx.MutableEnsureTables()->AddTables(); + auto* init = tx.MutableInitShard(); + init->SetDataChannelCount(tableInfo->Description.GetStorageConfig().GetDataChannelCount()); + init->SetOwnerPathId(pathId.LocalPathId); + init->SetOwnerPath(path.PathString()); - create->SetPathId(pathId.LocalPathId); - if (tableInfo->Description.HasSchema()) { + create = init->AddTables(); create->MutableSchema()->CopyFrom(tableInfo->Description.GetSchema()); - } - if (tableInfo->Description.HasSchemaPresetId()) { + } else { + Y_VERIFY(tableInfo->OwnedColumnShards.empty()); + Y_VERIFY(!tableInfo->Description.HasSchema()); + Y_VERIFY(tableInfo->Description.HasSchemaPresetId()); + + create = tx.MutableEnsureTables()->AddTables(); + + if (tableInfo->Description.HasSchemaPresetVersionAdj()) { + create->SetSchemaPresetVersionAdj(tableInfo->Description.GetSchemaPresetVersionAdj()); + } + + auto olapStorePath = path.FindOlapStore(); + Y_VERIFY(olapStorePath, "Unexpected failure to find a tablestore"); + auto storeInfo = context.SS->OlapStores.at(olapStorePath->PathId); + const ui32 presetId = tableInfo->Description.GetSchemaPresetId(); Y_VERIFY(storeInfo->SchemaPresets.contains(presetId), - "Failed to find schema preset %" PRIu32 " in an olap store", presetId); + "Failed to find schema preset %" PRIu32 " in a tablestore", presetId); auto& preset = storeInfo->SchemaPresets.at(presetId); size_t presetIndex = preset.ProtoIndex; create->MutableSchemaPreset()->CopyFrom(storeInfo->Description.GetSchemaPresets(presetIndex)); } + + Y_VERIFY(create); + create->SetPathId(pathId.LocalPathId); + if (tableInfo->Description.HasTtlSettings()) { create->MutableTtlSettings()->CopyFrom(tableInfo->Description.GetTtlSettings()); } - if (tableInfo->Description.HasSchemaPresetVersionAdj()) { - create->SetSchemaPresetVersionAdj(tableInfo->Description.GetSchemaPresetVersionAdj()); - } - - Y_VERIFY(tx.SerializeToString(&columnShardTxBody)); } + Y_VERIFY(tx.SerializeToString(&columnShardTxBody)); + for (auto& shard : txState->Shards) { TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID; @@ -423,6 +533,11 @@ public: Y_VERIFY(pending); TColumnTableInfo::TPtr table = pending->AlterData; Y_VERIFY(table); + if (table->IsStandalone()) { + Y_VERIFY(table->ColumnShards.empty()); + SetShardingTablets(table, table->OwnedColumnShards, table->OwnedColumnShards.size(), false, context.SS); + } + context.SS->ColumnTables[pathId] = table; context.SS->PersistColumnTableAlterRemove(db, pathId); @@ -544,13 +659,18 @@ class TCreateColumnTable: public TSubOperation { const TTxTransaction Transaction; TTxState::ETxState State = TTxState::Invalid; - TTxState::ETxState NextState() { - return TTxState::ConfigureParts; + TTxState::ETxState NextState(bool inStore) { + if (inStore) { + return TTxState::ConfigureParts; + } + return TTxState::CreateParts; } TTxState::ETxState NextState(TTxState::ETxState state) { switch(state) { case TTxState::Waiting: + case TTxState::CreateParts: + return TTxState::ConfigureParts; case TTxState::ConfigureParts: return TTxState::Propose; case TTxState::Propose: @@ -568,6 +688,8 @@ class TCreateColumnTable: public TSubOperation { switch(state) { case TTxState::Waiting: + case TTxState::CreateParts: + return TPtr(new TCreateParts(OperationId)); case TTxState::ConfigureParts: return TPtr(new TConfigureParts(OperationId)); case TTxState::Propose: @@ -610,6 +732,8 @@ public: const TString& parentPathStr = Transaction.GetWorkingDir(); auto& createDescription = Transaction.GetCreateColumnTable(); const TString& name = createDescription.GetName(); + const ui32 shardsCount = Max(ui32(1), createDescription.GetColumnShardCount()); + auto opTxId = OperationId.GetTxId(); LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TCreateColumnTable Propose" @@ -618,8 +742,9 @@ public: << ", at schemeshard: " << ssId); TEvSchemeShard::EStatus status = NKikimrScheme::StatusAccepted; - auto result = MakeHolder<TProposeResponse>(status, ui64(OperationId.GetTxId()), ui64(ssId)); + auto result = MakeHolder<TProposeResponse>(status, ui64(opTxId), ui64(ssId)); + TOlapStoreInfo::TPtr storeInfo; NSchemeShard::TPath parentPath = NSchemeShard::TPath::Resolve(parentPathStr, context.SS); { NSchemeShard::TPath::TChecker checks = parentPath.Check(); @@ -629,7 +754,6 @@ public: .IsResolved() .NotDeleted() .NotUnderDeleting() - .HasOlapStore() .IsCommonSensePath() .IsLikeDirectory(); @@ -640,6 +764,29 @@ public: result->SetError(status, explain); return result; } + + if (auto olapStorePath = parentPath.FindOlapStore()) { + storeInfo = context.SS->OlapStores.at(olapStorePath->PathId); + Y_VERIFY(storeInfo, "Unexpected failure to find an tablestore info"); + + NSchemeShard::TPath::TChecker ckecksStore = olapStorePath.Check(); + ckecksStore + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsOlapStore() + .NotUnderOperation(); + + if (!ckecksStore) { + TString explain = TStringBuilder() << "tablestore fail checks" + << ", path: " << olapStorePath.PathString(); + auto status = ckecksStore.GetStatus(&explain); + result->SetError(status, explain); + return result; + } + } } const TString acl = Transaction.GetModifyACL().GetDiffACL(); @@ -664,6 +811,8 @@ public: .IsValidLeafName() .DepthLimit() .PathsLimit() + .ShardsLimit(storeInfo ? 0 : shardsCount) + .PathShardsLimit(storeInfo ? 0 : shardsCount) .DirChildrenLimit() .IsValidACL(acl); } @@ -688,40 +837,35 @@ public: return result; } - auto olapStorePath = dstPath.FindOlapStore(); - Y_VERIFY(olapStorePath, "Unexpected failure to find an olap store"); - auto storeInfo = context.SS->OlapStores.at(olapStorePath->PathId); - { - NSchemeShard::TPath::TChecker checks = olapStorePath.Check(); - checks - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() - .IsResolved() - .NotDeleted() - .NotUnderDeleting() - .IsOlapStore() - .NotUnderOperation(); - - if (!checks) { - TString explain = TStringBuilder() << "olap store fail checks" - << ", path: " << olapStorePath.PathString(); - auto status = checks.GetStatus(&explain); - result->SetError(status, explain); - return result; - } - } - if (!AppData()->FeatureFlags.GetEnableOlapSchemaOperations()) { result->SetError(NKikimrScheme::StatusPreconditionFailed, "Olap schema operations are not supported"); return result; } - TColumnTableInfo::TPtr tableInfo = CreateColumnTable(createDescription, storeInfo, *parentPath.DomainInfo(), status, errStr, context.SS); + TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo; + { + tableInfo->AlterVersion = 1; + tableInfo->Description.CopyFrom(createDescription); + // Don't allow users to set these fields + tableInfo->Description.ClearSchemaPresetVersionAdj(); + tableInfo->Description.ClearTtlSettingsPresetVersionAdj(); + } + + if (storeInfo) { + tableInfo = CreateColumnTableInStore(tableInfo, storeInfo, shardsCount, status, errStr); + if (tableInfo) { + SetShardingTablets(tableInfo, storeInfo->ColumnShards, shardsCount, true, context.SS); + } + } else { + tableInfo = CreateColumnTable(tableInfo, status, errStr); + } + if (!tableInfo) { result->SetError(status, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateColumnTable, errStr)) { result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); return result; @@ -733,49 +877,111 @@ public: context.SS->TabletCounters->Simple()[COUNTER_COLUMN_TABLE_COUNT].Add(1); TPathId pathId = dstPath.Base()->PathId; - dstPath.Base()->CreateTxId = OperationId.GetTxId(); - dstPath.Base()->LastTxId = OperationId.GetTxId(); + dstPath.Base()->CreateTxId = opTxId; + dstPath.Base()->LastTxId = opTxId; dstPath.Base()->PathState = TPathElement::EPathState::EPathStateCreate; dstPath.Base()->PathType = TPathElement::EPathType::EPathTypeColumnTable; NIceDb::TNiceDb db(context.GetDB()); TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateColumnTable, pathId); - txState.State = TTxState::ConfigureParts; - - txState.Shards.reserve(tableInfo->ColumnShards.size()); - for (ui64 columnShardId : tableInfo->ColumnShards) { - auto tabletId = TTabletId(columnShardId); - auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId); - TShardInfo& shardInfo = context.SS->ShardInfos.at(shardIdx); - txState.Shards.emplace_back(shardIdx, ETabletType::ColumnShard, TTxState::ConfigureParts); - // N.B. we seem to only need CurrentTxId when creating/modifying tablets - shardInfo.CurrentTxId = OperationId.GetTxId(); - context.SS->PersistShardTx(db, shardIdx, OperationId.GetTxId()); - } - - TColumnTableInfo::TPtr pending = new TColumnTableInfo; - pending->AlterData = tableInfo; - pending->SetOlapStorePathId(olapStorePath->PathId); - tableInfo->SetOlapStorePathId(olapStorePath->PathId); - context.SS->ColumnTables[pathId] = pending; - storeInfo->ColumnTables.insert(pathId); - storeInfo->ColumnTablesUnderOperation.insert(pathId); - context.SS->PersistColumnTable(db, pathId, *pending); - context.SS->PersistColumnTableAlter(db, pathId, *tableInfo); - context.SS->IncrementPathDbRefCount(pathId); - - if (parentPath.Base()->HasActiveChanges()) { - TTxId parentTxId = parentPath.Base()->PlannedToCreate() ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId; - context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); - } - - // Sequentially chain operations in the same olap store - if (context.SS->Operations.contains(olapStorePath.Base()->LastTxId)) { - context.OnComplete.Dependence(olapStorePath.Base()->LastTxId, OperationId.GetTxId()); - } - olapStorePath.Base()->LastTxId = OperationId.GetTxId(); - context.SS->PersistLastTxId(db, olapStorePath.Base()); + + Y_VERIFY(tableInfo); + if (storeInfo) { + auto olapStorePath = parentPath.FindOlapStore(); + + txState.State = TTxState::ConfigureParts; + txState.Shards.reserve(tableInfo->ColumnShards.size()); + + for (ui64 columnShardId : tableInfo->ColumnShards) { + auto tabletId = TTabletId(columnShardId); + auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId); + TShardInfo& shardInfo = context.SS->ShardInfos.at(shardIdx); + txState.Shards.emplace_back(shardIdx, ETabletType::ColumnShard, TTxState::ConfigureParts); + // N.B. we seem to only need CurrentTxId when creating/modifying tablets + shardInfo.CurrentTxId = opTxId; + context.SS->PersistShardTx(db, shardIdx, opTxId); + } + + TColumnTableInfo::TPtr pending = new TColumnTableInfo; + pending->AlterData = tableInfo; + pending->SetOlapStorePathId(olapStorePath->PathId); + tableInfo->SetOlapStorePathId(olapStorePath->PathId); + context.SS->ColumnTables[pathId] = pending; + storeInfo->ColumnTables.insert(pathId); + storeInfo->ColumnTablesUnderOperation.insert(pathId); + context.SS->PersistColumnTable(db, pathId, *pending); + context.SS->PersistColumnTableAlter(db, pathId, *tableInfo); + context.SS->IncrementPathDbRefCount(pathId); + + if (parentPath.Base()->HasActiveChanges()) { + TTxId parentTxId = parentPath.Base()->PlannedToCreate() + ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId; + context.OnComplete.Dependence(parentTxId, opTxId); + } + + // Sequentially chain operations in the same store + if (context.SS->Operations.contains(olapStorePath.Base()->LastTxId)) { + context.OnComplete.Dependence(olapStorePath.Base()->LastTxId, opTxId); + } + olapStorePath.Base()->LastTxId = opTxId; + context.SS->PersistLastTxId(db, olapStorePath.Base()); + } else { + NKikimrSchemeOp::TColumnStorageConfig storageConfig; // default + storageConfig.SetDataChannelCount(1); + + TChannelsBindings channelsBindings; + if (!context.SS->GetOlapChannelsBindings(dstPath.GetPathIdForDomain(), + storageConfig, channelsBindings, errStr)) + { + result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); + return result; + } + + txState.State = TTxState::CreateParts; + txState.Shards.reserve(shardsCount); + + TShardInfo columnShardInfo = TShardInfo::ColumnShardInfo(opTxId, pathId); + columnShardInfo.BindedChannels = channelsBindings; + + tableInfo->StandaloneSharding = NKikimrSchemeOp::TColumnStoreSharding(); + Y_VERIFY(tableInfo->OwnedColumnShards.empty()); + tableInfo->OwnedColumnShards.reserve(shardsCount); + + for (ui64 i = 0; i < shardsCount; ++i) { + TShardIdx idx = context.SS->RegisterShardInfo(columnShardInfo); + context.SS->TabletCounters->Simple()[COUNTER_COLUMN_SHARDS].Add(1); + txState.Shards.emplace_back(idx, ETabletType::ColumnShard, TTxState::CreateParts); + + auto* shardInfoProto = tableInfo->StandaloneSharding->AddColumnShards(); + shardInfoProto->SetOwnerId(idx.GetOwnerId()); + shardInfoProto->SetLocalId(idx.GetLocalId().GetValue()); + + tableInfo->OwnedColumnShards.emplace_back(std::move(idx)); + } + + context.SS->SetPartitioning(pathId, tableInfo); + + for (auto shard : txState.Shards) { + context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, pathId, opTxId, shard.TabletType); + context.SS->PersistChannelsBinding(db, shard.Idx, channelsBindings); + } + Y_VERIFY(txState.Shards.size() == shardsCount); + + TColumnTableInfo::TPtr pending = new TColumnTableInfo; + pending->AlterData = tableInfo; + + context.SS->ColumnTables[pathId] = pending; + context.SS->PersistColumnTable(db, pathId, *pending); + context.SS->PersistColumnTableAlter(db, pathId, *tableInfo); + context.SS->IncrementPathDbRefCount(pathId); + + if (parentPath.Base()->HasActiveChanges()) { + TTxId parentTxId = parentPath.Base()->PlannedToCreate() + ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId; + context.OnComplete.Dependence(parentTxId, opTxId); + } + } context.SS->PersistTxState(db, OperationId); context.SS->PersistPath(db, dstPath.Base()->PathId); @@ -799,9 +1005,13 @@ public: context.OnComplete.PublishToSchemeBoard(OperationId, dstPath.Base()->PathId); dstPath.DomainInfo()->IncPathsInside(); + if (!storeInfo) { + dstPath.DomainInfo()->AddInternalShards(txState); + dstPath.Base()->IncShardsInside(tableInfo->OwnedColumnShards.size()); + } parentPath.Base()->IncAliveChildren(); - State = NextState(); + State = NextState(!!storeInfo); SetState(SelectStateFunc(State)); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp index 446c86b93b4..f1d294b5e27 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp @@ -446,11 +446,28 @@ TVector<ISubOperationBase::TPtr> CreateDropIndexedTable(TOperationId nextId, con checks .NotEmpty() .IsResolved() - .NotDeleted() - .IsTable() - .NotUnderDeleting() - .NotUnderOperation() - .IsCommonSensePath(); + .NotDeleted(); + + if (checks) { + if (table.Base()->IsColumnTable()) { + checks + .IsColumnTable() + .NotUnderDeleting() + .NotUnderOperation() + .IsCommonSensePath(); + + if (checks) { + // DROP TABLE statement has no info is it a drop of row or column table + return {CreateDropColumnTable(nextId, tx)}; + } + } else { + checks + .IsTable() + .NotUnderDeleting() + .NotUnderOperation() + .IsCommonSensePath(); + } + } if (!checks) { TString explain = TStringBuilder() << "path table fail checks" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp index 369b25fa8b1..174ad91e908 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp @@ -189,19 +189,13 @@ public: txState->ClearShardsInProgress(); for (auto& shard : txState->Shards) { + Y_VERIFY(shard.TabletType == ETabletType::ColumnShard); + TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID; - switch (shard.TabletType) { - case ETabletType::ColumnShard: { - auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId())); + auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId())); - context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); - txState->ShardsInProgress.insert(shard.Idx); - break; - } - default: { - Y_FAIL("unexpected tablet type"); - } - } + context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); + txState->ShardsInProgress.insert(shard.Idx); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " ProgressState" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp index 21719ccdde2..bdb673602af 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp @@ -60,9 +60,11 @@ public: } for (auto& shard : txState->Shards) { + Y_VERIFY(shard.TabletType == ETabletType::ColumnShard); + TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID; - if (shard.TabletType == ETabletType::ColumnShard) { + { auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( NKikimrTxColumnShard::TX_KIND_SCHEMA, context.SS->TabletID(), @@ -72,8 +74,6 @@ public: context.SS->SelectProcessingPrarams(txState->TargetPathId)); context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); - } else { - Y_FAIL("unexpected tablet type"); } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -233,19 +233,13 @@ public: txState->ClearShardsInProgress(); for (auto& shard : txState->Shards) { + Y_VERIFY(shard.TabletType == ETabletType::ColumnShard); + TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID; - switch (shard.TabletType) { - case ETabletType::ColumnShard: { - auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId())); + auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId())); - context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); - txState->ShardsInProgress.insert(shard.Idx); - break; - } - default: { - Y_FAIL("unexpected tablet type"); - } - } + context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); + txState->ShardsInProgress.insert(shard.Idx); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " ProgressState" @@ -289,9 +283,23 @@ public: Y_VERIFY(txState); Y_VERIFY(txState->TxType == TTxState::TxDropColumnTable); + bool isStandalone = false; + { + Y_VERIFY(context.SS->ColumnTables.contains(txState->TargetPathId)); + TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(txState->TargetPathId); + Y_VERIFY(tableInfo); + isStandalone = tableInfo->IsStandalone(); + } + NIceDb::TNiceDb db(context.GetDB()); context.SS->PersistColumnTableRemove(db, txState->TargetPathId); + if (isStandalone) { + for (auto& shard : txState->Shards) { + context.OnComplete.DeleteShard(shard.Idx); + } + } + context.OnComplete.DoneOperation(OperationId); return true; } @@ -319,6 +327,7 @@ public: const TString& parentPathStr = Transaction.GetWorkingDir(); const TString& name = drop.GetName(); + auto opTxId = OperationId.GetTxId(); LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TDropColumnTable Propose" @@ -327,7 +336,7 @@ public: << ", opId: " << OperationId << ", at schemeshard: " << ssId); - auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(opTxId), ui64(ssId)); TPath path = drop.HasId() ? TPath::Init(context.SS->MakeLocalId(drop.GetId()), context.SS) @@ -378,30 +387,6 @@ public: } } - Y_VERIFY(context.SS->ColumnTables.contains(path.Base()->PathId)); - TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId); - - TPath storePath = TPath::Init(tableInfo->OlapStorePathId, context.SS); - { - TPath::TChecker checks = storePath.Check(); - checks - .NotEmpty() - .IsResolved() - .IsOlapStore() - .NotUnderOperation(); - - if (!checks) { - TString explain = TStringBuilder() << "store path fail checks" - << ", path: " << storePath.PathString(); - auto status = checks.GetStatus(&explain); - result->SetError(status, explain); - return result; - } - } - - Y_VERIFY(context.SS->OlapStores.contains(tableInfo->OlapStorePathId)); - TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(tableInfo->OlapStorePathId); - TString errStr; if (!context.SS->CheckApplyIf(Transaction, errStr)) { result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); @@ -412,9 +397,6 @@ public: return result; } - Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId)); - storeInfo->ColumnTablesUnderOperation.insert(path->PathId); - TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropColumnTable, path.Base()->PathId); txState.State = TTxState::DropParts; // Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped" @@ -422,30 +404,68 @@ public: NIceDb::TNiceDb db(context.GetDB()); - // TODO: we need to know all shards where this table has ever been created - for (ui64 columnShardId : tableInfo->ColumnShards) { - auto tabletId = TTabletId(columnShardId); - auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId); + Y_VERIFY(context.SS->ColumnTables.contains(path.Base()->PathId)); + TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId); - Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx); - txState.Shards.emplace_back(shardIdx, context.SS->ShardInfos[shardIdx].TabletType, TTxState::DropParts); + if (tableInfo->IsStandalone()) { + for (auto shardIdx : tableInfo->OwnedColumnShards) { + Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx); + txState.Shards.emplace_back(shardIdx, context.SS->ShardInfos[shardIdx].TabletType, TTxState::DropParts); - context.SS->ShardInfos[shardIdx].CurrentTxId = OperationId.GetTxId(); - context.SS->PersistShardTx(db, shardIdx, OperationId.GetTxId()); - } + context.SS->ShardInfos[shardIdx].CurrentTxId = opTxId; + context.SS->PersistShardTx(db, shardIdx, opTxId); + } + } else { + auto& storePathId = *tableInfo->OlapStorePathId; + TPath storePath = TPath::Init(storePathId, context.SS); + { + TPath::TChecker checks = storePath.Check(); + checks + .NotEmpty() + .IsResolved() + .IsOlapStore() + .NotUnderOperation(); + + if (!checks) { + TString explain = TStringBuilder() << "store path fail checks" + << ", path: " << storePath.PathString(); + auto status = checks.GetStatus(&explain); + result->SetError(status, explain); + return result; + } + } + + Y_VERIFY(context.SS->OlapStores.contains(storePathId)); + TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(storePathId); + + Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId)); + storeInfo->ColumnTablesUnderOperation.insert(path->PathId); - // Sequentially chain operations in the same olap store - if (context.SS->Operations.contains(storePath.Base()->LastTxId)) { - context.OnComplete.Dependence(storePath.Base()->LastTxId, OperationId.GetTxId()); + // Sequentially chain operations in the same olap store + if (context.SS->Operations.contains(storePath.Base()->LastTxId)) { + context.OnComplete.Dependence(storePath.Base()->LastTxId, opTxId); + } + storePath.Base()->LastTxId = opTxId; + context.SS->PersistLastTxId(db, storePath.Base()); + + // TODO: we need to know all shards where this table has ever been created + for (ui64 columnShardId : tableInfo->ColumnShards) { + auto tabletId = TTabletId(columnShardId); + auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId); + + Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx); + txState.Shards.emplace_back(shardIdx, context.SS->ShardInfos[shardIdx].TabletType, TTxState::DropParts); + + context.SS->ShardInfos[shardIdx].CurrentTxId = opTxId; + context.SS->PersistShardTx(db, shardIdx, opTxId); + } } - storePath.Base()->LastTxId = OperationId.GetTxId(); - context.SS->PersistLastTxId(db, storePath.Base()); context.OnComplete.ActivateTx(OperationId); path.Base()->PathState = TPathElement::EPathState::EPathStateDrop; - path.Base()->DropTxId = OperationId.GetTxId(); - path.Base()->LastTxId = OperationId.GetTxId(); + path.Base()->DropTxId = opTxId; + path.Base()->LastTxId = opTxId; context.SS->PersistLastTxId(db, path.Base()); context.SS->PersistTxState(db, OperationId); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 5999e2f0b94..de6e27a2eee 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -3005,11 +3005,23 @@ void TSchemeShard::PersistColumnTable(NIceDb::TNiceDb& db, TPathId pathId, const db.Table<Schema::ColumnTablesAlters>().Key(pathId.LocalPathId).Update( NIceDb::TUpdate<Schema::ColumnTablesAlters::AlterBody>(serializedAlterBody)); } + if (tableInfo.StandaloneSharding) { + TString serializedOwnedShards; + Y_VERIFY(tableInfo.StandaloneSharding->SerializeToString(&serializedOwnedShards)); + db.Table<Schema::ColumnTablesAlters>().Key(pathId.LocalPathId).Update( + NIceDb::TUpdate<Schema::ColumnTablesAlters::StandaloneSharding>(serializedOwnedShards)); + } } else { db.Table<Schema::ColumnTables>().Key(pathId.LocalPathId).Update( NIceDb::TUpdate<Schema::ColumnTables::AlterVersion>(tableInfo.AlterVersion), NIceDb::TUpdate<Schema::ColumnTables::Description>(serialized), NIceDb::TUpdate<Schema::ColumnTables::Sharding>(serializedSharding)); + if (tableInfo.StandaloneSharding) { + TString serializedOwnedShards; + Y_VERIFY(tableInfo.StandaloneSharding->SerializeToString(&serializedOwnedShards)); + db.Table<Schema::ColumnTables>().Key(pathId.LocalPathId).Update( + NIceDb::TUpdate<Schema::ColumnTables::StandaloneSharding>(serializedOwnedShards)); + } } } @@ -3032,8 +3044,9 @@ void TSchemeShard::PersistColumnTableRemove(NIceDb::TNiceDb& db, TPathId pathId, } // Unlink table from olap store - if (OlapStores.contains(tableInfo->OlapStorePathId)) { - auto storeInfo = OlapStores.at(tableInfo->OlapStorePathId); + if (tableInfo->OlapStorePathId && *tableInfo->OlapStorePathId) { + Y_VERIFY(OlapStores.contains(*tableInfo->OlapStorePathId)); + auto storeInfo = OlapStores.at(*tableInfo->OlapStorePathId); storeInfo->ColumnTablesUnderOperation.erase(pathId); storeInfo->ColumnTables.erase(pathId); } @@ -3657,7 +3670,8 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co if (tableInfo->Description.HasSchema()) { result.SetColumnTableSchemaVersion(tableInfo->Description.GetSchema().GetVersion()); } else if (tableInfo->Description.HasSchemaPresetId() && tableInfo->OlapStorePathId) { - auto storeInfo = OlapStores.at(tableInfo->OlapStorePathId); + Y_VERIFY(OlapStores.contains(*tableInfo->OlapStorePathId)); + auto& storeInfo = OlapStores.at(*tableInfo->OlapStorePathId); auto& preset = storeInfo->SchemaPresets.at(tableInfo->Description.GetSchemaPresetId()); result.SetColumnTableSchemaVersion(tableInfo->Description.GetSchemaPresetVersionAdj() + preset.Version); } else { @@ -5954,9 +5968,7 @@ bool TSchemeShard::FillUniformPartitioning(TVector<TString>& rangeEnds, ui32 key return true; } -void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo) { - const TVector<TShardIdx>& partitioning = storeInfo->ColumnShards; - +void TSchemeShard::SetPartitioning(TPathId pathId, const TVector<TShardIdx>& partitioning) { if (AppData()->FeatureFlags.GetEnableSystemViews()) { TVector<std::pair<ui64, ui64>> shardIndices; shardIndices.reserve(partitioning.size()); @@ -5971,6 +5983,14 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInf } } +void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo) { + SetPartitioning(pathId, storeInfo->ColumnShards); +} + +void TSchemeShard::SetPartitioning(TPathId pathId, TColumnTableInfo::TPtr tableInfo) { + SetPartitioning(pathId, tableInfo->OwnedColumnShards); +} + void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning) { if (AppData()->FeatureFlags.GetEnableSystemViews()) { TVector<std::pair<ui64, ui64>> shardIndices; @@ -6232,7 +6252,7 @@ void TSchemeShard::ConfigureStatsOperations(const NKikimrConfig::TSchemeShardCon auto txState = TTxState::ConvertToTxType(operationConfig.GetType()); InFlightLimits[txState] = limit; } - + if (InFlightLimits.empty()) { NKikimrConfig::TSchemeShardConfig_TInFlightCounterConfig inFlightCounterConfig; auto defaultInFlightLimit = inFlightCounterConfig.GetInFlightLimit(); @@ -6252,7 +6272,7 @@ void TSchemeShard::ConfigureStatsOperations(const NKikimrConfig::TSchemeShardCon bool TSchemeShard::CheckInFlightLimit(const TTxState::ETxType txType, TString& errStr) const { auto it = InFlightLimits.find(txType); if (it == InFlightLimits.end()) { - return true; + return true; } if (it->second != 0 && TabletCounters->Simple()[TTxState::TxTypeInFlightCounter(txType)].Get() >= it->second) { @@ -6261,7 +6281,7 @@ bool TSchemeShard::CheckInFlightLimit(const TTxState::ETxType txType, TString& e << ", limit: " << it->second; return false; } - + return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 50bdc6bc3d4..859562ecfb1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -537,7 +537,9 @@ public: void DoShardsDeletion(const THashSet<TShardIdx>& shardIdx, const TActorContext& ctx); + void SetPartitioning(TPathId pathId, const TVector<TShardIdx>& partitioning); void SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo); + void SetPartitioning(TPathId pathId, TColumnTableInfo::TPtr tableInfo); void SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning); auto BuildStatsForCollector(TPathId tableId, TShardIdx shardIdx, TTabletId datashardId, TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 5c9a351992d..84611de6408 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1983,6 +1983,134 @@ NKikimr::NSchemeShard::TBillingStats::operator bool() const { return Rows || Bytes; } +bool TOlapSchema::UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr) { + ui32 nextColumnId = proto.GetNextColumnId(); + + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; + + for (auto& colProto : *proto.MutableColumns()) { + auto& colName = colProto.GetName(); + + if (!colProto.HasId()) { + colProto.SetId(nextColumnId++); + } else if (colProto.GetId() <= 0 || colProto.GetId() >= nextColumnId) { + errStr = Sprintf("Column id is incorrect"); + return false; + } + + if (colProto.HasTypeId()) { + errStr = Sprintf("Cannot set TypeId for column '%s', use Type", colName.c_str()); + return false; + } + if (!colProto.HasType()) { + errStr = Sprintf("Missing Type for column '%s'", colName.c_str()); + return false; + } + + auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); + const NScheme::IType* type = typeRegistry->GetType(typeName); + NScheme::TTypeInfo typeInfo; + if (type) { + if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { + errStr = Sprintf("Type '%s' specified for column '%s' is not supported", + colProto.GetType().c_str(), colName.c_str()); + return false; + } + typeInfo = NScheme::TTypeInfo(type->GetTypeId()); + } else { +#if 0 // TODO: support PG types in ColumnShard + auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); + if (!typeDesc) { + errStr = Sprintf("Type '%s' specified for column '%s' is not supported", + colProto.GetType().c_str(), colName.c_str()); + return false; + } + typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); +#else + errStr = Sprintf("Type '%s' specified for column '%s' is not supported", + colProto.GetType().c_str(), colName.c_str()); + return false; +#endif + } + + auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(typeInfo); + colProto.SetTypeId(columnType.TypeId); + if (columnType.TypeInfo) { + *colProto.MutableTypeInfo() = *columnType.TypeInfo; + } + } + + proto.SetNextColumnId(nextColumnId); + return true; +} + +bool TOlapSchema::Parse(const NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr) { + NextColumnId = proto.GetNextColumnId(); + + Columns.clear(); + for (auto& colProto : proto.GetColumns()) { + if (colProto.GetName().empty()) { + errStr = Sprintf("Columns cannot have an empty name"); + return false; + } + + ui32 colId = colProto.GetId(); + if (Columns.contains(colId)) { + errStr = Sprintf("Duplicate column id %" PRIu32 " for column '%s'", colId, colProto.GetName().c_str()); + return false; + } + auto& col = Columns[colId]; + col.Id = colId; + col.Name = colProto.GetName(); + + if (ColumnsByName.contains(col.Name)) { + errStr = Sprintf("Duplicate column '%s'", col.Name.c_str()); + return false; + } + + if (!colProto.HasTypeId()) { + errStr = Sprintf("No generated TypeId for column '%s'", col.Name.c_str()); + return false; + } + + if (colProto.HasTypeInfo()) { + col.Type = NScheme::TypeInfoFromProtoColumnType(colProto.GetTypeId(), &colProto.GetTypeInfo()); + } else { + col.Type = NScheme::TTypeInfo(colProto.GetTypeId()); + } + + ColumnsByName[col.Name] = col.Id; + } + + if (Columns.empty()) { + errStr = Sprintf("At least one column is required"); + return false; + } + + KeyColumnIds.clear(); + for (const TString& keyName : proto.GetKeyColumnNames()) { + auto* col = FindColumnByName(keyName); + if (!col) { + errStr = Sprintf("Unknown key column '%s'", keyName.c_str()); + return false; + } + if (col->IsKeyColumn()) { + errStr = Sprintf("Duplicate key column '%s'", keyName.c_str()); + return false; + } + col->KeyOrder = KeyColumnIds.size(); + KeyColumnIds.push_back(col->Id); + } + + if (KeyColumnIds.empty()) { + errStr = "At least one key column is required"; + return false; + } + + Engine = proto.GetEngine(); + return true; +} + TOlapStoreInfo::TOlapStoreInfo( ui64 alterVersion, NKikimrSchemeOp::TColumnStoreDescription&& description, @@ -2039,20 +2167,39 @@ TColumnTableInfo::TColumnTableInfo( ui64 alterVersion, NKikimrSchemeOp::TColumnTableDescription&& description, NKikimrSchemeOp::TColumnTableSharding&& sharding, + TMaybe<NKikimrSchemeOp::TColumnStoreSharding>&& standaloneSharding, TMaybe<NKikimrSchemeOp::TAlterColumnTable>&& alterBody) : AlterVersion(alterVersion) , Description(std::move(description)) , Sharding(std::move(sharding)) + , StandaloneSharding(std::move(standaloneSharding)) , AlterBody(std::move(alterBody)) { - OlapStorePathId = TPathId( - TOwnerId(Description.GetColumnStorePathId().GetOwnerId()), - TLocalPathId(Description.GetColumnStorePathId().GetLocalId())); + if (Description.HasColumnStorePathId()) { + OlapStorePathId = TPathId( + TOwnerId(Description.GetColumnStorePathId().GetOwnerId()), + TLocalPathId(Description.GetColumnStorePathId().GetLocalId())); + } + + if (Description.HasSchema()) { + Schema = TOlapSchema(); + TString strError; + Y_VERIFY((*Schema).Parse(Description.GetSchema(), strError), "Cannot parse column table schema"); + } ColumnShards.reserve(Sharding.GetColumnShards().size()); for (ui64 columnShard : Sharding.GetColumnShards()) { ColumnShards.push_back(columnShard); } + + if (StandaloneSharding) { + OwnedColumnShards.reserve(StandaloneSharding->GetColumnShards().size()); + for (const auto& shardIdx : StandaloneSharding->GetColumnShards()) { + OwnedColumnShards.push_back(TShardIdx( + TOwnerId(shardIdx.GetOwnerId()), + TLocalShardIdx(shardIdx.GetLocalId()))); + } + } } TSequenceInfo::TSequenceInfo( diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 1d03551aa85..4023f4e6ac4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -823,6 +823,9 @@ struct TOlapSchema { } return nullptr; } + + static bool UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr); + bool Parse(const NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr); }; struct TOlapStoreSchemaPreset : public TOlapSchema { @@ -880,17 +883,20 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> { NKikimrSchemeOp::TColumnTableDescription Description; NKikimrSchemeOp::TColumnTableSharding Sharding; + TMaybe<NKikimrSchemeOp::TColumnStoreSharding> StandaloneSharding; TMaybe<NKikimrSchemeOp::TAlterColumnTable> AlterBody; - // Path id of the olap store - TPathId OlapStorePathId; + TMaybe<TPathId> OlapStorePathId; // PathId of the table store + TMaybe<TOlapSchema> Schema; // schema for standalone table // Current list of column shards TVector<ui64> ColumnShards; + TVector<TShardIdx> OwnedColumnShards; TColumnTableInfo() = default; TColumnTableInfo(ui64 alterVersion, NKikimrSchemeOp::TColumnTableDescription&& description, NKikimrSchemeOp::TColumnTableSharding&& sharding, + TMaybe<NKikimrSchemeOp::TColumnStoreSharding>&& standaloneSharding, TMaybe<NKikimrSchemeOp::TAlterColumnTable>&& alterBody = Nothing()); void SetOlapStorePathId(const TPathId& pathId) { @@ -898,6 +904,12 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> { Description.MutableColumnStorePathId()->SetOwnerId(pathId.OwnerId); Description.MutableColumnStorePathId()->SetLocalId(pathId.LocalPathId); } + + bool IsStandalone() const { + return !OwnedColumnShards.empty(); + } + + // TODO: UpdateShardStats(), GetStats() for standalone table }; struct TPQShardInfo : TSimpleRefCount<TPQShardInfo> { diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 80c6c09bf39..f6e4db42840 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -369,8 +369,6 @@ void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr pathEl) { const TColumnTableInfo::TPtr tableInfo = *Self->ColumnTables.FindPtr(pathId); Y_VERIFY(tableInfo, "ColumnTable not found"); - const TOlapStoreInfo::TPtr storeInfo = *Self->OlapStores.FindPtr(tableInfo->OlapStorePathId); - Y_VERIFY(storeInfo, "OlapStore not found"); Y_UNUSED(pathEl); auto description = Result->Record.MutablePathDescription()->MutableColumnTableDescription(); @@ -378,6 +376,9 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path description->MutableSharding()->CopyFrom(tableInfo->Sharding); if (!description->HasSchema() && description->HasSchemaPresetId()) { + const TOlapStoreInfo::TPtr storeInfo = *Self->OlapStores.FindPtr(*tableInfo->OlapStorePathId); + Y_VERIFY(storeInfo, "OlapStore not found"); + auto& preset = storeInfo->SchemaPresets.at(description->GetSchemaPresetId()); auto& presetProto = storeInfo->Description.GetSchemaPresets(preset.ProtoIndex); *description->MutableSchema() = presetProto.GetSchema(); diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 76160368e86..f504946b830 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1497,9 +1497,10 @@ struct Schema : NIceDb::Schema { struct AlterVersion : Column<2, NScheme::NTypeIds::Uint64> {}; struct Description : Column<3, NScheme::NTypeIds::String> {}; // TColumnTableDescription struct Sharding : Column<4, NScheme::NTypeIds::String> {}; // TColumnTableSharding + struct StandaloneSharding : Column<5, NScheme::NTypeIds::String> {}; // TColumnStoreSharding using TKey = TableKey<PathId>; - using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding>; + using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding, StandaloneSharding>; }; struct ColumnTablesAlters : Table<91> { @@ -1508,9 +1509,10 @@ struct Schema : NIceDb::Schema { struct Description : Column<3, NScheme::NTypeIds::String> {}; // TColumnTableDescription struct Sharding : Column<4, NScheme::NTypeIds::String> {}; // TColumnTableSharding struct AlterBody : Column<5, NScheme::NTypeIds::String> {}; // TAlterColumnTable + struct StandaloneSharding : Column<6, NScheme::NTypeIds::String> {}; // TColumnStoreSharding using TKey = TableKey<PathId>; - using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding, AlterBody>; + using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding, AlterBody, StandaloneSharding>; }; struct LoginKeys : Table<92> { diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp index 82e6a5549ec..918bf68d4a1 100644 --- a/ydb/core/tx/schemeshard/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap.cpp @@ -28,6 +28,17 @@ static const TString defaultStoreSchema = R"( } )"; +TString defaultTableSchema = R"( + Name: "ColumnTable" + ColumnShardCount: 1 + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } +)"; + static const TVector<std::pair<TString, TTypeInfo>> defaultYdbSchema = { {"timestamp", TTypeInfo(NTypeIds::Timestamp) }, {"data", TTypeInfo(NTypeIds::Utf8) } @@ -359,7 +370,9 @@ Y_UNIT_TEST_SUITE(TOlap) { TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore/MyDir", tableSchema); env.TestWaitNotification(runtime, txId); + TestLsPathId(runtime, 4, NLs::PathStringEqual("/MyRoot/OlapStore/MyDir/ColumnTable")); + TestDropColumnTable(runtime, ++txId, "/MyRoot/OlapStore/MyDir", "ColumnTable"); env.TestWaitNotification(runtime, txId); @@ -377,6 +390,28 @@ Y_UNIT_TEST_SUITE(TOlap) { TestLsPathId(runtime, 2, NLs::PathStringEqual("")); } + Y_UNIT_TEST(CreateDropStandaloneTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestMkDir(runtime, ++txId, "/MyRoot", "MyDir"); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/MyDir", false, NLs::PathExist); + + TestCreateColumnTable(runtime, ++txId, "/MyRoot/MyDir", defaultTableSchema); + env.TestWaitNotification(runtime, txId); + + TestLsPathId(runtime, 3, NLs::PathStringEqual("/MyRoot/MyDir/ColumnTable")); + + TestDropColumnTable(runtime, ++txId, "/MyRoot/MyDir", "ColumnTable"); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/MyDir/ColumnTable", false, NLs::PathNotExist); + TestLsPathId(runtime, 3, NLs::PathStringEqual("")); + } + Y_UNIT_TEST(CreateTableTtl) { TTestBasicRuntime runtime; TTestEnv env(runtime); diff --git a/ydb/core/tx/schemeshard/ut_olap_reboots.cpp b/ydb/core/tx/schemeshard/ut_olap_reboots.cpp index f07276278e9..a9e9c897e8b 100644 --- a/ydb/core/tx/schemeshard/ut_olap_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_olap_reboots.cpp @@ -79,6 +79,35 @@ Y_UNIT_TEST_SUITE(TOlapReboots) { }); } + Y_UNIT_TEST(CreateStandaloneTable) { + TTestWithReboots t(false); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + + { + TInactiveZone inactive(activeZone); + // no inactive initialization + } + + TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ColumnTable" + ColumnShardCount: 1 + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + + TestLs(runtime, "/MyRoot/ColumnTable", false, NLs::PathExist); + } + }); + } + Y_UNIT_TEST(CreateDropTable) { TTestWithReboots t(false); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { @@ -119,6 +148,38 @@ Y_UNIT_TEST_SUITE(TOlapReboots) { }); } + Y_UNIT_TEST(CreateDropStandaloneTable) { + TTestWithReboots t(false); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + + { + TInactiveZone inactive(activeZone); + // no inactive initialization + } + + TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ColumnTable" + ColumnShardCount: 1 + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDropColumnTable(runtime, ++t.TxId, "/MyRoot", "ColumnTable"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + + TestLs(runtime, "/MyRoot/ColumnTable", false, NLs::PathNotExist); + } + }); + } + Y_UNIT_TEST(CreateMultipleTables) { TTestWithReboots t(false); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { @@ -163,6 +224,50 @@ Y_UNIT_TEST_SUITE(TOlapReboots) { }); } + Y_UNIT_TEST(CreateMultipleStandaloneTables) { + TTestWithReboots t(false); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + + { + TInactiveZone inactive(activeZone); + // no inactive initialization + } + + t.TestEnv->ReliablePropose(runtime, + CreateColumnTableRequest(t.TxId += 2, "/MyRoot", R"( + Name: "ColumnTable1" + ColumnShardCount: 1 + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + )"), + {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, NKikimrScheme::StatusMultipleModifications}); + t.TestEnv->ReliablePropose(runtime, + CreateColumnTableRequest(t.TxId - 1, "/MyRoot", R"( + Name: "ColumnTable2" + ColumnShardCount: 1 + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + )"), + {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, NKikimrScheme::StatusMultipleModifications}); + t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId}); + + { + TInactiveZone inactive(activeZone); + + TestLs(runtime, "/MyRoot/ColumnTable1", false, NLs::PathExist); + TestLs(runtime, "/MyRoot/ColumnTable2", false, NLs::PathExist); + } + }); + } + Y_UNIT_TEST(DropMultipleTables) { TTestWithReboots t(false); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { @@ -213,6 +318,55 @@ Y_UNIT_TEST_SUITE(TOlapReboots) { }); } + Y_UNIT_TEST(DropMultipleStandaloneTables) { + TTestWithReboots t(false); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + + { + TInactiveZone inactive(activeZone); + + TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ColumnTable1" + ColumnShardCount: 1 + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ColumnTable2" + ColumnShardCount: 1 + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + t.TestEnv->ReliablePropose(runtime, + DropColumnTableRequest(t.TxId += 2, "/MyRoot", "ColumnTable1"), + {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications}); + t.TestEnv->ReliablePropose(runtime, + DropColumnTableRequest(t.TxId - 1, "/MyRoot", "ColumnTable2"), + {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications}); + t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId}); + + { + TInactiveZone inactive(activeZone); + + TestLs(runtime, "/MyRoot/ColumnTable1", false, NLs::PathNotExist); + TestLs(runtime, "/MyRoot/ColumnTable2", false, NLs::PathNotExist); + } + }); + } + Y_UNIT_TEST(CreateDropStore) { TTestWithReboots t(false); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 247e9e1f645..05803c18423 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -5861,6 +5861,11 @@ "ColumnId": 4, "ColumnName": "Sharding", "ColumnType": "String" + }, + { + "ColumnId": 5, + "ColumnName": "StandaloneSharding", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -5870,7 +5875,8 @@ 1, 2, 3, - 4 + 4, + 5 ], "RoomID": 0, "Codec": 0, @@ -5919,6 +5925,11 @@ "ColumnId": 5, "ColumnName": "AlterBody", "ColumnType": "String" + }, + { + "ColumnId": 6, + "ColumnName": "StandaloneSharding", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -5929,7 +5940,8 @@ 2, 3, 4, - 5 + 5, + 6 ], "RoomID": 0, "Codec": 0, |