aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-22 16:39:03 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-22 20:07:59 +0300
commit2a8db7ea366cd6757bd39bebbda36ef00cc19844 (patch)
treef7a59ce8c6a368661b66f755b9eadf23f6be13e2
parent9542b1bc4af77566d86105846eca1dc2f4638405 (diff)
downloadydb-2a8db7ea366cd6757bd39bebbda36ef00cc19844.tar.gz
AFL_ENSURE usage for store context
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_state.cpp2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_common.h2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp10
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h46
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp22
5 files changed, 41 insertions, 41 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.cpp b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp
index 651c23f6c3..967e678957 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_state.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp
@@ -102,7 +102,7 @@ TString TShardState::GetAddress() const {
TShardState::TShardState(const ui64 tabletId)
: TabletId(tabletId)
{
- AFL_VERIFY(TabletId);
+ AFL_ENSURE(TabletId);
}
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_common.h b/ydb/core/kqp/compute_actor/kqp_scan_common.h
index 2ed16e1367..784cfca953 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_common.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_common.h
@@ -29,7 +29,7 @@ struct TScannedDataStats {
void CompleteShard(TShardState::TPtr state) {
auto it = ReadShardInfo.find(state->TabletId);
- AFL_VERIFY(it != ReadShardInfo.end());
+ AFL_ENSURE(it != ReadShardInfo.end());
auto& [currentRows, currentBytes] = it->second;
TotalReadRows += currentRows;
TotalReadBytes += currentBytes;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
index a8b1edeb9a..a5ebda9aeb 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
@@ -9,23 +9,23 @@ TShardState::TPtr TInFlightShards::Put(TShardState&& state) {
MutableStatistics(state.TabletId).MutableStatistics(0).SetStartInstant(Now());
TShardState::TPtr result = std::make_shared<TShardState>(std::move(state));
- AFL_VERIFY(Shards.emplace(result->TabletId, result).second);
+ AFL_ENSURE(Shards.emplace(result->TabletId, result).second);
return result;
}
std::vector<std::unique_ptr<TComputeTaskData>> TShardScannerInfo::OnReceiveData(TEvKqpCompute::TEvScanData& data, const std::shared_ptr<TShardScannerInfo>& selfPtr) {
if (!data.Finished) {
- AFL_VERIFY(!NeedAck);
+ AFL_ENSURE(!NeedAck);
NeedAck = true;
} else {
Finished = true;
}
if (data.IsEmpty()) {
- AFL_VERIFY(data.Finished);
+ AFL_ENSURE(data.Finished);
return {};
}
- AFL_VERIFY(ActorId);
- AFL_VERIFY(!DataChunksInFlightCount);
+ AFL_ENSURE(ActorId);
+ AFL_ENSURE(!DataChunksInFlightCount);
std::vector<std::unique_ptr<TComputeTaskData>> result;
if (data.SplittedBatches.size() > 1) {
ui32 idx = 0;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index ff62c983fc..d3e6821273 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -36,10 +36,10 @@ private:
return;
}
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "scan_ack")("actor_id", ActorId);
- AFL_VERIFY(NeedAck);
+ AFL_ENSURE(NeedAck);
NeedAck = false;
- AFL_VERIFY(ActorId);
- AFL_VERIFY(!DataChunksInFlightCount);
+ AFL_ENSURE(ActorId);
+ AFL_ENSURE(!DataChunksInFlightCount);
ui32 flags = IEventHandle::FlagTrackDelivery;
if (!TracingStarted) {
flags |= IEventHandle::FlagSubscribeOnSession;
@@ -81,7 +81,7 @@ public:
void Start(const TActorId& actorId) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("actor_id", actorId);
- AFL_VERIFY(!ActorId);
+ AFL_ENSURE(!ActorId);
ActorId = actorId;
DoAck();
}
@@ -90,7 +90,7 @@ public:
void FinishWaitSendData() {
--DataChunksInFlightCount;
- AFL_VERIFY(DataChunksInFlightCount >= 0);
+ AFL_ENSURE(DataChunksInFlightCount >= 0);
if (!DataChunksInFlightCount && !!ActorId) {
DoAck();
}
@@ -105,7 +105,7 @@ private:
const std::optional<ui32> ComputeShardId;
public:
ui32 GetRowsCount() const {
- AFL_VERIFY(Event);
+ AFL_ENSURE(Event);
return Event->GetRowsCount();
}
@@ -125,7 +125,7 @@ public:
}
void Finish() {
- AFL_VERIFY(!Finished);
+ AFL_ENSURE(!Finished);
Finished = true;
Info->FinishWaitSendData();
}
@@ -163,8 +163,8 @@ public:
}
void OnAckReceived(const ui64 freeSpace) {
- AFL_VERIFY(!FreeSpace);
- AFL_VERIFY(freeSpace);
+ AFL_ENSURE(!FreeSpace);
+ AFL_ENSURE(freeSpace);
FreeSpace = freeSpace;
SendData();
}
@@ -210,7 +210,7 @@ public:
bool OnComputeAck(const TActorId& computeActorId, const ui64 freeSpace) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "ack")("compute_actor_id", computeActorId);
auto it = ComputeActorsById.find(computeActorId);
- AFL_VERIFY(it != ComputeActorsById.end());
+ AFL_ENSURE(it != ComputeActorsById.end());
if (it->second->IsFree()) {
return false;
}
@@ -233,7 +233,7 @@ public:
}
UndefinedShardTaskData.emplace_back(std::move(sendTask));
} else {
- AFL_VERIFY(*computeShardId < ComputeActors.size());
+ AFL_ENSURE(*computeShardId < ComputeActors.size());
ComputeActors[*computeShardId].AddDataToSend(std::move(sendTask));
}
}
@@ -276,7 +276,7 @@ public:
const std::shared_ptr<TShardState>& GetShardStateVerified(const ui64 tabletId) const {
auto it = Shards.find(tabletId);
- AFL_VERIFY(it != Shards.end());
+ AFL_ENSURE(it != Shards.end())("tablet_id", tabletId);
return it->second;
}
@@ -294,27 +294,27 @@ public:
if (!state) {
return;
}
- AFL_VERIFY(state->State == NComputeActor::EShardState::Starting)("state", state->State);
+ AFL_ENSURE(state->State == NComputeActor::EShardState::Starting)("state", state->State);
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "RegisterScannerActor")("actor_id", scanActorId)
("state", state->State)("tabletId", state->TabletId);
- AFL_VERIFY(!state->ActorId);
+ AFL_ENSURE(!state->ActorId);
state->State = NComputeActor::EShardState::Running;
state->ActorId = scanActorId;
state->ResetRetry();
- AFL_VERIFY(ShardsByActorId.emplace(scanActorId, state).second);
+ AFL_ENSURE(ShardsByActorId.emplace(scanActorId, state).second);
GetShardScannerVerified(tabletId)->Start(scanActorId);
}
void StartScanner(TShardState& state) {
- AFL_VERIFY(state.State == NComputeActor::EShardState::Initial);
- AFL_VERIFY(state.TabletId);
- AFL_VERIFY(!state.ActorId);
+ AFL_ENSURE(state.State == NComputeActor::EShardState::Initial)("state", state.State);
+ AFL_ENSURE(state.TabletId);
+ AFL_ENSURE(!state.ActorId);
state.State = NComputeActor::EShardState::Starting;
auto newScanner = std::make_shared<TShardScannerInfo>(state, ExternalObjectsProvider);
- AFL_VERIFY(ShardScanners.emplace(state.TabletId, newScanner).second);
+ AFL_ENSURE(ShardScanners.emplace(state.TabletId, newScanner).second);
}
void StopScanner(const ui64 tabletId, const bool stopShard = true) {
@@ -322,7 +322,7 @@ public:
auto& state = GetShardStateVerified(tabletId);
const auto actorId = state->ActorId;
if (actorId) {
- AFL_VERIFY(ShardsByActorId.erase(actorId));
+ AFL_ENSURE(ShardsByActorId.erase(actorId));
}
if (state->State != NComputeActor::EShardState::Initial) {
state->RetryAttempt++;
@@ -332,19 +332,19 @@ public:
state->SubscribedOnTablet = false;
auto it = ShardScanners.find(tabletId);
- AFL_VERIFY(it != ShardScanners.end());
+ AFL_ENSURE(it != ShardScanners.end());
it->second->Stop(true, "");
ShardScanners.erase(it);
}
if (stopShard) {
- AFL_VERIFY(Shards.erase(tabletId));
+ AFL_ENSURE(Shards.erase(tabletId));
}
}
const std::shared_ptr<TShardScannerInfo>& GetShardScannerVerified(const ui64 tabletId) const {
auto it = ShardScanners.find(tabletId);
- AFL_VERIFY(it != ShardScanners.end());
+ AFL_ENSURE(it != ShardScanners.end());
return it->second;
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index d1b57b7937..da29486bd7 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -38,8 +38,8 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
, InFlightComputes(ComputeActorIds)
{
Y_UNUSED(traceId);
- YQL_ENSURE(!Meta.GetReads().empty());
- YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
+ AFL_ENSURE(!Meta.GetReads().empty());
+ AFL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "META:" << meta.DebugString();
KeyColumnTypes.reserve(Meta.GetKeyColumnTypes().size());
for (size_t i = 0; i < Meta.KeyColumnTypesSize(); i++) {
@@ -117,7 +117,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
if (!state) {
return;
}
- AFL_VERIFY(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender);
+ AFL_ENSURE(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender);
TInstant startTime = TActivationContext::Now();
if (ev->Get()->Finished) {
@@ -202,14 +202,14 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
if (!InFlightShards.IsActive()) {
return;
}
- YQL_ENSURE(!PendingResolveShards.empty());
+ AFL_ENSURE(!PendingResolveShards.empty());
auto state = std::move(PendingResolveShards.front());
PendingResolveShards.pop_front();
ResolveNextShard();
Y_ABORT_UNLESS(!InFlightShards.GetShardScanner(state.TabletId));
- YQL_ENSURE(state.State == EShardState::Resolving);
+ AFL_ENSURE(state.State == EShardState::Resolving);
CA_LOG_D("Received TEvResolveKeySetResult update for table '" << ScanDataMeta.TablePath << "'");
auto* request = ev->Get()->Request.Get();
@@ -309,7 +309,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
}
}
- YQL_ENSURE(!newShards.empty());
+ AFL_ENSURE(!newShards.empty());
for (int i = newShards.ysize() - 1; i >= 0; --i) {
PendingShards.emplace_front(std::move(newShards[i]));
@@ -324,7 +324,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
PendingShards.front().LastKey = std::move(readShard.LastKey);
}
- YQL_ENSURE(!PendingShards.empty());
+ AFL_ENSURE(!PendingShards.empty());
}
StartTableScan();
}
@@ -446,7 +446,7 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds());
}
- YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender);
+ AFL_ENSURE(state->ActorId == ev->Sender)("expected", state->ActorId)("got", ev->Sender);
state->LastKey = std::move(msg.LastKey);
const ui64 rowsCount = msg.GetRowsCount();
@@ -458,7 +458,7 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
<< ";tablet_id=" << state->TabletId);
auto shardScanner = InFlightShards.GetShardScannerVerified(state->TabletId);
auto tasksForCompute = shardScanner->OnReceiveData(msg, shardScanner);
- AFL_VERIFY(tasksForCompute.size() == 1 || tasksForCompute.size() == 0 || tasksForCompute.size() == ComputeActorIds.size())("size", tasksForCompute.size())("compute_size", ComputeActorIds.size());
+ AFL_ENSURE(tasksForCompute.size() == 1 || tasksForCompute.size() == 0 || tasksForCompute.size() == ComputeActorIds.size())("size", tasksForCompute.size())("compute_size", ComputeActorIds.size());
for (auto&& i : tasksForCompute) {
const std::optional<ui32> computeShardId = i->GetComputeShardId();
InFlightComputes.OnReceiveData(computeShardId, std::move(i));
@@ -477,7 +477,7 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
}
void TKqpScanFetcherActor::ProcessScanData() {
- YQL_ENSURE(!PendingScanData.empty());
+ AFL_ENSURE(!PendingScanData.empty());
auto ev = std::move(PendingScanData.front().first);
auto enqueuedAt = std::move(PendingScanData.front().second);
@@ -487,7 +487,7 @@ void TKqpScanFetcherActor::ProcessScanData() {
if (!state)
return;
- AFL_VERIFY(state->State == EShardState::Running || state->State == EShardState::PostRunning);
+ AFL_ENSURE(state->State == EShardState::Running || state->State == EShardState::PostRunning)("state", state->State);
ProcessPendingScanDataItem(ev, enqueuedAt);
}