aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server
diff options
context:
space:
mode:
authorilnurkh <ilnurkh@yandex-team.com>2023-10-09 23:39:40 +0300
committerilnurkh <ilnurkh@yandex-team.com>2023-10-09 23:57:14 +0300
commite601ca03f859335d57ecff2e5aa6af234b6052ed (patch)
treede519a847e58a1b3993fcbfe05ff44cc946a3e24 /library/cpp/grpc/server
parentbbf2b6878af3854815a2c0ecb07a687071787639 (diff)
downloadydb-e601ca03f859335d57ecff2e5aa6af234b6052ed.tar.gz
Y_VERIFY->Y_ABORT_UNLESS at ^l
https://clubs.at.yandex-team.ru/arcadia/29404
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r--library/cpp/grpc/server/grpc_request.cpp4
-rw-r--r--library/cpp/grpc/server/grpc_request.h22
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp4
-rw-r--r--library/cpp/grpc/server/grpc_server.h10
4 files changed, 20 insertions, 20 deletions
diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp
index d18a32776f2..7030992173d 100644
--- a/library/cpp/grpc/server/grpc_request.cpp
+++ b/library/cpp/grpc/server/grpc_request.cpp
@@ -13,7 +13,7 @@ public:
void Enqueue(std::function<void()>&& fn, bool urgent) override {
with_lock(Mtx_) {
if (!UrgentQueue_.empty() || !NormalQueue_.empty()) {
- Y_VERIFY(!StreamIsReady_);
+ Y_ABORT_UNLESS(!StreamIsReady_);
}
auto& queue = urgent ? UrgentQueue_ : NormalQueue_;
if (StreamIsReady_ && queue.empty()) {
@@ -30,7 +30,7 @@ public:
size_t left = 0;
std::function<void()> fn;
with_lock(Mtx_) {
- Y_VERIFY(!StreamIsReady_);
+ Y_ABORT_UNLESS(!StreamIsReady_);
auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_;
if (queue.empty()) {
// Both queues are empty
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index caf47e26771..5c4cf7c2b8c 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -76,7 +76,7 @@ public:
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ Y_ABORT_UNLESS(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
}
@@ -104,7 +104,7 @@ public:
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ Y_ABORT_UNLESS(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
StreamAdaptor_ = CreateStreamAdaptor();
@@ -157,7 +157,7 @@ public:
}
void DestroyRequest() override {
- Y_VERIFY(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress");
+ Y_ABORT_UNLESS(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress");
RequestDestroyed_ = true;
if (RequestRegistered_) {
Server_->DeregisterRequestCtx(this);
@@ -262,16 +262,16 @@ private:
}
void OnBeforeCall() {
- Y_VERIFY(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed");
- Y_VERIFY(!Finished_, "Cannot start grpc calls after request is finished");
+ Y_ABORT_UNLESS(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed");
+ Y_ABORT_UNLESS(!Finished_, "Cannot start grpc calls after request is finished");
bool wasInProgress = std::exchange(CallInProgress_, true);
- Y_VERIFY(!wasInProgress, "Another grpc call is already in progress");
+ Y_ABORT_UNLESS(!wasInProgress, "Another grpc call is already in progress");
}
void OnAfterCall() {
- Y_VERIFY(!RequestDestroyed_, "Finished grpc call after request is already destroyed");
+ Y_ABORT_UNLESS(!RequestDestroyed_, "Finished grpc call after request is already destroyed");
bool wasInProgress = std::exchange(CallInProgress_, false);
- Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress");
+ Y_ABORT_UNLESS(wasInProgress, "Finished grpc call that was not in progress");
}
void WriteDataOk(NProtoBuf::Message* resp, ui32 status) {
@@ -290,7 +290,7 @@ private:
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
ResponseStatus = status;
- Y_VERIFY(this->Context.c_call());
+ Y_ABORT_UNLESS(this->Context.c_call());
OnBeforeCall();
Finished_ = true;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
@@ -346,7 +346,7 @@ private:
}
void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, const TString& details, bool urgent) {
- Y_VERIFY(code != grpc::OK);
+ Y_ABORT_UNLESS(code != grpc::OK);
if (code == grpc::StatusCode::UNAUTHENTICATED) {
Counters_->CountNotAuthenticated();
} else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {
@@ -395,7 +395,7 @@ private:
ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
if (this->Context.c_call() == nullptr) {
- Y_VERIFY(!ok);
+ Y_ABORT_UNLESS(!ok);
// One ref by OnFinishTag, grpc will not call this tag if no request received
UnRef();
} else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 9ba7d296591..a38d4c9da6c 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -44,7 +44,7 @@ TGRpcServer::TGRpcServer(const TServerOptions& opts)
{}
TGRpcServer::~TGRpcServer() {
- Y_VERIFY(Ts.empty());
+ Y_ABORT_UNLESS(Ts.empty());
Services_.clear();
}
@@ -193,7 +193,7 @@ void TGRpcServer::Stop() {
if (Server_) {
i64 sec = Options_.GRpcShutdownDeadline.Seconds();
- Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
+ Y_ABORT_UNLESS(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
}
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index fc1826b9228..2d746de99b3 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -155,7 +155,7 @@ public:
i64 prev;
do {
prev = AtomicGet(CurInFlightReqs_);
- Y_VERIFY(prev >= 0);
+ Y_ABORT_UNLESS(prev >= 0);
if (Limit_ && prev > Limit_) {
return false;
}
@@ -166,7 +166,7 @@ public:
void Dec() {
i64 newVal = AtomicDecrement(CurInFlightReqs_);
- Y_VERIFY(newVal >= 0);
+ Y_ABORT_UNLESS(newVal >= 0);
}
i64 GetCurrentInFlight() const {
@@ -334,18 +334,18 @@ public:
}
auto r = shard.Requests_.emplace(req);
- Y_VERIFY(r.second, "Ctx already registered");
+ Y_ABORT_UNLESS(r.second, "Ctx already registered");
}
return true;
}
void DeregisterRequestCtx(ICancelableContext* req) {
- Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
+ Y_ABORT_UNLESS(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
auto& shard = Shards_[req->ShardIndex];
with_lock(shard.Lock_) {
- Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered");
+ Y_ABORT_UNLESS(shard.Requests_.erase(req), "Ctx is not registered");
}
}