diff options
author | snaury <snaury@ydb.tech> | 2022-08-31 16:57:27 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-08-31 16:57:27 +0300 |
commit | 79a5c4b63f6732d9ba5358a7480feabf6b3c0916 (patch) | |
tree | 73908a76bac98aa2203bb63a6f519b85e2182321 | |
parent | 54b5eee3de36b32d1c04ac6db4aa7883e5221609 (diff) | |
download | ydb-79a5c4b63f6732d9ba5358a7480feabf6b3c0916.tar.gz |
Support migrated shards in a local table after rename
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 29 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 38 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_schema.h | 5 |
4 files changed, 72 insertions, 4 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index f66a3875e6..2f478464b3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -18,6 +18,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TDeque<TPathId> BlockStoreVolumesToClean; TVector<ui64> ExportsToResume; TVector<ui64> ImportsToResume; + bool Broken = false; explicit TTxInit(TSelf *self) : TBase(self) @@ -188,6 +189,16 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TString name = rows.GetValue<Schema::Paths::Name>(); + if (pathId.LocalPathId == 0) { + // Skip special incompatibility marker + Y_VERIFY_S(parentPathId.LocalPathId == 0 && name == "/incompatible/", + "Unexpected row PathId# " << pathId << " ParentPathId# " << parentPathId << " Name# " << name); + if (!rows.Next()) { + return false; + } + continue; + } + TPathElement::EPathType pathType = (TPathElement::EPathType)rows.GetValue<Schema::Paths::PathType>(); TStepId stepCreated = rows.GetValueOrDefault<Schema::Paths::StepCreated>(InvalidStepId); @@ -560,6 +571,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } } + // We need to sort partitions by PathId/PartitionId due to incompatible change 1 + std::sort(partitionsRows.begin(), partitionsRows.end()); + return true; } @@ -1281,6 +1295,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { return false;\ } + RETURN_IF_NO_PRECHARGED(Self->ReadSysValue(db, Schema::SysParam_MaxIncompatibleChange, Self->MaxIncompatibleChange)); + if (Self->MaxIncompatibleChange > Schema::MaxIncompatibleChangeSupported) { + LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxInit, unsupported changes detected: MaxIncompatibleChange = " << Self->MaxIncompatibleChange << + ", MaxIncompatibleChangeSupported = " << Schema::MaxIncompatibleChangeSupported << + ", restarting!"); + Self->BreakTabletAndRestart(ctx); + Broken = true; + return true; + } + { ui64 initStateVal = (ui64)TTenantInitState::InvalidState; RETURN_IF_NO_PRECHARGED(Self->ReadSysValue(db, Schema::SysParam_TenantInitState, initStateVal, (ui64)TTenantInitState::InvalidState)); @@ -4623,6 +4648,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } void Complete(const TActorContext &ctx) override { + if (Broken) { + return; + } + auto delayPublications = OnComplete.ExtractPublicationsToSchemeBoard(); //there no Populator exist jet for (auto& [txId, pathIds] : Publications) { std::move(pathIds.begin(), pathIds.end(), std::back_inserter(delayPublications[txId])); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index d9b950c46f..d22c965885 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1367,6 +1367,27 @@ void TSchemeShard::TRwTxBase::Complete(const TActorContext &ctx) { DoComplete(ctx); } +void TSchemeShard::BumpIncompatibleChanges(NIceDb::TNiceDb& db, ui64 incompatibleChange) { + if (MaxIncompatibleChange < incompatibleChange) { + Y_VERIFY_S(incompatibleChange <= Schema::MaxIncompatibleChangeSupported, + "Attempting to bump incompatible changes to " << incompatibleChange << + ", but maximum supported change is " << Schema::MaxIncompatibleChangeSupported); + // We add a special path on the first incompatible change, which breaks + // all versions that don't know about incompatible changes. Newer + // versions will just skip this non-sensical entry. + if (MaxIncompatibleChange == 0) { + db.Table<Schema::Paths>().Key(0).Update( + NIceDb::TUpdate<Schema::Paths::ParentId>(0), + NIceDb::TUpdate<Schema::Paths::Name>("/incompatible/")); + } + // Persist a new maximum incompatible change, this will cause older + // versions to stop gracefully instead of working inconsistently. + db.Table<Schema::SysParams>().Key(Schema::SysParam_MaxIncompatibleChange).Update( + NIceDb::TUpdate<Schema::SysParams::Value>(ToString(incompatibleChange))); + MaxIncompatibleChange = incompatibleChange; + } +} + void TSchemeShard::PersistTableIndex(NIceDb::TNiceDb& db, const TPathId& pathId) { Y_VERIFY(PathsById.contains(pathId)); TPathElement::TPtr elemnt = PathsById.at(pathId); @@ -2085,14 +2106,20 @@ void TSchemeShard::PersistChannelsBinding(NIceDb::TNiceDb& db, const TShardIdx s void TSchemeShard::PersistTablePartitioning(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) { for (ui64 pi = 0; pi < tableInfo->GetPartitions().size(); ++pi) { const auto& partition = tableInfo->GetPartitions()[pi]; - if (IsLocalId(pathId)) { - Y_VERIFY(IsLocalId(partition.ShardIdx)); + if (IsLocalId(pathId) && IsLocalId(partition.ShardIdx)) { db.Table<Schema::TablePartitions>().Key(pathId.LocalPathId, pi).Update( NIceDb::TUpdate<Schema::TablePartitions::RangeEnd>(partition.EndOfRange), NIceDb::TUpdate<Schema::TablePartitions::DatashardIdx>(partition.ShardIdx.GetLocalId()), NIceDb::TUpdate<Schema::TablePartitions::LastCondErase>(partition.LastCondErase.GetValue()), NIceDb::TUpdate<Schema::TablePartitions::NextCondErase>(partition.NextCondErase.GetValue())); } else { + if (IsLocalId(pathId)) { + // Incompatible change 1: + // Store migrated shards of local tables in migrated table partitions + // This change is incompatible with older versions because partitions + // may no longer be in a single table and will require sorting at load time. + BumpIncompatibleChanges(db, 1); + } db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Update( NIceDb::TUpdate<Schema::MigratedTablePartitions::RangeEnd>(partition.EndOfRange), NIceDb::TUpdate<Schema::MigratedTablePartitions::OwnerShardIdx>(partition.ShardIdx.GetOwnerId()), @@ -2124,12 +2151,15 @@ void TSchemeShard::PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const T void TSchemeShard::PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPathId& pathId, ui64 id, const TTableInfo::TPtr tableInfo) { const auto& partition = tableInfo->GetPartitions()[id]; - if (IsLocalId(pathId)) { - Y_VERIFY(IsLocalId(partition.ShardIdx)); + if (IsLocalId(pathId) && IsLocalId(partition.ShardIdx)) { db.Table<Schema::TablePartitions>().Key(pathId.LocalPathId, id).Update( NIceDb::TUpdate<Schema::TablePartitions::LastCondErase>(partition.LastCondErase.GetValue()), NIceDb::TUpdate<Schema::TablePartitions::NextCondErase>(partition.NextCondErase.GetValue())); } else { + if (IsLocalId(pathId)) { + // Incompatible change 1 (see above) + BumpIncompatibleChanges(db, 1); + } db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, id).Update( NIceDb::TUpdate<Schema::MigratedTablePartitions::LastCondErase>(partition.LastCondErase.GetValue()), NIceDb::TUpdate<Schema::MigratedTablePartitions::NextCondErase>(partition.NextCondErase.GetValue())); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index b84eb3ed55..168998c349 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -177,6 +177,7 @@ public: TVector<TString> RootPathElements; + ui64 MaxIncompatibleChange = 0; THashMap<TPathId, TPathElement::TPtr> PathsById; TLocalPathId NextLocalPathId = 0; @@ -546,6 +547,9 @@ public: void IncrementPathDbRefCount(const TPathId& pathId, const TStringBuf& debug = TStringBuf()); void DecrementPathDbRefCount(const TPathId& pathId, const TStringBuf& debug = TStringBuf()); + // incompatible changes + void BumpIncompatibleChanges(NIceDb::TNiceDb& db, ui64 incompatibleChange); + // path void PersistPath(NIceDb::TNiceDb& db, const TPathId& pathId); void PersistRemovePath(NIceDb::TNiceDb& db, const TPathElement::TPtr path); diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 53a2a48af5..fce0f91dc3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1720,6 +1720,11 @@ struct Schema : NIceDb::Schema { static constexpr ui64 SysParam_ParentDomainEffectiveACLVersion = 8; static constexpr ui64 SysParam_TenantInitState = 9; static constexpr ui64 SysParam_ServerlessStorageLastBillTime = 10; + static constexpr ui64 SysParam_MaxIncompatibleChange = 11; + + // List of incompatible changes: + // * Change 1: store migrated shards of local tables (e.g. after a rename) as a migrated record + static constexpr ui64 MaxIncompatibleChangeSupported = 1; }; } |