aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
commit4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc
parent17e20fa084178ddcb16255f974dbde74fb93608b (diff)
downloadydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp414
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h1122
-rw-r--r--library/cpp/grpc/client/grpc_common.h88
-rw-r--r--library/cpp/grpc/client/ya.make4
-rw-r--r--library/cpp/grpc/server/event_callback.cpp2
-rw-r--r--library/cpp/grpc/server/event_callback.h144
-rw-r--r--library/cpp/grpc/server/grpc_async_ctx_base.h122
-rw-r--r--library/cpp/grpc/server/grpc_counters.cpp2
-rw-r--r--library/cpp/grpc/server/grpc_counters.h134
-rw-r--r--library/cpp/grpc/server/grpc_request.cpp112
-rw-r--r--library/cpp/grpc/server/grpc_request.h696
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h152
-rw-r--r--library/cpp/grpc/server/grpc_response.h2
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp290
-rw-r--r--library/cpp/grpc/server/grpc_server.h404
-rw-r--r--library/cpp/grpc/server/ut/stream_adaptor_ut.cpp230
-rw-r--r--library/cpp/grpc/server/ut/ya.make32
-rw-r--r--library/cpp/grpc/server/ya.make42
18 files changed, 1996 insertions, 1996 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index ee9e997fa7..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -1,164 +1,164 @@
-#include "grpc_client_low.h"
-#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
-#include <contrib/libs/grpc/include/grpc/support/log.h>
-
-#include <library/cpp/containers/stack_vector/stack_vec.h>
-
-#include <util/string/printf.h>
+#include "grpc_client_low.h"
+#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
+#include <contrib/libs/grpc/include/grpc/support/log.h>
+
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+
+#include <util/string/printf.h>
#include <util/system/thread.h>
#include <util/random/random.h>
-
-#if !defined(_WIN32) && !defined(_WIN64)
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#endif
-
+
+#if !defined(_WIN32) && !defined(_WIN64)
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
namespace NGrpc {
-
-void EnableGRpcTracing() {
- grpc_tracer_set_enabled("tcp", true);
- grpc_tracer_set_enabled("client_channel", true);
- grpc_tracer_set_enabled("channel", true);
- grpc_tracer_set_enabled("api", true);
- grpc_tracer_set_enabled("connectivity_state", true);
- grpc_tracer_set_enabled("handshaker", true);
- grpc_tracer_set_enabled("http", true);
- grpc_tracer_set_enabled("http2_stream_state", true);
- grpc_tracer_set_enabled("op_failure", true);
- grpc_tracer_set_enabled("timer", true);
- gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
-}
-
-class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
-public:
- TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
- : Idle_(idle)
- , Count_(count)
- , Interval_(interval)
- {
- grpc_socket_mutator_init(this, &VTable);
- }
-private:
- static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
- return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
- }
-
- template<typename TVal>
- bool SetOption(int fd, int level, int optname, const TVal& value) {
- return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
- }
- bool SetOption(int fd) {
- if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
- Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
- return false;
- }
-#ifdef _linux_
- if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
- return false;
- }
-#endif
- return true;
- }
- static bool Mutate(int fd, grpc_socket_mutator* mutator) {
- auto self = Cast(mutator);
- return self->SetOption(fd);
- }
- static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
- const auto* selfA = Cast(a);
- const auto* selfB = Cast(b);
- auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
- auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
- return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
- }
- static void Destroy(grpc_socket_mutator* mutator) {
- delete Cast(mutator);
- }
-
- static grpc_socket_mutator_vtable VTable;
- const int Idle_;
- const int Count_;
- const int Interval_;
-};
-
-grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
- {
- &TGRpcKeepAliveSocketMutator::Mutate,
- &TGRpcKeepAliveSocketMutator::Compare,
- &TGRpcKeepAliveSocketMutator::Destroy
- };
-
+
+void EnableGRpcTracing() {
+ grpc_tracer_set_enabled("tcp", true);
+ grpc_tracer_set_enabled("client_channel", true);
+ grpc_tracer_set_enabled("channel", true);
+ grpc_tracer_set_enabled("api", true);
+ grpc_tracer_set_enabled("connectivity_state", true);
+ grpc_tracer_set_enabled("handshaker", true);
+ grpc_tracer_set_enabled("http", true);
+ grpc_tracer_set_enabled("http2_stream_state", true);
+ grpc_tracer_set_enabled("op_failure", true);
+ grpc_tracer_set_enabled("timer", true);
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+}
+
+class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
+public:
+ TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
+ : Idle_(idle)
+ , Count_(count)
+ , Interval_(interval)
+ {
+ grpc_socket_mutator_init(this, &VTable);
+ }
+private:
+ static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
+ return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
+ }
+
+ template<typename TVal>
+ bool SetOption(int fd, int level, int optname, const TVal& value) {
+ return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
+ }
+ bool SetOption(int fd) {
+ if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
+ Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+#ifdef _linux_
+ if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+ if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+ if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+#endif
+ return true;
+ }
+ static bool Mutate(int fd, grpc_socket_mutator* mutator) {
+ auto self = Cast(mutator);
+ return self->SetOption(fd);
+ }
+ static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
+ const auto* selfA = Cast(a);
+ const auto* selfB = Cast(b);
+ auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
+ auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
+ return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
+ }
+ static void Destroy(grpc_socket_mutator* mutator) {
+ delete Cast(mutator);
+ }
+
+ static grpc_socket_mutator_vtable VTable;
+ const int Idle_;
+ const int Count_;
+ const int Interval_;
+};
+
+grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
+ {
+ &TGRpcKeepAliveSocketMutator::Mutate,
+ &TGRpcKeepAliveSocketMutator::Compare,
+ &TGRpcKeepAliveSocketMutator::Destroy
+ };
+
TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
- : TcpKeepAliveSettings_(tcpKeepAliveSettings)
+ : TcpKeepAliveSettings_(tcpKeepAliveSettings)
, ExpireTime_(expireTime)
, UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
-{}
-
-void TChannelPool::GetStubsHolderLocked(
- const TString& channelId,
- const TGRpcClientConfig& config,
- std::function<void(TStubsHolder&)> cb)
-{
- {
- std::shared_lock readGuard(RWMutex_);
- const auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
+{}
+
+void TChannelPool::GetStubsHolderLocked(
+ const TString& channelId,
+ const TGRpcClientConfig& config,
+ std::function<void(TStubsHolder&)> cb)
+{
+ {
+ std::shared_lock readGuard(RWMutex_);
+ const auto it = Pool_.find(channelId);
+ if (it != Pool_.end()) {
if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
- return cb(it->second);
- }
- }
- }
- {
- std::unique_lock writeGuard(RWMutex_);
- {
- auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken()) {
+ return cb(it->second);
+ }
+ }
+ }
+ {
+ std::unique_lock writeGuard(RWMutex_);
+ {
+ auto it = Pool_.find(channelId);
+ if (it != Pool_.end()) {
+ if (!it->second.IsChannelBroken()) {
EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
auto now = Now();
LastUsedQueue_.emplace(now, channelId);
it->second.SetLastUseTime(now);
- return cb(it->second);
- } else {
- // This channel can't be used. Remove from pool to create new one
+ return cb(it->second);
+ } else {
+ // This channel can't be used. Remove from pool to create new one
EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- Pool_.erase(it);
- }
- }
- }
- TGRpcKeepAliveSocketMutator* mutator = nullptr;
- // will be destroyed inside grpc
- if (TcpKeepAliveSettings_.Enabled) {
- mutator = new TGRpcKeepAliveSocketMutator(
- TcpKeepAliveSettings_.Idle,
- TcpKeepAliveSettings_.Count,
- TcpKeepAliveSettings_.Interval
- );
- }
- cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
+ Pool_.erase(it);
+ }
+ }
+ }
+ TGRpcKeepAliveSocketMutator* mutator = nullptr;
+ // will be destroyed inside grpc
+ if (TcpKeepAliveSettings_.Enabled) {
+ mutator = new TGRpcKeepAliveSocketMutator(
+ TcpKeepAliveSettings_.Idle,
+ TcpKeepAliveSettings_.Count,
+ TcpKeepAliveSettings_.Interval
+ );
+ }
+ cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
- }
-}
-
-void TChannelPool::DeleteChannel(const TString& channelId) {
- std::unique_lock writeLock(RWMutex_);
+ }
+}
+
+void TChannelPool::DeleteChannel(const TString& channelId) {
+ std::unique_lock writeLock(RWMutex_);
auto poolIt = Pool_.find(channelId);
if (poolIt != Pool_.end()) {
EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
Pool_.erase(poolIt);
}
-}
-
+}
+
void TChannelPool::DeleteExpiredStubsHolders() {
std::unique_lock writeLock(RWMutex_);
auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
@@ -392,7 +392,7 @@ private:
// Some children are stored inline, others are in a set
std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
- std::unordered_set<TContextImpl*> Children;
+ std::unordered_set<TContextImpl*> Children;
// Single callback is stored without extra allocations
TStackVec<TCallback, 1> Callbacks;
@@ -404,10 +404,10 @@ private:
TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
: UseCompletionQueuePerThread_(useCompletionQueuePerThread)
{
- Init(numWorkerThread);
-}
-
-void TGRpcClientLow::Init(size_t numWorkerThread) {
+ Init(numWorkerThread);
+}
+
+void TGRpcClientLow::Init(size_t numWorkerThread) {
SetCqState(WORKING);
if (UseCompletionQueuePerThread_) {
for (size_t i = 0; i < numWorkerThread; i++) {
@@ -425,9 +425,9 @@ void TGRpcClientLow::Init(size_t numWorkerThread) {
PullEvents(cq);
}).Release());
}
- }
-}
-
+ }
+}
+
void TGRpcClientLow::AddWorkerThreadForTest() {
if (UseCompletionQueuePerThread_) {
CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
@@ -453,17 +453,17 @@ void TGRpcClientLow::Stop(bool wait) {
if (wait) {
WaitInternal();
- }
-}
-
+ }
+}
+
void TGRpcClientLow::StopInternal(bool silent) {
bool shutdown;
TVector<TContextImpl::TContextPtr> cancelQueue;
- {
- std::unique_lock<std::mutex> guard(Mtx_);
-
+ {
+ std::unique_lock<std::mutex> guard(Mtx_);
+
auto allowStateChange = [&]() {
switch (GetCqState()) {
case WORKING:
@@ -484,7 +484,7 @@ void TGRpcClientLow::StopInternal(bool silent) {
SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
- if (!silent && !Contexts_.empty()) {
+ if (!silent && !Contexts_.empty()) {
cancelQueue.reserve(Contexts_.size());
for (auto* ptr : Contexts_) {
// N.B. some contexts may be stuck in destructors
@@ -494,7 +494,7 @@ void TGRpcClientLow::StopInternal(bool silent) {
}
}
- shutdown = Contexts_.empty();
+ shutdown = Contexts_.empty();
}
for (auto& context : cancelQueue) {
@@ -506,62 +506,62 @@ void TGRpcClientLow::StopInternal(bool silent) {
for (auto& cq : CQS_) {
cq->Shutdown();
}
- }
+ }
}
void TGRpcClientLow::WaitInternal() {
- std::unique_lock<std::mutex> guard(JoinMutex_);
-
- for (auto& ti : WorkerThreads_) {
- ti->Join();
- }
-}
-
+ std::unique_lock<std::mutex> guard(JoinMutex_);
+
+ for (auto& ti : WorkerThreads_) {
+ ti->Join();
+ }
+}
+
void TGRpcClientLow::WaitIdle() {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- while (!Contexts_.empty()) {
- ContextsEmpty_.wait(guard);
- }
-}
-
+ std::unique_lock<std::mutex> guard(Mtx_);
+
+ while (!Contexts_.empty()) {
+ ContextsEmpty_.wait(guard);
+ }
+}
+
std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- auto allowCreateContext = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- case STOP_SILENT:
- case STOP_EXPLICIT:
- return false;
+ std::unique_lock<std::mutex> guard(Mtx_);
+
+ auto allowCreateContext = [&]() {
+ switch (GetCqState()) {
+ case WORKING:
+ return true;
+ case STOP_SILENT:
+ case STOP_EXPLICIT:
+ return false;
}
- Y_UNREACHABLE();
- };
-
- if (!allowCreateContext()) {
- // New context creation is forbidden
- return nullptr;
- }
-
- auto context = std::make_shared<TContextImpl>();
- Contexts_.insert(context.get());
- context->Owner = this;
- if (UseCompletionQueuePerThread_) {
- context->CQ = CQS_[RandomNumber(CQS_.size())].get();
- } else {
- context->CQ = CQS_[0].get();
- }
- return context;
+ Y_UNREACHABLE();
+ };
+
+ if (!allowCreateContext()) {
+ // New context creation is forbidden
+ return nullptr;
+ }
+
+ auto context = std::make_shared<TContextImpl>();
+ Contexts_.insert(context.get());
+ context->Owner = this;
+ if (UseCompletionQueuePerThread_) {
+ context->CQ = CQS_[RandomNumber(CQS_.size())].get();
+ } else {
+ context->CQ = CQS_[0].get();
+ }
+ return context;
}
void TGRpcClientLow::ForgetContext(TContextImpl* context) {
bool shutdown = false;
- {
- std::unique_lock<std::mutex> guard(Mtx_);
-
+ {
+ std::unique_lock<std::mutex> guard(Mtx_);
+
if (!Contexts_.erase(context)) {
Y_FAIL("Unexpected ForgetContext(%p)", context);
}
@@ -571,7 +571,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
shutdown = true;
}
- ContextsEmpty_.notify_all();
+ ContextsEmpty_.notify_all();
}
}
@@ -580,7 +580,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
for (auto& cq : CQS_) {
cq->Shutdown();
}
- }
-}
-
+ }
+}
+
} // namespace NGRpc
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index e452be45a1..ab0a0627be 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -1,57 +1,57 @@
-#pragma once
-
-#include "grpc_common.h"
-
+#pragma once
+
+#include "grpc_common.h"
+
#include <util/thread/factory.h>
-#include <grpc++/grpc++.h>
+#include <grpc++/grpc++.h>
#include <grpc++/support/async_stream.h>
#include <grpc++/support/async_unary_call.h>
-
-#include <deque>
-#include <typeindex>
-#include <typeinfo>
+
+#include <deque>
+#include <typeindex>
+#include <typeinfo>
#include <variant>
-#include <vector>
-#include <unordered_map>
-#include <unordered_set>
-#include <mutex>
-#include <shared_mutex>
-
-/*
- * This file contains low level logic for grpc
- * This file should not be used in high level code without special reason
- */
+#include <vector>
+#include <unordered_map>
+#include <unordered_set>
+#include <mutex>
+#include <shared_mutex>
+
+/*
+ * This file contains low level logic for grpc
+ * This file should not be used in high level code without special reason
+ */
namespace NGrpc {
-
-const size_t DEFAULT_NUM_THREADS = 2;
-
-////////////////////////////////////////////////////////////////////////////////
-
-void EnableGRpcTracing();
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TTcpKeepAliveSettings {
- bool Enabled;
- size_t Idle;
- size_t Count;
- size_t Interval;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-// Common interface used to execute action from grpc cq routine
-class IQueueClientEvent {
-public:
- virtual ~IQueueClientEvent() = default;
-
+
+const size_t DEFAULT_NUM_THREADS = 2;
+
+////////////////////////////////////////////////////////////////////////////////
+
+void EnableGRpcTracing();
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TTcpKeepAliveSettings {
+ bool Enabled;
+ size_t Idle;
+ size_t Count;
+ size_t Interval;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Common interface used to execute action from grpc cq routine
+class IQueueClientEvent {
+public:
+ virtual ~IQueueClientEvent() = default;
+
//! Execute an action defined by implementation
- virtual bool Execute(bool ok) = 0;
-
- //! Finish and destroy event
- virtual void Destroy() = 0;
-};
-
+ virtual bool Execute(bool ok) = 0;
+
+ //! Finish and destroy event
+ virtual void Destroy() = 0;
+};
+
// Implementation of IQueueClientEvent that reduces allocations
template<class TSelf>
class TQueueClientFixedEvent : private IQueueClientEvent {
@@ -123,8 +123,8 @@ public:
}
};
-// Represents grpc status and error message string
-struct TGrpcStatus {
+// Represents grpc status and error message string
+struct TGrpcStatus {
TString Msg;
TString Details;
int GRpcStatusCode;
@@ -167,36 +167,36 @@ struct TGrpcStatus {
bool Ok() const {
return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
}
-};
-
-bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
+};
+
+bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
return status.Ok();
-}
-
+}
+
// Response callback type - this callback will be called when request is finished
-// (or after getting each chunk in case of streaming mode)
-template<typename TResponse>
-using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>;
-
+// (or after getting each chunk in case of streaming mode)
+template<typename TResponse>
+using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>;
+
template<typename TResponse>
using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>;
-// Call associated metadata
-struct TCallMeta {
- std::shared_ptr<grpc::CallCredentials> CallCredentials;
- std::vector<std::pair<TString, TString>> Aux;
+// Call associated metadata
+struct TCallMeta {
+ std::shared_ptr<grpc::CallCredentials> CallCredentials;
+ std::vector<std::pair<TString, TString>> Aux;
std::variant<TDuration, TInstant> Timeout; // timeout as duration from now or time point in future
-};
-
-class TGRpcRequestProcessorCommon {
-protected:
- void ApplyMeta(const TCallMeta& meta) {
- for (const auto& rec : meta.Aux) {
- Context.AddMetadata(rec.first, rec.second);
- }
- if (meta.CallCredentials) {
- Context.set_credentials(meta.CallCredentials);
- }
+};
+
+class TGRpcRequestProcessorCommon {
+protected:
+ void ApplyMeta(const TCallMeta& meta) {
+ for (const auto& rec : meta.Aux) {
+ Context.AddMetadata(rec.first, rec.second);
+ }
+ if (meta.CallCredentials) {
+ Context.set_credentials(meta.CallCredentials);
+ }
if (const TDuration* timeout = std::get_if<TDuration>(&meta.Timeout)) {
if (*timeout) {
auto deadline = gpr_time_add(
@@ -208,10 +208,10 @@ protected:
if (*deadline) {
Context.set_deadline(gpr_time_from_micros(deadline->MicroSeconds(), GPR_CLOCK_MONOTONIC));
}
- }
- }
+ }
+ }
- void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) {
+ void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) {
for (const auto& [key, value] : Context.GetServerInitialMetadata()) {
metadata->emplace(
TString(key.begin(), key.end()),
@@ -220,61 +220,61 @@ protected:
}
}
- grpc::Status Status;
- grpc::ClientContext Context;
- std::shared_ptr<IQueueClientContext> LocalContext;
-};
-
-template<typename TStub, typename TRequest, typename TResponse>
-class TSimpleRequestProcessor
- : public TThrRefBase
- , public IQueueClientEvent
- , public TGRpcRequestProcessorCommon {
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
- template<typename> friend class TServiceConnection;
-public:
- using TPtr = TIntrusivePtr<TSimpleRequestProcessor>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
-
+ grpc::Status Status;
+ grpc::ClientContext Context;
+ std::shared_ptr<IQueueClientContext> LocalContext;
+};
+
+template<typename TStub, typename TRequest, typename TResponse>
+class TSimpleRequestProcessor
+ : public TThrRefBase
+ , public IQueueClientEvent
+ , public TGRpcRequestProcessorCommon {
+ using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
+ template<typename> friend class TServiceConnection;
+public:
+ using TPtr = TIntrusivePtr<TSimpleRequestProcessor>;
+ using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
+
explicit TSimpleRequestProcessor(TResponseCallback<TResponse>&& callback)
: Callback_(std::move(callback))
- { }
-
- ~TSimpleRequestProcessor() {
+ { }
+
+ ~TSimpleRequestProcessor() {
if (!Replied_ && Callback_) {
Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
Callback_ = nullptr; // free resources as early as possible
- }
- }
-
- bool Execute(bool ok) override {
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext.reset();
+ }
+ }
+
+ bool Execute(bool ok) override {
+ {
+ std::unique_lock<std::mutex> guard(Mutex_);
+ LocalContext.reset();
}
TGrpcStatus status;
if (ok) {
- status = Status;
+ status = Status;
} else {
status = TGrpcStatus::Internal("Unexpected error");
- }
- Replied_ = true;
- Callback_(std::move(status), std::move(Reply_));
+ }
+ Replied_ = true;
+ Callback_(std::move(status), std::move(Reply_));
Callback_ = nullptr; // free resources as early as possible
- return false;
- }
-
- void Destroy() override {
+ return false;
+ }
+
+ void Destroy() override {
UnRef();
- }
-
-private:
+ }
+
+private:
IQueueClientEvent* FinishedEvent() {
Ref();
return this;
- }
-
- void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
+ }
+
+ void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
auto context = provider->CreateContext();
if (!context) {
Replied_ = true;
@@ -282,11 +282,11 @@ private:
Callback_ = nullptr;
return;
}
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext = context;
- Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
- Reader_->Finish(&Reply_, &Status, FinishedEvent());
+ {
+ std::unique_lock<std::mutex> guard(Mutex_);
+ LocalContext = context;
+ Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
+ Reader_->Finish(&Reply_, &Status, FinishedEvent());
}
context->SubscribeStop([self = TPtr(this)] {
self->Stop();
@@ -294,17 +294,17 @@ private:
}
void Stop() {
- Context.TryCancel();
+ Context.TryCancel();
}
TResponseCallback<TResponse> Callback_;
- TResponse Reply_;
- std::mutex Mutex_;
- TAsyncReaderPtr Reader_;
-
- bool Replied_ = false;
-};
-
+ TResponse Reply_;
+ std::mutex Mutex_;
+ TAsyncReaderPtr Reader_;
+
+ bool Replied_ = false;
+};
+
template<typename TStub, typename TRequest, typename TResponse>
class TAdvancedRequestProcessor
: public TThrRefBase
@@ -328,8 +328,8 @@ public:
}
bool Execute(bool ok) override {
- {
- std::unique_lock<std::mutex> guard(Mutex_);
+ {
+ std::unique_lock<std::mutex> guard(Mutex_);
LocalContext.reset();
}
TGrpcStatus status;
@@ -362,8 +362,8 @@ private:
Callback_ = nullptr;
return;
}
- {
- std::unique_lock<std::mutex> guard(Mutex_);
+ {
+ std::unique_lock<std::mutex> guard(Mutex_);
LocalContext = context;
Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
Reader_->Finish(&Reply_, &Status, FinishedEvent());
@@ -379,16 +379,16 @@ private:
TAdvancedResponseCallback<TResponse> Callback_;
TResponse Reply_;
- std::mutex Mutex_;
+ std::mutex Mutex_;
TAsyncReaderPtr Reader_;
bool Replied_ = false;
};
-template<class TResponse>
-class IStreamRequestReadProcessor : public TThrRefBase {
+template<class TResponse>
+class IStreamRequestReadProcessor : public TThrRefBase {
public:
- using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
+ using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
using TReadCallback = std::function<void(TGrpcStatus&&)>;
/**
@@ -399,7 +399,7 @@ public:
/**
* Scheduled initial server metadata read from the stream
*/
- virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;
+ virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;
/**
* Scheduled response read from the stream
@@ -420,72 +420,72 @@ public:
virtual void AddFinishedCallback(TReadCallback callback) = 0;
};
-template<class TRequest, class TResponse>
-class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {
-public:
- using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;
+template<class TRequest, class TResponse>
+class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {
+public:
+ using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;
using TWriteCallback = std::function<void(TGrpcStatus&&)>;
-
- /**
- * Scheduled request write to the stream
- */
+
+ /**
+ * Scheduled request write to the stream
+ */
virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
-};
-
-class TGRpcKeepAliveSocketMutator;
-
-// Class to hold stubs allocated on channel.
-// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit
-
-// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if
-// request processor using stub
-class TStubsHolder : public TNonCopyable {
- using TypeInfoRef = std::reference_wrapper<const std::type_info>;
-
- struct THasher {
- std::size_t operator()(TypeInfoRef code) const {
- return code.get().hash_code();
- }
- };
-
- struct TEqualTo {
- bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const {
- return lhs.get() == rhs.get();
- }
- };
-public:
- TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel)
- : ChannelInterface_(channel)
- {}
-
- // Returns true if channel can't be used to perform request now
- bool IsChannelBroken() const {
- auto state = ChannelInterface_->GetState(false);
- return state == GRPC_CHANNEL_SHUTDOWN ||
- state == GRPC_CHANNEL_TRANSIENT_FAILURE;
- }
-
- template<typename TStub>
- std::shared_ptr<TStub> GetOrCreateStub() {
- const auto& stubId = typeid(TStub);
- {
- std::shared_lock readGuard(RWMutex_);
- const auto it = Stubs_.find(stubId);
- if (it != Stubs_.end()) {
- return std::static_pointer_cast<TStub>(it->second);
- }
- }
- {
- std::unique_lock writeGuard(RWMutex_);
- auto it = Stubs_.emplace(stubId, nullptr);
- if (!it.second) {
- return std::static_pointer_cast<TStub>(it.first->second);
- } else {
- it.first->second = std::make_shared<TStub>(ChannelInterface_);
- return std::static_pointer_cast<TStub>(it.first->second);
- }
- }
- }
+};
+
+class TGRpcKeepAliveSocketMutator;
+
+// Class to hold stubs allocated on channel.
+// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit
+
+// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if
+// request processor using stub
+class TStubsHolder : public TNonCopyable {
+ using TypeInfoRef = std::reference_wrapper<const std::type_info>;
+
+ struct THasher {
+ std::size_t operator()(TypeInfoRef code) const {
+ return code.get().hash_code();
+ }
+ };
+
+ struct TEqualTo {
+ bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const {
+ return lhs.get() == rhs.get();
+ }
+ };
+public:
+ TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel)
+ : ChannelInterface_(channel)
+ {}
+
+ // Returns true if channel can't be used to perform request now
+ bool IsChannelBroken() const {
+ auto state = ChannelInterface_->GetState(false);
+ return state == GRPC_CHANNEL_SHUTDOWN ||
+ state == GRPC_CHANNEL_TRANSIENT_FAILURE;
+ }
+
+ template<typename TStub>
+ std::shared_ptr<TStub> GetOrCreateStub() {
+ const auto& stubId = typeid(TStub);
+ {
+ std::shared_lock readGuard(RWMutex_);
+ const auto it = Stubs_.find(stubId);
+ if (it != Stubs_.end()) {
+ return std::static_pointer_cast<TStub>(it->second);
+ }
+ }
+ {
+ std::unique_lock writeGuard(RWMutex_);
+ auto it = Stubs_.emplace(stubId, nullptr);
+ if (!it.second) {
+ return std::static_pointer_cast<TStub>(it.first->second);
+ } else {
+ it.first->second = std::make_shared<TStub>(ChannelInterface_);
+ return std::static_pointer_cast<TStub>(it.first->second);
+ }
+ }
+ }
const TInstant& GetLastUseTime() const {
return LastUsed_;
@@ -494,76 +494,76 @@ public:
void SetLastUseTime(const TInstant& time) {
LastUsed_ = time;
}
-private:
+private:
TInstant LastUsed_ = Now();
- std::shared_mutex RWMutex_;
- std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_;
- std::shared_ptr<grpc::ChannelInterface> ChannelInterface_;
-};
-
-class TChannelPool {
-public:
+ std::shared_mutex RWMutex_;
+ std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_;
+ std::shared_ptr<grpc::ChannelInterface> ChannelInterface_;
+};
+
+class TChannelPool {
+public:
TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6));
- //Allows to CreateStub from TStubsHolder under lock
- //The callback will be called just during GetStubsHolderLocked call
- void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb);
- void DeleteChannel(const TString& channelId);
+ //Allows to CreateStub from TStubsHolder under lock
+ //The callback will be called just during GetStubsHolderLocked call
+ void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb);
+ void DeleteChannel(const TString& channelId);
void DeleteExpiredStubsHolders();
-private:
- std::shared_mutex RWMutex_;
- std::unordered_map<TString, TStubsHolder> Pool_;
+private:
+ std::shared_mutex RWMutex_;
+ std::unordered_map<TString, TStubsHolder> Pool_;
std::multimap<TInstant, TString> LastUsedQueue_;
- TTcpKeepAliveSettings TcpKeepAliveSettings_;
+ TTcpKeepAliveSettings TcpKeepAliveSettings_;
TDuration ExpireTime_;
TDuration UpdateReUseTime_;
void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId);
-};
-
-template<class TResponse>
-using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>;
-
-template<typename TStub, typename TRequest, typename TResponse>
-class TStreamRequestReadProcessor
- : public IStreamRequestReadProcessor<TResponse>
- , public TGRpcRequestProcessorCommon {
- template<typename> friend class TServiceConnection;
-public:
- using TSelf = TStreamRequestReadProcessor;
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*);
- using TReaderCallback = TStreamReaderCallback<TResponse>;
- using TPtr = TIntrusivePtr<TSelf>;
- using TBase = IStreamRequestReadProcessor<TResponse>;
- using TReadCallback = typename TBase::TReadCallback;
-
- explicit TStreamRequestReadProcessor(TReaderCallback&& callback)
- : Callback(std::move(callback))
- {
- Y_VERIFY(Callback, "Missing connected callback");
- }
-
- void Cancel() override {
- Context.TryCancel();
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Cancelled = true;
+};
+
+template<class TResponse>
+using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>;
+
+template<typename TStub, typename TRequest, typename TResponse>
+class TStreamRequestReadProcessor
+ : public IStreamRequestReadProcessor<TResponse>
+ , public TGRpcRequestProcessorCommon {
+ template<typename> friend class TServiceConnection;
+public:
+ using TSelf = TStreamRequestReadProcessor;
+ using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>;
+ using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*);
+ using TReaderCallback = TStreamReaderCallback<TResponse>;
+ using TPtr = TIntrusivePtr<TSelf>;
+ using TBase = IStreamRequestReadProcessor<TResponse>;
+ using TReadCallback = typename TBase::TReadCallback;
+
+ explicit TStreamRequestReadProcessor(TReaderCallback&& callback)
+ : Callback(std::move(callback))
+ {
+ Y_VERIFY(Callback, "Missing connected callback");
+ }
+
+ void Cancel() override {
+ Context.TryCancel();
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ Cancelled = true;
if (Started && !ReadFinished) {
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (ReadFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- }
-
- void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
+ if (!ReadActive) {
+ ReadFinished = true;
+ }
+ if (ReadFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ }
+ }
+ }
+
+ void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished && !HasInitialMetadata) {
ReadActive = true;
@@ -588,66 +588,66 @@ public:
callback(std::move(status));
}
- void Read(TResponse* message, TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- if (!ReadFinished) {
- Stream->Read(message, OnReadDoneTag.Prepare());
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
-
- callback(std::move(status));
- }
-
- void Finish(TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- FinishCallback = std::move(callback);
- if (!ReadFinished) {
- ReadFinished = true;
- }
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
+ void Read(TResponse* message, TReadCallback callback) override {
+ TGrpcStatus status;
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ if (!Finished) {
+ ReadActive = true;
+ ReadCallback = std::move(callback);
+ if (!ReadFinished) {
+ Stream->Read(message, OnReadDoneTag.Prepare());
+ }
+ return;
+ }
+ if (FinishedOk) {
+ status = Status;
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ }
+
+ callback(std::move(status));
+ }
+
+ void Finish(TReadCallback callback) override {
+ TGrpcStatus status;
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ if (!Finished) {
+ ReadActive = true;
+ FinishCallback = std::move(callback);
+ if (!ReadFinished) {
+ ReadFinished = true;
+ }
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ return;
+ }
+ if (FinishedOk) {
+ status = Status;
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ callback(std::move(status));
+ }
void AddFinishedCallback(TReadCallback callback) override {
Y_VERIFY(callback, "Unexpected empty callback");
TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
if (!Finished) {
FinishedCallbacks.emplace_back().swap(callback);
return;
@@ -665,103 +665,103 @@ public:
callback(std::move(status));
}
-private:
- void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- auto callback = std::move(Callback);
- TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
- callback(std::move(status), nullptr);
- return;
- }
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- LocalContext = context;
- Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare());
- }
-
- context->SubscribeStop([self = TPtr(this)] {
- self->Cancel();
- });
- }
-
- void OnReadDone(bool ok) {
- TGrpcStatus status;
- TReadCallback callback;
- std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(ReadActive, "Unexpected Read done callback");
- Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
-
- if (!ok || Cancelled) {
- ReadFinished = true;
-
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- if (!ok) {
- // Keep ReadActive=true, so callback is called
- // after the call is finished with an error
- return;
- }
- }
-
- callback = std::move(ReadCallback);
- ReadCallback = nullptr;
- ReadActive = false;
+private:
+ void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
+ auto context = provider->CreateContext();
+ if (!context) {
+ auto callback = std::move(Callback);
+ TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
+ callback(std::move(status), nullptr);
+ return;
+ }
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ LocalContext = context;
+ Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare());
+ }
+
+ context->SubscribeStop([self = TPtr(this)] {
+ self->Cancel();
+ });
+ }
+
+ void OnReadDone(bool ok) {
+ TGrpcStatus status;
+ TReadCallback callback;
+ std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ Y_VERIFY(ReadActive, "Unexpected Read done callback");
+ Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
+
+ if (!ok || Cancelled) {
+ ReadFinished = true;
+
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ if (!ok) {
+ // Keep ReadActive=true, so callback is called
+ // after the call is finished with an error
+ return;
+ }
+ }
+
+ callback = std::move(ReadCallback);
+ ReadCallback = nullptr;
+ ReadActive = false;
initialMetadata = InitialMetadata;
InitialMetadata = nullptr;
HasInitialMetadata = true;
- }
-
+ }
+
if (initialMetadata) {
GetInitialMetadata(initialMetadata);
}
- callback(std::move(status));
- }
-
- void OnStartDone(bool ok) {
- TReaderCallback callback;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ callback(std::move(status));
+ }
+
+ void OnStartDone(bool ok) {
+ TReaderCallback callback;
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Started = true;
if (!ok || Cancelled) {
- ReadFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
- callback = std::move(Callback);
- Callback = nullptr;
- }
-
- callback({ }, typename TBase::TPtr(this));
- }
-
- void OnFinished(bool ok) {
- TGrpcStatus status;
- std::vector<TReadCallback> finishedCallbacks;
+ ReadFinished = true;
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ return;
+ }
+ callback = std::move(Callback);
+ Callback = nullptr;
+ }
+
+ callback({ }, typename TBase::TPtr(this));
+ }
+
+ void OnFinished(bool ok) {
+ TGrpcStatus status;
+ std::vector<TReadCallback> finishedCallbacks;
TReaderCallback startCallback;
- TReadCallback readCallback;
- TReadCallback finishCallback;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- Finished = true;
- FinishedOk = ok;
- LocalContext.reset();
-
- if (ok) {
- status = Status;
+ TReadCallback readCallback;
+ TReadCallback finishCallback;
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ Finished = true;
+ FinishedOk = ok;
+ LocalContext.reset();
+
+ if (ok) {
+ status = Status;
} else if (Cancelled) {
status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
-
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+
finishedCallbacks.swap(FinishedCallbacks);
if (Callback) {
@@ -769,68 +769,68 @@ private:
startCallback = std::move(Callback);
Callback = nullptr;
} else if (ReadActive) {
- if (ReadCallback) {
- readCallback = std::move(ReadCallback);
- ReadCallback = nullptr;
- } else {
- finishCallback = std::move(FinishCallback);
- FinishCallback = nullptr;
- }
- ReadActive = false;
- }
- }
-
+ if (ReadCallback) {
+ readCallback = std::move(ReadCallback);
+ ReadCallback = nullptr;
+ } else {
+ finishCallback = std::move(FinishCallback);
+ FinishCallback = nullptr;
+ }
+ ReadActive = false;
+ }
+ }
+
for (auto& finishedCallback : finishedCallbacks) {
auto statusCopy = status;
finishedCallback(std::move(statusCopy));
}
if (startCallback) {
- if (status.Ok()) {
+ if (status.Ok()) {
status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
}
startCallback(std::move(status), nullptr);
} else if (readCallback) {
if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
- readCallback(std::move(status));
- } else if (finishCallback) {
- finishCallback(std::move(status));
- }
- }
-
- TReaderCallback Callback;
- TAsyncReaderPtr Stream;
- using TFixedEvent = TQueueClientFixedEvent<TSelf>;
- std::mutex Mutex;
- TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
- TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone };
- TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
-
- TReadCallback ReadCallback;
- TReadCallback FinishCallback;
- std::vector<TReadCallback> FinishedCallbacks;
- std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
+ status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ }
+ readCallback(std::move(status));
+ } else if (finishCallback) {
+ finishCallback(std::move(status));
+ }
+ }
+
+ TReaderCallback Callback;
+ TAsyncReaderPtr Stream;
+ using TFixedEvent = TQueueClientFixedEvent<TSelf>;
+ std::mutex Mutex;
+ TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
+ TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone };
+ TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
+
+ TReadCallback ReadCallback;
+ TReadCallback FinishCallback;
+ std::vector<TReadCallback> FinishedCallbacks;
+ std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
bool Started = false;
bool HasInitialMetadata = false;
- bool ReadActive = false;
- bool ReadFinished = false;
- bool Finished = false;
- bool Cancelled = false;
- bool FinishedOk = false;
-};
-
+ bool ReadActive = false;
+ bool ReadFinished = false;
+ bool Finished = false;
+ bool Cancelled = false;
+ bool FinishedOk = false;
+};
+
template<class TRequest, class TResponse>
-using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;
+using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;
template<class TStub, class TRequest, class TResponse>
-class TStreamRequestReadWriteProcessor
- : public IStreamRequestReadWriteProcessor<TRequest, TResponse>
- , public TGRpcRequestProcessorCommon {
+class TStreamRequestReadWriteProcessor
+ : public IStreamRequestReadWriteProcessor<TRequest, TResponse>
+ , public TGRpcRequestProcessorCommon {
public:
- using TSelf = TStreamRequestReadWriteProcessor;
- using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;
+ using TSelf = TStreamRequestReadWriteProcessor;
+ using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;
using TPtr = TIntrusivePtr<TSelf>;
using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
using TReadCallback = typename TBase::TReadCallback;
@@ -838,7 +838,7 @@ public:
using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
- explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
+ explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
: ConnectedCallback(std::move(callback))
{
Y_VERIFY(ConnectedCallback, "Missing connected callback");
@@ -847,8 +847,8 @@ public:
void Cancel() override {
Context.TryCancel();
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Cancelled = true;
if (Started && !(ReadFinished && WriteFinished)) {
if (!ReadActive) {
@@ -867,8 +867,8 @@ public:
void Write(TRequest&& request, TWriteCallback callback) override {
TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
if (Cancelled || ReadFinished || WriteFinished) {
status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
} else if (WriteActive) {
@@ -887,11 +887,11 @@ public:
}
}
- void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
+ void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished && !HasInitialMetadata) {
ReadActive = true;
@@ -919,8 +919,8 @@ public:
void Read(TResponse* message, TReadCallback callback) override {
TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished) {
ReadActive = true;
@@ -947,8 +947,8 @@ public:
void Finish(TReadCallback callback) override {
TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
if (!Finished) {
ReadActive = true;
@@ -979,8 +979,8 @@ public:
TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
if (!Finished) {
FinishedCallbacks.emplace_back().swap(callback);
return;
@@ -1010,8 +1010,8 @@ private:
return;
}
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
LocalContext = context;
Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
}
@@ -1025,8 +1025,8 @@ private:
void OnConnected(bool ok) {
TConnectedCallback callback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Started = true;
if (!ok || Cancelled) {
ReadFinished = true;
@@ -1045,10 +1045,10 @@ private:
void OnReadDone(bool ok) {
TGrpcStatus status;
TReadCallback callback;
- std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
+ std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Y_VERIFY(ReadActive, "Unexpected Read done callback");
Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
@@ -1085,8 +1085,8 @@ private:
void OnWriteDone(bool ok) {
TWriteCallback okCallback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Y_VERIFY(WriteActive, "Unexpected Write done callback");
Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag");
@@ -1107,7 +1107,7 @@ private:
if (ReadFinished) {
Stream->Finish(&Status, OnFinishedTag.Prepare());
}
- } else if (!WriteQueue.empty()) {
+ } else if (!WriteQueue.empty()) {
WriteCallback.swap(WriteQueue.front().Callback);
Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
WriteQueue.pop_front();
@@ -1127,14 +1127,14 @@ private:
void OnFinished(bool ok) {
TGrpcStatus status;
- std::deque<TWriteItem> writesDropped;
- std::vector<TReadCallback> finishedCallbacks;
+ std::deque<TWriteItem> writesDropped;
+ std::vector<TReadCallback> finishedCallbacks;
TConnectedCallback connectedCallback;
TReadCallback readCallback;
TReadCallback finishCallback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
Finished = true;
FinishedOk = ok;
LocalContext.reset();
@@ -1211,15 +1211,15 @@ private:
TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
private:
- std::mutex Mutex;
+ std::mutex Mutex;
TAsyncReaderWriterPtr Stream;
TConnectedCallback ConnectedCallback;
TReadCallback ReadCallback;
TReadCallback FinishCallback;
- std::vector<TReadCallback> FinishedCallbacks;
- std::deque<TWriteItem> WriteQueue;
+ std::vector<TReadCallback> FinishedCallbacks;
+ std::deque<TWriteItem> WriteQueue;
TWriteCallback WriteCallback;
- std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
+ std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
bool Started = false;
bool HasInitialMetadata = false;
bool ReadActive = false;
@@ -1231,30 +1231,30 @@ private:
bool FinishedOk = false;
};
-class TGRpcClientLow;
-
-template<typename TGRpcService>
-class TServiceConnection {
- using TStub = typename TGRpcService::Stub;
- friend class TGRpcClientLow;
-
-public:
- /*
- * Start simple request
- */
- template<typename TRequest, typename TResponse>
- void DoRequest(const TRequest& request,
+class TGRpcClientLow;
+
+template<typename TGRpcService>
+class TServiceConnection {
+ using TStub = typename TGRpcService::Stub;
+ friend class TGRpcClientLow;
+
+public:
+ /*
+ * Start simple request
+ */
+ template<typename TRequest, typename TResponse>
+ void DoRequest(const TRequest& request,
TResponseCallback<TResponse> callback,
typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
+ const TCallMeta& metas = { },
IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
- }
-
- /*
+ {
+ auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
+ processor->ApplyMeta(metas);
+ processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
+ }
+
+ /*
* Start simple request
*/
template<typename TRequest, typename TResponse>
@@ -1270,58 +1270,58 @@ public:
}
/*
- * Start bidirectional streamming
- */
- template<typename TRequest, typename TResponse>
- void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback,
- typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);
- }
-
- /*
- * Start streaming response reading (one request, many responses)
- */
+ * Start bidirectional streamming
+ */
+ template<typename TRequest, typename TResponse>
+ void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback,
+ typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
+ const TCallMeta& metas = { },
+ IQueueClientContextProvider* provider = nullptr)
+ {
+ auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback));
+ processor->ApplyMeta(metas);
+ processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);
+ }
+
+ /*
+ * Start streaming response reading (one request, many responses)
+ */
template<typename TRequest, typename TResponse>
- void DoStreamRequest(const TRequest& request,
- TStreamReaderCallback<TResponse> callback,
- typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
+ void DoStreamRequest(const TRequest& request,
+ TStreamReaderCallback<TResponse> callback,
+ typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
const TCallMeta& metas = { },
IQueueClientContextProvider* provider = nullptr)
{
- auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));
+ auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));
processor->ApplyMeta(metas);
- processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);
+ processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);
}
-private:
- TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,
+private:
+ TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,
+ IQueueClientContextProvider* provider)
+ : Stub_(TGRpcService::NewStub(ci))
+ , Provider_(provider)
+ {
+ Y_VERIFY(Provider_, "Connection does not have a queue provider");
+ }
+
+ TServiceConnection(TStubsHolder& holder,
IQueueClientContextProvider* provider)
- : Stub_(TGRpcService::NewStub(ci))
+ : Stub_(holder.GetOrCreateStub<TStub>())
, Provider_(provider)
{
Y_VERIFY(Provider_, "Connection does not have a queue provider");
}
- TServiceConnection(TStubsHolder& holder,
- IQueueClientContextProvider* provider)
- : Stub_(holder.GetOrCreateStub<TStub>())
- , Provider_(provider)
- {
- Y_VERIFY(Provider_, "Connection does not have a queue provider");
- }
-
- std::shared_ptr<TStub> Stub_;
+ std::shared_ptr<TStub> Stub_;
IQueueClientContextProvider* Provider_;
-};
-
-class TGRpcClientLow
+};
+
+class TGRpcClientLow
: public IQueueClientContextProvider
-{
+{
class TContextImpl;
friend class TContextImpl;
@@ -1331,10 +1331,10 @@ class TGRpcClientLow
STOP_EXPLICIT = 2,
};
-public:
+public:
explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
- ~TGRpcClientLow();
-
+ ~TGRpcClientLow();
+
// Tries to stop all currently running requests (via their stop callbacks)
// Will shutdown CQ and drain events once all requests have finished
// No new requests may be started after this call
@@ -1357,24 +1357,24 @@ public:
IQueueClientContextPtr CreateContext() override;
- template<typename TGRpcService>
- std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {
- return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
- }
-
- template<typename TGRpcService>
- std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) {
- return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));
- }
-
+ template<typename TGRpcService>
+ std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {
+ return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
+ }
+
+ template<typename TGRpcService>
+ std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) {
+ return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));
+ }
+
// Tests only, not thread-safe
void AddWorkerThreadForTest();
-private:
+private:
using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;
using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
- void Init(size_t numWorkerThread);
-
+ void Init(size_t numWorkerThread);
+
inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
@@ -1385,15 +1385,15 @@ private:
private:
bool UseCompletionQueuePerThread_;
- std::vector<CompletionQueueRef> CQS_;
- std::vector<IThreadRef> WorkerThreads_;
+ std::vector<CompletionQueueRef> CQS_;
+ std::vector<IThreadRef> WorkerThreads_;
TAtomic CqState_ = -1;
- std::mutex Mtx_;
- std::condition_variable ContextsEmpty_;
- std::unordered_set<TContextImpl*> Contexts_;
+ std::mutex Mtx_;
+ std::condition_variable ContextsEmpty_;
+ std::unordered_set<TContextImpl*> Contexts_;
+
+ std::mutex JoinMutex_;
+};
- std::mutex JoinMutex_;
-};
-
} // namespace NGRpc
diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h
index ac62e8b331..ffcdafe045 100644
--- a/library/cpp/grpc/client/grpc_common.h
+++ b/library/cpp/grpc/client/grpc_common.h
@@ -1,53 +1,53 @@
-#pragma once
-
-#include <grpc++/grpc++.h>
+#pragma once
+
+#include <grpc++/grpc++.h>
#include <grpc++/resource_quota.h>
-
+
#include <util/datetime/base.h>
-#include <unordered_map>
-#include <util/generic/string.h>
-
-constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
-
+#include <unordered_map>
+#include <util/generic/string.h>
+
+constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
+
namespace NGrpc {
-
-struct TGRpcClientConfig {
- TString Locator; // format host:port
- TDuration Timeout = TDuration::Max(); // request timeout
- ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size
+
+struct TGRpcClientConfig {
+ TString Locator; // format host:port
+ TDuration Timeout = TDuration::Max(); // request timeout
+ ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size
ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests
ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests
- ui32 MaxInFlight = 0;
+ ui32 MaxInFlight = 0;
bool EnableSsl = false;
TString SslCaCert; //Implicitly enables Ssl if not empty
grpc_compression_algorithm CompressionAlgoritm = GRPC_COMPRESS_NONE;
ui64 MemQuota = 0;
- std::unordered_map<TString, TString> StringChannelParams;
- std::unordered_map<TString, int> IntChannelParams;
+ std::unordered_map<TString, TString> StringChannelParams;
+ std::unordered_map<TString, int> IntChannelParams;
TString LoadBalancingPolicy = { };
TString SslTargetNameOverride = { };
-
- TGRpcClientConfig() = default;
- TGRpcClientConfig(const TGRpcClientConfig&) = default;
- TGRpcClientConfig(TGRpcClientConfig&&) = default;
- TGRpcClientConfig& operator=(const TGRpcClientConfig&) = default;
- TGRpcClientConfig& operator=(TGRpcClientConfig&&) = default;
-
- TGRpcClientConfig(const TString& locator, TDuration timeout = TDuration::Max(),
+
+ TGRpcClientConfig() = default;
+ TGRpcClientConfig(const TGRpcClientConfig&) = default;
+ TGRpcClientConfig(TGRpcClientConfig&&) = default;
+ TGRpcClientConfig& operator=(const TGRpcClientConfig&) = default;
+ TGRpcClientConfig& operator=(TGRpcClientConfig&&) = default;
+
+ TGRpcClientConfig(const TString& locator, TDuration timeout = TDuration::Max(),
ui64 maxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT, ui32 maxInFlight = 0, TString caCert = "",
grpc_compression_algorithm compressionAlgorithm = GRPC_COMPRESS_NONE, bool enableSsl = false)
- : Locator(locator)
- , Timeout(timeout)
- , MaxMessageSize(maxMessageSize)
- , MaxInFlight(maxInFlight)
+ : Locator(locator)
+ , Timeout(timeout)
+ , MaxMessageSize(maxMessageSize)
+ , MaxInFlight(maxInFlight)
, EnableSsl(enableSsl)
- , SslCaCert(caCert)
+ , SslCaCert(caCert)
, CompressionAlgoritm(compressionAlgorithm)
- {}
-};
-
-inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){
- grpc::ChannelArguments args;
+ {}
+};
+
+inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){
+ grpc::ChannelArguments args;
args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize);
args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize);
args.SetCompressionAlgorithm(config.CompressionAlgoritm);
@@ -65,9 +65,9 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp
quota.Resize(config.MemQuota);
args.SetResourceQuota(quota);
}
- if (mutator) {
- args.SetSocketMutator(mutator);
- }
+ if (mutator) {
+ args.SetSocketMutator(mutator);
+ }
if (!config.LoadBalancingPolicy.empty()) {
args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy);
}
@@ -75,10 +75,10 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp
args.SetSslTargetNameOverride(config.SslTargetNameOverride);
}
if (config.EnableSsl || config.SslCaCert) {
- return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args);
- } else {
- return grpc::CreateCustomChannel(config.Locator, grpc::InsecureChannelCredentials(), args);
- }
-}
-
+ return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args);
+ } else {
+ return grpc::CreateCustomChannel(config.Locator, grpc::InsecureChannelCredentials(), args);
+ }
+}
+
} // namespace NGRpc
diff --git a/library/cpp/grpc/client/ya.make b/library/cpp/grpc/client/ya.make
index 11f36aa94f..a4e74b067c 100644
--- a/library/cpp/grpc/client/ya.make
+++ b/library/cpp/grpc/client/ya.make
@@ -6,11 +6,11 @@ OWNER(
)
SRCS(
- grpc_client_low.cpp
+ grpc_client_low.cpp
)
PEERDIR(
- contrib/libs/grpc
+ contrib/libs/grpc
)
END()
diff --git a/library/cpp/grpc/server/event_callback.cpp b/library/cpp/grpc/server/event_callback.cpp
index 559a472807..f423836bd6 100644
--- a/library/cpp/grpc/server/event_callback.cpp
+++ b/library/cpp/grpc/server/event_callback.cpp
@@ -1 +1 @@
-#include "event_callback.h"
+#include "event_callback.h"
diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h
index c0e16ee504..d0b700b3c9 100644
--- a/library/cpp/grpc/server/event_callback.h
+++ b/library/cpp/grpc/server/event_callback.h
@@ -1,80 +1,80 @@
-#pragma once
-
-#include "grpc_server.h"
-
+#pragma once
+
+#include "grpc_server.h"
+
namespace NGrpc {
-
-enum class EQueueEventStatus {
- OK,
- ERROR
-};
-
-template<class TCallback>
+
+enum class EQueueEventStatus {
+ OK,
+ ERROR
+};
+
+template<class TCallback>
class TQueueEventCallback: public IQueueEvent {
-public:
- TQueueEventCallback(const TCallback& callback)
- : Callback(callback)
- {}
-
- TQueueEventCallback(TCallback&& callback)
- : Callback(std::move(callback))
- {}
-
- bool Execute(bool ok) override {
- Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);
- return false;
- }
-
- void DestroyRequest() override {
- delete this;
- }
-
-private:
- TCallback Callback;
-};
-
+public:
+ TQueueEventCallback(const TCallback& callback)
+ : Callback(callback)
+ {}
+
+ TQueueEventCallback(TCallback&& callback)
+ : Callback(std::move(callback))
+ {}
+
+ bool Execute(bool ok) override {
+ Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);
+ return false;
+ }
+
+ void DestroyRequest() override {
+ delete this;
+ }
+
+private:
+ TCallback Callback;
+};
+
// Implementation of IQueueEvent that reduces allocations
-template<class TSelf>
+template<class TSelf>
class TQueueFixedEvent: private IQueueEvent {
- using TCallback = void (TSelf::*)(EQueueEventStatus);
-
-public:
- TQueueFixedEvent(TSelf* self, TCallback callback)
- : Self(self)
- , Callback(callback)
- { }
-
+ using TCallback = void (TSelf::*)(EQueueEventStatus);
+
+public:
+ TQueueFixedEvent(TSelf* self, TCallback callback)
+ : Self(self)
+ , Callback(callback)
+ { }
+
IQueueEvent* Prepare() {
- Self->Ref();
- return this;
- }
-
-private:
- bool Execute(bool ok) override {
- ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);
- return false;
- }
-
- void DestroyRequest() override {
- Self->UnRef();
- }
-
-private:
- TSelf* const Self;
- TCallback const Callback;
-};
-
-template<class TCallback>
+ Self->Ref();
+ return this;
+ }
+
+private:
+ bool Execute(bool ok) override {
+ ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);
+ return false;
+ }
+
+ void DestroyRequest() override {
+ Self->UnRef();
+ }
+
+private:
+ TSelf* const Self;
+ TCallback const Callback;
+};
+
+template<class TCallback>
inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) {
- return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback));
-}
-
-template<class T>
+ return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback));
+}
+
+template<class T>
inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) {
- using TPtr = TIntrusivePtr<T>;
- return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) {
- ((*self).*method)(status);
- });
-}
-
+ using TPtr = TIntrusivePtr<T>;
+ return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) {
+ ((*self).*method)(status);
+ });
+}
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h
index d170e2ef34..51356d4ce5 100644
--- a/library/cpp/grpc/server/grpc_async_ctx_base.h
+++ b/library/cpp/grpc/server/grpc_async_ctx_base.h
@@ -1,48 +1,48 @@
-#pragma once
-
-#include "grpc_server.h"
-
-#include <util/generic/vector.h>
-#include <util/generic/string.h>
-#include <util/system/yassert.h>
+#pragma once
+
+#include "grpc_server.h"
+
+#include <util/generic/vector.h>
+#include <util/generic/string.h>
+#include <util/system/yassert.h>
#include <util/generic/set.h>
-
-#include <grpc++/server.h>
-#include <grpc++/server_context.h>
-
-#include <chrono>
-
+
+#include <grpc++/server.h>
+#include <grpc++/server_context.h>
+
+#include <chrono>
+
namespace NGrpc {
-
-template<typename TService>
+
+template<typename TService>
class TBaseAsyncContext: public ICancelableContext {
-public:
- TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)
- : Service(service)
- , CQ(cq)
- {
- }
-
- TString GetPeerName() const {
- return TString(Context.peer());
- }
-
- TInstant Deadline() const {
- // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline
+public:
+ TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)
+ : Service(service)
+ , CQ(cq)
+ {
+ }
+
+ TString GetPeerName() const {
+ return TString(Context.peer());
+ }
+
+ TInstant Deadline() const {
+ // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline
// right before the request is getting to be send.
// 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
//
- // After this timeout calculated back to the deadline on the server side
- // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).
- // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME
- //
-
- std::chrono::system_clock::time_point t = Context.deadline();
- if (t == std::chrono::system_clock::time_point::max()) {
- return TInstant::Max();
+ // After this timeout calculated back to the deadline on the server side
+ // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).
+ // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME
+ //
+
+ std::chrono::system_clock::time_point t = Context.deadline();
+ if (t == std::chrono::system_clock::time_point::max()) {
+ return TInstant::Max();
}
- auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);
- return TInstant::MicroSeconds(us.time_since_epoch().count());
+ auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);
+ return TInstant::MicroSeconds(us.time_since_epoch().count());
}
TSet<TStringBuf> GetPeerMetaKeys() const {
@@ -54,7 +54,7 @@ public:
}
TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {
- const auto& clientMetadata = Context.client_metadata();
+ const auto& clientMetadata = Context.client_metadata();
const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});
if (range.first == range.second) {
return {};
@@ -63,32 +63,32 @@ public:
TVector<TStringBuf> values;
values.reserve(std::distance(range.first, range.second));
- for (auto it = range.first; it != range.second; ++it) {
+ for (auto it = range.first; it != range.second; ++it) {
values.emplace_back(it->second.data(), it->second.size());
- }
+ }
return values;
- }
-
+ }
+
grpc_compression_level GetCompressionLevel() const {
return Context.compression_level();
}
- void Shutdown() override {
- // Shutdown may only be called after request has started successfully
- if (Context.c_call())
- Context.TryCancel();
- }
-
-protected:
- //! The means of communication with the gRPC runtime for an asynchronous
- //! server.
- typename TService::TCurrentGRpcService::AsyncService* const Service;
- //! The producer-consumer queue where for asynchronous server notifications.
- grpc::ServerCompletionQueue* const CQ;
- //! Context for the rpc, allowing to tweak aspects of it such as the use
- //! of compression, authentication, as well as to send metadata back to the
- //! client.
- grpc::ServerContext Context;
-};
-
+ void Shutdown() override {
+ // Shutdown may only be called after request has started successfully
+ if (Context.c_call())
+ Context.TryCancel();
+ }
+
+protected:
+ //! The means of communication with the gRPC runtime for an asynchronous
+ //! server.
+ typename TService::TCurrentGRpcService::AsyncService* const Service;
+ //! The producer-consumer queue where for asynchronous server notifications.
+ grpc::ServerCompletionQueue* const CQ;
+ //! Context for the rpc, allowing to tweak aspects of it such as the use
+ //! of compression, authentication, as well as to send metadata back to the
+ //! client.
+ grpc::ServerContext Context;
+};
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp
index f3a0b2b5e9..fa96e0100b 100644
--- a/library/cpp/grpc/server/grpc_counters.cpp
+++ b/library/cpp/grpc/server/grpc_counters.cpp
@@ -1,4 +1,4 @@
-#include "grpc_counters.h"
+#include "grpc_counters.h"
namespace NGrpc {
namespace {
diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h
index 25b7c28369..0b6c36c84c 100644
--- a/library/cpp/grpc/server/grpc_counters.h
+++ b/library/cpp/grpc/server/grpc_counters.h
@@ -1,11 +1,11 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <util/generic/ptr.h>
-
+#include <util/generic/ptr.h>
+
namespace NGrpc {
-
+
struct ICounterBlock : public TThrRefBase {
virtual void CountNotOkRequest() = 0;
virtual void CountNotOkResponse() = 0;
@@ -15,9 +15,9 @@ struct ICounterBlock : public TThrRefBase {
virtual void CountResponseBytes(ui32 responseSize) = 0;
virtual void StartProcessing(ui32 requestSize) = 0;
virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0;
- virtual void CountRequestsWithoutDatabase() {}
- virtual void CountRequestsWithoutToken() {}
- virtual void CountRequestWithoutTls() {}
+ virtual void CountRequestsWithoutDatabase() {}
+ virtual void CountRequestsWithoutToken() {}
+ virtual void CountRequestWithoutTls() {}
virtual TIntrusivePtr<ICounterBlock> Clone() { return this; }
virtual void UseDatabase(const TString& database) { Y_UNUSED(database); }
@@ -26,62 +26,62 @@ struct ICounterBlock : public TThrRefBase {
using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>;
class TCounterBlock final : public ICounterBlock {
- NMonitoring::TDynamicCounters::TCounterPtr TotalCounter;
- NMonitoring::TDynamicCounters::TCounterPtr InflyCounter;
- NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter;
- NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter;
- NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
- NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes;
- NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes;
- NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated;
- NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted;
+ NMonitoring::TDynamicCounters::TCounterPtr TotalCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr InflyCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated;
+ NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted;
bool Percentile = false;
NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs;
- std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters;
-
-public:
- TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter,
- NMonitoring::TDynamicCounters::TCounterPtr inflyCounter,
- NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter,
- NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter,
- NMonitoring::TDynamicCounters::TCounterPtr requestBytes,
- NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes,
+ std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters;
+
+public:
+ TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter,
+ NMonitoring::TDynamicCounters::TCounterPtr inflyCounter,
+ NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter,
+ NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter,
+ NMonitoring::TDynamicCounters::TCounterPtr requestBytes,
+ NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes,
NMonitoring::TDynamicCounters::TCounterPtr responseBytes,
- NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated,
- NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted,
+ NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated,
+ NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted,
TIntrusivePtr<NMonitoring::TDynamicCounters> group)
- : TotalCounter(std::move(totalCounter))
- , InflyCounter(std::move(inflyCounter))
- , NotOkRequestCounter(std::move(notOkRequestCounter))
- , NotOkResponseCounter(std::move(notOkResponseCounter))
- , RequestBytes(std::move(requestBytes))
- , InflyRequestBytes(std::move(inflyRequestBytes))
- , ResponseBytes(std::move(responseBytes))
- , NotAuthenticated(std::move(notAuthenticated))
- , ResourceExhausted(std::move(resourceExhausted))
+ : TotalCounter(std::move(totalCounter))
+ , InflyCounter(std::move(inflyCounter))
+ , NotOkRequestCounter(std::move(notOkRequestCounter))
+ , NotOkResponseCounter(std::move(notOkResponseCounter))
+ , RequestBytes(std::move(requestBytes))
+ , InflyRequestBytes(std::move(inflyRequestBytes))
+ , ResponseBytes(std::move(responseBytes))
+ , NotAuthenticated(std::move(notAuthenticated))
+ , ResourceExhausted(std::move(resourceExhausted))
{
if (group) {
RequestHistMs.Initialize(group, "event", "request", "ms", {0.5f, 0.9f, 0.99f, 0.999f, 1.0f});
Percentile = true;
}
}
-
+
void CountNotOkRequest() override {
- NotOkRequestCounter->Inc();
- }
-
+ NotOkRequestCounter->Inc();
+ }
+
void CountNotOkResponse() override {
- NotOkResponseCounter->Inc();
- }
-
+ NotOkResponseCounter->Inc();
+ }
+
void CountNotAuthenticated() override {
- NotAuthenticated->Inc();
- }
-
+ NotAuthenticated->Inc();
+ }
+
void CountResourceExhausted() override {
- ResourceExhausted->Inc();
- }
-
+ ResourceExhausted->Inc();
+ }
+
void CountRequestBytes(ui32 requestSize) override {
*RequestBytes += requestSize;
}
@@ -91,27 +91,27 @@ public:
}
void StartProcessing(ui32 requestSize) override {
- TotalCounter->Inc();
- InflyCounter->Inc();
- *RequestBytes += requestSize;
- *InflyRequestBytes += requestSize;
- }
-
+ TotalCounter->Inc();
+ InflyCounter->Inc();
+ *RequestBytes += requestSize;
+ *InflyRequestBytes += requestSize;
+ }
+
void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status,
TDuration requestDuration) override
{
Y_UNUSED(status);
- InflyCounter->Dec();
- *InflyRequestBytes -= requestSize;
- *ResponseBytes += responseSize;
- if (!ok) {
- NotOkResponseCounter->Inc();
- }
+ InflyCounter->Dec();
+ *InflyRequestBytes -= requestSize;
+ *ResponseBytes += responseSize;
+ if (!ok) {
+ NotOkResponseCounter->Inc();
+ }
if (Percentile) {
RequestHistMs.Increment(requestDuration.MilliSeconds());
}
- }
+ }
ICounterBlockPtr Clone() override {
return this;
@@ -122,10 +122,10 @@ public:
RequestHistMs.Update();
}
}
-};
-
-using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>;
-
+};
+
+using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>;
+
/**
* Creates new instance of ICounterBlock implementation which does nothing.
*
diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp
index 33264fe6f2..d18a32776f 100644
--- a/library/cpp/grpc/server/grpc_request.cpp
+++ b/library/cpp/grpc/server/grpc_request.cpp
@@ -1,59 +1,59 @@
-#include "grpc_request.h"
-
+#include "grpc_request.h"
+
namespace NGrpc {
-
-const char* GRPC_USER_AGENT_HEADER = "user-agent";
-
+
+const char* GRPC_USER_AGENT_HEADER = "user-agent";
+
class TStreamAdaptor: public IStreamAdaptor {
-public:
- TStreamAdaptor()
- : StreamIsReady_(true)
- {}
-
- void Enqueue(std::function<void()>&& fn, bool urgent) override {
- with_lock(Mtx_) {
- if (!UrgentQueue_.empty() || !NormalQueue_.empty()) {
- Y_VERIFY(!StreamIsReady_);
- }
- auto& queue = urgent ? UrgentQueue_ : NormalQueue_;
- if (StreamIsReady_ && queue.empty()) {
- StreamIsReady_ = false;
- } else {
- queue.push_back(std::move(fn));
- return;
- }
- }
- fn();
- }
-
- size_t ProcessNext() override {
- size_t left = 0;
- std::function<void()> fn;
- with_lock(Mtx_) {
- Y_VERIFY(!StreamIsReady_);
- auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_;
- if (queue.empty()) {
- // Both queues are empty
- StreamIsReady_ = true;
- } else {
- fn = std::move(queue.front());
- queue.pop_front();
- left = UrgentQueue_.size() + NormalQueue_.size();
- }
- }
- if (fn)
- fn();
- return left;
- }
-private:
- bool StreamIsReady_;
- TList<std::function<void()>> NormalQueue_;
- TList<std::function<void()>> UrgentQueue_;
- TMutex Mtx_;
-};
-
-IStreamAdaptor::TPtr CreateStreamAdaptor() {
- return std::make_unique<TStreamAdaptor>();
-}
-
+public:
+ TStreamAdaptor()
+ : StreamIsReady_(true)
+ {}
+
+ void Enqueue(std::function<void()>&& fn, bool urgent) override {
+ with_lock(Mtx_) {
+ if (!UrgentQueue_.empty() || !NormalQueue_.empty()) {
+ Y_VERIFY(!StreamIsReady_);
+ }
+ auto& queue = urgent ? UrgentQueue_ : NormalQueue_;
+ if (StreamIsReady_ && queue.empty()) {
+ StreamIsReady_ = false;
+ } else {
+ queue.push_back(std::move(fn));
+ return;
+ }
+ }
+ fn();
+ }
+
+ size_t ProcessNext() override {
+ size_t left = 0;
+ std::function<void()> fn;
+ with_lock(Mtx_) {
+ Y_VERIFY(!StreamIsReady_);
+ auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_;
+ if (queue.empty()) {
+ // Both queues are empty
+ StreamIsReady_ = true;
+ } else {
+ fn = std::move(queue.front());
+ queue.pop_front();
+ left = UrgentQueue_.size() + NormalQueue_.size();
+ }
+ }
+ if (fn)
+ fn();
+ return left;
+ }
+private:
+ bool StreamIsReady_;
+ TList<std::function<void()>> NormalQueue_;
+ TList<std::function<void()>> UrgentQueue_;
+ TMutex Mtx_;
+};
+
+IStreamAdaptor::TPtr CreateStreamAdaptor() {
+ return std::make_unique<TStreamAdaptor>();
+}
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index 2841069d62..5bd8d3902b 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -1,118 +1,118 @@
-#pragma once
-
+#pragma once
+
#include <google/protobuf/text_format.h>
#include <google/protobuf/arena.h>
#include <google/protobuf/message.h>
-
+
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/logger/priority.h>
-
+
#include "grpc_response.h"
-#include "event_callback.h"
+#include "event_callback.h"
#include "grpc_async_ctx_base.h"
#include "grpc_counters.h"
-#include "grpc_request_base.h"
-#include "grpc_server.h"
+#include "grpc_request_base.h"
+#include "grpc_server.h"
#include "logger.h"
-
+
#include <util/system/hp_timer.h>
-#include <grpc++/server.h>
-#include <grpc++/server_context.h>
+#include <grpc++/server.h>
+#include <grpc++/server_context.h>
#include <grpc++/support/async_stream.h>
#include <grpc++/support/async_unary_call.h>
-#include <grpc++/support/byte_buffer.h>
-#include <grpc++/impl/codegen/async_stream.h>
-
+#include <grpc++/support/byte_buffer.h>
+#include <grpc++/impl/codegen/async_stream.h>
+
namespace NGrpc {
-
-class IStreamAdaptor {
-public:
- using TPtr = std::unique_ptr<IStreamAdaptor>;
- virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0;
- virtual size_t ProcessNext() = 0;
- virtual ~IStreamAdaptor() = default;
-};
-
-IStreamAdaptor::TPtr CreateStreamAdaptor();
-
-///////////////////////////////////////////////////////////////////////////////
+
+class IStreamAdaptor {
+public:
+ using TPtr = std::unique_ptr<IStreamAdaptor>;
+ virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0;
+ virtual size_t ProcessNext() = 0;
+ virtual ~IStreamAdaptor() = default;
+};
+
+IStreamAdaptor::TPtr CreateStreamAdaptor();
+
+///////////////////////////////////////////////////////////////////////////////
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter>
class TGRpcRequestImpl
- : public TBaseAsyncContext<TService>
- , public IQueueEvent
- , public IRequestContextBase
-{
+ : public TBaseAsyncContext<TService>
+ , public IQueueEvent
+ , public IRequestContextBase
+{
using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-public:
- using TOnRequest = std::function<void (IRequestContextBase* ctx)>;
- using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
- grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
- using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
- grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
-
+public:
+ using TOnRequest = std::function<void (IRequestContextBase* ctx)>;
+ using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
+ grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
+ using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
+ grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
+
TGRpcRequestImpl(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- TOnRequest cb,
- TRequestCallback requestCallback,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ TOnRequest cb,
+ TRequestCallback requestCallback,
const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
- : TBaseAsyncContext<TService>(service, cq)
- , Server_(server)
- , Cb_(cb)
- , RequestCallback_(requestCallback)
- , StreamRequestCallback_(nullptr)
- , Name_(name)
+ : TBaseAsyncContext<TService>(service, cq)
+ , Server_(server)
+ , Cb_(cb)
+ , RequestCallback_(requestCallback)
+ , StreamRequestCallback_(nullptr)
+ , Name_(name)
, Logger_(std::move(logger))
- , Counters_(std::move(counters))
+ , Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
- {
- AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
- Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ {
+ AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
+ Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
+ Y_VERIFY(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
- FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
- }
-
+ FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
+ }
+
TGRpcRequestImpl(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- TOnRequest cb,
- TStreamRequestCallback requestCallback,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ TOnRequest cb,
+ TStreamRequestCallback requestCallback,
const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
- : TBaseAsyncContext<TService>(service, cq)
- , Server_(server)
- , Cb_(cb)
- , RequestCallback_(nullptr)
- , StreamRequestCallback_(requestCallback)
- , Name_(name)
+ : TBaseAsyncContext<TService>(service, cq)
+ , Server_(server)
+ , Cb_(cb)
+ , RequestCallback_(nullptr)
+ , StreamRequestCallback_(requestCallback)
+ , Name_(name)
, Logger_(std::move(logger))
- , Counters_(std::move(counters))
+ , Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
- , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
+ , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
- {
- AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
- Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ {
+ AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
+ Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
+ Y_VERIFY(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
- FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
- StreamAdaptor_ = CreateStreamAdaptor();
- }
-
- TAsyncFinishResult GetFinishFuture() override {
- return FinishPromise_.GetFuture();
- }
-
+ FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
+ StreamAdaptor_ = CreateStreamAdaptor();
+ }
+
+ TAsyncFinishResult GetFinishFuture() override {
+ return FinishPromise_.GetFuture();
+ }
+
TString GetPeer() const override {
return TString(this->Context.peer());
}
@@ -121,7 +121,7 @@ public:
return Server_->SslServer();
}
- void Run() {
+ void Run() {
// Start request unless server is shutting down
if (auto guard = Server_->ProtectShutdown()) {
Ref(); //For grpc c runtime
@@ -135,28 +135,28 @@ public:
(&this->Context, Request_,
reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag());
}
- }
- }
-
+ }
+ }
+
~TGRpcRequestImpl() {
- // No direct dtor call allowed
- Y_ASSERT(RefCount() == 0);
- }
-
- bool Execute(bool ok) override {
- return (this->*StateFunc_)(ok);
- }
-
- void DestroyRequest() override {
+ // No direct dtor call allowed
+ Y_ASSERT(RefCount() == 0);
+ }
+
+ bool Execute(bool ok) override {
+ return (this->*StateFunc_)(ok);
+ }
+
+ void DestroyRequest() override {
if (RequestRegistered_) {
Server_->DeregisterRequestCtx(this);
RequestRegistered_ = false;
}
- UnRef();
- }
-
- TInstant Deadline() const override {
- return TBaseAsyncContext<TService>::Deadline();
+ UnRef();
+ }
+
+ TInstant Deadline() const override {
+ return TBaseAsyncContext<TService>::Deadline();
}
TSet<TStringBuf> GetPeerMetaKeys() const override {
@@ -164,299 +164,299 @@ public:
}
TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
- return TBaseAsyncContext<TService>::GetPeerMetaValues(key);
- }
-
+ return TBaseAsyncContext<TService>::GetPeerMetaValues(key);
+ }
+
grpc_compression_level GetCompressionLevel() const override {
return TBaseAsyncContext<TService>::GetCompressionLevel();
}
- //! Get pointer to the request's message.
- const NProtoBuf::Message* GetRequest() const override {
- return Request_;
- }
-
- TAuthState& GetAuthState() override {
- return AuthState_;
- }
-
+ //! Get pointer to the request's message.
+ const NProtoBuf::Message* GetRequest() const override {
+ return Request_;
+ }
+
+ TAuthState& GetAuthState() override {
+ return AuthState_;
+ }
+
void Reply(NProtoBuf::Message* resp, ui32 status) override {
ResponseStatus = status;
- WriteDataOk(resp);
- }
-
+ WriteDataOk(resp);
+ }
+
void Reply(grpc::ByteBuffer* resp, ui32 status) override {
ResponseStatus = status;
- WriteByteDataOk(resp);
- }
-
+ WriteByteDataOk(resp);
+ }
+
void ReplyError(grpc::StatusCode code, const TString& msg) override {
- FinishGrpcStatus(code, msg, false);
- }
-
- void ReplyUnauthenticated(const TString& in) override {
- const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;
- FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false);
- }
-
- void SetNextReplyCallback(TOnNextReply&& cb) override {
- NextReplyCb_ = cb;
- }
-
+ FinishGrpcStatus(code, msg, false);
+ }
+
+ void ReplyUnauthenticated(const TString& in) override {
+ const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;
+ FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false);
+ }
+
+ void SetNextReplyCallback(TOnNextReply&& cb) override {
+ NextReplyCb_ = cb;
+ }
+
void AddTrailingMetadata(const TString& key, const TString& value) override {
- this->Context.AddTrailingMetadata(key, value);
- }
-
- void FinishStreamingOk() override {
- GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_,
- this->Context.peer().c_str());
- auto cb = [this]() {
- StateFunc_ = &TThis::SetFinishDone;
- GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_,
- this->Context.peer().c_str());
-
- StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
-
- google::protobuf::Arena* GetArena() override {
- return &Arena_;
- }
-
+ this->Context.AddTrailingMetadata(key, value);
+ }
+
+ void FinishStreamingOk() override {
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_,
+ this->Context.peer().c_str());
+ auto cb = [this]() {
+ StateFunc_ = &TThis::SetFinishDone;
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_,
+ this->Context.peer().c_str());
+
+ StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag());
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), false);
+ }
+
+ google::protobuf::Arena* GetArena() override {
+ return &Arena_;
+ }
+
void UseDatabase(const TString& database) override {
Counters_->UseDatabase(database);
}
-private:
- void Clone() {
- if (!Server_->IsShuttingDown()) {
- if (RequestCallback_) {
+private:
+ void Clone() {
+ if (!Server_->IsShuttingDown()) {
+ if (RequestCallback_) {
MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
- } else {
+ } else {
MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
- }
- }
- }
-
- void WriteDataOk(NProtoBuf::Message* resp) {
- auto makeResponseString = [&] {
- TString x;
+ }
+ }
+ }
+
+ void WriteDataOk(NProtoBuf::Message* resp) {
+ auto makeResponseString = [&] {
+ TString x;
TOutProtoPrinter printer;
- printer.SetSingleLineMode(true);
- printer.PrintToString(*resp, &x);
- return x;
- };
-
- auto sz = (size_t)resp->ByteSize();
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,
- makeResponseString().data(), this->Context.peer().c_str());
+ printer.SetSingleLineMode(true);
+ printer.PrintToString(*resp, &x);
+ return x;
+ };
+
+ auto sz = (size_t)resp->ByteSize();
+ if (Writer_) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,
+ makeResponseString().data(), this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
- ResponseSize = sz;
- Y_VERIFY(this->Context.c_call());
+ ResponseSize = sz;
+ Y_VERIFY(this->Context.c_call());
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
- this, Name_, makeResponseString().data(), this->Context.peer().c_str());
+ } else {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
+ this, Name_, makeResponseString().data(), this->Context.peer().c_str());
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid message copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
- this, Name_, makeResponseString().data(), this->Context.peer().c_str());
- StateFunc_ = &TThis::NextReply;
- ResponseSize += sz;
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
+ this, Name_, makeResponseString().data(), this->Context.peer().c_str());
+ StateFunc_ = &TThis::NextReply;
+ ResponseSize += sz;
StreamWriter_->Write(*uResp, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
- }
-
- void WriteByteDataOk(grpc::ByteBuffer* resp) {
- auto sz = resp->Length();
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
- this->Context.peer().c_str());
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), false);
+ }
+ }
+
+ void WriteByteDataOk(grpc::ByteBuffer* resp) {
+ auto sz = resp->Length();
+ if (Writer_) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
+ this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
- ResponseSize = sz;
+ ResponseSize = sz;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
- this->Context.peer().c_str());
+ } else {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
+ this->Context.peer().c_str());
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid buffer copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
auto cb = [this, uResp = std::move(uResp), sz]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
- this, Name_, this->Context.peer().c_str());
- StateFunc_ = &TThis::NextReply;
- ResponseSize += sz;
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
+ this, Name_, this->Context.peer().c_str());
+ StateFunc_ = &TThis::NextReply;
+ ResponseSize += sz;
StreamWriter_->Write(*uResp, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
- }
-
- void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) {
- Y_VERIFY(code != grpc::OK);
- if (code == grpc::StatusCode::UNAUTHENTICATED) {
- Counters_->CountNotAuthenticated();
- } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {
- Counters_->CountResourceExhausted();
- }
-
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,
- Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
- StateFunc_ = &TThis::SetFinishError;
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), false);
+ }
+ }
+
+ void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) {
+ Y_VERIFY(code != grpc::OK);
+ if (code == grpc::StatusCode::UNAUTHENTICATED) {
+ Counters_->CountNotAuthenticated();
+ } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {
+ Counters_->CountResourceExhausted();
+ }
+
+ if (Writer_) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,
+ Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
+ StateFunc_ = &TThis::SetFinishError;
TOut resp;
Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
- " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
- auto cb = [this, code, msg]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
- " (pushed to grpc)", this, Name_, msg.c_str(),
- this->Context.peer().c_str(), (int)code);
- StateFunc_ = &TThis::SetFinishError;
- StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), urgent);
- }
- }
-
- bool SetRequestDone(bool ok) {
- auto makeRequestString = [&] {
- TString resp;
- if (ok) {
+ } else {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
+ " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
+ auto cb = [this, code, msg]() {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
+ " (pushed to grpc)", this, Name_, msg.c_str(),
+ this->Context.peer().c_str(), (int)code);
+ StateFunc_ = &TThis::SetFinishError;
+ StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag());
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), urgent);
+ }
+ }
+
+ bool SetRequestDone(bool ok) {
+ auto makeRequestString = [&] {
+ TString resp;
+ if (ok) {
TInProtoPrinter printer;
- printer.SetSingleLineMode(true);
- printer.PrintToString(*Request_, &resp);
- } else {
- resp = "<not ok>";
- }
- return resp;
- };
+ printer.SetSingleLineMode(true);
+ printer.PrintToString(*Request_, &resp);
+ } else {
+ resp = "<not ok>";
+ }
+ return resp;
+ };
GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,
ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
-
- if (this->Context.c_call() == nullptr) {
+
+ if (this->Context.c_call() == nullptr) {
Y_VERIFY(!ok);
- // One ref by OnFinishTag, grpc will not call this tag if no request received
- UnRef();
+ // One ref by OnFinishTag, grpc will not call this tag if no request received
+ UnRef();
} else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
// Request cannot be registered due to shutdown
// It's unsafe to continue, so drop this request without processing
GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
this->Context.TryCancel();
return false;
- }
-
- Clone(); // TODO: Request pool?
- if (!ok) {
- Counters_->CountNotOkRequest();
- return false;
- }
-
+ }
+
+ Clone(); // TODO: Request pool?
+ if (!ok) {
+ Counters_->CountNotOkRequest();
+ return false;
+ }
+
if (IncRequest()) {
- // Adjust counters.
- RequestSize = Request_->ByteSize();
- Counters_->StartProcessing(RequestSize);
+ // Adjust counters.
+ RequestSize = Request_->ByteSize();
+ Counters_->StartProcessing(RequestSize);
RequestTimer.Reset();
-
- if (!SslServer()) {
- Counters_->CountRequestWithoutTls();
- }
-
- //TODO: Move this in to grpc_request_proxy
+
+ if (!SslServer()) {
+ Counters_->CountRequestWithoutTls();
+ }
+
+ //TODO: Move this in to grpc_request_proxy
auto maybeDatabase = GetPeerMetaValues(TStringBuf("x-ydb-database"));
- if (maybeDatabase.empty()) {
- Counters_->CountRequestsWithoutDatabase();
- }
+ if (maybeDatabase.empty()) {
+ Counters_->CountRequestsWithoutDatabase();
+ }
auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket"));
- if (maybeToken.empty() || maybeToken[0].empty()) {
+ if (maybeToken.empty() || maybeToken[0].empty()) {
TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};
- Counters_->CountRequestsWithoutToken();
- GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "
- "Name# %s data# %s peer# %s database# %s", this, Name_,
- makeRequestString().data(), this->Context.peer().c_str(), db.c_str());
- }
-
- // Handle current request.
- Cb_(this);
- } else {
- //This request has not been counted
- SkipUpdateCountersOnError = true;
- FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true);
- }
- return true;
- }
-
- bool NextReply(bool ok) {
- auto logCb = [this, ok](int left) {
- GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str(), left);
- };
-
- if (!ok) {
- logCb(-1);
+ Counters_->CountRequestsWithoutToken();
+ GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "
+ "Name# %s data# %s peer# %s database# %s", this, Name_,
+ makeRequestString().data(), this->Context.peer().c_str(), db.c_str());
+ }
+
+ // Handle current request.
+ Cb_(this);
+ } else {
+ //This request has not been counted
+ SkipUpdateCountersOnError = true;
+ FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true);
+ }
+ return true;
+ }
+
+ bool NextReply(bool ok) {
+ auto logCb = [this, ok](int left) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_,
+ ok ? "true" : "false", this->Context.peer().c_str(), left);
+ };
+
+ if (!ok) {
+ logCb(-1);
DecRequest();
- Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
- TDuration::Seconds(RequestTimer.Passed()));
- return false;
- }
-
- Ref(); // To prevent destroy during this call in case of execution Finish
- size_t left = StreamAdaptor_->ProcessNext();
- logCb(left);
- if (NextReplyCb_) {
- NextReplyCb_(left);
- }
- // Now it is safe to destroy even if Finish was called
- UnRef();
- return true;
- }
-
- bool SetFinishDone(bool ok) {
+ Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
+ TDuration::Seconds(RequestTimer.Passed()));
+ return false;
+ }
+
+ Ref(); // To prevent destroy during this call in case of execution Finish
+ size_t left = StreamAdaptor_->ProcessNext();
+ logCb(left);
+ if (NextReplyCb_) {
+ NextReplyCb_(left);
+ }
+ // Now it is safe to destroy even if Finish was called
+ UnRef();
+ return true;
+ }
+
+ bool SetFinishDone(bool ok) {
GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str());
- //PrintBackTrace();
+ ok ? "true" : "false", this->Context.peer().c_str());
+ //PrintBackTrace();
DecRequest();
Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
TDuration::Seconds(RequestTimer.Passed()));
- return false;
- }
-
- bool SetFinishError(bool ok) {
+ return false;
+ }
+
+ bool SetFinishError(bool ok) {
GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str());
- if (!SkipUpdateCountersOnError) {
+ ok ? "true" : "false", this->Context.peer().c_str());
+ if (!SkipUpdateCountersOnError) {
DecRequest();
Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
TDuration::Seconds(RequestTimer.Passed()));
- }
- return false;
- }
-
- // Returns pointer to IQueueEvent to pass into grpc c runtime
- // Implicit C style cast from this to void* is wrong due to multiple inheritance
- void* GetGRpcTag() {
- return static_cast<IQueueEvent*>(this);
- }
-
- void OnFinish(EQueueEventStatus evStatus) {
- if (this->Context.IsCancelled()) {
- FinishPromise_.SetValue(EFinishStatus::CANCEL);
- } else {
- FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
- }
- }
-
+ }
+ return false;
+ }
+
+ // Returns pointer to IQueueEvent to pass into grpc c runtime
+ // Implicit C style cast from this to void* is wrong due to multiple inheritance
+ void* GetGRpcTag() {
+ return static_cast<IQueueEvent*>(this);
+ }
+
+ void OnFinish(EQueueEventStatus evStatus) {
+ if (this->Context.IsCancelled()) {
+ FinishPromise_.SetValue(EFinishStatus::CANCEL);
+ } else {
+ FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
+ }
+ }
+
bool IncRequest() {
if (!Server_->IncRequest())
return false;
@@ -480,36 +480,36 @@ private:
}
using TStateFunc = bool (TThis::*)(bool);
- TService* Server_;
- TOnRequest Cb_;
- TRequestCallback RequestCallback_;
- TStreamRequestCallback StreamRequestCallback_;
+ TService* Server_;
+ TOnRequest Cb_;
+ TRequestCallback RequestCallback_;
+ TStreamRequestCallback StreamRequestCallback_;
const char* const Name_;
TLoggerPtr Logger_;
ICounterBlockPtr Counters_;
IGRpcRequestLimiterPtr RequestLimiter_;
-
+
THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
- THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;
- TStateFunc StateFunc_;
- TIn* Request_;
-
- google::protobuf::Arena Arena_;
- TOnNextReply NextReplyCb_;
- ui32 RequestSize = 0;
- ui32 ResponseSize = 0;
+ THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;
+ TStateFunc StateFunc_;
+ TIn* Request_;
+
+ google::protobuf::Arena Arena_;
+ TOnNextReply NextReplyCb_;
+ ui32 RequestSize = 0;
+ ui32 ResponseSize = 0;
ui32 ResponseStatus = 0;
THPTimer RequestTimer;
- TAuthState AuthState_ = 0;
+ TAuthState AuthState_ = 0;
bool RequestRegistered_ = false;
-
+
using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
- NThreading::TPromise<EFinishStatus> FinishPromise_;
- bool SkipUpdateCountersOnError = false;
- IStreamAdaptor::TPtr StreamAdaptor_;
-};
-
+ NThreading::TPromise<EFinishStatus> FinishPromise_;
+ bool SkipUpdateCountersOnError = false;
+ IStreamAdaptor::TPtr StreamAdaptor_;
+};
+
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> {
using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index 506221dd98..fcfce1c181 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -1,33 +1,33 @@
-#pragma once
-
+#pragma once
+
#include <google/protobuf/message.h>
#include <library/cpp/threading/future/future.h>
-
+
#include <grpc++/server_context.h>
-namespace grpc {
-class ByteBuffer;
-}
-
+namespace grpc {
+class ByteBuffer;
+}
+
namespace NGrpc {
-
-extern const char* GRPC_USER_AGENT_HEADER;
-
-struct TAuthState {
- enum EAuthState {
- AS_NOT_PERFORMED,
- AS_OK,
- AS_FAIL,
- AS_UNAVAILABLE
- };
- TAuthState(bool needAuth)
- : NeedAuth(needAuth)
- , State(AS_NOT_PERFORMED)
- {}
- bool NeedAuth;
- EAuthState State;
-};
-
+
+extern const char* GRPC_USER_AGENT_HEADER;
+
+struct TAuthState {
+ enum EAuthState {
+ AS_NOT_PERFORMED,
+ AS_OK,
+ AS_FAIL,
+ AS_UNAVAILABLE
+ };
+ TAuthState(bool needAuth)
+ : NeedAuth(needAuth)
+ , State(AS_NOT_PERFORMED)
+ {}
+ bool NeedAuth;
+ EAuthState State;
+};
+
//! An interface that may be used to limit concurrency of requests
class IGRpcRequestLimiter: public TThrRefBase {
@@ -38,79 +38,79 @@ public:
using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>;
-//! State of current request
+//! State of current request
class IRequestContextBase: public TThrRefBase {
-public:
- enum class EFinishStatus {
- OK,
- ERROR,
- CANCEL
- };
- using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;
-
- using TOnNextReply = std::function<void (size_t left)>;
-
- //! Get pointer to the request's message.
- virtual const NProtoBuf::Message* GetRequest() const = 0;
-
- //! Get current auth state
- virtual TAuthState& GetAuthState() = 0;
-
- //! Send common response (The request shoult be created for protobuf response type)
- //! Implementation can swap protobuf message
+public:
+ enum class EFinishStatus {
+ OK,
+ ERROR,
+ CANCEL
+ };
+ using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;
+
+ using TOnNextReply = std::function<void (size_t left)>;
+
+ //! Get pointer to the request's message.
+ virtual const NProtoBuf::Message* GetRequest() const = 0;
+
+ //! Get current auth state
+ virtual TAuthState& GetAuthState() = 0;
+
+ //! Send common response (The request shoult be created for protobuf response type)
+ //! Implementation can swap protobuf message
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;
-
+
//! Send serialised response (The request shoult be created for bytes response type)
- //! Implementation can swap ByteBuffer
+ //! Implementation can swap ByteBuffer
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0;
-
- //! Send grpc UNAUTHENTICATED status
- virtual void ReplyUnauthenticated(const TString& in) = 0;
-
+
+ //! Send grpc UNAUTHENTICATED status
+ virtual void ReplyUnauthenticated(const TString& in) = 0;
+
//! Send grpc error
virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0;
- //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise
- virtual TInstant Deadline() const = 0;
+ //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise
+ virtual TInstant Deadline() const = 0;
//! Returns available peer metadata keys
virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0;
- //! Returns peer optional metavalue
+ //! Returns peer optional metavalue
virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0;
-
+
//! Returns request compression level
virtual grpc_compression_level GetCompressionLevel() const = 0;
- //! Returns protobuf arena allocator associated with current request
- //! Lifetime of the arena is lifetime of the context
- virtual google::protobuf::Arena* GetArena() = 0;
-
- //! Add trailing metadata in to grpc context
- //! The metadata will be send at the time of rpc finish
- virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0;
-
+ //! Returns protobuf arena allocator associated with current request
+ //! Lifetime of the arena is lifetime of the context
+ virtual google::protobuf::Arena* GetArena() = 0;
+
+ //! Add trailing metadata in to grpc context
+ //! The metadata will be send at the time of rpc finish
+ virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0;
+
//! Use validated database name for counters
virtual void UseDatabase(const TString& database) = 0;
- // Streaming part
-
- //! Set callback. The callback will be called when response deliverid to the client
- //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one
- //! reply in flight
- virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0;
-
- //! Finish streaming reply
- virtual void FinishStreamingOk() = 0;
-
- //! Returns future to get cancel of finish notification
- virtual TAsyncFinishResult GetFinishFuture() = 0;
+ // Streaming part
+
+ //! Set callback. The callback will be called when response deliverid to the client
+ //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one
+ //! reply in flight
+ virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0;
+
+ //! Finish streaming reply
+ virtual void FinishStreamingOk() = 0;
+
+ //! Returns future to get cancel of finish notification
+ virtual TAsyncFinishResult GetFinishFuture() = 0;
//! Returns peer address
virtual TString GetPeer() const = 0;
//! Returns true if server is using ssl
virtual bool SslServer() const = 0;
-};
-
+};
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h
index 53a3195982..8e9afe44d5 100644
--- a/library/cpp/grpc/server/grpc_response.h
+++ b/library/cpp/grpc/server/grpc_response.h
@@ -11,7 +11,7 @@ namespace NGrpc {
* Universal response that owns underlying message or buffer.
*/
template <typename TMsg>
-class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly {
+class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly {
friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>;
public:
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 3e68b26e1c..7437b7a8f5 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -1,12 +1,12 @@
-#include "grpc_server.h"
-
-#include <util/string/join.h>
-#include <util/generic/yexception.h>
-#include <util/system/thread.h>
-
-#include <grpc++/resource_quota.h>
+#include "grpc_server.h"
+
+#include <util/string/join.h>
+#include <util/generic/yexception.h>
+#include <util/system/thread.h>
+
+#include <grpc++/resource_quota.h>
#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
-
+
#if !defined(_WIN32) && !defined(_WIN64)
#include <sys/socket.h>
@@ -16,9 +16,9 @@
#endif
namespace NGrpc {
-
-using NThreading::TFuture;
-
+
+using NThreading::TFuture;
+
static void PullEvents(grpc::ServerCompletionQueue* cq) {
TThread::SetCurrentThreadName("grpc_server");
while (true) {
@@ -37,33 +37,33 @@ static void PullEvents(grpc::ServerCompletionQueue* cq) {
}
}
-TGRpcServer::TGRpcServer(const TServerOptions& opts)
- : Options_(opts)
- , Limiter_(Options_.MaxGlobalRequestInFlight)
- {}
-
-TGRpcServer::~TGRpcServer() {
- Y_VERIFY(Ts.empty());
- Services_.clear();
-}
-
-void TGRpcServer::AddService(IGRpcServicePtr service) {
- Services_.push_back(service);
-}
-
-void TGRpcServer::Start() {
+TGRpcServer::TGRpcServer(const TServerOptions& opts)
+ : Options_(opts)
+ , Limiter_(Options_.MaxGlobalRequestInFlight)
+ {}
+
+TGRpcServer::~TGRpcServer() {
+ Y_VERIFY(Ts.empty());
+ Services_.clear();
+}
+
+void TGRpcServer::AddService(IGRpcServicePtr service) {
+ Services_.push_back(service);
+}
+
+void TGRpcServer::Start() {
TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
- using grpc::ServerBuilder;
- using grpc::ResourceQuota;
- ServerBuilder builder;
+ using grpc::ServerBuilder;
+ using grpc::ResourceQuota;
+ ServerBuilder builder;
auto credentials = grpc::InsecureServerCredentials();
if (Options_.SslData) {
- grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
- keycert.cert_chain = std::move(Options_.SslData->Cert);
- keycert.private_key = std::move(Options_.SslData->Key);
- grpc::SslServerCredentialsOptions sslOps;
- sslOps.pem_root_certs = std::move(Options_.SslData->Root);
- sslOps.pem_key_cert_pairs.push_back(keycert);
+ grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
+ keycert.cert_chain = std::move(Options_.SslData->Cert);
+ keycert.private_key = std::move(Options_.SslData->Key);
+ grpc::SslServerCredentialsOptions sslOps;
+ sslOps.pem_root_certs = std::move(Options_.SslData->Root);
+ sslOps.pem_key_cert_pairs.push_back(keycert);
credentials = grpc::SslServerCredentials(sslOps);
}
if (Options_.ExternalListener) {
@@ -72,58 +72,58 @@ void TGRpcServer::Start() {
credentials
));
} else {
- builder.AddListeningPort(server_address, credentials);
- }
- builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
- builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
- for (IGRpcServicePtr service : Services_) {
+ builder.AddListeningPort(server_address, credentials);
+ }
+ builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
+ builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
+ for (IGRpcServicePtr service : Services_) {
service->SetServerOptions(Options_);
- builder.RegisterService(service->GetService());
- service->SetGlobalLimiterHandle(&Limiter_);
- }
-
+ builder.RegisterService(service->GetService());
+ service->SetGlobalLimiterHandle(&Limiter_);
+ }
+
class TKeepAliveOption: public grpc::ServerBuilderOption {
- public:
- TKeepAliveOption(int idle, int interval)
- : Idle(idle)
- , Interval(interval)
- , KeepAliveEnabled(true)
- {}
-
- TKeepAliveOption()
- : Idle(0)
- , Interval(0)
- , KeepAliveEnabled(false)
- {}
-
- void UpdateArguments(grpc::ChannelArguments *args) override {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
- args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
- if (KeepAliveEnabled) {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
- args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
- args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
- }
- }
-
- void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
- {}
- private:
- const int Idle;
- const int Interval;
- const bool KeepAliveEnabled;
- };
-
- if (Options_.KeepAliveEnable) {
- builder.SetOption(std::make_unique<TKeepAliveOption>(
- Options_.KeepAliveIdleTimeoutTriggerSec,
- Options_.KeepAliveProbeIntervalSec));
- } else {
- builder.SetOption(std::make_unique<TKeepAliveOption>());
- }
-
+ public:
+ TKeepAliveOption(int idle, int interval)
+ : Idle(idle)
+ , Interval(interval)
+ , KeepAliveEnabled(true)
+ {}
+
+ TKeepAliveOption()
+ : Idle(0)
+ , Interval(0)
+ , KeepAliveEnabled(false)
+ {}
+
+ void UpdateArguments(grpc::ChannelArguments *args) override {
+ args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
+ args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
+ if (KeepAliveEnabled) {
+ args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
+ args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
+ args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
+ args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
+ args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
+ }
+ }
+
+ void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
+ {}
+ private:
+ const int Idle;
+ const int Interval;
+ const bool KeepAliveEnabled;
+ };
+
+ if (Options_.KeepAliveEnable) {
+ builder.SetOption(std::make_unique<TKeepAliveOption>(
+ Options_.KeepAliveIdleTimeoutTriggerSec,
+ Options_.KeepAliveProbeIntervalSec));
+ } else {
+ builder.SetOption(std::make_unique<TKeepAliveOption>());
+ }
+
if (Options_.UseCompletionQueuePerThread) {
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
CQS_.push_back(builder.AddCompletionQueue());
@@ -132,30 +132,30 @@ void TGRpcServer::Start() {
CQS_.push_back(builder.AddCompletionQueue());
}
- if (Options_.GRpcMemoryQuotaBytes) {
+ if (Options_.GRpcMemoryQuotaBytes) {
// See details KIKIMR-6932
- /*
- grpc::ResourceQuota quota("memory_bound");
- quota.Resize(Options_.GRpcMemoryQuotaBytes);
-
- builder.SetResourceQuota(quota);
- */
+ /*
+ grpc::ResourceQuota quota("memory_bound");
+ quota.Resize(Options_.GRpcMemoryQuotaBytes);
+
+ builder.SetResourceQuota(quota);
+ */
Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl;
- }
+ }
Options_.ServerBuilderMutator(builder);
builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
-
- Server_ = builder.BuildAndStart();
- if (!Server_) {
+
+ Server_ = builder.BuildAndStart();
+ if (!Server_) {
ythrow yexception() << "can't start grpc server on " << server_address;
- }
+ }
size_t index = 0;
- for (IGRpcServicePtr service : Services_) {
+ for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
- }
-
+ }
+
if (Options_.UseCompletionQueuePerThread) {
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
auto* cq = &CQS_[i];
@@ -170,71 +170,71 @@ void TGRpcServer::Start() {
PullEvents(cq->get());
}));
}
- }
+ }
if (Options_.ExternalListener) {
Options_.ExternalListener->Start();
}
-}
-
-void TGRpcServer::Stop() {
- for (auto& service : Services_) {
- service->StopService();
- }
-
- auto now = TInstant::Now();
-
- if (Server_) {
- i64 sec = Options_.GRpcShutdownDeadline.Seconds();
- Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
- i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
- Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
- }
-
+}
+
+void TGRpcServer::Stop() {
+ for (auto& service : Services_) {
+ service->StopService();
+ }
+
+ auto now = TInstant::Now();
+
+ if (Server_) {
+ i64 sec = Options_.GRpcShutdownDeadline.Seconds();
+ Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
+ i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
+ Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
+ }
+
for (ui64 attempt = 0; ; ++attempt) {
bool unsafe = false;
- size_t infly = 0;
- for (auto& service : Services_) {
+ size_t infly = 0;
+ for (auto& service : Services_) {
unsafe |= service->IsUnsafeToShutdown();
infly += service->RequestsInProgress();
- }
-
+ }
+
if (!unsafe && !infly)
- break;
+ break;
- auto spent = (TInstant::Now() - now).SecondsFloat();
+ auto spent = (TInstant::Now() - now).SecondsFloat();
if (attempt % 300 == 0) {
// don't log too much
Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
}
if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
- break;
- Sleep(TDuration::MilliSeconds(10));
- }
-
- // Always shutdown the completion queue after the server.
+ break;
+ Sleep(TDuration::MilliSeconds(10));
+ }
+
+ // Always shutdown the completion queue after the server.
for (auto& cq : CQS_) {
cq->Shutdown();
- }
-
- for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
- (*ti)->Join();
- }
-
- Ts.clear();
+ }
+
+ for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
+ (*ti)->Join();
+ }
+
+ Ts.clear();
if (Options_.ExternalListener) {
Options_.ExternalListener->Stop();
}
-}
-
-ui16 TGRpcServer::GetPort() const {
- return Options_.Port;
-}
-
-TString TGRpcServer::GetHost() const {
- return Options_.Host;
-}
-
+}
+
+ui16 TGRpcServer::GetPort() const {
+ return Options_.Port;
+}
+
+TString TGRpcServer::GetHost() const {
+ return Options_.Host;
+}
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index 0a4123d84e..d6814a90a0 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -1,32 +1,32 @@
-#pragma once
-
+#pragma once
+
#include "grpc_request_base.h"
#include "logger.h"
#include <library/cpp/threading/future/future.h>
-
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/generic/vector.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
#include <util/generic/maybe.h>
-#include <util/generic/queue.h>
-#include <util/generic/hash_set.h>
-#include <util/system/types.h>
-#include <util/system/mutex.h>
+#include <util/generic/queue.h>
+#include <util/generic/hash_set.h>
+#include <util/system/types.h>
+#include <util/system/mutex.h>
#include <util/thread/factory.h>
-
-#include <grpc++/grpc++.h>
-
+
+#include <grpc++/grpc++.h>
+
namespace NGrpc {
-
-constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
-
-struct TSslData {
- TString Cert;
- TString Key;
- TString Root;
-};
-
+
+constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
+
+struct TSslData {
+ TString Cert;
+ TString Key;
+ TString Root;
+};
+
struct IExternalListener
: public TThrRefBase
{
@@ -36,55 +36,55 @@ struct IExternalListener
virtual void Stop() = 0;
};
-//! Server's options.
-struct TServerOptions {
-#define DECLARE_FIELD(name, type, default) \
- type name{default}; \
- inline TServerOptions& Set##name(const type& value) { \
- name = value; \
- return *this; \
- }
-
- //! Hostname of server to bind to.
- DECLARE_FIELD(Host, TString, "[::]");
- //! Service port.
- DECLARE_FIELD(Port, ui16, 0);
-
- //! Number of worker threads.
- DECLARE_FIELD(WorkerThreads, size_t, 2);
-
+//! Server's options.
+struct TServerOptions {
+#define DECLARE_FIELD(name, type, default) \
+ type name{default}; \
+ inline TServerOptions& Set##name(const type& value) { \
+ name = value; \
+ return *this; \
+ }
+
+ //! Hostname of server to bind to.
+ DECLARE_FIELD(Host, TString, "[::]");
+ //! Service port.
+ DECLARE_FIELD(Port, ui16, 0);
+
+ //! Number of worker threads.
+ DECLARE_FIELD(WorkerThreads, size_t, 2);
+
//! Create one completion queue per thread
DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
- //! Memory quota size for grpc server in bytes. Zero means unlimited.
- DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
-
- //! How long to wait until pending rpcs are forcefully terminated.
- DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));
-
- //! In/Out message size limit
- DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
-
- //! Use GRpc keepalive
- DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());
-
- //! GRPC_ARG_KEEPALIVE_TIME_MS setting
+ //! Memory quota size for grpc server in bytes. Zero means unlimited.
+ DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
+
+ //! How long to wait until pending rpcs are forcefully terminated.
+ DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));
+
+ //! In/Out message size limit
+ DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
+
+ //! Use GRpc keepalive
+ DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());
+
+ //! GRPC_ARG_KEEPALIVE_TIME_MS setting
DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0);
- //! Deprecated, ths option ignored. Will be removed soon.
+ //! Deprecated, ths option ignored. Will be removed soon.
DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0);
- //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting
+ //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting
DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0);
- //! Max number of requests processing by services (global limit for grpc server)
- DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);
-
- //! SSL server data
- DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());
-
- //! GRPC auth
- DECLARE_FIELD(UseAuth, bool, false);
+ //! Max number of requests processing by services (global limit for grpc server)
+ DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);
+
+ //! SSL server data
+ DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());
+
+ //! GRPC auth
+ DECLARE_FIELD(UseAuth, bool, false);
//! Default compression level. Used when no compression options provided by client.
// Mapping to particular compression algorithm depends on client.
@@ -98,75 +98,75 @@ struct TServerOptions {
//! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
-#undef DECLARE_FIELD
-};
-
-class IQueueEvent {
-public:
- virtual ~IQueueEvent() = default;
-
+#undef DECLARE_FIELD
+};
+
+class IQueueEvent {
+public:
+ virtual ~IQueueEvent() = default;
+
//! Execute an action defined by implementation.
- virtual bool Execute(bool ok) = 0;
-
- //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also
- // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does
- // not implement in flight management.
- virtual void Process() {}
-
- //! Finish and destroy request.
- virtual void DestroyRequest() = 0;
-};
-
-class ICancelableContext {
-public:
- virtual void Shutdown() = 0;
- virtual ~ICancelableContext() = default;
-};
-
+ virtual bool Execute(bool ok) = 0;
+
+ //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also
+ // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does
+ // not implement in flight management.
+ virtual void Process() {}
+
+ //! Finish and destroy request.
+ virtual void DestroyRequest() = 0;
+};
+
+class ICancelableContext {
+public:
+ virtual void Shutdown() = 0;
+ virtual ~ICancelableContext() = default;
+};
+
template <class TLimit>
class TInFlightLimiterImpl {
-public:
+public:
explicit TInFlightLimiterImpl(const TLimit& limit)
- : Limit_(limit)
- {}
-
- bool Inc() {
- i64 newVal;
- i64 prev;
- do {
- prev = AtomicGet(CurInFlightReqs_);
- Y_VERIFY(prev >= 0);
- if (Limit_ && prev > Limit_) {
- return false;
- }
- newVal = prev + 1;
- } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));
- return true;
- }
-
- void Dec() {
- i64 newVal = AtomicDecrement(CurInFlightReqs_);
- Y_VERIFY(newVal >= 0);
- }
-
- i64 GetCurrentInFlight() const {
- return AtomicGet(CurInFlightReqs_);
- }
-
-private:
+ : Limit_(limit)
+ {}
+
+ bool Inc() {
+ i64 newVal;
+ i64 prev;
+ do {
+ prev = AtomicGet(CurInFlightReqs_);
+ Y_VERIFY(prev >= 0);
+ if (Limit_ && prev > Limit_) {
+ return false;
+ }
+ newVal = prev + 1;
+ } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));
+ return true;
+ }
+
+ void Dec() {
+ i64 newVal = AtomicDecrement(CurInFlightReqs_);
+ Y_VERIFY(newVal >= 0);
+ }
+
+ i64 GetCurrentInFlight() const {
+ return AtomicGet(CurInFlightReqs_);
+ }
+
+private:
const TLimit Limit_;
- TAtomic CurInFlightReqs_ = 0;
-};
-
+ TAtomic CurInFlightReqs_ = 0;
+};
+
using TGlobalLimiter = TInFlightLimiterImpl<i64>;
class IGRpcService: public TThrRefBase {
-public:
- virtual grpc::Service* GetService() = 0;
- virtual void StopService() noexcept = 0;
+public:
+ virtual grpc::Service* GetService() = 0;
+ virtual void StopService() noexcept = 0;
virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
- virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
+ virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
virtual bool IsUnsafeToShutdown() const = 0;
virtual size_t RequestsInProgress() const = 0;
@@ -175,11 +175,11 @@ public:
* service to inspect server options and initialize accordingly.
*/
virtual void SetServerOptions(const TServerOptions& options) = 0;
-};
-
-template<typename T>
+};
+
+template<typename T>
class TGrpcServiceBase: public IGRpcService {
-public:
+public:
class TShutdownGuard {
using TOwner = TGrpcServiceBase<T>;
friend class TGrpcServiceBase<T>;
@@ -232,20 +232,20 @@ public:
};
public:
- using TCurrentGRpcService = T;
-
- void StopService() noexcept override {
- with_lock(Lock_) {
+ using TCurrentGRpcService = T;
+
+ void StopService() noexcept override {
+ with_lock(Lock_) {
AtomicSet(ShuttingDown_, 1);
- // Send TryCansel to event (can be send after finishing).
- // Actual dtors will be called from grpc thread, so deadlock impossible
- for (auto* request : Requests_) {
- request->Shutdown();
- }
- }
- }
-
+ // Send TryCansel to event (can be send after finishing).
+ // Actual dtors will be called from grpc thread, so deadlock impossible
+ for (auto* request : Requests_) {
+ request->Shutdown();
+ }
+ }
+ }
+
TShutdownGuard ProtectShutdown() noexcept {
AtomicIncrement(GuardCount_);
if (IsShuttingDown()) {
@@ -261,35 +261,35 @@ public:
}
size_t RequestsInProgress() const override {
- size_t c = 0;
- with_lock(Lock_) {
- c = Requests_.size();
- }
- return c;
- }
-
+ size_t c = 0;
+ with_lock(Lock_) {
+ c = Requests_.size();
+ }
+ return c;
+ }
+
void SetServerOptions(const TServerOptions& options) override {
SslServer_ = bool(options.SslData);
NeedAuth_ = options.UseAuth;
- }
-
- void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
-
- //! Check if the server is going to shut down.
- bool IsShuttingDown() const {
- return AtomicGet(ShuttingDown_);
- }
-
+ }
+
+ void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
+
+ //! Check if the server is going to shut down.
+ bool IsShuttingDown() const {
+ return AtomicGet(ShuttingDown_);
+ }
+
bool SslServer() const {
return SslServer_;
}
- bool NeedAuth() const {
- return NeedAuth_;
- }
-
+ bool NeedAuth() const {
+ return NeedAuth_;
+ }
+
bool RegisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
+ with_lock(Lock_) {
auto r = Requests_.emplace(req);
Y_VERIFY(r.second, "Ctx already registered");
@@ -298,59 +298,59 @@ public:
Requests_.erase(r.first);
return false;
}
- }
+ }
return true;
- }
-
- void DeregisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
+ }
+
+ void DeregisterRequestCtx(ICancelableContext* req) {
+ with_lock(Lock_) {
Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
- }
- }
-
-protected:
- using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
- TGrpcAsyncService Service_;
-
+ }
+ }
+
+protected:
+ using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
+ TGrpcAsyncService Service_;
+
TGrpcAsyncService* GetService() override {
- return &Service_;
- }
-
-private:
- TAtomic ShuttingDown_ = 0;
+ return &Service_;
+ }
+
+private:
+ TAtomic ShuttingDown_ = 0;
TAtomic GuardCount_ = 0;
-
+
bool SslServer_ = false;
- bool NeedAuth_ = false;
-
- THashSet<ICancelableContext*> Requests_;
- TAdaptiveLock Lock_;
-};
-
+ bool NeedAuth_ = false;
+
+ THashSet<ICancelableContext*> Requests_;
+ TAdaptiveLock Lock_;
+};
+
class TGRpcServer {
-public:
- using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
- TGRpcServer(const TServerOptions& opts);
- ~TGRpcServer();
- void AddService(IGRpcServicePtr service);
- void Start();
- // Send stop to registred services and call Shutdown on grpc server
- // This method MUST be called before destroying TGRpcServer
- void Stop();
- ui16 GetPort() const;
- TString GetHost() const;
-
-private:
+public:
+ using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
+ TGRpcServer(const TServerOptions& opts);
+ ~TGRpcServer();
+ void AddService(IGRpcServicePtr service);
+ void Start();
+ // Send stop to registred services and call Shutdown on grpc server
+ // This method MUST be called before destroying TGRpcServer
+ void Stop();
+ ui16 GetPort() const;
+ TString GetHost() const;
+
+private:
using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
-
- const TServerOptions Options_;
- std::unique_ptr<grpc::Server> Server_;
+
+ const TServerOptions Options_;
+ std::unique_ptr<grpc::Server> Server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
TVector<IThreadRef> Ts;
-
+
TVector<IGRpcServicePtr> Services_;
- TGlobalLimiter Limiter_;
-};
-
+ TGlobalLimiter Limiter_;
+};
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
index 0840c77176..c34d3b8c2b 100644
--- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
+++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
@@ -1,121 +1,121 @@
#include <library/cpp/grpc/server/grpc_request.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
-
-#include <util/system/thread.h>
-#include <util/thread/pool.h>
-
+
+#include <util/system/thread.h>
+#include <util/thread/pool.h>
+
using namespace NGrpc;
-
-// Here we emulate stream data producer
+
+// Here we emulate stream data producer
class TOrderedProducer: public TThread {
-public:
- TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
- : TThread(&ThreadProc, this)
- , Adaptor_(adaptor)
- , Max_(max)
- , WithSleep_(withSleep)
- , ConsumerOp_(std::move(consumerOp))
- {}
-
- static void* ThreadProc(void* _this) {
- SetCurrentThreadName("OrderedProducerThread");
- static_cast<TOrderedProducer*>(_this)->Exec();
- return nullptr;
- }
-
- void Exec() {
- for (ui64 i = 0; i < Max_; i++) {
- auto cb = [i, this]() mutable {
- ConsumerOp_(i);
- };
- Adaptor_->Enqueue(std::move(cb), false);
- if (WithSleep_ && (i % 256 == 0)) {
- Sleep(TDuration::MilliSeconds(10));
- }
- }
- }
-
-private:
- IStreamAdaptor* Adaptor_;
- const ui64 Max_;
- const bool WithSleep_;
- std::function<void(ui64)> ConsumerOp_;
-};
-
-Y_UNIT_TEST_SUITE(StreamAdaptor) {
- static void OrderingTest(size_t threads, bool withSleep) {
-
- auto adaptor = CreateStreamAdaptor();
-
- const i64 max = 10000;
-
- // Here we will emulate grpc stream (NextReply call after writing)
+public:
+ TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
+ : TThread(&ThreadProc, this)
+ , Adaptor_(adaptor)
+ , Max_(max)
+ , WithSleep_(withSleep)
+ , ConsumerOp_(std::move(consumerOp))
+ {}
+
+ static void* ThreadProc(void* _this) {
+ SetCurrentThreadName("OrderedProducerThread");
+ static_cast<TOrderedProducer*>(_this)->Exec();
+ return nullptr;
+ }
+
+ void Exec() {
+ for (ui64 i = 0; i < Max_; i++) {
+ auto cb = [i, this]() mutable {
+ ConsumerOp_(i);
+ };
+ Adaptor_->Enqueue(std::move(cb), false);
+ if (WithSleep_ && (i % 256 == 0)) {
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ }
+ }
+
+private:
+ IStreamAdaptor* Adaptor_;
+ const ui64 Max_;
+ const bool WithSleep_;
+ std::function<void(ui64)> ConsumerOp_;
+};
+
+Y_UNIT_TEST_SUITE(StreamAdaptor) {
+ static void OrderingTest(size_t threads, bool withSleep) {
+
+ auto adaptor = CreateStreamAdaptor();
+
+ const i64 max = 10000;
+
+ // Here we will emulate grpc stream (NextReply call after writing)
std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false)));
- // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)
- consumerQueue->Start(threads, 1);
-
- // Non atomic!!! Stream adaptor must protect us
- ui64 curVal = 0;
-
- // Used just to wait in the main thread
- TAtomic finished = false;
- auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {
- // Check no reordering inside stream adaptor
- // and no simultanious consumer Op call
- UNIT_ASSERT_VALUES_EQUAL(curVal, i);
- curVal++;
- // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext
- // so compare here and set after
- bool tmp = curVal == max;
- bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {
- // Additional check the value still same
- // run under tsan makes sure no consumer Op call before we call ProcessNext
- UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);
- ptr->ProcessNext();
- // Reordering after ProcessNext is possible, so check tmp and set finished to true
- if (tmp)
- AtomicSet(finished, true);
- });
- UNIT_ASSERT(res);
- };
-
- TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));
-
- producer.Start();
- producer.Join();
-
- while (!AtomicGet(finished))
- {
- Sleep(TDuration::MilliSeconds(100));
- }
-
- consumerQueue->Stop();
-
- UNIT_ASSERT_VALUES_EQUAL(curVal, max);
- }
-
- Y_UNIT_TEST(OrderingOneThread) {
- OrderingTest(1, false);
- }
-
- Y_UNIT_TEST(OrderingTwoThreads) {
- OrderingTest(2, false);
- }
-
- Y_UNIT_TEST(OrderingManyThreads) {
- OrderingTest(10, false);
- }
-
- Y_UNIT_TEST(OrderingOneThreadWithSleep) {
- OrderingTest(1, true);
- }
-
- Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {
- OrderingTest(2, true);
- }
-
- Y_UNIT_TEST(OrderingManyThreadsWithSleep) {
- OrderingTest(10, true);
- }
-}
+ // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)
+ consumerQueue->Start(threads, 1);
+
+ // Non atomic!!! Stream adaptor must protect us
+ ui64 curVal = 0;
+
+ // Used just to wait in the main thread
+ TAtomic finished = false;
+ auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {
+ // Check no reordering inside stream adaptor
+ // and no simultanious consumer Op call
+ UNIT_ASSERT_VALUES_EQUAL(curVal, i);
+ curVal++;
+ // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext
+ // so compare here and set after
+ bool tmp = curVal == max;
+ bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {
+ // Additional check the value still same
+ // run under tsan makes sure no consumer Op call before we call ProcessNext
+ UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);
+ ptr->ProcessNext();
+ // Reordering after ProcessNext is possible, so check tmp and set finished to true
+ if (tmp)
+ AtomicSet(finished, true);
+ });
+ UNIT_ASSERT(res);
+ };
+
+ TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));
+
+ producer.Start();
+ producer.Join();
+
+ while (!AtomicGet(finished))
+ {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ consumerQueue->Stop();
+
+ UNIT_ASSERT_VALUES_EQUAL(curVal, max);
+ }
+
+ Y_UNIT_TEST(OrderingOneThread) {
+ OrderingTest(1, false);
+ }
+
+ Y_UNIT_TEST(OrderingTwoThreads) {
+ OrderingTest(2, false);
+ }
+
+ Y_UNIT_TEST(OrderingManyThreads) {
+ OrderingTest(10, false);
+ }
+
+ Y_UNIT_TEST(OrderingOneThreadWithSleep) {
+ OrderingTest(1, true);
+ }
+
+ Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {
+ OrderingTest(2, true);
+ }
+
+ Y_UNIT_TEST(OrderingManyThreadsWithSleep) {
+ OrderingTest(10, true);
+ }
+}
diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make
index 2f7fa1afc2..feb3291af9 100644
--- a/library/cpp/grpc/server/ut/ya.make
+++ b/library/cpp/grpc/server/ut/ya.make
@@ -1,21 +1,21 @@
UNITTEST_FOR(library/cpp/grpc/server)
-
-OWNER(
- dcherednik
- g:kikimr
-)
-
+
+OWNER(
+ dcherednik
+ g:kikimr
+)
+
TIMEOUT(600)
SIZE(MEDIUM)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/grpc/server
-)
-
-SRCS(
+)
+
+SRCS(
grpc_response_ut.cpp
- stream_adaptor_ut.cpp
-)
-
-END()
-
+ stream_adaptor_ut.cpp
+)
+
+END()
+
diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make
index 75499b926a..356a1b6793 100644
--- a/library/cpp/grpc/server/ya.make
+++ b/library/cpp/grpc/server/ya.make
@@ -1,25 +1,25 @@
-LIBRARY()
-
-OWNER(
- dcherednik
- g:kikimr
-)
-
-SRCS(
- event_callback.cpp
- grpc_request.cpp
- grpc_server.cpp
- grpc_counters.cpp
-)
-
-GENERATE_ENUM_SERIALIZATION(grpc_request_base.h)
-
-PEERDIR(
- contrib/libs/grpc
+LIBRARY()
+
+OWNER(
+ dcherednik
+ g:kikimr
+)
+
+SRCS(
+ event_callback.cpp
+ grpc_request.cpp
+ grpc_server.cpp
+ grpc_counters.cpp
+)
+
+GENERATE_ENUM_SERIALIZATION(grpc_request_base.h)
+
+PEERDIR(
+ contrib/libs/grpc
library/cpp/monlib/dynamic_counters/percentile
-)
-
-END()
+)
+
+END()
RECURSE_FOR_TESTS(ut)