diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:03 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:03 +0300 |
commit | 2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/grpc/server | |
parent | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff) | |
download | ydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r-- | library/cpp/grpc/server/actors/logger.cpp | 90 | ||||
-rw-r--r-- | library/cpp/grpc/server/actors/logger.h | 22 | ||||
-rw-r--r-- | library/cpp/grpc/server/actors/ya.make | 26 | ||||
-rw-r--r-- | library/cpp/grpc/server/event_callback.h | 16 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_async_ctx_base.h | 60 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_counters.cpp | 88 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_counters.h | 24 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request.cpp | 6 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 92 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 18 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_response.h | 178 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 8 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 18 | ||||
-rw-r--r-- | library/cpp/grpc/server/logger.h | 86 | ||||
-rw-r--r-- | library/cpp/grpc/server/ut/grpc_response_ut.cpp | 176 | ||||
-rw-r--r-- | library/cpp/grpc/server/ut/stream_adaptor_ut.cpp | 6 | ||||
-rw-r--r-- | library/cpp/grpc/server/ut/ya.make | 10 | ||||
-rw-r--r-- | library/cpp/grpc/server/ya.make | 8 |
18 files changed, 466 insertions, 466 deletions
diff --git a/library/cpp/grpc/server/actors/logger.cpp b/library/cpp/grpc/server/actors/logger.cpp index 176675366a..d8b2042576 100644 --- a/library/cpp/grpc/server/actors/logger.cpp +++ b/library/cpp/grpc/server/actors/logger.cpp @@ -1,45 +1,45 @@ -#include "logger.h" - -namespace NGrpc { -namespace { - -static_assert( - ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) && - ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG), - "log levels in the library/log and library/cpp/actors don't match"); - -class TActorSystemLogger final: public TLogger { -public: - TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept - : ActorSystem_{as} - , Component_{component} - { - } - -protected: - bool DoIsEnabled(ELogPriority p) const noexcept override { - const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings()); - const auto priority = static_cast<::NActors::NLog::EPriority>(p); - - return settings && settings->Satisfies(priority, Component_, 0); - } - - void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override { - Y_VERIFY_DEBUG(DoIsEnabled(p)); - - const auto priority = static_cast<::NActors::NLog::EPriority>(p); - ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args); - } - -private: - NActors::TActorSystem& ActorSystem_; - NActors::NLog::EComponent Component_; -}; - -} // namespace - -TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) { - return MakeIntrusive<TActorSystemLogger>(as, component); -} - -} // namespace NGrpc +#include "logger.h" + +namespace NGrpc { +namespace { + +static_assert( + ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) && + ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG), + "log levels in the library/log and library/cpp/actors don't match"); + +class TActorSystemLogger final: public TLogger { +public: + TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept + : ActorSystem_{as} + , Component_{component} + { + } + +protected: + bool DoIsEnabled(ELogPriority p) const noexcept override { + const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings()); + const auto priority = static_cast<::NActors::NLog::EPriority>(p); + + return settings && settings->Satisfies(priority, Component_, 0); + } + + void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override { + Y_VERIFY_DEBUG(DoIsEnabled(p)); + + const auto priority = static_cast<::NActors::NLog::EPriority>(p); + ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args); + } + +private: + NActors::TActorSystem& ActorSystem_; + NActors::NLog::EComponent Component_; +}; + +} // namespace + +TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) { + return MakeIntrusive<TActorSystemLogger>(as, component); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/actors/logger.h b/library/cpp/grpc/server/actors/logger.h index c066a40add..abf9270f7b 100644 --- a/library/cpp/grpc/server/actors/logger.h +++ b/library/cpp/grpc/server/actors/logger.h @@ -1,11 +1,11 @@ -#pragma once - -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/core/log.h> -#include <library/cpp/grpc/server/logger.h> - -namespace NGrpc { - -TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component); - -} // namespace NGrpc +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/grpc/server/logger.h> + +namespace NGrpc { + +TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component); + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/actors/ya.make b/library/cpp/grpc/server/actors/ya.make index 072db84142..6c9d80aa45 100644 --- a/library/cpp/grpc/server/actors/ya.make +++ b/library/cpp/grpc/server/actors/ya.make @@ -1,13 +1,13 @@ -LIBRARY() - -OWNER(g:kikimr g:solomon) - -SRCS( - logger.cpp -) - -PEERDIR( - library/cpp/actors/core -) - -END() +LIBRARY() + +OWNER(g:kikimr g:solomon) + +SRCS( + logger.cpp +) + +PEERDIR( + library/cpp/actors/core +) + +END() diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h index 13d9bb46b2..d0b700b3c9 100644 --- a/library/cpp/grpc/server/event_callback.h +++ b/library/cpp/grpc/server/event_callback.h @@ -2,7 +2,7 @@ #include "grpc_server.h" -namespace NGrpc { +namespace NGrpc { enum class EQueueEventStatus { OK, @@ -10,7 +10,7 @@ enum class EQueueEventStatus { }; template<class TCallback> -class TQueueEventCallback: public IQueueEvent { +class TQueueEventCallback: public IQueueEvent { public: TQueueEventCallback(const TCallback& callback) : Callback(callback) @@ -33,9 +33,9 @@ private: TCallback Callback; }; -// Implementation of IQueueEvent that reduces allocations +// Implementation of IQueueEvent that reduces allocations template<class TSelf> -class TQueueFixedEvent: private IQueueEvent { +class TQueueFixedEvent: private IQueueEvent { using TCallback = void (TSelf::*)(EQueueEventStatus); public: @@ -44,7 +44,7 @@ public: , Callback(callback) { } - IQueueEvent* Prepare() { + IQueueEvent* Prepare() { Self->Ref(); return this; } @@ -65,16 +65,16 @@ private: }; template<class TCallback> -inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) { +inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) { return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback)); } template<class T> -inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) { +inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) { using TPtr = TIntrusivePtr<T>; return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) { ((*self).*method)(status); }); } -} // namespace NGrpc +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h index 65341fa1ad..51356d4ce5 100644 --- a/library/cpp/grpc/server/grpc_async_ctx_base.h +++ b/library/cpp/grpc/server/grpc_async_ctx_base.h @@ -5,17 +5,17 @@ #include <util/generic/vector.h> #include <util/generic/string.h> #include <util/system/yassert.h> -#include <util/generic/set.h> +#include <util/generic/set.h> #include <grpc++/server.h> #include <grpc++/server_context.h> #include <chrono> -namespace NGrpc { +namespace NGrpc { template<typename TService> -class TBaseAsyncContext: public ICancelableContext { +class TBaseAsyncContext: public ICancelableContext { public: TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq) : Service(service) @@ -29,44 +29,44 @@ public: TInstant Deadline() const { // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline - // right before the request is getting to be send. - // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md - // + // right before the request is getting to be send. + // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + // // After this timeout calculated back to the deadline on the server side // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method). // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME // - + std::chrono::system_clock::time_point t = Context.deadline(); if (t == std::chrono::system_clock::time_point::max()) { return TInstant::Max(); - } + } auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t); return TInstant::MicroSeconds(us.time_since_epoch().count()); - } - - TSet<TStringBuf> GetPeerMetaKeys() const { - TSet<TStringBuf> keys; - for (const auto& [key, _]: Context.client_metadata()) { - keys.emplace(key.data(), key.size()); - } - return keys; - } - - TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const { + } + + TSet<TStringBuf> GetPeerMetaKeys() const { + TSet<TStringBuf> keys; + for (const auto& [key, _]: Context.client_metadata()) { + keys.emplace(key.data(), key.size()); + } + return keys; + } + + TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const { const auto& clientMetadata = Context.client_metadata(); - const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()}); - if (range.first == range.second) { - return {}; - } - - TVector<TStringBuf> values; - values.reserve(std::distance(range.first, range.second)); - + const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()}); + if (range.first == range.second) { + return {}; + } + + TVector<TStringBuf> values; + values.reserve(std::distance(range.first, range.second)); + for (auto it = range.first; it != range.second; ++it) { - values.emplace_back(it->second.data(), it->second.size()); + values.emplace_back(it->second.data(), it->second.size()); } - return values; + return values; } grpc_compression_level GetCompressionLevel() const { @@ -91,4 +91,4 @@ protected: grpc::ServerContext Context; }; -} // namespace NGrpc +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp index bdd72b3292..fa96e0100b 100644 --- a/library/cpp/grpc/server/grpc_counters.cpp +++ b/library/cpp/grpc/server/grpc_counters.cpp @@ -1,45 +1,45 @@ #include "grpc_counters.h" - -namespace NGrpc { -namespace { - -class TFakeCounterBlock final: public ICounterBlock { -private: - void CountNotOkRequest() override { - } - - void CountNotOkResponse() override { - } - - void CountNotAuthenticated() override { - } - - void CountResourceExhausted() override { - } - - void CountRequestBytes(ui32 /*requestSize*/) override { - } - - void CountResponseBytes(ui32 /*responseSize*/) override { - } - - void StartProcessing(ui32 /*requestSize*/) override { - } - - void FinishProcessing( - ui32 /*requestSize*/, - ui32 /*responseSize*/, - bool /*ok*/, - ui32 /*status*/, - TDuration /*requestDuration*/) override - { - } -}; - -} // namespace - -ICounterBlockPtr FakeCounterBlock() { - return MakeIntrusive<TFakeCounterBlock>(); -} - -} // namespace NGrpc + +namespace NGrpc { +namespace { + +class TFakeCounterBlock final: public ICounterBlock { +private: + void CountNotOkRequest() override { + } + + void CountNotOkResponse() override { + } + + void CountNotAuthenticated() override { + } + + void CountResourceExhausted() override { + } + + void CountRequestBytes(ui32 /*requestSize*/) override { + } + + void CountResponseBytes(ui32 /*responseSize*/) override { + } + + void StartProcessing(ui32 /*requestSize*/) override { + } + + void FinishProcessing( + ui32 /*requestSize*/, + ui32 /*responseSize*/, + bool /*ok*/, + ui32 /*status*/, + TDuration /*requestDuration*/) override + { + } +}; + +} // namespace + +ICounterBlockPtr FakeCounterBlock() { + return MakeIntrusive<TFakeCounterBlock>(); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h index a591beb84e..0b6c36c84c 100644 --- a/library/cpp/grpc/server/grpc_counters.h +++ b/library/cpp/grpc/server/grpc_counters.h @@ -1,10 +1,10 @@ #pragma once -#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <util/generic/ptr.h> -namespace NGrpc { +namespace NGrpc { struct ICounterBlock : public TThrRefBase { virtual void CountNotOkRequest() = 0; @@ -14,7 +14,7 @@ struct ICounterBlock : public TThrRefBase { virtual void CountRequestBytes(ui32 requestSize) = 0; virtual void CountResponseBytes(ui32 responseSize) = 0; virtual void StartProcessing(ui32 requestSize) = 0; - virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0; + virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0; virtual void CountRequestsWithoutDatabase() {} virtual void CountRequestsWithoutToken() {} virtual void CountRequestWithoutTls() {} @@ -126,11 +126,11 @@ public: using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>; -/** - * Creates new instance of ICounterBlock implementation which does nothing. - * - * @return new instance - */ -ICounterBlockPtr FakeCounterBlock(); - -} // namespace NGrpc +/** + * Creates new instance of ICounterBlock implementation which does nothing. + * + * @return new instance + */ +ICounterBlockPtr FakeCounterBlock(); + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp index 60db2f230d..d18a32776f 100644 --- a/library/cpp/grpc/server/grpc_request.cpp +++ b/library/cpp/grpc/server/grpc_request.cpp @@ -1,10 +1,10 @@ #include "grpc_request.h" -namespace NGrpc { +namespace NGrpc { const char* GRPC_USER_AGENT_HEADER = "user-agent"; -class TStreamAdaptor: public IStreamAdaptor { +class TStreamAdaptor: public IStreamAdaptor { public: TStreamAdaptor() : StreamIsReady_(true) @@ -56,4 +56,4 @@ IStreamAdaptor::TPtr CreateStreamAdaptor() { return std::make_unique<TStreamAdaptor>(); } -} // namespace NGrpc +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index dd9041eec7..5bd8d3902b 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -4,19 +4,19 @@ #include <google/protobuf/arena.h> #include <google/protobuf/message.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/logger/priority.h> -#include "grpc_response.h" +#include "grpc_response.h" #include "event_callback.h" -#include "grpc_async_ctx_base.h" -#include "grpc_counters.h" +#include "grpc_async_ctx_base.h" +#include "grpc_counters.h" #include "grpc_request_base.h" #include "grpc_server.h" -#include "logger.h" +#include "logger.h" + +#include <util/system/hp_timer.h> -#include <util/system/hp_timer.h> - #include <grpc++/server.h> #include <grpc++/server_context.h> #include <grpc++/support/async_stream.h> @@ -24,7 +24,7 @@ #include <grpc++/support/byte_buffer.h> #include <grpc++/impl/codegen/async_stream.h> -namespace NGrpc { +namespace NGrpc { class IStreamAdaptor { public: @@ -57,7 +57,7 @@ public: grpc::ServerCompletionQueue* cq, TOnRequest cb, TRequestCallback requestCallback, - const char* name, + const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) @@ -67,10 +67,10 @@ public: , RequestCallback_(requestCallback) , StreamRequestCallback_(nullptr) , Name_(name) - , Logger_(std::move(logger)) + , Logger_(std::move(logger)) , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) - , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) + , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); @@ -85,7 +85,7 @@ public: grpc::ServerCompletionQueue* cq, TOnRequest cb, TStreamRequestCallback requestCallback, - const char* name, + const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) @@ -95,7 +95,7 @@ public: , RequestCallback_(nullptr) , StreamRequestCallback_(requestCallback) , Name_(name) - , Logger_(std::move(logger)) + , Logger_(std::move(logger)) , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) @@ -157,13 +157,13 @@ public: TInstant Deadline() const override { return TBaseAsyncContext<TService>::Deadline(); - } - - TSet<TStringBuf> GetPeerMetaKeys() const override { - return TBaseAsyncContext<TService>::GetPeerMetaKeys(); - } - - TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { + } + + TSet<TStringBuf> GetPeerMetaKeys() const override { + return TBaseAsyncContext<TService>::GetPeerMetaKeys(); + } + + TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { return TBaseAsyncContext<TService>::GetPeerMetaValues(key); } @@ -233,10 +233,10 @@ private: if (!Server_->IsShuttingDown()) { if (RequestCallback_) { MakeIntrusive<TThis>( - Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } else { MakeIntrusive<TThis>( - Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } } } @@ -257,20 +257,20 @@ private: StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Y_VERIFY(this->Context.c_call()); - Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); + Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)", this, Name_, makeResponseString().data(), this->Context.peer().c_str()); - - // because of std::function cannot hold move-only captured object - // we allocate shared object on heap to avoid message copy - auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); - auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() { + + // because of std::function cannot hold move-only captured object + // we allocate shared object on heap to avoid message copy + auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); + auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)", this, Name_, makeResponseString().data(), this->Context.peer().c_str()); StateFunc_ = &TThis::NextReply; ResponseSize += sz; - StreamWriter_->Write(*uResp, GetGRpcTag()); + StreamWriter_->Write(*uResp, GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), false); } @@ -283,20 +283,20 @@ private: this->Context.peer().c_str()); StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; - Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); + Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_, this->Context.peer().c_str()); - - // because of std::function cannot hold move-only captured object - // we allocate shared object on heap to avoid buffer copy - auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); - auto cb = [this, uResp = std::move(uResp), sz]() { + + // because of std::function cannot hold move-only captured object + // we allocate shared object on heap to avoid buffer copy + auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); + auto cb = [this, uResp = std::move(uResp), sz]() { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)", this, Name_, this->Context.peer().c_str()); StateFunc_ = &TThis::NextReply; ResponseSize += sz; - StreamWriter_->Write(*uResp, GetGRpcTag()); + StreamWriter_->Write(*uResp, GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), false); } @@ -314,8 +314,8 @@ private: GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); StateFunc_ = &TThis::SetFinishError; - TOut resp; - Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); + TOut resp; + Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); @@ -380,7 +380,7 @@ private: } auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket")); if (maybeToken.empty() || maybeToken[0].empty()) { - TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}}; + TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}}; Counters_->CountRequestsWithoutToken(); GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token " "Name# %s data# %s peer# %s database# %s", this, Name_, @@ -484,12 +484,12 @@ private: TOnRequest Cb_; TRequestCallback RequestCallback_; TStreamRequestCallback StreamRequestCallback_; - const char* const Name_; + const char* const Name_; TLoggerPtr Logger_; ICounterBlockPtr Counters_; IGRpcRequestLimiterPtr RequestLimiter_; - THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; + THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_; TStateFunc StateFunc_; TIn* Request_; @@ -520,10 +520,10 @@ public: typename TBase::TOnRequest cb, typename TBase::TRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter = nullptr) - : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} + : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} { } @@ -533,11 +533,11 @@ public: typename TBase::TOnRequest cb, typename TBase::TStreamRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters) - : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} + : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} { } }; -} // namespace NGrpc +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index b61cf553aa..fcfce1c181 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -9,7 +9,7 @@ namespace grpc { class ByteBuffer; } -namespace NGrpc { +namespace NGrpc { extern const char* GRPC_USER_AGENT_HEADER; @@ -30,7 +30,7 @@ struct TAuthState { //! An interface that may be used to limit concurrency of requests -class IGRpcRequestLimiter: public TThrRefBase { +class IGRpcRequestLimiter: public TThrRefBase { public: virtual bool IncRequest() = 0; virtual void DecRequest() = 0; @@ -39,7 +39,7 @@ public: using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>; //! State of current request -class IRequestContextBase: public TThrRefBase { +class IRequestContextBase: public TThrRefBase { public: enum class EFinishStatus { OK, @@ -72,12 +72,12 @@ public: //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise virtual TInstant Deadline() const = 0; - - //! Returns available peer metadata keys - virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0; - + + //! Returns available peer metadata keys + virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0; + //! Returns peer optional metavalue - virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0; + virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0; //! Returns request compression level virtual grpc_compression_level GetCompressionLevel() const = 0; @@ -113,4 +113,4 @@ public: virtual bool SslServer() const = 0; }; -} // namespace NGrpc +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h index 47b22c28d0..8e9afe44d5 100644 --- a/library/cpp/grpc/server/grpc_response.h +++ b/library/cpp/grpc/server/grpc_response.h @@ -1,90 +1,90 @@ -#pragma once - -#include <grpc++/impl/codegen/byte_buffer.h> -#include <grpc++/impl/codegen/proto_utils.h> - -#include <variant> - -namespace NGrpc { - -/** - * Universal response that owns underlying message or buffer. - */ -template <typename TMsg> +#pragma once + +#include <grpc++/impl/codegen/byte_buffer.h> +#include <grpc++/impl/codegen/proto_utils.h> + +#include <variant> + +namespace NGrpc { + +/** + * Universal response that owns underlying message or buffer. + */ +template <typename TMsg> class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly { - friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>; - -public: - explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept - : Data_{TMsg{}} - { - std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg)); - } - - explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept - : Data_{grpc::ByteBuffer{}} - { - std::get<grpc::ByteBuffer>(Data_).Swap(buffer); - } - -private: - std::variant<TMsg, grpc::ByteBuffer> Data_; -}; - -/** - * Universal response that only keeps reference to underlying message or buffer. - */ -template <typename TMsg> -class TUniversalResponseRef: private TMoveOnly { - friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>; - -public: - explicit TUniversalResponseRef(const NProtoBuf::Message* msg) - : Data_{msg} - { - } - - explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer) - : Data_{buffer} - { - } - -private: - std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_; -}; - -} // namespace NGrpc - -namespace grpc { - -template <typename TMsg> -class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> { -public: - static Status Serialize( - const NGrpc::TUniversalResponse<TMsg>& resp, - ByteBuffer* buffer, - bool* ownBuffer) - { - return std::visit([&](const auto& data) { - using T = std::decay_t<decltype(data)>; - return SerializationTraits<T>::Serialize(data, buffer, ownBuffer); - }, resp.Data_); - } -}; - -template <typename TMsg> -class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> { -public: - static Status Serialize( - const NGrpc::TUniversalResponseRef<TMsg>& resp, - ByteBuffer* buffer, - bool* ownBuffer) - { - return std::visit([&](const auto* data) { - using T = std::decay_t<std::remove_pointer_t<decltype(data)>>; - return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer); - }, resp.Data_); - } -}; - -} // namespace grpc + friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>; + +public: + explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept + : Data_{TMsg{}} + { + std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg)); + } + + explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept + : Data_{grpc::ByteBuffer{}} + { + std::get<grpc::ByteBuffer>(Data_).Swap(buffer); + } + +private: + std::variant<TMsg, grpc::ByteBuffer> Data_; +}; + +/** + * Universal response that only keeps reference to underlying message or buffer. + */ +template <typename TMsg> +class TUniversalResponseRef: private TMoveOnly { + friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>; + +public: + explicit TUniversalResponseRef(const NProtoBuf::Message* msg) + : Data_{msg} + { + } + + explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer) + : Data_{buffer} + { + } + +private: + std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_; +}; + +} // namespace NGrpc + +namespace grpc { + +template <typename TMsg> +class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> { +public: + static Status Serialize( + const NGrpc::TUniversalResponse<TMsg>& resp, + ByteBuffer* buffer, + bool* ownBuffer) + { + return std::visit([&](const auto& data) { + using T = std::decay_t<decltype(data)>; + return SerializationTraits<T>::Serialize(data, buffer, ownBuffer); + }, resp.Data_); + } +}; + +template <typename TMsg> +class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> { +public: + static Status Serialize( + const NGrpc::TUniversalResponseRef<TMsg>& resp, + ByteBuffer* buffer, + bool* ownBuffer) + { + return std::visit([&](const auto* data) { + using T = std::decay_t<std::remove_pointer_t<decltype(data)>>; + return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer); + }, resp.Data_); + } +}; + +} // namespace grpc diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 4f4c7412fc..7437b7a8f5 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -15,7 +15,7 @@ #endif -namespace NGrpc { +namespace NGrpc { using NThreading::TFuture; @@ -82,7 +82,7 @@ void TGRpcServer::Start() { service->SetGlobalLimiterHandle(&Limiter_); } - class TKeepAliveOption: public grpc::ServerBuilderOption { + class TKeepAliveOption: public grpc::ServerBuilderOption { public: TKeepAliveOption(int idle, int interval) : Idle(idle) @@ -153,7 +153,7 @@ void TGRpcServer::Start() { size_t index = 0; for (IGRpcServicePtr service : Services_) { // TODO: provide something else for services instead of ServerCompletionQueue - service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); + service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); } if (Options_.UseCompletionQueuePerThread) { @@ -237,4 +237,4 @@ TString TGRpcServer::GetHost() const { return Options_.Host; } -} // namespace NGrpc +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index 24c8caa78d..d6814a90a0 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -1,7 +1,7 @@ #pragma once #include "grpc_request_base.h" -#include "logger.h" +#include "logger.h" #include <library/cpp/threading/future/future.h> @@ -17,7 +17,7 @@ #include <grpc++/grpc++.h> -namespace NGrpc { +namespace NGrpc { constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; @@ -95,9 +95,9 @@ struct TServerOptions { DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr); - //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). - DECLARE_FIELD(Logger, TLoggerPtr, nullptr); - + //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). + DECLARE_FIELD(Logger, TLoggerPtr, nullptr); + #undef DECLARE_FIELD }; @@ -161,11 +161,11 @@ private: using TGlobalLimiter = TInFlightLimiterImpl<i64>; -class IGRpcService: public TThrRefBase { +class IGRpcService: public TThrRefBase { public: virtual grpc::Service* GetService() = 0; virtual void StopService() noexcept = 0; - virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; + virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; virtual bool IsUnsafeToShutdown() const = 0; virtual size_t RequestsInProgress() const = 0; @@ -178,7 +178,7 @@ public: }; template<typename T> -class TGrpcServiceBase: public IGRpcService { +class TGrpcServiceBase: public IGRpcService { public: class TShutdownGuard { using TOwner = TGrpcServiceBase<T>; @@ -353,4 +353,4 @@ private: TGlobalLimiter Limiter_; }; -} // namespace NGrpc +} // namespace NGrpc diff --git a/library/cpp/grpc/server/logger.h b/library/cpp/grpc/server/logger.h index 5e44d83d67..53af26be9c 100644 --- a/library/cpp/grpc/server/logger.h +++ b/library/cpp/grpc/server/logger.h @@ -1,43 +1,43 @@ -#pragma once - -#include <library/cpp/logger/priority.h> - -#include <util/generic/ptr.h> - -namespace NGrpc { - -class TLogger: public TThrRefBase { -protected: - TLogger() = default; - -public: - [[nodiscard]] - bool IsEnabled(ELogPriority priority) const noexcept { - return DoIsEnabled(priority); - } - - void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept { - va_list args; - va_start(args, format); - DoWrite(priority, format, args); - va_end(args); - } - -protected: - virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0; - virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept = 0; -}; - -using TLoggerPtr = TIntrusivePtr<TLogger>; - -#define GRPC_LOG_DEBUG(logger, format, ...) \ - if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \ - logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \ - } else { } - -#define GRPC_LOG_INFO(logger, format, ...) \ - if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \ - logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \ - } else { } - -} // namespace NGrpc +#pragma once + +#include <library/cpp/logger/priority.h> + +#include <util/generic/ptr.h> + +namespace NGrpc { + +class TLogger: public TThrRefBase { +protected: + TLogger() = default; + +public: + [[nodiscard]] + bool IsEnabled(ELogPriority priority) const noexcept { + return DoIsEnabled(priority); + } + + void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept { + va_list args; + va_start(args, format); + DoWrite(priority, format, args); + va_end(args); + } + +protected: + virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0; + virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept = 0; +}; + +using TLoggerPtr = TIntrusivePtr<TLogger>; + +#define GRPC_LOG_DEBUG(logger, format, ...) \ + if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \ + logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \ + } else { } + +#define GRPC_LOG_INFO(logger, format, ...) \ + if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \ + logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \ + } else { } + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/ut/grpc_response_ut.cpp b/library/cpp/grpc/server/ut/grpc_response_ut.cpp index cb66478e94..8abc4e4e0e 100644 --- a/library/cpp/grpc/server/ut/grpc_response_ut.cpp +++ b/library/cpp/grpc/server/ut/grpc_response_ut.cpp @@ -1,88 +1,88 @@ -#include <library/cpp/grpc/server/grpc_response.h> -#include <library/cpp/testing/unittest/registar.h> - -#include <google/protobuf/duration.pb.h> -#include <grpc++/impl/codegen/proto_utils.h> -#include <grpc++/impl/grpc_library.h> - -static ::grpc::internal::GrpcLibraryInitializer grpcInitializer; - -using namespace NGrpc; - -using google::protobuf::Duration; - -Y_UNIT_TEST_SUITE(ResponseTest) { - - template <typename T> - grpc::ByteBuffer Serialize(T resp) { - grpc::ByteBuffer buf; - bool ownBuf = false; - grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf); - UNIT_ASSERT(status.ok()); - return buf; - } - - template <typename T> - T Deserialize(grpc::ByteBuffer* buf) { - T message; - auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message); - UNIT_ASSERT(status.ok()); - return message; - } - - Y_UNIT_TEST(UniversalResponseMsg) { - Duration d1; - d1.set_seconds(12345); - d1.set_nanos(67890); - - auto buf = Serialize(TUniversalResponse<Duration>(&d1)); - Duration d2 = Deserialize<Duration>(&buf); - - UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); - UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); - } - - Y_UNIT_TEST(UniversalResponseBuf) { - Duration d1; - d1.set_seconds(123); - d1.set_nanos(456); - - TString data = d1.SerializeAsString(); - grpc::Slice dataSlice{data.data(), data.size()}; - grpc::ByteBuffer dataBuf{&dataSlice, 1}; - - auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf)); - Duration d2 = Deserialize<Duration>(&buf); - - UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); - UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); - } - - Y_UNIT_TEST(UniversalResponseRefMsg) { - Duration d1; - d1.set_seconds(12345); - d1.set_nanos(67890); - - auto buf = Serialize(TUniversalResponseRef<Duration>(&d1)); - Duration d2 = Deserialize<Duration>(&buf); - - UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); - UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); - } - - Y_UNIT_TEST(UniversalResponseRefBuf) { - Duration d1; - d1.set_seconds(123); - d1.set_nanos(456); - - TString data = d1.SerializeAsString(); - grpc::Slice dataSlice{data.data(), data.size()}; - grpc::ByteBuffer dataBuf{&dataSlice, 1}; - - auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf)); - Duration d2 = Deserialize<Duration>(&buf); - - UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); - UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); - } -} +#include <library/cpp/grpc/server/grpc_response.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <google/protobuf/duration.pb.h> +#include <grpc++/impl/codegen/proto_utils.h> +#include <grpc++/impl/grpc_library.h> + +static ::grpc::internal::GrpcLibraryInitializer grpcInitializer; + +using namespace NGrpc; + +using google::protobuf::Duration; + +Y_UNIT_TEST_SUITE(ResponseTest) { + + template <typename T> + grpc::ByteBuffer Serialize(T resp) { + grpc::ByteBuffer buf; + bool ownBuf = false; + grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf); + UNIT_ASSERT(status.ok()); + return buf; + } + + template <typename T> + T Deserialize(grpc::ByteBuffer* buf) { + T message; + auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message); + UNIT_ASSERT(status.ok()); + return message; + } + + Y_UNIT_TEST(UniversalResponseMsg) { + Duration d1; + d1.set_seconds(12345); + d1.set_nanos(67890); + + auto buf = Serialize(TUniversalResponse<Duration>(&d1)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); + } + + Y_UNIT_TEST(UniversalResponseBuf) { + Duration d1; + d1.set_seconds(123); + d1.set_nanos(456); + + TString data = d1.SerializeAsString(); + grpc::Slice dataSlice{data.data(), data.size()}; + grpc::ByteBuffer dataBuf{&dataSlice, 1}; + + auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); + } + + Y_UNIT_TEST(UniversalResponseRefMsg) { + Duration d1; + d1.set_seconds(12345); + d1.set_nanos(67890); + + auto buf = Serialize(TUniversalResponseRef<Duration>(&d1)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); + } + + Y_UNIT_TEST(UniversalResponseRefBuf) { + Duration d1; + d1.set_seconds(123); + d1.set_nanos(456); + + TString data = d1.SerializeAsString(); + grpc::Slice dataSlice{data.data(), data.size()}; + grpc::ByteBuffer dataBuf{&dataSlice, 1}; + + auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); + } +} diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp index 3457e98bf1..c34d3b8c2b 100644 --- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp +++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp @@ -1,14 +1,14 @@ -#include <library/cpp/grpc/server/grpc_request.h> +#include <library/cpp/grpc/server/grpc_request.h> #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> #include <util/system/thread.h> #include <util/thread/pool.h> -using namespace NGrpc; +using namespace NGrpc; // Here we emulate stream data producer -class TOrderedProducer: public TThread { +class TOrderedProducer: public TThread { public: TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp) : TThread(&ThreadProc, this) diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make index ff6c8fdb7b..feb3291af9 100644 --- a/library/cpp/grpc/server/ut/ya.make +++ b/library/cpp/grpc/server/ut/ya.make @@ -1,19 +1,19 @@ -UNITTEST_FOR(library/cpp/grpc/server) +UNITTEST_FOR(library/cpp/grpc/server) OWNER( dcherednik g:kikimr ) -TIMEOUT(600) -SIZE(MEDIUM) +TIMEOUT(600) +SIZE(MEDIUM) PEERDIR( - library/cpp/grpc/server + library/cpp/grpc/server ) SRCS( - grpc_response_ut.cpp + grpc_response_ut.cpp stream_adaptor_ut.cpp ) diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make index b0f262e5dc..356a1b6793 100644 --- a/library/cpp/grpc/server/ya.make +++ b/library/cpp/grpc/server/ya.make @@ -16,10 +16,10 @@ GENERATE_ENUM_SERIALIZATION(grpc_request_base.h) PEERDIR( contrib/libs/grpc - library/cpp/monlib/dynamic_counters/percentile + library/cpp/monlib/dynamic_counters/percentile ) END() - -RECURSE_FOR_TESTS(ut) - + +RECURSE_FOR_TESTS(ut) + |