diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-22 16:39:03 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-22 20:07:59 +0300 |
commit | 2a8db7ea366cd6757bd39bebbda36ef00cc19844 (patch) | |
tree | f7a59ce8c6a368661b66f755b9eadf23f6be13e2 | |
parent | 9542b1bc4af77566d86105846eca1dc2f4638405 (diff) | |
download | ydb-2a8db7ea366cd6757bd39bebbda36ef00cc19844.tar.gz |
AFL_ENSURE usage for store context
5 files changed, 41 insertions, 41 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.cpp b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp index 651c23f6c3..967e678957 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_state.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp @@ -102,7 +102,7 @@ TString TShardState::GetAddress() const { TShardState::TShardState(const ui64 tabletId) : TabletId(tabletId) { - AFL_VERIFY(TabletId); + AFL_ENSURE(TabletId); } } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_common.h b/ydb/core/kqp/compute_actor/kqp_scan_common.h index 2ed16e1367..784cfca953 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_common.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_common.h @@ -29,7 +29,7 @@ struct TScannedDataStats { void CompleteShard(TShardState::TPtr state) { auto it = ReadShardInfo.find(state->TabletId); - AFL_VERIFY(it != ReadShardInfo.end()); + AFL_ENSURE(it != ReadShardInfo.end()); auto& [currentRows, currentBytes] = it->second; TotalReadRows += currentRows; TotalReadBytes += currentBytes; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp index a8b1edeb9a..a5ebda9aeb 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp @@ -9,23 +9,23 @@ TShardState::TPtr TInFlightShards::Put(TShardState&& state) { MutableStatistics(state.TabletId).MutableStatistics(0).SetStartInstant(Now()); TShardState::TPtr result = std::make_shared<TShardState>(std::move(state)); - AFL_VERIFY(Shards.emplace(result->TabletId, result).second); + AFL_ENSURE(Shards.emplace(result->TabletId, result).second); return result; } std::vector<std::unique_ptr<TComputeTaskData>> TShardScannerInfo::OnReceiveData(TEvKqpCompute::TEvScanData& data, const std::shared_ptr<TShardScannerInfo>& selfPtr) { if (!data.Finished) { - AFL_VERIFY(!NeedAck); + AFL_ENSURE(!NeedAck); NeedAck = true; } else { Finished = true; } if (data.IsEmpty()) { - AFL_VERIFY(data.Finished); + AFL_ENSURE(data.Finished); return {}; } - AFL_VERIFY(ActorId); - AFL_VERIFY(!DataChunksInFlightCount); + AFL_ENSURE(ActorId); + AFL_ENSURE(!DataChunksInFlightCount); std::vector<std::unique_ptr<TComputeTaskData>> result; if (data.SplittedBatches.size() > 1) { ui32 idx = 0; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index ff62c983fc..d3e6821273 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -36,10 +36,10 @@ private: return; } AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "scan_ack")("actor_id", ActorId); - AFL_VERIFY(NeedAck); + AFL_ENSURE(NeedAck); NeedAck = false; - AFL_VERIFY(ActorId); - AFL_VERIFY(!DataChunksInFlightCount); + AFL_ENSURE(ActorId); + AFL_ENSURE(!DataChunksInFlightCount); ui32 flags = IEventHandle::FlagTrackDelivery; if (!TracingStarted) { flags |= IEventHandle::FlagSubscribeOnSession; @@ -81,7 +81,7 @@ public: void Start(const TActorId& actorId) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("actor_id", actorId); - AFL_VERIFY(!ActorId); + AFL_ENSURE(!ActorId); ActorId = actorId; DoAck(); } @@ -90,7 +90,7 @@ public: void FinishWaitSendData() { --DataChunksInFlightCount; - AFL_VERIFY(DataChunksInFlightCount >= 0); + AFL_ENSURE(DataChunksInFlightCount >= 0); if (!DataChunksInFlightCount && !!ActorId) { DoAck(); } @@ -105,7 +105,7 @@ private: const std::optional<ui32> ComputeShardId; public: ui32 GetRowsCount() const { - AFL_VERIFY(Event); + AFL_ENSURE(Event); return Event->GetRowsCount(); } @@ -125,7 +125,7 @@ public: } void Finish() { - AFL_VERIFY(!Finished); + AFL_ENSURE(!Finished); Finished = true; Info->FinishWaitSendData(); } @@ -163,8 +163,8 @@ public: } void OnAckReceived(const ui64 freeSpace) { - AFL_VERIFY(!FreeSpace); - AFL_VERIFY(freeSpace); + AFL_ENSURE(!FreeSpace); + AFL_ENSURE(freeSpace); FreeSpace = freeSpace; SendData(); } @@ -210,7 +210,7 @@ public: bool OnComputeAck(const TActorId& computeActorId, const ui64 freeSpace) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "ack")("compute_actor_id", computeActorId); auto it = ComputeActorsById.find(computeActorId); - AFL_VERIFY(it != ComputeActorsById.end()); + AFL_ENSURE(it != ComputeActorsById.end()); if (it->second->IsFree()) { return false; } @@ -233,7 +233,7 @@ public: } UndefinedShardTaskData.emplace_back(std::move(sendTask)); } else { - AFL_VERIFY(*computeShardId < ComputeActors.size()); + AFL_ENSURE(*computeShardId < ComputeActors.size()); ComputeActors[*computeShardId].AddDataToSend(std::move(sendTask)); } } @@ -276,7 +276,7 @@ public: const std::shared_ptr<TShardState>& GetShardStateVerified(const ui64 tabletId) const { auto it = Shards.find(tabletId); - AFL_VERIFY(it != Shards.end()); + AFL_ENSURE(it != Shards.end())("tablet_id", tabletId); return it->second; } @@ -294,27 +294,27 @@ public: if (!state) { return; } - AFL_VERIFY(state->State == NComputeActor::EShardState::Starting)("state", state->State); + AFL_ENSURE(state->State == NComputeActor::EShardState::Starting)("state", state->State); AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "RegisterScannerActor")("actor_id", scanActorId) ("state", state->State)("tabletId", state->TabletId); - AFL_VERIFY(!state->ActorId); + AFL_ENSURE(!state->ActorId); state->State = NComputeActor::EShardState::Running; state->ActorId = scanActorId; state->ResetRetry(); - AFL_VERIFY(ShardsByActorId.emplace(scanActorId, state).second); + AFL_ENSURE(ShardsByActorId.emplace(scanActorId, state).second); GetShardScannerVerified(tabletId)->Start(scanActorId); } void StartScanner(TShardState& state) { - AFL_VERIFY(state.State == NComputeActor::EShardState::Initial); - AFL_VERIFY(state.TabletId); - AFL_VERIFY(!state.ActorId); + AFL_ENSURE(state.State == NComputeActor::EShardState::Initial)("state", state.State); + AFL_ENSURE(state.TabletId); + AFL_ENSURE(!state.ActorId); state.State = NComputeActor::EShardState::Starting; auto newScanner = std::make_shared<TShardScannerInfo>(state, ExternalObjectsProvider); - AFL_VERIFY(ShardScanners.emplace(state.TabletId, newScanner).second); + AFL_ENSURE(ShardScanners.emplace(state.TabletId, newScanner).second); } void StopScanner(const ui64 tabletId, const bool stopShard = true) { @@ -322,7 +322,7 @@ public: auto& state = GetShardStateVerified(tabletId); const auto actorId = state->ActorId; if (actorId) { - AFL_VERIFY(ShardsByActorId.erase(actorId)); + AFL_ENSURE(ShardsByActorId.erase(actorId)); } if (state->State != NComputeActor::EShardState::Initial) { state->RetryAttempt++; @@ -332,19 +332,19 @@ public: state->SubscribedOnTablet = false; auto it = ShardScanners.find(tabletId); - AFL_VERIFY(it != ShardScanners.end()); + AFL_ENSURE(it != ShardScanners.end()); it->second->Stop(true, ""); ShardScanners.erase(it); } if (stopShard) { - AFL_VERIFY(Shards.erase(tabletId)); + AFL_ENSURE(Shards.erase(tabletId)); } } const std::shared_ptr<TShardScannerInfo>& GetShardScannerVerified(const ui64 tabletId) const { auto it = ShardScanners.find(tabletId); - AFL_VERIFY(it != ShardScanners.end()); + AFL_ENSURE(it != ShardScanners.end()); return it->second; } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index d1b57b7937..da29486bd7 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -38,8 +38,8 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps , InFlightComputes(ComputeActorIds) { Y_UNUSED(traceId); - YQL_ENSURE(!Meta.GetReads().empty()); - YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView); + AFL_ENSURE(!Meta.GetReads().empty()); + AFL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView); ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "META:" << meta.DebugString(); KeyColumnTypes.reserve(Meta.GetKeyColumnTypes().size()); for (size_t i = 0; i < Meta.KeyColumnTypesSize(); i++) { @@ -117,7 +117,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { if (!state) { return; } - AFL_VERIFY(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender); + AFL_ENSURE(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender); TInstant startTime = TActivationContext::Now(); if (ev->Get()->Finished) { @@ -202,14 +202,14 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet if (!InFlightShards.IsActive()) { return; } - YQL_ENSURE(!PendingResolveShards.empty()); + AFL_ENSURE(!PendingResolveShards.empty()); auto state = std::move(PendingResolveShards.front()); PendingResolveShards.pop_front(); ResolveNextShard(); Y_ABORT_UNLESS(!InFlightShards.GetShardScanner(state.TabletId)); - YQL_ENSURE(state.State == EShardState::Resolving); + AFL_ENSURE(state.State == EShardState::Resolving); CA_LOG_D("Received TEvResolveKeySetResult update for table '" << ScanDataMeta.TablePath << "'"); auto* request = ev->Get()->Request.Get(); @@ -309,7 +309,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet } } - YQL_ENSURE(!newShards.empty()); + AFL_ENSURE(!newShards.empty()); for (int i = newShards.ysize() - 1; i >= 0; --i) { PendingShards.emplace_front(std::move(newShards[i])); @@ -324,7 +324,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet PendingShards.front().LastKey = std::move(readShard.LastKey); } - YQL_ENSURE(!PendingShards.empty()); + AFL_ENSURE(!PendingShards.empty()); } StartTableScan(); } @@ -446,7 +446,7 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds()); } - YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender); + AFL_ENSURE(state->ActorId == ev->Sender)("expected", state->ActorId)("got", ev->Sender); state->LastKey = std::move(msg.LastKey); const ui64 rowsCount = msg.GetRowsCount(); @@ -458,7 +458,7 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData << ";tablet_id=" << state->TabletId); auto shardScanner = InFlightShards.GetShardScannerVerified(state->TabletId); auto tasksForCompute = shardScanner->OnReceiveData(msg, shardScanner); - AFL_VERIFY(tasksForCompute.size() == 1 || tasksForCompute.size() == 0 || tasksForCompute.size() == ComputeActorIds.size())("size", tasksForCompute.size())("compute_size", ComputeActorIds.size()); + AFL_ENSURE(tasksForCompute.size() == 1 || tasksForCompute.size() == 0 || tasksForCompute.size() == ComputeActorIds.size())("size", tasksForCompute.size())("compute_size", ComputeActorIds.size()); for (auto&& i : tasksForCompute) { const std::optional<ui32> computeShardId = i->GetComputeShardId(); InFlightComputes.OnReceiveData(computeShardId, std::move(i)); @@ -477,7 +477,7 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData } void TKqpScanFetcherActor::ProcessScanData() { - YQL_ENSURE(!PendingScanData.empty()); + AFL_ENSURE(!PendingScanData.empty()); auto ev = std::move(PendingScanData.front().first); auto enqueuedAt = std::move(PendingScanData.front().second); @@ -487,7 +487,7 @@ void TKqpScanFetcherActor::ProcessScanData() { if (!state) return; - AFL_VERIFY(state->State == EShardState::Running || state->State == EShardState::PostRunning); + AFL_ENSURE(state->State == EShardState::Running || state->State == EShardState::PostRunning)("state", state->State); ProcessPendingScanDataItem(ev, enqueuedAt); } |