diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-07-08 22:26:11 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-07-08 22:26:11 +0300 |
commit | 7a7d303e197aa7e4f43c61cc289d8652df38ab43 (patch) | |
tree | 9b8a0f60395d2f421663ee44ef683565aa0c032a | |
parent | e8b271f5c48247858dd767751576b052de6d9fa4 (diff) | |
download | ydb-7a7d303e197aa7e4f43c61cc289d8652df38ab43.tar.gz |
Re-run TTL task properly in case of disconnect
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp | 119 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.h | 10 |
4 files changed, 73 insertions, 59 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp index f4f5db4ba7..1b5c31d258 100644 --- a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp @@ -7,6 +7,29 @@ namespace NSchemeShard { using namespace NTabletFlatExecutor; +namespace { + + std::pair<TTableInfo::TPtr, TShardIdx> ResolveInfo(const TSchemeShard* self, TTabletId tabletId) { + const auto shardIdx = self->GetShardIdx(tabletId); + if (!self->ShardInfos.contains(shardIdx)) { + return std::make_pair(nullptr, InvalidShardIdx); + } + + const auto& pathId = self->ShardInfos.at(shardIdx).PathId; + if (!self->TTLEnabledTables.contains(pathId)) { + return std::make_pair(nullptr, InvalidShardIdx); + } + + auto tableInfo = self->TTLEnabledTables.at(pathId); + if (!tableInfo->IsTTLEnabled()) { + return std::make_pair(nullptr, InvalidShardIdx); + } + + return std::make_pair(tableInfo, shardIdx); + } + +} // anonymous + struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase { TTableInfo::TPtr TableInfo; THashMap<TTabletId, NKikimrTxDataShard::TEvConditionalEraseRowsRequest> RunOnTablets; @@ -55,28 +78,7 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase { return; } - const auto& partitions = tableInfo->GetPartitions(); - const auto& shardToPartition = tableInfo->GetShard2PartitionIdx(); - THashSet<TShardIdx> reschedule; - - for (const auto& shardIdx : tableInfo->GetInFlightCondErase()) { - Y_VERIFY(shardToPartition.contains(shardIdx)); - const ui64 partitionIdx = shardToPartition.at(shardIdx); - - Y_VERIFY(partitionIdx < partitions.size()); - const auto& tableShardInfo = partitions.at(partitionIdx); - - if (!DoExecuteOnShard(tableInfo, tableShardInfo, ctx)) { - reschedule.insert(shardIdx); - } - } - - for (const auto& shardIdx : reschedule) { - tableInfo->RescheduleCondErase(shardIdx); - } - - const auto& sysSettings = tableInfo->TTLSettings().GetEnabled().GetSysSettings(); - const auto maxInFlight = sysSettings.GetMaxShardsInFlight(); + const auto maxInFlight = tableInfo->TTLSettings().GetEnabled().GetSysSettings().GetMaxShardsInFlight(); while (true) { if (maxInFlight && tableInfo->GetInFlightCondErase().size() >= maxInFlight) { @@ -182,6 +184,19 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase { const auto& tabletId = kv.first; auto& request = kv.second; + auto [tableInfo, shardIdx] = ResolveInfo(Self, tabletId); + if (!tableInfo || shardIdx == InvalidShardIdx) { + Y_VERIFY_DEBUG(false, "Unreachable"); + continue; + } + + auto& inFlight = tableInfo->GetInFlightCondErase(); + auto it = inFlight.find(shardIdx); + if (it == inFlight.end()) { + Y_VERIFY_DEBUG(false, "Unreachable"); + continue; + } + auto ev = MakeHolder<TEvDataShard::TEvConditionalEraseRowsRequest>(); ev->Record = std::move(request); @@ -189,7 +204,7 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase { << ", tabletId: " << tabletId << ", request: " << ev->Record.ShortDebugString() << ", at schemeshard: " << Self->TabletID()); - Self->PipeClientCache->Send(ctx, ui64(tabletId), ev.Release()); + it->second = Self->PipeClientCache->Send(ctx, ui64(tabletId), ev.Release()); } if (!TableInfo) { @@ -331,42 +346,16 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche const auto& record = Ev->Get()->Record; const TTabletId tabletId(record.GetTabletID()); - const TShardIdx shardIdx = Self->GetShardIdx(tabletId); + auto [tableInfo, shardIdx] = ResolveInfo(Self, tabletId); - if (!Self->ShardInfos.contains(shardIdx)) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info" + if (!tableInfo || shardIdx == InvalidShardIdx) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve info" << ": tabletId: " << tabletId << ", at schemeshard: " << Self->TabletID()); return true; } - const TShardInfo& shardInfo = Self->ShardInfos.at(shardIdx); - const TPathId& tableId = shardInfo.PathId; - - if (!Self->TTLEnabledTables.contains(tableId)) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTL is not enabled for table #P2" - << ": tabletId: " << tabletId - << ", tableId: " << tableId - << ", at schemeshard: " << Self->TabletID()); - return true; - } - - TTableInfo::TPtr tableInfo = Self->TTLEnabledTables.at(tableId); - if (!tableInfo->IsTTLEnabled()) { - LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTL is not enabled for table #P3" - << ": tabletId: " << tabletId - << ", tableId: " << tableId - << ", at schemeshard: " << Self->TabletID()); - return true; - } - - const auto& inFlight = tableInfo->GetInFlightCondErase(); - if (!inFlight.contains(shardIdx)) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Shard idx mismatch" - << ", tableId: " << tableId - << ", shardIdx: " << shardIdx - << ", in-flight shards: [" << JoinSeq(",", inFlight) << "]" - << ", at schemeshard: " << Self->TabletID()); + if (!tableInfo->GetInFlightCondErase().contains(shardIdx)) { return true; } @@ -413,9 +402,7 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche const auto& partitions = tableInfo->GetPartitions(); Y_VERIFY(partitionIdx < partitions.size()); - const auto& tableShardInfo = partitions.at(partitionIdx); - - const auto& lag = tableShardInfo.LastCondEraseLag; + const auto& lag = partitions.at(partitionIdx).LastCondEraseLag; if (lag) { Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].DecrementFor(lag->Seconds()); @@ -427,6 +414,10 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche NIceDb::TNiceDb db(txc.DB); tableInfo->ScheduleNextCondErase(shardIdx, now, next); + + Y_VERIFY(Self->ShardInfos.contains(shardIdx)); + const TPathId& tableId = Self->ShardInfos.at(shardIdx).PathId; + Self->PersistTablePartitionCondErase(db, tableId, partitionIdx, tableInfo); if (AppData(ctx)->FeatureFlags.GetEnableSystemViews()) { @@ -470,5 +461,21 @@ ITransaction* TSchemeShard::CreateTxScheduleConditionalErase(TEvDataShard::TEvCo return new TTxScheduleConditionalErase(this, ev); } +void TSchemeShard::ConditionalEraseHandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { + auto [tableInfo, shardIdx] = ResolveInfo(this, tabletId); + if (!tableInfo || shardIdx == InvalidShardIdx) { + return; + } + + const auto& inFlight = tableInfo->GetInFlightCondErase(); + auto it = inFlight.find(shardIdx); + if (it == inFlight.end() || it->second != clientId) { + return; + } + + tableInfo->RescheduleCondErase(shardIdx); + Execute(new TTxRunConditionalErase(this, tableInfo), ctx); +} + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 061abc0251..115ae12872 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4665,6 +4665,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc << ", at schemeshard: " << TabletID()); BorrowedCompactionHandleDisconnect(tabletId, clientId); + ConditionalEraseHandleDisconnect(tabletId, clientId, ctx); RestartPipeTx(tabletId, ctx); } @@ -4703,6 +4704,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc } BorrowedCompactionHandleDisconnect(tabletId, clientId); + ConditionalEraseHandleDisconnect(tabletId, clientId, ctx); RestartPipeTx(tabletId, ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 600943a9b7..4a434e59a3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -950,6 +950,7 @@ public: void ScheduleConditionalEraseRun(const TActorContext& ctx); void Handle(TEvPrivate::TEvRunConditionalErase::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvConditionalEraseRowsResponse::TPtr& ev, const TActorContext& ctx); + void ConditionalEraseHandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx); void Handle(NSysView::TEvSysView::TEvGetPartitionStats::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index b870aeb00a..b2e6a23e92 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -455,7 +455,7 @@ private: TPartitionsVec Partitions; THashMap<TShardIdx, ui64> Shard2PartitionIdx; // shardIdx -> index in Partitions TPriorityQueue<TPartitionsVec::iterator, TVector<TPartitionsVec::iterator>, TSortByNextCondErase> CondEraseSchedule; - THashSet<TShardIdx> InFlightCondErase; + THashMap<TShardIdx, TActorId> InFlightCondErase; // shard to pipe client mutable TMaybe<ui32> TTLColumnId; THashSet<TOperationId> SplitOpsInFlight; THashMap<TOperationId, TVector<TShardIdx>> ShardsInSplitMergeByOpId; @@ -734,7 +734,11 @@ public: return CondEraseSchedule.top(); } - const THashSet<TShardIdx>& GetInFlightCondErase() const { + const auto& GetInFlightCondErase() const { + return InFlightCondErase; + } + + auto& GetInFlightCondErase() { return InFlightCondErase; } @@ -742,7 +746,7 @@ public: const auto* shardInfo = GetScheduledCondEraseShard(); Y_VERIFY(shardInfo && shardIdx == shardInfo->ShardIdx); - InFlightCondErase.insert(shardIdx); + InFlightCondErase[shardIdx] = TActorId(); CondEraseSchedule.pop(); } |