aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-07-08 22:26:11 +0300
committerilnaz <ilnaz@ydb.tech>2022-07-08 22:26:11 +0300
commit7a7d303e197aa7e4f43c61cc289d8652df38ab43 (patch)
tree9b8a0f60395d2f421663ee44ef683565aa0c032a
parente8b271f5c48247858dd767751576b052de6d9fa4 (diff)
downloadydb-7a7d303e197aa7e4f43c61cc289d8652df38ab43.tar.gz
Re-run TTL task properly in case of disconnect
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp119
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h10
4 files changed, 73 insertions, 59 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp
index f4f5db4ba7..1b5c31d258 100644
--- a/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp
@@ -7,6 +7,29 @@ namespace NSchemeShard {
using namespace NTabletFlatExecutor;
+namespace {
+
+ std::pair<TTableInfo::TPtr, TShardIdx> ResolveInfo(const TSchemeShard* self, TTabletId tabletId) {
+ const auto shardIdx = self->GetShardIdx(tabletId);
+ if (!self->ShardInfos.contains(shardIdx)) {
+ return std::make_pair(nullptr, InvalidShardIdx);
+ }
+
+ const auto& pathId = self->ShardInfos.at(shardIdx).PathId;
+ if (!self->TTLEnabledTables.contains(pathId)) {
+ return std::make_pair(nullptr, InvalidShardIdx);
+ }
+
+ auto tableInfo = self->TTLEnabledTables.at(pathId);
+ if (!tableInfo->IsTTLEnabled()) {
+ return std::make_pair(nullptr, InvalidShardIdx);
+ }
+
+ return std::make_pair(tableInfo, shardIdx);
+ }
+
+} // anonymous
+
struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase {
TTableInfo::TPtr TableInfo;
THashMap<TTabletId, NKikimrTxDataShard::TEvConditionalEraseRowsRequest> RunOnTablets;
@@ -55,28 +78,7 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase {
return;
}
- const auto& partitions = tableInfo->GetPartitions();
- const auto& shardToPartition = tableInfo->GetShard2PartitionIdx();
- THashSet<TShardIdx> reschedule;
-
- for (const auto& shardIdx : tableInfo->GetInFlightCondErase()) {
- Y_VERIFY(shardToPartition.contains(shardIdx));
- const ui64 partitionIdx = shardToPartition.at(shardIdx);
-
- Y_VERIFY(partitionIdx < partitions.size());
- const auto& tableShardInfo = partitions.at(partitionIdx);
-
- if (!DoExecuteOnShard(tableInfo, tableShardInfo, ctx)) {
- reschedule.insert(shardIdx);
- }
- }
-
- for (const auto& shardIdx : reschedule) {
- tableInfo->RescheduleCondErase(shardIdx);
- }
-
- const auto& sysSettings = tableInfo->TTLSettings().GetEnabled().GetSysSettings();
- const auto maxInFlight = sysSettings.GetMaxShardsInFlight();
+ const auto maxInFlight = tableInfo->TTLSettings().GetEnabled().GetSysSettings().GetMaxShardsInFlight();
while (true) {
if (maxInFlight && tableInfo->GetInFlightCondErase().size() >= maxInFlight) {
@@ -182,6 +184,19 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase {
const auto& tabletId = kv.first;
auto& request = kv.second;
+ auto [tableInfo, shardIdx] = ResolveInfo(Self, tabletId);
+ if (!tableInfo || shardIdx == InvalidShardIdx) {
+ Y_VERIFY_DEBUG(false, "Unreachable");
+ continue;
+ }
+
+ auto& inFlight = tableInfo->GetInFlightCondErase();
+ auto it = inFlight.find(shardIdx);
+ if (it == inFlight.end()) {
+ Y_VERIFY_DEBUG(false, "Unreachable");
+ continue;
+ }
+
auto ev = MakeHolder<TEvDataShard::TEvConditionalEraseRowsRequest>();
ev->Record = std::move(request);
@@ -189,7 +204,7 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase {
<< ", tabletId: " << tabletId
<< ", request: " << ev->Record.ShortDebugString()
<< ", at schemeshard: " << Self->TabletID());
- Self->PipeClientCache->Send(ctx, ui64(tabletId), ev.Release());
+ it->second = Self->PipeClientCache->Send(ctx, ui64(tabletId), ev.Release());
}
if (!TableInfo) {
@@ -331,42 +346,16 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche
const auto& record = Ev->Get()->Record;
const TTabletId tabletId(record.GetTabletID());
- const TShardIdx shardIdx = Self->GetShardIdx(tabletId);
+ auto [tableInfo, shardIdx] = ResolveInfo(Self, tabletId);
- if (!Self->ShardInfos.contains(shardIdx)) {
- LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info"
+ if (!tableInfo || shardIdx == InvalidShardIdx) {
+ LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve info"
<< ": tabletId: " << tabletId
<< ", at schemeshard: " << Self->TabletID());
return true;
}
- const TShardInfo& shardInfo = Self->ShardInfos.at(shardIdx);
- const TPathId& tableId = shardInfo.PathId;
-
- if (!Self->TTLEnabledTables.contains(tableId)) {
- LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTL is not enabled for table #P2"
- << ": tabletId: " << tabletId
- << ", tableId: " << tableId
- << ", at schemeshard: " << Self->TabletID());
- return true;
- }
-
- TTableInfo::TPtr tableInfo = Self->TTLEnabledTables.at(tableId);
- if (!tableInfo->IsTTLEnabled()) {
- LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTL is not enabled for table #P3"
- << ": tabletId: " << tabletId
- << ", tableId: " << tableId
- << ", at schemeshard: " << Self->TabletID());
- return true;
- }
-
- const auto& inFlight = tableInfo->GetInFlightCondErase();
- if (!inFlight.contains(shardIdx)) {
- LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Shard idx mismatch"
- << ", tableId: " << tableId
- << ", shardIdx: " << shardIdx
- << ", in-flight shards: [" << JoinSeq(",", inFlight) << "]"
- << ", at schemeshard: " << Self->TabletID());
+ if (!tableInfo->GetInFlightCondErase().contains(shardIdx)) {
return true;
}
@@ -413,9 +402,7 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche
const auto& partitions = tableInfo->GetPartitions();
Y_VERIFY(partitionIdx < partitions.size());
- const auto& tableShardInfo = partitions.at(partitionIdx);
-
- const auto& lag = tableShardInfo.LastCondEraseLag;
+ const auto& lag = partitions.at(partitionIdx).LastCondEraseLag;
if (lag) {
Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].DecrementFor(lag->Seconds());
@@ -427,6 +414,10 @@ struct TSchemeShard::TTxScheduleConditionalErase : public TTransactionBase<TSche
NIceDb::TNiceDb db(txc.DB);
tableInfo->ScheduleNextCondErase(shardIdx, now, next);
+
+ Y_VERIFY(Self->ShardInfos.contains(shardIdx));
+ const TPathId& tableId = Self->ShardInfos.at(shardIdx).PathId;
+
Self->PersistTablePartitionCondErase(db, tableId, partitionIdx, tableInfo);
if (AppData(ctx)->FeatureFlags.GetEnableSystemViews()) {
@@ -470,5 +461,21 @@ ITransaction* TSchemeShard::CreateTxScheduleConditionalErase(TEvDataShard::TEvCo
return new TTxScheduleConditionalErase(this, ev);
}
+void TSchemeShard::ConditionalEraseHandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) {
+ auto [tableInfo, shardIdx] = ResolveInfo(this, tabletId);
+ if (!tableInfo || shardIdx == InvalidShardIdx) {
+ return;
+ }
+
+ const auto& inFlight = tableInfo->GetInFlightCondErase();
+ auto it = inFlight.find(shardIdx);
+ if (it == inFlight.end() || it->second != clientId) {
+ return;
+ }
+
+ tableInfo->RescheduleCondErase(shardIdx);
+ Execute(new TTxRunConditionalErase(this, tableInfo), ctx);
+}
+
} // NSchemeShard
} // NKikimr
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 061abc0251..115ae12872 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -4665,6 +4665,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc
<< ", at schemeshard: " << TabletID());
BorrowedCompactionHandleDisconnect(tabletId, clientId);
+ ConditionalEraseHandleDisconnect(tabletId, clientId, ctx);
RestartPipeTx(tabletId, ctx);
}
@@ -4703,6 +4704,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc
}
BorrowedCompactionHandleDisconnect(tabletId, clientId);
+ ConditionalEraseHandleDisconnect(tabletId, clientId, ctx);
RestartPipeTx(tabletId, ctx);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 600943a9b7..4a434e59a3 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -950,6 +950,7 @@ public:
void ScheduleConditionalEraseRun(const TActorContext& ctx);
void Handle(TEvPrivate::TEvRunConditionalErase::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvConditionalEraseRowsResponse::TPtr& ev, const TActorContext& ctx);
+ void ConditionalEraseHandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx);
void Handle(NSysView::TEvSysView::TEvGetPartitionStats::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index b870aeb00a..b2e6a23e92 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -455,7 +455,7 @@ private:
TPartitionsVec Partitions;
THashMap<TShardIdx, ui64> Shard2PartitionIdx; // shardIdx -> index in Partitions
TPriorityQueue<TPartitionsVec::iterator, TVector<TPartitionsVec::iterator>, TSortByNextCondErase> CondEraseSchedule;
- THashSet<TShardIdx> InFlightCondErase;
+ THashMap<TShardIdx, TActorId> InFlightCondErase; // shard to pipe client
mutable TMaybe<ui32> TTLColumnId;
THashSet<TOperationId> SplitOpsInFlight;
THashMap<TOperationId, TVector<TShardIdx>> ShardsInSplitMergeByOpId;
@@ -734,7 +734,11 @@ public:
return CondEraseSchedule.top();
}
- const THashSet<TShardIdx>& GetInFlightCondErase() const {
+ const auto& GetInFlightCondErase() const {
+ return InFlightCondErase;
+ }
+
+ auto& GetInFlightCondErase() {
return InFlightCondErase;
}
@@ -742,7 +746,7 @@ public:
const auto* shardInfo = GetScheduledCondEraseShard();
Y_VERIFY(shardInfo && shardIdx == shardInfo->ShardIdx);
- InFlightCondErase.insert(shardIdx);
+ InFlightCondErase[shardIdx] = TActorId();
CondEraseSchedule.pop();
}