aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-08-31 16:57:27 +0300
committersnaury <snaury@ydb.tech>2022-08-31 16:57:27 +0300
commit79a5c4b63f6732d9ba5358a7480feabf6b3c0916 (patch)
tree73908a76bac98aa2203bb63a6f519b85e2182321
parent54b5eee3de36b32d1c04ac6db4aa7883e5221609 (diff)
downloadydb-79a5c4b63f6732d9ba5358a7480feabf6b3c0916.tar.gz
Support migrated shards in a local table after rename
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp29
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp38
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h5
4 files changed, 72 insertions, 4 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index f66a3875e6..2f478464b3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -18,6 +18,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TDeque<TPathId> BlockStoreVolumesToClean;
TVector<ui64> ExportsToResume;
TVector<ui64> ImportsToResume;
+ bool Broken = false;
explicit TTxInit(TSelf *self)
: TBase(self)
@@ -188,6 +189,16 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TString name = rows.GetValue<Schema::Paths::Name>();
+ if (pathId.LocalPathId == 0) {
+ // Skip special incompatibility marker
+ Y_VERIFY_S(parentPathId.LocalPathId == 0 && name == "/incompatible/",
+ "Unexpected row PathId# " << pathId << " ParentPathId# " << parentPathId << " Name# " << name);
+ if (!rows.Next()) {
+ return false;
+ }
+ continue;
+ }
+
TPathElement::EPathType pathType = (TPathElement::EPathType)rows.GetValue<Schema::Paths::PathType>();
TStepId stepCreated = rows.GetValueOrDefault<Schema::Paths::StepCreated>(InvalidStepId);
@@ -560,6 +571,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}
+ // We need to sort partitions by PathId/PartitionId due to incompatible change 1
+ std::sort(partitionsRows.begin(), partitionsRows.end());
+
return true;
}
@@ -1281,6 +1295,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
return false;\
}
+ RETURN_IF_NO_PRECHARGED(Self->ReadSysValue(db, Schema::SysParam_MaxIncompatibleChange, Self->MaxIncompatibleChange));
+ if (Self->MaxIncompatibleChange > Schema::MaxIncompatibleChangeSupported) {
+ LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TTxInit, unsupported changes detected: MaxIncompatibleChange = " << Self->MaxIncompatibleChange <<
+ ", MaxIncompatibleChangeSupported = " << Schema::MaxIncompatibleChangeSupported <<
+ ", restarting!");
+ Self->BreakTabletAndRestart(ctx);
+ Broken = true;
+ return true;
+ }
+
{
ui64 initStateVal = (ui64)TTenantInitState::InvalidState;
RETURN_IF_NO_PRECHARGED(Self->ReadSysValue(db, Schema::SysParam_TenantInitState, initStateVal, (ui64)TTenantInitState::InvalidState));
@@ -4623,6 +4648,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
void Complete(const TActorContext &ctx) override {
+ if (Broken) {
+ return;
+ }
+
auto delayPublications = OnComplete.ExtractPublicationsToSchemeBoard(); //there no Populator exist jet
for (auto& [txId, pathIds] : Publications) {
std::move(pathIds.begin(), pathIds.end(), std::back_inserter(delayPublications[txId]));
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index d9b950c46f..d22c965885 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1367,6 +1367,27 @@ void TSchemeShard::TRwTxBase::Complete(const TActorContext &ctx) {
DoComplete(ctx);
}
+void TSchemeShard::BumpIncompatibleChanges(NIceDb::TNiceDb& db, ui64 incompatibleChange) {
+ if (MaxIncompatibleChange < incompatibleChange) {
+ Y_VERIFY_S(incompatibleChange <= Schema::MaxIncompatibleChangeSupported,
+ "Attempting to bump incompatible changes to " << incompatibleChange <<
+ ", but maximum supported change is " << Schema::MaxIncompatibleChangeSupported);
+ // We add a special path on the first incompatible change, which breaks
+ // all versions that don't know about incompatible changes. Newer
+ // versions will just skip this non-sensical entry.
+ if (MaxIncompatibleChange == 0) {
+ db.Table<Schema::Paths>().Key(0).Update(
+ NIceDb::TUpdate<Schema::Paths::ParentId>(0),
+ NIceDb::TUpdate<Schema::Paths::Name>("/incompatible/"));
+ }
+ // Persist a new maximum incompatible change, this will cause older
+ // versions to stop gracefully instead of working inconsistently.
+ db.Table<Schema::SysParams>().Key(Schema::SysParam_MaxIncompatibleChange).Update(
+ NIceDb::TUpdate<Schema::SysParams::Value>(ToString(incompatibleChange)));
+ MaxIncompatibleChange = incompatibleChange;
+ }
+}
+
void TSchemeShard::PersistTableIndex(NIceDb::TNiceDb& db, const TPathId& pathId) {
Y_VERIFY(PathsById.contains(pathId));
TPathElement::TPtr elemnt = PathsById.at(pathId);
@@ -2085,14 +2106,20 @@ void TSchemeShard::PersistChannelsBinding(NIceDb::TNiceDb& db, const TShardIdx s
void TSchemeShard::PersistTablePartitioning(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
for (ui64 pi = 0; pi < tableInfo->GetPartitions().size(); ++pi) {
const auto& partition = tableInfo->GetPartitions()[pi];
- if (IsLocalId(pathId)) {
- Y_VERIFY(IsLocalId(partition.ShardIdx));
+ if (IsLocalId(pathId) && IsLocalId(partition.ShardIdx)) {
db.Table<Schema::TablePartitions>().Key(pathId.LocalPathId, pi).Update(
NIceDb::TUpdate<Schema::TablePartitions::RangeEnd>(partition.EndOfRange),
NIceDb::TUpdate<Schema::TablePartitions::DatashardIdx>(partition.ShardIdx.GetLocalId()),
NIceDb::TUpdate<Schema::TablePartitions::LastCondErase>(partition.LastCondErase.GetValue()),
NIceDb::TUpdate<Schema::TablePartitions::NextCondErase>(partition.NextCondErase.GetValue()));
} else {
+ if (IsLocalId(pathId)) {
+ // Incompatible change 1:
+ // Store migrated shards of local tables in migrated table partitions
+ // This change is incompatible with older versions because partitions
+ // may no longer be in a single table and will require sorting at load time.
+ BumpIncompatibleChanges(db, 1);
+ }
db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Update(
NIceDb::TUpdate<Schema::MigratedTablePartitions::RangeEnd>(partition.EndOfRange),
NIceDb::TUpdate<Schema::MigratedTablePartitions::OwnerShardIdx>(partition.ShardIdx.GetOwnerId()),
@@ -2124,12 +2151,15 @@ void TSchemeShard::PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const T
void TSchemeShard::PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPathId& pathId, ui64 id, const TTableInfo::TPtr tableInfo) {
const auto& partition = tableInfo->GetPartitions()[id];
- if (IsLocalId(pathId)) {
- Y_VERIFY(IsLocalId(partition.ShardIdx));
+ if (IsLocalId(pathId) && IsLocalId(partition.ShardIdx)) {
db.Table<Schema::TablePartitions>().Key(pathId.LocalPathId, id).Update(
NIceDb::TUpdate<Schema::TablePartitions::LastCondErase>(partition.LastCondErase.GetValue()),
NIceDb::TUpdate<Schema::TablePartitions::NextCondErase>(partition.NextCondErase.GetValue()));
} else {
+ if (IsLocalId(pathId)) {
+ // Incompatible change 1 (see above)
+ BumpIncompatibleChanges(db, 1);
+ }
db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, id).Update(
NIceDb::TUpdate<Schema::MigratedTablePartitions::LastCondErase>(partition.LastCondErase.GetValue()),
NIceDb::TUpdate<Schema::MigratedTablePartitions::NextCondErase>(partition.NextCondErase.GetValue()));
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index b84eb3ed55..168998c349 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -177,6 +177,7 @@ public:
TVector<TString> RootPathElements;
+ ui64 MaxIncompatibleChange = 0;
THashMap<TPathId, TPathElement::TPtr> PathsById;
TLocalPathId NextLocalPathId = 0;
@@ -546,6 +547,9 @@ public:
void IncrementPathDbRefCount(const TPathId& pathId, const TStringBuf& debug = TStringBuf());
void DecrementPathDbRefCount(const TPathId& pathId, const TStringBuf& debug = TStringBuf());
+ // incompatible changes
+ void BumpIncompatibleChanges(NIceDb::TNiceDb& db, ui64 incompatibleChange);
+
// path
void PersistPath(NIceDb::TNiceDb& db, const TPathId& pathId);
void PersistRemovePath(NIceDb::TNiceDb& db, const TPathElement::TPtr path);
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 53a2a48af5..fce0f91dc3 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1720,6 +1720,11 @@ struct Schema : NIceDb::Schema {
static constexpr ui64 SysParam_ParentDomainEffectiveACLVersion = 8;
static constexpr ui64 SysParam_TenantInitState = 9;
static constexpr ui64 SysParam_ServerlessStorageLastBillTime = 10;
+ static constexpr ui64 SysParam_MaxIncompatibleChange = 11;
+
+ // List of incompatible changes:
+ // * Change 1: store migrated shards of local tables (e.g. after a rename) as a migrated record
+ static constexpr ui64 MaxIncompatibleChangeSupported = 1;
};
}