aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc
diff options
context:
space:
mode:
authorilnurkh <ilnurkh@yandex-team.com>2023-10-09 23:39:40 +0300
committerilnurkh <ilnurkh@yandex-team.com>2023-10-09 23:57:14 +0300
commite601ca03f859335d57ecff2e5aa6af234b6052ed (patch)
treede519a847e58a1b3993fcbfe05ff44cc946a3e24 /library/cpp/grpc
parentbbf2b6878af3854815a2c0ecb07a687071787639 (diff)
downloadydb-e601ca03f859335d57ecff2e5aa6af234b6052ed.tar.gz
Y_VERIFY->Y_ABORT_UNLESS at ^l
https://clubs.at.yandex-team.ru/arcadia/29404
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp10
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h40
-rw-r--r--library/cpp/grpc/server/grpc_request.cpp4
-rw-r--r--library/cpp/grpc/server/grpc_request.h22
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp4
-rw-r--r--library/cpp/grpc/server/grpc_server.h10
6 files changed, 45 insertions, 45 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index 5794e1ecb1d..d06bbb2a7d2 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -176,7 +176,7 @@ void TChannelPool::DeleteExpiredStubsHolders() {
void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
- Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
+ Y_ABORT_UNLESS(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
LastUsedQueue_.erase(pos);
}
@@ -209,7 +209,7 @@ class TGRpcClientLow::TContextImpl final
public:
~TContextImpl() override {
- Y_VERIFY(CountChildren() == 0,
+ Y_ABORT_UNLESS(CountChildren() == 0,
"Destructor called with non-empty children");
if (Parent) {
@@ -239,7 +239,7 @@ public:
std::unique_lock<std::mutex> guard(Mutex);
auto removed = RemoveChild(child);
- Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
+ Y_ABORT_UNLESS(removed, "Unexpected ForgetContext(%p)", child);
}
IQueueClientContextPtr CreateContext() override {
@@ -266,7 +266,7 @@ public:
}
grpc::CompletionQueue* CompletionQueue() override {
- Y_VERIFY(Owner, "Uninitialized context");
+ Y_ABORT_UNLESS(Owner, "Uninitialized context");
return CQ;
}
@@ -326,7 +326,7 @@ public:
}
void SubscribeCancel(TCallback callback) override {
- Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
+ Y_ABORT_UNLESS(callback, "SubscribeCancel called with an empty callback");
{
std::unique_lock<std::mutex> guard(Mutex);
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index d9a061035d4..e3f786242db 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -555,7 +555,7 @@ public:
explicit TStreamRequestReadProcessor(TReaderCallback&& callback)
: Callback(std::move(callback))
{
- Y_VERIFY(Callback, "Missing connected callback");
+ Y_ABORT_UNLESS(Callback, "Missing connected callback");
}
void Cancel() override {
@@ -580,7 +580,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished && !HasInitialMetadata) {
ReadActive = true;
ReadCallback = std::move(callback);
@@ -609,7 +609,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished) {
ReadActive = true;
ReadCallback = std::move(callback);
@@ -637,7 +637,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished) {
ReadActive = true;
FinishCallback = std::move(callback);
@@ -658,7 +658,7 @@ public:
}
void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
+ Y_ABORT_UNLESS(callback, "Unexpected empty callback");
TGrpcStatus status;
@@ -709,8 +709,8 @@ private:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(ReadActive, "Unexpected Read done callback");
- Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
+ Y_ABORT_UNLESS(ReadActive, "Unexpected Read done callback");
+ Y_ABORT_UNLESS(!ReadFinished, "Unexpected ReadFinished flag");
if (!ok || Cancelled) {
ReadFinished = true;
@@ -781,7 +781,7 @@ private:
finishedCallbacks.swap(FinishedCallbacks);
if (Callback) {
- Y_VERIFY(!ReadActive);
+ Y_ABORT_UNLESS(!ReadActive);
startCallback = std::move(Callback);
Callback = nullptr;
} else if (ReadActive) {
@@ -857,7 +857,7 @@ public:
explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
: ConnectedCallback(std::move(callback))
{
- Y_VERIFY(ConnectedCallback, "Missing connected callback");
+ Y_ABORT_UNLESS(ConnectedCallback, "Missing connected callback");
}
void Cancel() override {
@@ -908,7 +908,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished && !HasInitialMetadata) {
ReadActive = true;
ReadCallback = std::move(callback);
@@ -937,7 +937,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished) {
ReadActive = true;
ReadCallback = std::move(callback);
@@ -965,7 +965,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished) {
ReadActive = true;
FinishCallback = std::move(callback);
@@ -991,7 +991,7 @@ public:
}
void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
+ Y_ABORT_UNLESS(callback, "Unexpected empty callback");
TGrpcStatus status;
@@ -1065,8 +1065,8 @@ private:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(ReadActive, "Unexpected Read done callback");
- Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
+ Y_ABORT_UNLESS(ReadActive, "Unexpected Read done callback");
+ Y_ABORT_UNLESS(!ReadFinished, "Unexpected ReadFinished flag");
if (!ok || Cancelled || WriteFinished) {
ReadFinished = true;
@@ -1103,8 +1103,8 @@ private:
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(WriteActive, "Unexpected Write done callback");
- Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag");
+ Y_ABORT_UNLESS(WriteActive, "Unexpected Write done callback");
+ Y_ABORT_UNLESS(!WriteFinished, "Unexpected WriteFinished flag");
if (ok) {
okCallback.swap(WriteCallback);
@@ -1167,7 +1167,7 @@ private:
finishedCallbacks.swap(FinishedCallbacks);
if (ConnectedCallback) {
- Y_VERIFY(!ReadActive);
+ Y_ABORT_UNLESS(!ReadActive);
connectedCallback = std::move(ConnectedCallback);
ConnectedCallback = nullptr;
} else if (ReadActive) {
@@ -1320,7 +1320,7 @@ private:
: Stub_(TGRpcService::NewStub(ci))
, Provider_(provider)
{
- Y_VERIFY(Provider_, "Connection does not have a queue provider");
+ Y_ABORT_UNLESS(Provider_, "Connection does not have a queue provider");
}
TServiceConnection(TStubsHolder& holder,
@@ -1328,7 +1328,7 @@ private:
: Stub_(holder.GetOrCreateStub<TStub>())
, Provider_(provider)
{
- Y_VERIFY(Provider_, "Connection does not have a queue provider");
+ Y_ABORT_UNLESS(Provider_, "Connection does not have a queue provider");
}
std::shared_ptr<TStub> Stub_;
diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp
index d18a32776f2..7030992173d 100644
--- a/library/cpp/grpc/server/grpc_request.cpp
+++ b/library/cpp/grpc/server/grpc_request.cpp
@@ -13,7 +13,7 @@ public:
void Enqueue(std::function<void()>&& fn, bool urgent) override {
with_lock(Mtx_) {
if (!UrgentQueue_.empty() || !NormalQueue_.empty()) {
- Y_VERIFY(!StreamIsReady_);
+ Y_ABORT_UNLESS(!StreamIsReady_);
}
auto& queue = urgent ? UrgentQueue_ : NormalQueue_;
if (StreamIsReady_ && queue.empty()) {
@@ -30,7 +30,7 @@ public:
size_t left = 0;
std::function<void()> fn;
with_lock(Mtx_) {
- Y_VERIFY(!StreamIsReady_);
+ Y_ABORT_UNLESS(!StreamIsReady_);
auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_;
if (queue.empty()) {
// Both queues are empty
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index caf47e26771..5c4cf7c2b8c 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -76,7 +76,7 @@ public:
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ Y_ABORT_UNLESS(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
}
@@ -104,7 +104,7 @@ public:
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ Y_ABORT_UNLESS(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
StreamAdaptor_ = CreateStreamAdaptor();
@@ -157,7 +157,7 @@ public:
}
void DestroyRequest() override {
- Y_VERIFY(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress");
+ Y_ABORT_UNLESS(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress");
RequestDestroyed_ = true;
if (RequestRegistered_) {
Server_->DeregisterRequestCtx(this);
@@ -262,16 +262,16 @@ private:
}
void OnBeforeCall() {
- Y_VERIFY(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed");
- Y_VERIFY(!Finished_, "Cannot start grpc calls after request is finished");
+ Y_ABORT_UNLESS(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed");
+ Y_ABORT_UNLESS(!Finished_, "Cannot start grpc calls after request is finished");
bool wasInProgress = std::exchange(CallInProgress_, true);
- Y_VERIFY(!wasInProgress, "Another grpc call is already in progress");
+ Y_ABORT_UNLESS(!wasInProgress, "Another grpc call is already in progress");
}
void OnAfterCall() {
- Y_VERIFY(!RequestDestroyed_, "Finished grpc call after request is already destroyed");
+ Y_ABORT_UNLESS(!RequestDestroyed_, "Finished grpc call after request is already destroyed");
bool wasInProgress = std::exchange(CallInProgress_, false);
- Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress");
+ Y_ABORT_UNLESS(wasInProgress, "Finished grpc call that was not in progress");
}
void WriteDataOk(NProtoBuf::Message* resp, ui32 status) {
@@ -290,7 +290,7 @@ private:
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
ResponseStatus = status;
- Y_VERIFY(this->Context.c_call());
+ Y_ABORT_UNLESS(this->Context.c_call());
OnBeforeCall();
Finished_ = true;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
@@ -346,7 +346,7 @@ private:
}
void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, const TString& details, bool urgent) {
- Y_VERIFY(code != grpc::OK);
+ Y_ABORT_UNLESS(code != grpc::OK);
if (code == grpc::StatusCode::UNAUTHENTICATED) {
Counters_->CountNotAuthenticated();
} else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {
@@ -395,7 +395,7 @@ private:
ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
if (this->Context.c_call() == nullptr) {
- Y_VERIFY(!ok);
+ Y_ABORT_UNLESS(!ok);
// One ref by OnFinishTag, grpc will not call this tag if no request received
UnRef();
} else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 9ba7d296591..a38d4c9da6c 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -44,7 +44,7 @@ TGRpcServer::TGRpcServer(const TServerOptions& opts)
{}
TGRpcServer::~TGRpcServer() {
- Y_VERIFY(Ts.empty());
+ Y_ABORT_UNLESS(Ts.empty());
Services_.clear();
}
@@ -193,7 +193,7 @@ void TGRpcServer::Stop() {
if (Server_) {
i64 sec = Options_.GRpcShutdownDeadline.Seconds();
- Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
+ Y_ABORT_UNLESS(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
}
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index fc1826b9228..2d746de99b3 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -155,7 +155,7 @@ public:
i64 prev;
do {
prev = AtomicGet(CurInFlightReqs_);
- Y_VERIFY(prev >= 0);
+ Y_ABORT_UNLESS(prev >= 0);
if (Limit_ && prev > Limit_) {
return false;
}
@@ -166,7 +166,7 @@ public:
void Dec() {
i64 newVal = AtomicDecrement(CurInFlightReqs_);
- Y_VERIFY(newVal >= 0);
+ Y_ABORT_UNLESS(newVal >= 0);
}
i64 GetCurrentInFlight() const {
@@ -334,18 +334,18 @@ public:
}
auto r = shard.Requests_.emplace(req);
- Y_VERIFY(r.second, "Ctx already registered");
+ Y_ABORT_UNLESS(r.second, "Ctx already registered");
}
return true;
}
void DeregisterRequestCtx(ICancelableContext* req) {
- Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
+ Y_ABORT_UNLESS(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
auto& shard = Shards_[req->ShardIndex];
with_lock(shard.Lock_) {
- Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered");
+ Y_ABORT_UNLESS(shard.Requests_.erase(req), "Ctx is not registered");
}
}