diff options
| author | Sergey Polovko <[email protected]> | 2022-02-10 16:47:03 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:47:03 +0300 | 
| commit | 2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch) | |
| tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/grpc/server | |
| parent | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff) | |
Restoring authorship annotation for Sergey Polovko <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
| -rw-r--r-- | library/cpp/grpc/server/actors/logger.cpp | 90 | ||||
| -rw-r--r-- | library/cpp/grpc/server/actors/logger.h | 22 | ||||
| -rw-r--r-- | library/cpp/grpc/server/actors/ya.make | 26 | ||||
| -rw-r--r-- | library/cpp/grpc/server/event_callback.h | 16 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_async_ctx_base.h | 60 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_counters.cpp | 88 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_counters.h | 24 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_request.cpp | 6 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 92 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 18 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_response.h | 178 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 8 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 18 | ||||
| -rw-r--r-- | library/cpp/grpc/server/logger.h | 86 | ||||
| -rw-r--r-- | library/cpp/grpc/server/ut/grpc_response_ut.cpp | 176 | ||||
| -rw-r--r-- | library/cpp/grpc/server/ut/stream_adaptor_ut.cpp | 6 | ||||
| -rw-r--r-- | library/cpp/grpc/server/ut/ya.make | 10 | ||||
| -rw-r--r-- | library/cpp/grpc/server/ya.make | 8 | 
18 files changed, 466 insertions, 466 deletions
| diff --git a/library/cpp/grpc/server/actors/logger.cpp b/library/cpp/grpc/server/actors/logger.cpp index 176675366a5..d8b20425764 100644 --- a/library/cpp/grpc/server/actors/logger.cpp +++ b/library/cpp/grpc/server/actors/logger.cpp @@ -1,45 +1,45 @@ -#include "logger.h"  -  -namespace NGrpc {  -namespace {  -  -static_assert(  -        ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) &&  -        ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG),  -        "log levels in the library/log and library/cpp/actors don't match");  -  -class TActorSystemLogger final: public TLogger {  -public:  -    TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept  -        : ActorSystem_{as}  -        , Component_{component}  -    {  -    }  -  -protected:  -    bool DoIsEnabled(ELogPriority p) const noexcept override {  -        const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings());  -        const auto priority = static_cast<::NActors::NLog::EPriority>(p);  -  -        return settings && settings->Satisfies(priority, Component_, 0);  -    }  -  -    void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override {  -        Y_VERIFY_DEBUG(DoIsEnabled(p));  -  -        const auto priority = static_cast<::NActors::NLog::EPriority>(p);  -        ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args);  -    }  -  -private:  -    NActors::TActorSystem& ActorSystem_;  -    NActors::NLog::EComponent Component_;  -};  -  -} // namespace  -  -TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) {  -    return MakeIntrusive<TActorSystemLogger>(as, component);  -}  -  -} // namespace NGrpc  +#include "logger.h" + +namespace NGrpc { +namespace { + +static_assert( +        ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) && +        ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG), +        "log levels in the library/log and library/cpp/actors don't match"); + +class TActorSystemLogger final: public TLogger { +public: +    TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept +        : ActorSystem_{as} +        , Component_{component} +    { +    } + +protected: +    bool DoIsEnabled(ELogPriority p) const noexcept override { +        const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings()); +        const auto priority = static_cast<::NActors::NLog::EPriority>(p); + +        return settings && settings->Satisfies(priority, Component_, 0); +    } + +    void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override { +        Y_VERIFY_DEBUG(DoIsEnabled(p)); + +        const auto priority = static_cast<::NActors::NLog::EPriority>(p); +        ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args); +    } + +private: +    NActors::TActorSystem& ActorSystem_; +    NActors::NLog::EComponent Component_; +}; + +} // namespace + +TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) { +    return MakeIntrusive<TActorSystemLogger>(as, component); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/actors/logger.h b/library/cpp/grpc/server/actors/logger.h index c066a40add1..abf9270f7b0 100644 --- a/library/cpp/grpc/server/actors/logger.h +++ b/library/cpp/grpc/server/actors/logger.h @@ -1,11 +1,11 @@ -#pragma once  -  -#include <library/cpp/actors/core/actorsystem.h>  -#include <library/cpp/actors/core/log.h>  -#include <library/cpp/grpc/server/logger.h>  -  -namespace NGrpc {  -  -TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component);  -  -} // namespace NGrpc  +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/grpc/server/logger.h> + +namespace NGrpc { + +TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component); + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/actors/ya.make b/library/cpp/grpc/server/actors/ya.make index 072db841424..6c9d80aa458 100644 --- a/library/cpp/grpc/server/actors/ya.make +++ b/library/cpp/grpc/server/actors/ya.make @@ -1,13 +1,13 @@ -LIBRARY()  -  -OWNER(g:kikimr g:solomon)  -  -SRCS(  -    logger.cpp  -)  -  -PEERDIR(  -    library/cpp/actors/core  -)  -  -END()  +LIBRARY() + +OWNER(g:kikimr g:solomon) + +SRCS( +    logger.cpp +) + +PEERDIR( +    library/cpp/actors/core +) + +END() diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h index 13d9bb46b24..d0b700b3c92 100644 --- a/library/cpp/grpc/server/event_callback.h +++ b/library/cpp/grpc/server/event_callback.h @@ -2,7 +2,7 @@  #include "grpc_server.h" -namespace NGrpc {  +namespace NGrpc {  enum class EQueueEventStatus {      OK, @@ -10,7 +10,7 @@ enum class EQueueEventStatus {  };  template<class TCallback> -class TQueueEventCallback: public IQueueEvent {  +class TQueueEventCallback: public IQueueEvent {  public:      TQueueEventCallback(const TCallback& callback)          : Callback(callback) @@ -33,9 +33,9 @@ private:      TCallback Callback;  }; -// Implementation of IQueueEvent that reduces allocations  +// Implementation of IQueueEvent that reduces allocations  template<class TSelf> -class TQueueFixedEvent: private IQueueEvent {  +class TQueueFixedEvent: private IQueueEvent {      using TCallback = void (TSelf::*)(EQueueEventStatus);  public: @@ -44,7 +44,7 @@ public:          , Callback(callback)      { } -    IQueueEvent* Prepare() {  +    IQueueEvent* Prepare() {          Self->Ref();          return this;      } @@ -65,16 +65,16 @@ private:  };  template<class TCallback> -inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) {  +inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) {      return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback));  }  template<class T> -inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) {  +inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) {      using TPtr = TIntrusivePtr<T>;      return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) {          ((*self).*method)(status);      });  } -} // namespace NGrpc  +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h index 65341fa1ad1..51356d4ce5a 100644 --- a/library/cpp/grpc/server/grpc_async_ctx_base.h +++ b/library/cpp/grpc/server/grpc_async_ctx_base.h @@ -5,17 +5,17 @@  #include <util/generic/vector.h>  #include <util/generic/string.h>  #include <util/system/yassert.h> -#include <util/generic/set.h>  +#include <util/generic/set.h>  #include <grpc++/server.h>  #include <grpc++/server_context.h>  #include <chrono> -namespace NGrpc {  +namespace NGrpc {  template<typename TService> -class TBaseAsyncContext: public ICancelableContext {  +class TBaseAsyncContext: public ICancelableContext {  public:      TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)          : Service(service) @@ -29,44 +29,44 @@ public:      TInstant Deadline() const {          // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline -        // right before the request is getting to be send.  -        // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md  -        //  +        // right before the request is getting to be send. +        // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md +        //          // After this timeout calculated back to the deadline on the server side          // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).          // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME          // -  +          std::chrono::system_clock::time_point t = Context.deadline();          if (t == std::chrono::system_clock::time_point::max()) {              return TInstant::Max(); -        }  +        }          auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);          return TInstant::MicroSeconds(us.time_since_epoch().count()); -    }  -  -    TSet<TStringBuf> GetPeerMetaKeys() const {  -        TSet<TStringBuf> keys;  -        for (const auto& [key, _]: Context.client_metadata()) {  -            keys.emplace(key.data(), key.size());  -        }  -        return keys;  -    }  -  -    TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {  +    } + +    TSet<TStringBuf> GetPeerMetaKeys() const { +        TSet<TStringBuf> keys; +        for (const auto& [key, _]: Context.client_metadata()) { +            keys.emplace(key.data(), key.size()); +        } +        return keys; +    } + +    TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {          const auto& clientMetadata = Context.client_metadata(); -        const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});  -        if (range.first == range.second) {  -            return {};  -        }  -  -        TVector<TStringBuf> values;  -        values.reserve(std::distance(range.first, range.second));  -  +        const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()}); +        if (range.first == range.second) { +            return {}; +        } + +        TVector<TStringBuf> values; +        values.reserve(std::distance(range.first, range.second)); +          for (auto it = range.first; it != range.second; ++it) { -            values.emplace_back(it->second.data(), it->second.size());  +            values.emplace_back(it->second.data(), it->second.size());          } -        return values;  +        return values;      }      grpc_compression_level GetCompressionLevel() const { @@ -91,4 +91,4 @@ protected:      grpc::ServerContext Context;  }; -} // namespace NGrpc  +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp index bdd72b32928..fa96e0100b9 100644 --- a/library/cpp/grpc/server/grpc_counters.cpp +++ b/library/cpp/grpc/server/grpc_counters.cpp @@ -1,45 +1,45 @@  #include "grpc_counters.h" -  -namespace NGrpc {  -namespace {  -  -class TFakeCounterBlock final: public ICounterBlock {  -private:  -    void CountNotOkRequest() override {  -    }  -  -    void CountNotOkResponse() override {  -    }  -  -    void CountNotAuthenticated() override {  -    }  -  -    void CountResourceExhausted() override {  -    }  -  -    void CountRequestBytes(ui32 /*requestSize*/) override {  -    }  -  -    void CountResponseBytes(ui32 /*responseSize*/) override {  -    }  -  -    void StartProcessing(ui32 /*requestSize*/) override {  -    }  -  -    void FinishProcessing(  -            ui32 /*requestSize*/,  -            ui32 /*responseSize*/,  -            bool /*ok*/,  -            ui32 /*status*/,  -            TDuration /*requestDuration*/) override  -    {  -    }  -};  -  -} // namespace  -  -ICounterBlockPtr FakeCounterBlock() {  -    return MakeIntrusive<TFakeCounterBlock>();  -}  -  -} // namespace NGrpc  + +namespace NGrpc { +namespace { + +class TFakeCounterBlock final: public ICounterBlock { +private: +    void CountNotOkRequest() override { +    } + +    void CountNotOkResponse() override { +    } + +    void CountNotAuthenticated() override { +    } + +    void CountResourceExhausted() override { +    } + +    void CountRequestBytes(ui32 /*requestSize*/) override { +    } + +    void CountResponseBytes(ui32 /*responseSize*/) override { +    } + +    void StartProcessing(ui32 /*requestSize*/) override { +    } + +    void FinishProcessing( +            ui32 /*requestSize*/, +            ui32 /*responseSize*/, +            bool /*ok*/, +            ui32 /*status*/, +            TDuration /*requestDuration*/) override +    { +    } +}; + +} // namespace + +ICounterBlockPtr FakeCounterBlock() { +    return MakeIntrusive<TFakeCounterBlock>(); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h index a591beb84e0..0b6c36c84cc 100644 --- a/library/cpp/grpc/server/grpc_counters.h +++ b/library/cpp/grpc/server/grpc_counters.h @@ -1,10 +1,10 @@  #pragma once -#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h>  -#include <library/cpp/monlib/dynamic_counters/counters.h>  +#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h> +#include <library/cpp/monlib/dynamic_counters/counters.h>  #include <util/generic/ptr.h> -namespace NGrpc {  +namespace NGrpc {  struct ICounterBlock : public TThrRefBase {      virtual void CountNotOkRequest() = 0; @@ -14,7 +14,7 @@ struct ICounterBlock : public TThrRefBase {      virtual void CountRequestBytes(ui32 requestSize) = 0;      virtual void CountResponseBytes(ui32 responseSize) = 0;      virtual void StartProcessing(ui32 requestSize) = 0; -    virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0;  +    virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0;      virtual void CountRequestsWithoutDatabase() {}      virtual void CountRequestsWithoutToken() {}      virtual void CountRequestWithoutTls() {} @@ -126,11 +126,11 @@ public:  using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>; -/**  - * Creates new instance of ICounterBlock implementation which does nothing.  - *  - * @return new instance  - */  -ICounterBlockPtr FakeCounterBlock();  -  -} // namespace NGrpc  +/** + * Creates new instance of ICounterBlock implementation which does nothing. + * + * @return new instance + */ +ICounterBlockPtr FakeCounterBlock(); + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp index 60db2f230d3..d18a32776f2 100644 --- a/library/cpp/grpc/server/grpc_request.cpp +++ b/library/cpp/grpc/server/grpc_request.cpp @@ -1,10 +1,10 @@  #include "grpc_request.h" -namespace NGrpc {  +namespace NGrpc {  const char* GRPC_USER_AGENT_HEADER = "user-agent"; -class TStreamAdaptor: public IStreamAdaptor {  +class TStreamAdaptor: public IStreamAdaptor {  public:      TStreamAdaptor()          : StreamIsReady_(true) @@ -56,4 +56,4 @@ IStreamAdaptor::TPtr CreateStreamAdaptor() {      return std::make_unique<TStreamAdaptor>();  } -} // namespace NGrpc  +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index dd9041eec7f..5bd8d3902b5 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -4,19 +4,19 @@  #include <google/protobuf/arena.h>  #include <google/protobuf/message.h> -#include <library/cpp/monlib/dynamic_counters/counters.h>  +#include <library/cpp/monlib/dynamic_counters/counters.h>  #include <library/cpp/logger/priority.h> -#include "grpc_response.h"  +#include "grpc_response.h"  #include "event_callback.h" -#include "grpc_async_ctx_base.h"  -#include "grpc_counters.h"  +#include "grpc_async_ctx_base.h" +#include "grpc_counters.h"  #include "grpc_request_base.h"  #include "grpc_server.h" -#include "logger.h"  +#include "logger.h" + +#include <util/system/hp_timer.h> -#include <util/system/hp_timer.h>  -   #include <grpc++/server.h>  #include <grpc++/server_context.h>  #include <grpc++/support/async_stream.h> @@ -24,7 +24,7 @@  #include <grpc++/support/byte_buffer.h>  #include <grpc++/impl/codegen/async_stream.h> -namespace NGrpc {  +namespace NGrpc {  class IStreamAdaptor {  public: @@ -57,7 +57,7 @@ public:                   grpc::ServerCompletionQueue* cq,                   TOnRequest cb,                   TRequestCallback requestCallback, -                 const char* name,  +                 const char* name,                   TLoggerPtr logger,                   ICounterBlockPtr counters,                   IGRpcRequestLimiterPtr limiter) @@ -67,10 +67,10 @@ public:          , RequestCallback_(requestCallback)          , StreamRequestCallback_(nullptr)          , Name_(name) -        , Logger_(std::move(logger))  +        , Logger_(std::move(logger))          , Counters_(std::move(counters))          , RequestLimiter_(std::move(limiter)) -        , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))  +        , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))          , StateFunc_(&TThis::SetRequestDone)      {          AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); @@ -85,7 +85,7 @@ public:                   grpc::ServerCompletionQueue* cq,                   TOnRequest cb,                   TStreamRequestCallback requestCallback, -                 const char* name,  +                 const char* name,                   TLoggerPtr logger,                   ICounterBlockPtr counters,                   IGRpcRequestLimiterPtr limiter) @@ -95,7 +95,7 @@ public:          , RequestCallback_(nullptr)          , StreamRequestCallback_(requestCallback)          , Name_(name) -        , Logger_(std::move(logger))  +        , Logger_(std::move(logger))          , Counters_(std::move(counters))          , RequestLimiter_(std::move(limiter))          , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) @@ -157,13 +157,13 @@ public:      TInstant Deadline() const override {          return TBaseAsyncContext<TService>::Deadline(); -    }  -  -    TSet<TStringBuf> GetPeerMetaKeys() const override {  -        return TBaseAsyncContext<TService>::GetPeerMetaKeys();  -    }  -  -    TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {  +    } + +    TSet<TStringBuf> GetPeerMetaKeys() const override { +        return TBaseAsyncContext<TService>::GetPeerMetaKeys(); +    } + +    TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {          return TBaseAsyncContext<TService>::GetPeerMetaValues(key);      } @@ -233,10 +233,10 @@ private:          if (!Server_->IsShuttingDown()) {              if (RequestCallback_) {                  MakeIntrusive<TThis>( -                    Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();  +                    Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();              } else {                  MakeIntrusive<TThis>( -                    Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();  +                    Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();              }          }      } @@ -257,20 +257,20 @@ private:              StateFunc_ = &TThis::SetFinishDone;              ResponseSize = sz;              Y_VERIFY(this->Context.c_call()); -            Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());  +            Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());          } else {              GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",                  this, Name_, makeResponseString().data(), this->Context.peer().c_str()); -  -            // because of std::function cannot hold move-only captured object  -            // we allocate shared object on heap to avoid message copy  -            auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);  -            auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {  + +            // because of std::function cannot hold move-only captured object +            // we allocate shared object on heap to avoid message copy +            auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); +            auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {                  GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",                      this, Name_, makeResponseString().data(), this->Context.peer().c_str());                  StateFunc_ = &TThis::NextReply;                  ResponseSize += sz; -                StreamWriter_->Write(*uResp, GetGRpcTag());  +                StreamWriter_->Write(*uResp, GetGRpcTag());              };              StreamAdaptor_->Enqueue(std::move(cb), false);          } @@ -283,20 +283,20 @@ private:                  this->Context.peer().c_str());              StateFunc_ = &TThis::SetFinishDone;              ResponseSize = sz; -            Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());  +            Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());          } else {              GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,                  this->Context.peer().c_str()); -  -            // because of std::function cannot hold move-only captured object  -            // we allocate shared object on heap to avoid buffer copy  -            auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);  -            auto cb = [this, uResp = std::move(uResp), sz]() {  + +            // because of std::function cannot hold move-only captured object +            // we allocate shared object on heap to avoid buffer copy +            auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); +            auto cb = [this, uResp = std::move(uResp), sz]() {                  GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",                      this, Name_, this->Context.peer().c_str());                  StateFunc_ = &TThis::NextReply;                  ResponseSize += sz; -                StreamWriter_->Write(*uResp, GetGRpcTag());  +                StreamWriter_->Write(*uResp, GetGRpcTag());              };              StreamAdaptor_->Enqueue(std::move(cb), false);          } @@ -314,8 +314,8 @@ private:              GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,                  Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);              StateFunc_ = &TThis::SetFinishError; -            TOut resp;  -            Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());  +            TOut resp; +            Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());          } else {              GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"                                      " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); @@ -380,7 +380,7 @@ private:              }              auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket"));              if (maybeToken.empty() || maybeToken[0].empty()) { -                TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};  +                TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};                  Counters_->CountRequestsWithoutToken();                  GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "                      "Name# %s data# %s peer# %s database# %s", this, Name_, @@ -484,12 +484,12 @@ private:      TOnRequest Cb_;      TRequestCallback RequestCallback_;      TStreamRequestCallback StreamRequestCallback_; -    const char* const Name_;  +    const char* const Name_;      TLoggerPtr Logger_;      ICounterBlockPtr Counters_;      IGRpcRequestLimiterPtr RequestLimiter_; -    THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;  +    THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;      THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;      TStateFunc StateFunc_;      TIn* Request_; @@ -520,10 +520,10 @@ public:                   typename TBase::TOnRequest cb,                   typename TBase::TRequestCallback requestCallback,                   const char* name, -                 TLoggerPtr logger,  +                 TLoggerPtr logger,                   ICounterBlockPtr counters,                   IGRpcRequestLimiterPtr limiter = nullptr) -        : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}  +        : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}      {      } @@ -533,11 +533,11 @@ public:                   typename TBase::TOnRequest cb,                   typename TBase::TStreamRequestCallback requestCallback,                   const char* name, -                 TLoggerPtr logger,  +                 TLoggerPtr logger,                   ICounterBlockPtr counters) -        : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}  +        : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}      {      }  }; -} // namespace NGrpc  +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index b61cf553aa7..fcfce1c181a 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -9,7 +9,7 @@ namespace grpc {  class ByteBuffer;  } -namespace NGrpc {  +namespace NGrpc {  extern const char* GRPC_USER_AGENT_HEADER; @@ -30,7 +30,7 @@ struct TAuthState {  //! An interface that may be used to limit concurrency of requests -class IGRpcRequestLimiter: public TThrRefBase {  +class IGRpcRequestLimiter: public TThrRefBase {  public:      virtual bool IncRequest() = 0;      virtual void DecRequest() = 0; @@ -39,7 +39,7 @@ public:  using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>;  //! State of current request -class IRequestContextBase: public TThrRefBase {  +class IRequestContextBase: public TThrRefBase {  public:      enum class EFinishStatus {          OK, @@ -72,12 +72,12 @@ public:      //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise      virtual TInstant Deadline() const = 0; -  -    //! Returns available peer metadata keys  -    virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0;  -  + +    //! Returns available peer metadata keys +    virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0; +      //! Returns peer optional metavalue -    virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0;  +    virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0;      //! Returns request compression level      virtual grpc_compression_level GetCompressionLevel() const = 0; @@ -113,4 +113,4 @@ public:      virtual bool SslServer() const = 0;  }; -} // namespace NGrpc  +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h index 47b22c28d09..8e9afe44d53 100644 --- a/library/cpp/grpc/server/grpc_response.h +++ b/library/cpp/grpc/server/grpc_response.h @@ -1,90 +1,90 @@ -#pragma once  -  -#include <grpc++/impl/codegen/byte_buffer.h>  -#include <grpc++/impl/codegen/proto_utils.h>  -  -#include <variant>  -  -namespace NGrpc {  -  -/**  - * Universal response that owns underlying message or buffer.  - */  -template <typename TMsg>  +#pragma once + +#include <grpc++/impl/codegen/byte_buffer.h> +#include <grpc++/impl/codegen/proto_utils.h> + +#include <variant> + +namespace NGrpc { + +/** + * Universal response that owns underlying message or buffer. + */ +template <typename TMsg>  class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly { -    friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>;  -  -public:  -    explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept  -        : Data_{TMsg{}}  -    {  -        std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg));  -    }  -  -    explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept  -        : Data_{grpc::ByteBuffer{}}  -    {  -        std::get<grpc::ByteBuffer>(Data_).Swap(buffer);  -    }  -  -private:  -    std::variant<TMsg, grpc::ByteBuffer> Data_;  -};  -  -/**  - * Universal response that only keeps reference to underlying message or buffer.  - */  -template <typename TMsg>  -class TUniversalResponseRef: private TMoveOnly {  -    friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>;  -  -public:  -    explicit TUniversalResponseRef(const NProtoBuf::Message* msg)  -        : Data_{msg}  -    {  -    }  -  -    explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer)  -        : Data_{buffer}  -    {  -    }  -  -private:  -    std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_;  -};  -  -} // namespace NGrpc  -  -namespace grpc {  -  -template <typename TMsg>  -class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> {  -public:  -    static Status Serialize(  -            const NGrpc::TUniversalResponse<TMsg>& resp,  -            ByteBuffer* buffer,  -            bool* ownBuffer)  -    {  -        return std::visit([&](const auto& data) {  -            using T = std::decay_t<decltype(data)>;  -            return SerializationTraits<T>::Serialize(data, buffer, ownBuffer);  -        }, resp.Data_);  -    }  -};  -  -template <typename TMsg>  -class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> {  -public:  -    static Status Serialize(  -        const NGrpc::TUniversalResponseRef<TMsg>& resp,  -        ByteBuffer* buffer,  -        bool* ownBuffer)  -    {  -        return std::visit([&](const auto* data) {  -            using T = std::decay_t<std::remove_pointer_t<decltype(data)>>;  -            return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer);  -        }, resp.Data_);  -    }  -};  -  -} // namespace grpc  +    friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>; + +public: +    explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept +        : Data_{TMsg{}} +    { +        std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg)); +    } + +    explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept +        : Data_{grpc::ByteBuffer{}} +    { +        std::get<grpc::ByteBuffer>(Data_).Swap(buffer); +    } + +private: +    std::variant<TMsg, grpc::ByteBuffer> Data_; +}; + +/** + * Universal response that only keeps reference to underlying message or buffer. + */ +template <typename TMsg> +class TUniversalResponseRef: private TMoveOnly { +    friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>; + +public: +    explicit TUniversalResponseRef(const NProtoBuf::Message* msg) +        : Data_{msg} +    { +    } + +    explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer) +        : Data_{buffer} +    { +    } + +private: +    std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_; +}; + +} // namespace NGrpc + +namespace grpc { + +template <typename TMsg> +class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> { +public: +    static Status Serialize( +            const NGrpc::TUniversalResponse<TMsg>& resp, +            ByteBuffer* buffer, +            bool* ownBuffer) +    { +        return std::visit([&](const auto& data) { +            using T = std::decay_t<decltype(data)>; +            return SerializationTraits<T>::Serialize(data, buffer, ownBuffer); +        }, resp.Data_); +    } +}; + +template <typename TMsg> +class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> { +public: +    static Status Serialize( +        const NGrpc::TUniversalResponseRef<TMsg>& resp, +        ByteBuffer* buffer, +        bool* ownBuffer) +    { +        return std::visit([&](const auto* data) { +            using T = std::decay_t<std::remove_pointer_t<decltype(data)>>; +            return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer); +        }, resp.Data_); +    } +}; + +} // namespace grpc diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 4f4c7412fcd..7437b7a8f5e 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -15,7 +15,7 @@  #endif -namespace NGrpc {  +namespace NGrpc {  using NThreading::TFuture; @@ -82,7 +82,7 @@ void TGRpcServer::Start() {          service->SetGlobalLimiterHandle(&Limiter_);      } -    class TKeepAliveOption: public grpc::ServerBuilderOption {  +    class TKeepAliveOption: public grpc::ServerBuilderOption {      public:          TKeepAliveOption(int idle, int interval)              : Idle(idle) @@ -153,7 +153,7 @@ void TGRpcServer::Start() {      size_t index = 0;      for (IGRpcServicePtr service : Services_) {          // TODO: provide something else for services instead of ServerCompletionQueue -        service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);  +        service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);      }      if (Options_.UseCompletionQueuePerThread) { @@ -237,4 +237,4 @@ TString TGRpcServer::GetHost() const {      return Options_.Host;  } -} // namespace NGrpc  +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index 24c8caa78d1..d6814a90a0d 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -1,7 +1,7 @@  #pragma once  #include "grpc_request_base.h" -#include "logger.h"  +#include "logger.h"  #include <library/cpp/threading/future/future.h> @@ -17,7 +17,7 @@  #include <grpc++/grpc++.h> -namespace NGrpc {  +namespace NGrpc {  constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; @@ -95,9 +95,9 @@ struct TServerOptions {      DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr); -    //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).  -    DECLARE_FIELD(Logger, TLoggerPtr, nullptr);  -  +    //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). +    DECLARE_FIELD(Logger, TLoggerPtr, nullptr); +  #undef DECLARE_FIELD  }; @@ -161,11 +161,11 @@ private:  using TGlobalLimiter = TInFlightLimiterImpl<i64>; -class IGRpcService: public TThrRefBase {  +class IGRpcService: public TThrRefBase {  public:      virtual grpc::Service* GetService() = 0;      virtual void StopService() noexcept = 0; -    virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;  +    virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;      virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;      virtual bool IsUnsafeToShutdown() const = 0;      virtual size_t RequestsInProgress() const = 0; @@ -178,7 +178,7 @@ public:  };  template<typename T> -class TGrpcServiceBase: public IGRpcService {  +class TGrpcServiceBase: public IGRpcService {  public:      class TShutdownGuard {          using TOwner = TGrpcServiceBase<T>; @@ -353,4 +353,4 @@ private:      TGlobalLimiter Limiter_;  }; -} // namespace NGrpc  +} // namespace NGrpc diff --git a/library/cpp/grpc/server/logger.h b/library/cpp/grpc/server/logger.h index 5e44d83d67b..53af26be9c5 100644 --- a/library/cpp/grpc/server/logger.h +++ b/library/cpp/grpc/server/logger.h @@ -1,43 +1,43 @@ -#pragma once  -  -#include <library/cpp/logger/priority.h>  -  -#include <util/generic/ptr.h>  -  -namespace NGrpc {  -  -class TLogger: public TThrRefBase {  -protected:  -    TLogger() = default;  -  -public:  -    [[nodiscard]]  -    bool IsEnabled(ELogPriority priority) const noexcept {  -        return DoIsEnabled(priority);  -    }  -  -    void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept {  -        va_list args;  -        va_start(args, format);  -        DoWrite(priority, format, args);  -        va_end(args);  -    }  -  -protected:  -    virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0;  -    virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept  = 0;  -};  -  -using TLoggerPtr = TIntrusivePtr<TLogger>;  -  -#define GRPC_LOG_DEBUG(logger, format, ...) \  -    if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \  -        logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \  -    } else { }  -  -#define GRPC_LOG_INFO(logger, format, ...) \  -    if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \  -        logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \  -    } else { }  -  -} // namespace NGrpc  +#pragma once + +#include <library/cpp/logger/priority.h> + +#include <util/generic/ptr.h> + +namespace NGrpc { + +class TLogger: public TThrRefBase { +protected: +    TLogger() = default; + +public: +    [[nodiscard]] +    bool IsEnabled(ELogPriority priority) const noexcept { +        return DoIsEnabled(priority); +    } + +    void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept { +        va_list args; +        va_start(args, format); +        DoWrite(priority, format, args); +        va_end(args); +    } + +protected: +    virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0; +    virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept  = 0; +}; + +using TLoggerPtr = TIntrusivePtr<TLogger>; + +#define GRPC_LOG_DEBUG(logger, format, ...) \ +    if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \ +        logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \ +    } else { } + +#define GRPC_LOG_INFO(logger, format, ...) \ +    if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \ +        logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \ +    } else { } + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/ut/grpc_response_ut.cpp b/library/cpp/grpc/server/ut/grpc_response_ut.cpp index cb66478e940..8abc4e4e0ec 100644 --- a/library/cpp/grpc/server/ut/grpc_response_ut.cpp +++ b/library/cpp/grpc/server/ut/grpc_response_ut.cpp @@ -1,88 +1,88 @@ -#include <library/cpp/grpc/server/grpc_response.h>  -#include <library/cpp/testing/unittest/registar.h>  -  -#include <google/protobuf/duration.pb.h>  -#include <grpc++/impl/codegen/proto_utils.h>  -#include <grpc++/impl/grpc_library.h>  -  -static ::grpc::internal::GrpcLibraryInitializer grpcInitializer;  -  -using namespace NGrpc;  -  -using google::protobuf::Duration;  -  -Y_UNIT_TEST_SUITE(ResponseTest) {  -  -    template <typename T>  -    grpc::ByteBuffer Serialize(T resp) {  -        grpc::ByteBuffer buf;  -        bool ownBuf = false;  -        grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf);  -        UNIT_ASSERT(status.ok());  -        return buf;  -    }  -  -    template <typename T>  -    T Deserialize(grpc::ByteBuffer* buf) {  -        T message;  -        auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message);  -        UNIT_ASSERT(status.ok());  -        return message;  -    }  -  -    Y_UNIT_TEST(UniversalResponseMsg) {  -        Duration d1;  -        d1.set_seconds(12345);  -        d1.set_nanos(67890);  -  -        auto buf = Serialize(TUniversalResponse<Duration>(&d1));  -        Duration d2 = Deserialize<Duration>(&buf);  -  -        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);  -        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);  -    }  -  -    Y_UNIT_TEST(UniversalResponseBuf) {  -        Duration d1;  -        d1.set_seconds(123);  -        d1.set_nanos(456);  -  -        TString data = d1.SerializeAsString();  -        grpc::Slice dataSlice{data.data(), data.size()};  -        grpc::ByteBuffer dataBuf{&dataSlice, 1};  -  -        auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf));  -        Duration d2 = Deserialize<Duration>(&buf);  -  -        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);  -        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);  -    }  -  -    Y_UNIT_TEST(UniversalResponseRefMsg) {  -        Duration d1;  -        d1.set_seconds(12345);  -        d1.set_nanos(67890);  -  -        auto buf = Serialize(TUniversalResponseRef<Duration>(&d1));  -        Duration d2 = Deserialize<Duration>(&buf);  -  -        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);  -        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);  -    }  -  -    Y_UNIT_TEST(UniversalResponseRefBuf) {  -        Duration d1;  -        d1.set_seconds(123);  -        d1.set_nanos(456);  -  -        TString data = d1.SerializeAsString();  -        grpc::Slice dataSlice{data.data(), data.size()};  -        grpc::ByteBuffer dataBuf{&dataSlice, 1};  -  -        auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf));  -        Duration d2 = Deserialize<Duration>(&buf);  -  -        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);  -        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);  -    }  -}  +#include <library/cpp/grpc/server/grpc_response.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <google/protobuf/duration.pb.h> +#include <grpc++/impl/codegen/proto_utils.h> +#include <grpc++/impl/grpc_library.h> + +static ::grpc::internal::GrpcLibraryInitializer grpcInitializer; + +using namespace NGrpc; + +using google::protobuf::Duration; + +Y_UNIT_TEST_SUITE(ResponseTest) { + +    template <typename T> +    grpc::ByteBuffer Serialize(T resp) { +        grpc::ByteBuffer buf; +        bool ownBuf = false; +        grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf); +        UNIT_ASSERT(status.ok()); +        return buf; +    } + +    template <typename T> +    T Deserialize(grpc::ByteBuffer* buf) { +        T message; +        auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message); +        UNIT_ASSERT(status.ok()); +        return message; +    } + +    Y_UNIT_TEST(UniversalResponseMsg) { +        Duration d1; +        d1.set_seconds(12345); +        d1.set_nanos(67890); + +        auto buf = Serialize(TUniversalResponse<Duration>(&d1)); +        Duration d2 = Deserialize<Duration>(&buf); + +        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); +        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); +    } + +    Y_UNIT_TEST(UniversalResponseBuf) { +        Duration d1; +        d1.set_seconds(123); +        d1.set_nanos(456); + +        TString data = d1.SerializeAsString(); +        grpc::Slice dataSlice{data.data(), data.size()}; +        grpc::ByteBuffer dataBuf{&dataSlice, 1}; + +        auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf)); +        Duration d2 = Deserialize<Duration>(&buf); + +        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); +        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); +    } + +    Y_UNIT_TEST(UniversalResponseRefMsg) { +        Duration d1; +        d1.set_seconds(12345); +        d1.set_nanos(67890); + +        auto buf = Serialize(TUniversalResponseRef<Duration>(&d1)); +        Duration d2 = Deserialize<Duration>(&buf); + +        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); +        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); +    } + +    Y_UNIT_TEST(UniversalResponseRefBuf) { +        Duration d1; +        d1.set_seconds(123); +        d1.set_nanos(456); + +        TString data = d1.SerializeAsString(); +        grpc::Slice dataSlice{data.data(), data.size()}; +        grpc::ByteBuffer dataBuf{&dataSlice, 1}; + +        auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf)); +        Duration d2 = Deserialize<Duration>(&buf); + +        UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); +        UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); +    } +} diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp index 3457e98bf18..c34d3b8c2bf 100644 --- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp +++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp @@ -1,14 +1,14 @@ -#include <library/cpp/grpc/server/grpc_request.h>  +#include <library/cpp/grpc/server/grpc_request.h>  #include <library/cpp/testing/unittest/registar.h>  #include <library/cpp/testing/unittest/tests_data.h>  #include <util/system/thread.h>  #include <util/thread/pool.h> -using namespace NGrpc;  +using namespace NGrpc;  // Here we emulate stream data producer -class TOrderedProducer: public TThread {  +class TOrderedProducer: public TThread {  public:      TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)          : TThread(&ThreadProc, this) diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make index ff6c8fdb7b7..feb3291af92 100644 --- a/library/cpp/grpc/server/ut/ya.make +++ b/library/cpp/grpc/server/ut/ya.make @@ -1,19 +1,19 @@ -UNITTEST_FOR(library/cpp/grpc/server)  +UNITTEST_FOR(library/cpp/grpc/server)  OWNER(      dcherednik      g:kikimr  ) -TIMEOUT(600)  -SIZE(MEDIUM)  +TIMEOUT(600) +SIZE(MEDIUM)  PEERDIR( -    library/cpp/grpc/server  +    library/cpp/grpc/server  )  SRCS( -    grpc_response_ut.cpp  +    grpc_response_ut.cpp      stream_adaptor_ut.cpp  ) diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make index b0f262e5dc6..356a1b6793d 100644 --- a/library/cpp/grpc/server/ya.make +++ b/library/cpp/grpc/server/ya.make @@ -16,10 +16,10 @@ GENERATE_ENUM_SERIALIZATION(grpc_request_base.h)  PEERDIR(      contrib/libs/grpc -    library/cpp/monlib/dynamic_counters/percentile  +    library/cpp/monlib/dynamic_counters/percentile  )  END() -  -RECURSE_FOR_TESTS(ut)  -  + +RECURSE_FOR_TESTS(ut) + | 
