diff options
| author | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:16 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:16 +0300 | 
| commit | 17e20fa084178ddcb16255f974dbde74fb93608b (patch) | |
| tree | 39605336c0b4d33928df69a256102c515fdf6ff5 /library/cpp/grpc | |
| parent | 97df5ca7413550bf233fc6c7210e292fca0a51af (diff) | |
Restoring authorship annotation for Daniil Cherednik <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc')
| -rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 414 | ||||
| -rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 1122 | ||||
| -rw-r--r-- | library/cpp/grpc/client/grpc_common.h | 88 | ||||
| -rw-r--r-- | library/cpp/grpc/client/ya.make | 4 | ||||
| -rw-r--r-- | library/cpp/grpc/server/event_callback.cpp | 2 | ||||
| -rw-r--r-- | library/cpp/grpc/server/event_callback.h | 144 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_async_ctx_base.h | 122 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_counters.cpp | 2 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_counters.h | 134 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_request.cpp | 112 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 696 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 152 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_response.h | 2 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 290 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 404 | ||||
| -rw-r--r-- | library/cpp/grpc/server/ut/stream_adaptor_ut.cpp | 230 | ||||
| -rw-r--r-- | library/cpp/grpc/server/ut/ya.make | 32 | ||||
| -rw-r--r-- | library/cpp/grpc/server/ya.make | 42 | 
18 files changed, 1996 insertions, 1996 deletions
| diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index 73cc908ef82..ee9e997fa78 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -1,164 +1,164 @@ -#include "grpc_client_low.h" -#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> -#include <contrib/libs/grpc/include/grpc/support/log.h> - -#include <library/cpp/containers/stack_vector/stack_vec.h> - -#include <util/string/printf.h> +#include "grpc_client_low.h"  +#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>  +#include <contrib/libs/grpc/include/grpc/support/log.h>  +  +#include <library/cpp/containers/stack_vector/stack_vec.h>  +  +#include <util/string/printf.h>   #include <util/system/thread.h>  #include <util/random/random.h> - -#if !defined(_WIN32) && !defined(_WIN64) -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#endif - +  +#if !defined(_WIN32) && !defined(_WIN64)  +#include <sys/types.h>  +#include <sys/socket.h>  +#include <netinet/in.h>  +#include <netinet/tcp.h>  +#endif  +   namespace NGrpc { - -void EnableGRpcTracing() { -    grpc_tracer_set_enabled("tcp", true); -    grpc_tracer_set_enabled("client_channel", true); -    grpc_tracer_set_enabled("channel", true); -    grpc_tracer_set_enabled("api", true); -    grpc_tracer_set_enabled("connectivity_state", true); -    grpc_tracer_set_enabled("handshaker", true); -    grpc_tracer_set_enabled("http", true); -    grpc_tracer_set_enabled("http2_stream_state", true); -    grpc_tracer_set_enabled("op_failure", true); -    grpc_tracer_set_enabled("timer", true); -    gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); -} - -class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { -public: -    TGRpcKeepAliveSocketMutator(int idle, int count, int interval) -        : Idle_(idle) -        , Count_(count) -        , Interval_(interval) -    { -        grpc_socket_mutator_init(this, &VTable); -    } -private: -    static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { -        return static_cast<TGRpcKeepAliveSocketMutator*>(mutator); -    } - -    template<typename TVal> -    bool SetOption(int fd, int level, int optname, const TVal& value) { -        return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0; -    } -    bool SetOption(int fd) { -        if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { -            Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl; -            return false; -        } -#ifdef _linux_ -        if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { -            Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl; -            return false; -        } -        if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { -            Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl; -            return false; -        } -        if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { -            Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl; -            return false; -        } -#endif -        return true; -    } -    static bool Mutate(int fd, grpc_socket_mutator* mutator) { -        auto self = Cast(mutator); -        return self->SetOption(fd); -    } -    static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { -        const auto* selfA = Cast(a); -        const auto* selfB = Cast(b); -        auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); -        auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); -        return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; -    } -    static void Destroy(grpc_socket_mutator* mutator) { -        delete Cast(mutator); -    } - -    static grpc_socket_mutator_vtable VTable; -    const int Idle_; -    const int Count_; -    const int Interval_; -}; - -grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = -    { -        &TGRpcKeepAliveSocketMutator::Mutate, -        &TGRpcKeepAliveSocketMutator::Compare, -        &TGRpcKeepAliveSocketMutator::Destroy -    }; - +  +void EnableGRpcTracing() {  +    grpc_tracer_set_enabled("tcp", true);  +    grpc_tracer_set_enabled("client_channel", true);  +    grpc_tracer_set_enabled("channel", true);  +    grpc_tracer_set_enabled("api", true);  +    grpc_tracer_set_enabled("connectivity_state", true);  +    grpc_tracer_set_enabled("handshaker", true);  +    grpc_tracer_set_enabled("http", true);  +    grpc_tracer_set_enabled("http2_stream_state", true);  +    grpc_tracer_set_enabled("op_failure", true);  +    grpc_tracer_set_enabled("timer", true);  +    gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);  +}  +  +class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {  +public:  +    TGRpcKeepAliveSocketMutator(int idle, int count, int interval)  +        : Idle_(idle)  +        , Count_(count)  +        , Interval_(interval)  +    {  +        grpc_socket_mutator_init(this, &VTable);  +    }  +private:  +    static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {  +        return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);  +    }  +  +    template<typename TVal>  +    bool SetOption(int fd, int level, int optname, const TVal& value) {  +        return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;  +    }  +    bool SetOption(int fd) {  +        if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {  +            Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;  +            return false;  +        }  +#ifdef _linux_  +        if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {  +            Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;  +            return false;  +        }  +        if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {  +            Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;  +            return false;  +        }  +        if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {  +            Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;  +            return false;  +        }  +#endif  +        return true;  +    }  +    static bool Mutate(int fd, grpc_socket_mutator* mutator) {  +        auto self = Cast(mutator);  +        return self->SetOption(fd);  +    }  +    static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {  +        const auto* selfA = Cast(a);  +        const auto* selfB = Cast(b);  +        auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);  +        auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);  +        return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;  +    }  +    static void Destroy(grpc_socket_mutator* mutator) {  +        delete Cast(mutator);  +    }  +  +    static grpc_socket_mutator_vtable VTable;  +    const int Idle_;  +    const int Count_;  +    const int Interval_;  +};  +  +grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =  +    {  +        &TGRpcKeepAliveSocketMutator::Mutate,  +        &TGRpcKeepAliveSocketMutator::Compare,  +        &TGRpcKeepAliveSocketMutator::Destroy  +    };  +   TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) -    : TcpKeepAliveSettings_(tcpKeepAliveSettings) +    : TcpKeepAliveSettings_(tcpKeepAliveSettings)       , ExpireTime_(expireTime)      , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) -{} - -void TChannelPool::GetStubsHolderLocked( -    const TString& channelId, -    const TGRpcClientConfig& config, -    std::function<void(TStubsHolder&)> cb) -{ -    { -        std::shared_lock readGuard(RWMutex_); -        const auto it = Pool_.find(channelId); -        if (it != Pool_.end()) { +{}  +  +void TChannelPool::GetStubsHolderLocked(  +    const TString& channelId,  +    const TGRpcClientConfig& config,  +    std::function<void(TStubsHolder&)> cb)  +{  +    {  +        std::shared_lock readGuard(RWMutex_);  +        const auto it = Pool_.find(channelId);  +        if (it != Pool_.end()) {               if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { -                return cb(it->second); -            } -        } -    } -    { -        std::unique_lock writeGuard(RWMutex_); -        { -            auto it = Pool_.find(channelId); -            if (it != Pool_.end()) { -                if (!it->second.IsChannelBroken()) { +                return cb(it->second);  +            }  +        }  +    }  +    {  +        std::unique_lock writeGuard(RWMutex_);  +        {  +            auto it = Pool_.find(channelId);  +            if (it != Pool_.end()) {  +                if (!it->second.IsChannelBroken()) {                       EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);                      auto now = Now();                      LastUsedQueue_.emplace(now, channelId);                      it->second.SetLastUseTime(now); -                    return cb(it->second); -                } else { -                    // This channel can't be used. Remove from pool to create new one +                    return cb(it->second);  +                } else {  +                    // This channel can't be used. Remove from pool to create new one                       EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); -                    Pool_.erase(it); -                } -            } -        } -        TGRpcKeepAliveSocketMutator* mutator = nullptr; -        // will be destroyed inside grpc -        if (TcpKeepAliveSettings_.Enabled) { -            mutator = new TGRpcKeepAliveSocketMutator( -                TcpKeepAliveSettings_.Idle, -                TcpKeepAliveSettings_.Count, -                TcpKeepAliveSettings_.Interval -            ); -        } -        cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); +                    Pool_.erase(it);  +                }  +            }  +        }  +        TGRpcKeepAliveSocketMutator* mutator = nullptr;  +        // will be destroyed inside grpc  +        if (TcpKeepAliveSettings_.Enabled) {  +            mutator = new TGRpcKeepAliveSocketMutator(  +                TcpKeepAliveSettings_.Idle,  +                TcpKeepAliveSettings_.Count,  +                TcpKeepAliveSettings_.Interval  +            );  +        }  +        cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);           LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); -    } -} - -void TChannelPool::DeleteChannel(const TString& channelId) { -    std::unique_lock writeLock(RWMutex_); +    }  +}  +  +void TChannelPool::DeleteChannel(const TString& channelId) {  +    std::unique_lock writeLock(RWMutex_);       auto poolIt = Pool_.find(channelId);      if (poolIt != Pool_.end()) {          EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);          Pool_.erase(poolIt);      } -} - +}  +   void TChannelPool::DeleteExpiredStubsHolders() {      std::unique_lock writeLock(RWMutex_);      auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); @@ -392,7 +392,7 @@ private:      // Some children are stored inline, others are in a set      std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; -    std::unordered_set<TContextImpl*> Children; +    std::unordered_set<TContextImpl*> Children;       // Single callback is stored without extra allocations      TStackVec<TCallback, 1> Callbacks; @@ -404,10 +404,10 @@ private:  TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)      : UseCompletionQueuePerThread_(useCompletionQueuePerThread)  { -    Init(numWorkerThread); -} - -void TGRpcClientLow::Init(size_t numWorkerThread) { +    Init(numWorkerThread);  +}  +  +void TGRpcClientLow::Init(size_t numWorkerThread) {       SetCqState(WORKING);      if (UseCompletionQueuePerThread_) {          for (size_t i = 0; i < numWorkerThread; i++) { @@ -425,9 +425,9 @@ void TGRpcClientLow::Init(size_t numWorkerThread) {                  PullEvents(cq);              }).Release());          } -    } -} - +    }  +}  +   void TGRpcClientLow::AddWorkerThreadForTest() {      if (UseCompletionQueuePerThread_) {          CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); @@ -453,17 +453,17 @@ void TGRpcClientLow::Stop(bool wait) {      if (wait) {          WaitInternal(); -    } -} - +    }  +}  +   void TGRpcClientLow::StopInternal(bool silent) {      bool shutdown;      TVector<TContextImpl::TContextPtr> cancelQueue; -    { -        std::unique_lock<std::mutex> guard(Mtx_); - +    {  +        std::unique_lock<std::mutex> guard(Mtx_);  +           auto allowStateChange = [&]() {              switch (GetCqState()) {                  case WORKING: @@ -484,7 +484,7 @@ void TGRpcClientLow::StopInternal(bool silent) {          SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); -        if (!silent && !Contexts_.empty()) { +        if (!silent && !Contexts_.empty()) {               cancelQueue.reserve(Contexts_.size());              for (auto* ptr : Contexts_) {                  // N.B. some contexts may be stuck in destructors @@ -494,7 +494,7 @@ void TGRpcClientLow::StopInternal(bool silent) {              }          } -        shutdown = Contexts_.empty(); +        shutdown = Contexts_.empty();       }      for (auto& context : cancelQueue) { @@ -506,62 +506,62 @@ void TGRpcClientLow::StopInternal(bool silent) {          for (auto& cq : CQS_) {              cq->Shutdown();          } -    } +    }   }  void TGRpcClientLow::WaitInternal() { -    std::unique_lock<std::mutex> guard(JoinMutex_); - -    for (auto& ti : WorkerThreads_) { -        ti->Join(); -    } -} - +    std::unique_lock<std::mutex> guard(JoinMutex_);  +  +    for (auto& ti : WorkerThreads_) {  +        ti->Join();  +    }  +}  +   void TGRpcClientLow::WaitIdle() { -    std::unique_lock<std::mutex> guard(Mtx_); - -    while (!Contexts_.empty()) { -        ContextsEmpty_.wait(guard); -    } -} - +    std::unique_lock<std::mutex> guard(Mtx_);  +  +    while (!Contexts_.empty()) {  +        ContextsEmpty_.wait(guard);  +    }  +}  +   std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { -    std::unique_lock<std::mutex> guard(Mtx_); - -    auto allowCreateContext = [&]() { -        switch (GetCqState()) { -            case WORKING: -                return true; -            case STOP_SILENT: -            case STOP_EXPLICIT: -                return false; +    std::unique_lock<std::mutex> guard(Mtx_);  +  +    auto allowCreateContext = [&]() {  +        switch (GetCqState()) {  +            case WORKING:  +                return true;  +            case STOP_SILENT:  +            case STOP_EXPLICIT:  +                return false;           } -        Y_UNREACHABLE(); -    }; - -    if (!allowCreateContext()) { -        // New context creation is forbidden -        return nullptr; -    } - -    auto context = std::make_shared<TContextImpl>(); -    Contexts_.insert(context.get()); -    context->Owner = this; -    if (UseCompletionQueuePerThread_) { -        context->CQ = CQS_[RandomNumber(CQS_.size())].get(); -    } else { -        context->CQ = CQS_[0].get(); -    } -    return context; +        Y_UNREACHABLE();  +    };  +  +    if (!allowCreateContext()) {  +        // New context creation is forbidden  +        return nullptr;  +    } +  +    auto context = std::make_shared<TContextImpl>();  +    Contexts_.insert(context.get());  +    context->Owner = this;  +    if (UseCompletionQueuePerThread_) {  +        context->CQ = CQS_[RandomNumber(CQS_.size())].get();  +    } else {  +        context->CQ = CQS_[0].get();  +    }  +    return context;   }  void TGRpcClientLow::ForgetContext(TContextImpl* context) {      bool shutdown = false; -    { -        std::unique_lock<std::mutex> guard(Mtx_); - +    {  +        std::unique_lock<std::mutex> guard(Mtx_);  +           if (!Contexts_.erase(context)) {              Y_FAIL("Unexpected ForgetContext(%p)", context);          } @@ -571,7 +571,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {                  shutdown = true;              } -            ContextsEmpty_.notify_all(); +            ContextsEmpty_.notify_all();           }      } @@ -580,7 +580,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {          for (auto& cq : CQS_) {              cq->Shutdown();          } -    } -} - +    }  +}  +   } // namespace NGRpc diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index ab0a0627be0..e452be45a1c 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -1,57 +1,57 @@ -#pragma once - -#include "grpc_common.h" - +#pragma once  +  +#include "grpc_common.h"  +   #include <util/thread/factory.h> -#include <grpc++/grpc++.h> +#include <grpc++/grpc++.h>   #include <grpc++/support/async_stream.h>  #include <grpc++/support/async_unary_call.h> - -#include <deque> -#include <typeindex> -#include <typeinfo> +  +#include <deque>  +#include <typeindex>  +#include <typeinfo>   #include <variant> -#include <vector> -#include <unordered_map> -#include <unordered_set> -#include <mutex> -#include <shared_mutex> - -/* - * This file contains low level logic for grpc - * This file should not be used in high level code without special reason - */ +#include <vector>  +#include <unordered_map>  +#include <unordered_set>  +#include <mutex>  +#include <shared_mutex>  +  +/*  + * This file contains low level logic for grpc  + * This file should not be used in high level code without special reason  + */   namespace NGrpc { - -const size_t DEFAULT_NUM_THREADS = 2; - -//////////////////////////////////////////////////////////////////////////////// - -void EnableGRpcTracing(); - -//////////////////////////////////////////////////////////////////////////////// - -struct TTcpKeepAliveSettings { -    bool Enabled; -    size_t Idle; -    size_t Count; -    size_t Interval; -}; - -//////////////////////////////////////////////////////////////////////////////// - -// Common interface used to execute action from grpc cq routine -class IQueueClientEvent { -public: -    virtual ~IQueueClientEvent() = default; - +  +const size_t DEFAULT_NUM_THREADS = 2;  +  +////////////////////////////////////////////////////////////////////////////////  +  +void EnableGRpcTracing();  +  +////////////////////////////////////////////////////////////////////////////////  +  +struct TTcpKeepAliveSettings {  +    bool Enabled;  +    size_t Idle;  +    size_t Count;  +    size_t Interval;  +};  +  +////////////////////////////////////////////////////////////////////////////////  +  +// Common interface used to execute action from grpc cq routine  +class IQueueClientEvent {  +public:  +    virtual ~IQueueClientEvent() = default;  +       //! Execute an action defined by implementation -    virtual bool Execute(bool ok) = 0; - -    //! Finish and destroy event -    virtual void Destroy() = 0; -}; - +    virtual bool Execute(bool ok) = 0;  +  +    //! Finish and destroy event  +    virtual void Destroy() = 0;  +};  +   // Implementation of IQueueClientEvent that reduces allocations  template<class TSelf>  class TQueueClientFixedEvent : private IQueueClientEvent { @@ -123,8 +123,8 @@ public:      }  }; -// Represents grpc status and error message string -struct TGrpcStatus { +// Represents grpc status and error message string  +struct TGrpcStatus {       TString Msg;      TString Details;      int GRpcStatusCode; @@ -167,36 +167,36 @@ struct TGrpcStatus {      bool Ok() const {          return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;      } -}; - -bool inline IsGRpcStatusGood(const TGrpcStatus& status) { +};  +  +bool inline IsGRpcStatusGood(const TGrpcStatus& status) {       return status.Ok(); -} - +}  +   // Response callback type - this callback will be called when request is finished -//  (or after getting each chunk in case of streaming mode) -template<typename TResponse> -using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>; - +//  (or after getting each chunk in case of streaming mode)  +template<typename TResponse>  +using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>;  +   template<typename TResponse>  using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>; -// Call associated metadata -struct TCallMeta { -    std::shared_ptr<grpc::CallCredentials> CallCredentials; -    std::vector<std::pair<TString, TString>> Aux; +// Call associated metadata  +struct TCallMeta {  +    std::shared_ptr<grpc::CallCredentials> CallCredentials;  +    std::vector<std::pair<TString, TString>> Aux;       std::variant<TDuration, TInstant> Timeout; // timeout as duration from now or time point in future -}; - -class TGRpcRequestProcessorCommon { -protected: -    void ApplyMeta(const TCallMeta& meta) { -        for (const auto& rec : meta.Aux) { -            Context.AddMetadata(rec.first, rec.second); -        } -        if (meta.CallCredentials) { -            Context.set_credentials(meta.CallCredentials); -        } +};  +  +class TGRpcRequestProcessorCommon {  +protected:  +    void ApplyMeta(const TCallMeta& meta) {  +        for (const auto& rec : meta.Aux) {  +            Context.AddMetadata(rec.first, rec.second);  +        }  +        if (meta.CallCredentials) {  +            Context.set_credentials(meta.CallCredentials);  +        }           if (const TDuration* timeout = std::get_if<TDuration>(&meta.Timeout)) {              if (*timeout) {                  auto deadline = gpr_time_add( @@ -208,10 +208,10 @@ protected:              if (*deadline) {                  Context.set_deadline(gpr_time_from_micros(deadline->MicroSeconds(), GPR_CLOCK_MONOTONIC));              } -        } -    } +        }  +    }  -    void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) { +    void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) {           for (const auto& [key, value] : Context.GetServerInitialMetadata()) {              metadata->emplace(                  TString(key.begin(), key.end()), @@ -220,61 +220,61 @@ protected:          }      } -    grpc::Status Status; -    grpc::ClientContext Context; -    std::shared_ptr<IQueueClientContext> LocalContext; -}; - -template<typename TStub, typename TRequest, typename TResponse> -class TSimpleRequestProcessor -    : public TThrRefBase -    , public IQueueClientEvent -    , public TGRpcRequestProcessorCommon { -    using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>; -    template<typename> friend class TServiceConnection; -public: -    using TPtr = TIntrusivePtr<TSimpleRequestProcessor>; -    using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*); - +    grpc::Status Status;  +    grpc::ClientContext Context;  +    std::shared_ptr<IQueueClientContext> LocalContext;  +};  +  +template<typename TStub, typename TRequest, typename TResponse>  +class TSimpleRequestProcessor  +    : public TThrRefBase  +    , public IQueueClientEvent  +    , public TGRpcRequestProcessorCommon {  +    using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;  +    template<typename> friend class TServiceConnection;  +public:  +    using TPtr = TIntrusivePtr<TSimpleRequestProcessor>;  +    using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);  +       explicit TSimpleRequestProcessor(TResponseCallback<TResponse>&& callback)          : Callback_(std::move(callback)) -    { } - -    ~TSimpleRequestProcessor() { +    { }  +  +    ~TSimpleRequestProcessor() {           if (!Replied_ && Callback_) {              Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));              Callback_ = nullptr; // free resources as early as possible -        } -    } - -    bool Execute(bool ok) override { -        { -            std::unique_lock<std::mutex> guard(Mutex_); -            LocalContext.reset(); +        }  +    }  +  +    bool Execute(bool ok) override {  +        {  +            std::unique_lock<std::mutex> guard(Mutex_);  +            LocalContext.reset();           }          TGrpcStatus status;          if (ok) { -            status = Status; +            status = Status;           } else {              status = TGrpcStatus::Internal("Unexpected error"); -        } -        Replied_ = true; -        Callback_(std::move(status), std::move(Reply_)); +        }  +        Replied_ = true;  +        Callback_(std::move(status), std::move(Reply_));           Callback_ = nullptr; // free resources as early as possible -        return false; -    } - -    void Destroy() override { +        return false;  +    }  +  +    void Destroy() override {           UnRef(); -    } - -private: +    }  +  +private:       IQueueClientEvent* FinishedEvent() {          Ref();          return this; -    } - -    void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { +    }  +  +    void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {           auto context = provider->CreateContext();          if (!context) {              Replied_ = true; @@ -282,11 +282,11 @@ private:              Callback_ = nullptr;              return;          } -        { -            std::unique_lock<std::mutex> guard(Mutex_); -            LocalContext = context; -            Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); -            Reader_->Finish(&Reply_, &Status, FinishedEvent()); +        {  +            std::unique_lock<std::mutex> guard(Mutex_);  +            LocalContext = context;  +            Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());  +            Reader_->Finish(&Reply_, &Status, FinishedEvent());           }          context->SubscribeStop([self = TPtr(this)] {              self->Stop(); @@ -294,17 +294,17 @@ private:      }      void Stop() { -        Context.TryCancel(); +        Context.TryCancel();       }      TResponseCallback<TResponse> Callback_; -    TResponse Reply_; -    std::mutex Mutex_; -    TAsyncReaderPtr Reader_; - -    bool Replied_ = false; -}; - +    TResponse Reply_;  +    std::mutex Mutex_;  +    TAsyncReaderPtr Reader_;  +  +    bool Replied_ = false;  +};  +   template<typename TStub, typename TRequest, typename TResponse>  class TAdvancedRequestProcessor      : public TThrRefBase @@ -328,8 +328,8 @@ public:      }      bool Execute(bool ok) override { -        { -            std::unique_lock<std::mutex> guard(Mutex_); +        {  +            std::unique_lock<std::mutex> guard(Mutex_);               LocalContext.reset();          }          TGrpcStatus status; @@ -362,8 +362,8 @@ private:              Callback_ = nullptr;              return;          } -        { -            std::unique_lock<std::mutex> guard(Mutex_); +        {  +            std::unique_lock<std::mutex> guard(Mutex_);               LocalContext = context;              Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());              Reader_->Finish(&Reply_, &Status, FinishedEvent()); @@ -379,16 +379,16 @@ private:      TAdvancedResponseCallback<TResponse> Callback_;      TResponse Reply_; -    std::mutex Mutex_; +    std::mutex Mutex_;       TAsyncReaderPtr Reader_;      bool Replied_ = false;  }; -template<class TResponse> -class IStreamRequestReadProcessor : public TThrRefBase { +template<class TResponse>  +class IStreamRequestReadProcessor : public TThrRefBase {   public: -    using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>; +    using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;       using TReadCallback = std::function<void(TGrpcStatus&&)>;      /** @@ -399,7 +399,7 @@ public:      /**       * Scheduled initial server metadata read from the stream       */ -    virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0; +    virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;       /**       * Scheduled response read from the stream @@ -420,72 +420,72 @@ public:      virtual void AddFinishedCallback(TReadCallback callback) = 0;  }; -template<class TRequest, class TResponse> -class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> { -public: -    using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>; +template<class TRequest, class TResponse>  +class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {  +public:  +    using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;       using TWriteCallback = std::function<void(TGrpcStatus&&)>; - -    /** -     * Scheduled request write to the stream -     */ +  +    /**  +     * Scheduled request write to the stream  +     */       virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0; -}; - -class TGRpcKeepAliveSocketMutator; - -// Class to hold stubs allocated on channel. -// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit - -// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if -// request processor using stub -class TStubsHolder : public TNonCopyable { -    using TypeInfoRef = std::reference_wrapper<const std::type_info>; - -    struct THasher { -        std::size_t operator()(TypeInfoRef code) const { -            return code.get().hash_code(); -        } -    }; - -    struct TEqualTo { -        bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const { -            return lhs.get() == rhs.get(); -        } -    }; -public: -    TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel) -        : ChannelInterface_(channel) -    {} - -    // Returns true if channel can't be used to perform request now -    bool IsChannelBroken() const { -        auto state = ChannelInterface_->GetState(false); -        return state == GRPC_CHANNEL_SHUTDOWN || -            state == GRPC_CHANNEL_TRANSIENT_FAILURE; -    } - -    template<typename TStub> -    std::shared_ptr<TStub> GetOrCreateStub() { -        const auto& stubId = typeid(TStub); -        { -            std::shared_lock readGuard(RWMutex_); -            const auto it = Stubs_.find(stubId); -            if (it != Stubs_.end()) { -                return std::static_pointer_cast<TStub>(it->second); -            } -        } -        { -            std::unique_lock writeGuard(RWMutex_); -            auto it = Stubs_.emplace(stubId, nullptr); -            if (!it.second) { -                return std::static_pointer_cast<TStub>(it.first->second); -            } else { -                it.first->second = std::make_shared<TStub>(ChannelInterface_); -                return std::static_pointer_cast<TStub>(it.first->second); -            } -        } -    } +};  +  +class TGRpcKeepAliveSocketMutator;  +  +// Class to hold stubs allocated on channel.  +// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit  +  +// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if  +// request processor using stub  +class TStubsHolder : public TNonCopyable {  +    using TypeInfoRef = std::reference_wrapper<const std::type_info>;  +  +    struct THasher {  +        std::size_t operator()(TypeInfoRef code) const {  +            return code.get().hash_code();  +        }  +    };  +  +    struct TEqualTo {  +        bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const {  +            return lhs.get() == rhs.get();  +        }  +    };  +public:  +    TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel)  +        : ChannelInterface_(channel)  +    {}  +  +    // Returns true if channel can't be used to perform request now  +    bool IsChannelBroken() const {  +        auto state = ChannelInterface_->GetState(false);  +        return state == GRPC_CHANNEL_SHUTDOWN ||  +            state == GRPC_CHANNEL_TRANSIENT_FAILURE;  +    }  +  +    template<typename TStub>  +    std::shared_ptr<TStub> GetOrCreateStub() {  +        const auto& stubId = typeid(TStub);  +        {  +            std::shared_lock readGuard(RWMutex_);  +            const auto it = Stubs_.find(stubId);  +            if (it != Stubs_.end()) {  +                return std::static_pointer_cast<TStub>(it->second);  +            }  +        }  +        {  +            std::unique_lock writeGuard(RWMutex_);  +            auto it = Stubs_.emplace(stubId, nullptr);  +            if (!it.second) {  +                return std::static_pointer_cast<TStub>(it.first->second);  +            } else {  +                it.first->second = std::make_shared<TStub>(ChannelInterface_);  +                return std::static_pointer_cast<TStub>(it.first->second);  +            }  +        }  +    }       const TInstant& GetLastUseTime() const {          return LastUsed_; @@ -494,76 +494,76 @@ public:      void SetLastUseTime(const TInstant& time) {          LastUsed_ = time;      } -private: +private:       TInstant LastUsed_ = Now(); -    std::shared_mutex RWMutex_; -    std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_; -    std::shared_ptr<grpc::ChannelInterface> ChannelInterface_; -}; - -class TChannelPool { -public: +    std::shared_mutex RWMutex_;  +    std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_;  +    std::shared_ptr<grpc::ChannelInterface> ChannelInterface_;  +};  +  +class TChannelPool {  +public:       TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6)); -    //Allows to CreateStub from TStubsHolder under lock -    //The callback will be called just during GetStubsHolderLocked call -    void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb); -    void DeleteChannel(const TString& channelId); +    //Allows to CreateStub from TStubsHolder under lock  +    //The callback will be called just during GetStubsHolderLocked call  +    void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb);  +    void DeleteChannel(const TString& channelId);       void DeleteExpiredStubsHolders(); -private: -    std::shared_mutex RWMutex_; -    std::unordered_map<TString, TStubsHolder> Pool_; +private:  +    std::shared_mutex RWMutex_;  +    std::unordered_map<TString, TStubsHolder> Pool_;       std::multimap<TInstant, TString> LastUsedQueue_; -    TTcpKeepAliveSettings TcpKeepAliveSettings_; +    TTcpKeepAliveSettings TcpKeepAliveSettings_;       TDuration ExpireTime_;      TDuration UpdateReUseTime_;      void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId); -}; - -template<class TResponse> -using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>; - -template<typename TStub, typename TRequest, typename TResponse> -class TStreamRequestReadProcessor -    : public IStreamRequestReadProcessor<TResponse> -    , public TGRpcRequestProcessorCommon { -    template<typename> friend class TServiceConnection; -public: -    using TSelf = TStreamRequestReadProcessor; -    using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>; -    using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*); -    using TReaderCallback = TStreamReaderCallback<TResponse>; -    using TPtr = TIntrusivePtr<TSelf>; -    using TBase = IStreamRequestReadProcessor<TResponse>; -    using TReadCallback = typename TBase::TReadCallback; - -    explicit TStreamRequestReadProcessor(TReaderCallback&& callback) -        : Callback(std::move(callback)) -    { -        Y_VERIFY(Callback, "Missing connected callback"); -    } - -    void Cancel() override { -        Context.TryCancel(); - -        { -            std::unique_lock<std::mutex> guard(Mutex); -            Cancelled = true; +};  +  +template<class TResponse>  +using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>;  +  +template<typename TStub, typename TRequest, typename TResponse>  +class TStreamRequestReadProcessor  +    : public IStreamRequestReadProcessor<TResponse>  +    , public TGRpcRequestProcessorCommon {  +    template<typename> friend class TServiceConnection;  +public:  +    using TSelf = TStreamRequestReadProcessor;  +    using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>;  +    using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*);  +    using TReaderCallback = TStreamReaderCallback<TResponse>;  +    using TPtr = TIntrusivePtr<TSelf>;  +    using TBase = IStreamRequestReadProcessor<TResponse>;  +    using TReadCallback = typename TBase::TReadCallback;  +  +    explicit TStreamRequestReadProcessor(TReaderCallback&& callback)  +        : Callback(std::move(callback))  +    {  +        Y_VERIFY(Callback, "Missing connected callback");  +    }  +  +    void Cancel() override {  +        Context.TryCancel();  +  +        {  +            std::unique_lock<std::mutex> guard(Mutex);  +            Cancelled = true;               if (Started && !ReadFinished) { -                if (!ReadActive) { -                    ReadFinished = true; -                } -                if (ReadFinished) { -                    Stream->Finish(&Status, OnFinishedTag.Prepare()); -                } -            } -        } -    } - -    void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { +                if (!ReadActive) {  +                    ReadFinished = true;  +                }  +                if (ReadFinished) {  +                    Stream->Finish(&Status, OnFinishedTag.Prepare());  +                }  +            }  +        }  +    }  +  +    void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {           TGrpcStatus status; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");              if (!Finished && !HasInitialMetadata) {                  ReadActive = true; @@ -588,66 +588,66 @@ public:          callback(std::move(status));      } -    void Read(TResponse* message, TReadCallback callback) override { -        TGrpcStatus status; - -        { -            std::unique_lock<std::mutex> guard(Mutex); -            Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); -            if (!Finished) { -                ReadActive = true; -                ReadCallback = std::move(callback); -                if (!ReadFinished) { -                    Stream->Read(message, OnReadDoneTag.Prepare()); -                } -                return; -            } -            if (FinishedOk) { -                status = Status; -            } else { -                status = TGrpcStatus::Internal("Unexpected error"); -            } -        } - -        if (status.Ok()) { -            status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); -        } - -        callback(std::move(status)); -    } - -    void Finish(TReadCallback callback) override { -        TGrpcStatus status; - -        { -            std::unique_lock<std::mutex> guard(Mutex); -            Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); -            if (!Finished) { -                ReadActive = true; -                FinishCallback = std::move(callback); -                if (!ReadFinished) { -                    ReadFinished = true; -                } -                Stream->Finish(&Status, OnFinishedTag.Prepare()); -                return; -            } -            if (FinishedOk) { -                status = Status; -            } else { -                status = TGrpcStatus::Internal("Unexpected error"); -            } -        } - -        callback(std::move(status)); -    } +    void Read(TResponse* message, TReadCallback callback) override {  +        TGrpcStatus status;  +  +        {  +            std::unique_lock<std::mutex> guard(Mutex);  +            Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");  +            if (!Finished) {  +                ReadActive = true;  +                ReadCallback = std::move(callback);  +                if (!ReadFinished) {  +                    Stream->Read(message, OnReadDoneTag.Prepare());  +                }  +                return;  +            }  +            if (FinishedOk) {  +                status = Status;  +            } else {  +                status = TGrpcStatus::Internal("Unexpected error");  +            }  +        }  +  +        if (status.Ok()) {  +            status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");  +        }  +  +        callback(std::move(status));  +    }  +  +    void Finish(TReadCallback callback) override {  +        TGrpcStatus status;  +  +        {  +            std::unique_lock<std::mutex> guard(Mutex);  +            Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");  +            if (!Finished) {  +                ReadActive = true;  +                FinishCallback = std::move(callback);  +                if (!ReadFinished) {  +                    ReadFinished = true;  +                }  +                Stream->Finish(&Status, OnFinishedTag.Prepare());  +                return;  +            }  +            if (FinishedOk) {  +                status = Status;  +            } else {  +                status = TGrpcStatus::Internal("Unexpected error");  +            }  +        }  +  +        callback(std::move(status));  +    }       void AddFinishedCallback(TReadCallback callback) override {          Y_VERIFY(callback, "Unexpected empty callback");          TGrpcStatus status; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               if (!Finished) {                  FinishedCallbacks.emplace_back().swap(callback);                  return; @@ -665,103 +665,103 @@ public:          callback(std::move(status));      } -private: -    void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { -        auto context = provider->CreateContext(); -        if (!context) { -            auto callback = std::move(Callback); -            TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); -            callback(std::move(status), nullptr); -            return; -        } - -        { -            std::unique_lock<std::mutex> guard(Mutex); -            LocalContext = context; -            Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare()); -        } - -        context->SubscribeStop([self = TPtr(this)] { -            self->Cancel(); -        }); -    } - -    void OnReadDone(bool ok) { -        TGrpcStatus status; -        TReadCallback callback; -        std::unordered_multimap<TString, TString>* initialMetadata = nullptr; - -        { -            std::unique_lock<std::mutex> guard(Mutex); -            Y_VERIFY(ReadActive, "Unexpected Read done callback"); -            Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); - -            if (!ok || Cancelled) { -                ReadFinished = true; - -                Stream->Finish(&Status, OnFinishedTag.Prepare()); -                if (!ok) { -                    // Keep ReadActive=true, so callback is called -                    // after the call is finished with an error -                    return; -                } -            } - -            callback = std::move(ReadCallback); -            ReadCallback = nullptr; -            ReadActive = false; +private:  +    void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {  +        auto context = provider->CreateContext();  +        if (!context) {  +            auto callback = std::move(Callback);  +            TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");  +            callback(std::move(status), nullptr);  +            return;  +        }  +  +        {  +            std::unique_lock<std::mutex> guard(Mutex);  +            LocalContext = context;  +            Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare());  +        }  +  +        context->SubscribeStop([self = TPtr(this)] {  +            self->Cancel();  +        });  +    }  +  +    void OnReadDone(bool ok) {  +        TGrpcStatus status;  +        TReadCallback callback;  +        std::unordered_multimap<TString, TString>* initialMetadata = nullptr;  +  +        {  +            std::unique_lock<std::mutex> guard(Mutex);  +            Y_VERIFY(ReadActive, "Unexpected Read done callback");  +            Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");  +  +            if (!ok || Cancelled) {  +                ReadFinished = true;  +  +                Stream->Finish(&Status, OnFinishedTag.Prepare());  +                if (!ok) {  +                    // Keep ReadActive=true, so callback is called  +                    // after the call is finished with an error  +                    return;  +                }  +            }  +  +            callback = std::move(ReadCallback);  +            ReadCallback = nullptr;  +            ReadActive = false;               initialMetadata = InitialMetadata;              InitialMetadata = nullptr;              HasInitialMetadata = true; -        } - +        }  +           if (initialMetadata) {              GetInitialMetadata(initialMetadata);          } -        callback(std::move(status)); -    } - -    void OnStartDone(bool ok) { -        TReaderCallback callback; - -        { -            std::unique_lock<std::mutex> guard(Mutex); +        callback(std::move(status));  +    }  +  +    void OnStartDone(bool ok) {  +        TReaderCallback callback;  +  +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Started = true;              if (!ok || Cancelled) { -                ReadFinished = true; -                Stream->Finish(&Status, OnFinishedTag.Prepare()); -                return; -            } -            callback = std::move(Callback); -            Callback = nullptr; -        } - -        callback({ }, typename TBase::TPtr(this)); -    } - -    void OnFinished(bool ok) { -        TGrpcStatus status; -        std::vector<TReadCallback> finishedCallbacks; +                ReadFinished = true;  +                Stream->Finish(&Status, OnFinishedTag.Prepare());  +                return;  +            }  +            callback = std::move(Callback);  +            Callback = nullptr;  +        }  +  +        callback({ }, typename TBase::TPtr(this));  +    }  +  +    void OnFinished(bool ok) {  +        TGrpcStatus status;  +        std::vector<TReadCallback> finishedCallbacks;           TReaderCallback startCallback; -        TReadCallback readCallback; -        TReadCallback finishCallback; - -        { -            std::unique_lock<std::mutex> guard(Mutex); - -            Finished = true; -            FinishedOk = ok; -            LocalContext.reset(); - -            if (ok) { -                status = Status; +        TReadCallback readCallback;  +        TReadCallback finishCallback;  +  +        {  +            std::unique_lock<std::mutex> guard(Mutex);  +  +            Finished = true;  +            FinishedOk = ok;  +            LocalContext.reset();  +  +            if (ok) {  +                status = Status;               } else if (Cancelled) {                  status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); -            } else { -                status = TGrpcStatus::Internal("Unexpected error"); -            } - +            } else {  +                status = TGrpcStatus::Internal("Unexpected error");  +            }  +               finishedCallbacks.swap(FinishedCallbacks);              if (Callback) { @@ -769,68 +769,68 @@ private:                  startCallback = std::move(Callback);                  Callback = nullptr;              } else if (ReadActive) { -                if (ReadCallback) { -                    readCallback = std::move(ReadCallback); -                    ReadCallback = nullptr; -                } else { -                    finishCallback = std::move(FinishCallback); -                    FinishCallback = nullptr; -                } -                ReadActive = false; -            } -        } - +                if (ReadCallback) {  +                    readCallback = std::move(ReadCallback);  +                    ReadCallback = nullptr;  +                } else {  +                    finishCallback = std::move(FinishCallback);  +                    FinishCallback = nullptr;  +                }  +                ReadActive = false;  +            }  +        }  +           for (auto& finishedCallback : finishedCallbacks) {              auto statusCopy = status;              finishedCallback(std::move(statusCopy));          }          if (startCallback) { -            if (status.Ok()) { +            if (status.Ok()) {                   status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");              }              startCallback(std::move(status), nullptr);          } else if (readCallback) {              if (status.Ok()) { -                status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); -            } -            readCallback(std::move(status)); -        } else if (finishCallback) { -            finishCallback(std::move(status)); -        } -    } - -    TReaderCallback Callback; -    TAsyncReaderPtr Stream; -    using TFixedEvent = TQueueClientFixedEvent<TSelf>; -    std::mutex Mutex; -    TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; -    TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone }; -    TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; - -    TReadCallback ReadCallback; -    TReadCallback FinishCallback; -    std::vector<TReadCallback> FinishedCallbacks; -    std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; +                status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");  +            }  +            readCallback(std::move(status));  +        } else if (finishCallback) {  +            finishCallback(std::move(status));  +        }  +    }  +  +    TReaderCallback Callback;  +    TAsyncReaderPtr Stream;  +    using TFixedEvent = TQueueClientFixedEvent<TSelf>;  +    std::mutex Mutex;  +    TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };  +    TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone };  +    TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };  +  +    TReadCallback ReadCallback;  +    TReadCallback FinishCallback;  +    std::vector<TReadCallback> FinishedCallbacks;  +    std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;       bool Started = false;      bool HasInitialMetadata = false; -    bool ReadActive = false; -    bool ReadFinished = false; -    bool Finished = false; -    bool Cancelled = false; -    bool FinishedOk = false; -}; - +    bool ReadActive = false;  +    bool ReadFinished = false;  +    bool Finished = false;  +    bool Cancelled = false;  +    bool FinishedOk = false;  +};  +   template<class TRequest, class TResponse> -using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>; +using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;   template<class TStub, class TRequest, class TResponse> -class TStreamRequestReadWriteProcessor -    : public IStreamRequestReadWriteProcessor<TRequest, TResponse> -    , public TGRpcRequestProcessorCommon { +class TStreamRequestReadWriteProcessor  +    : public IStreamRequestReadWriteProcessor<TRequest, TResponse>  +    , public TGRpcRequestProcessorCommon {   public: -    using TSelf = TStreamRequestReadWriteProcessor; -    using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>; +    using TSelf = TStreamRequestReadWriteProcessor;  +    using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;       using TPtr = TIntrusivePtr<TSelf>;      using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;      using TReadCallback = typename TBase::TReadCallback; @@ -838,7 +838,7 @@ public:      using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;      using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*); -    explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback) +    explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)           : ConnectedCallback(std::move(callback))      {          Y_VERIFY(ConnectedCallback, "Missing connected callback"); @@ -847,8 +847,8 @@ public:      void Cancel() override {          Context.TryCancel(); -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Cancelled = true;              if (Started && !(ReadFinished && WriteFinished)) {                  if (!ReadActive) { @@ -867,8 +867,8 @@ public:      void Write(TRequest&& request, TWriteCallback callback) override {          TGrpcStatus status; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               if (Cancelled || ReadFinished || WriteFinished) {                  status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");              } else if (WriteActive) { @@ -887,11 +887,11 @@ public:          }      } -    void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { +    void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {           TGrpcStatus status; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");              if (!Finished && !HasInitialMetadata) {                  ReadActive = true; @@ -919,8 +919,8 @@ public:      void Read(TResponse* message, TReadCallback callback) override {          TGrpcStatus status; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");              if (!Finished) {                  ReadActive = true; @@ -947,8 +947,8 @@ public:      void Finish(TReadCallback callback) override {          TGrpcStatus status; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");              if (!Finished) {                  ReadActive = true; @@ -979,8 +979,8 @@ public:          TGrpcStatus status; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               if (!Finished) {                  FinishedCallbacks.emplace_back().swap(callback);                  return; @@ -1010,8 +1010,8 @@ private:              return;          } -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               LocalContext = context;              Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());          } @@ -1025,8 +1025,8 @@ private:      void OnConnected(bool ok) {          TConnectedCallback callback; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Started = true;              if (!ok || Cancelled) {                  ReadFinished = true; @@ -1045,10 +1045,10 @@ private:      void OnReadDone(bool ok) {          TGrpcStatus status;          TReadCallback callback; -        std::unordered_multimap<TString, TString>* initialMetadata = nullptr; +        std::unordered_multimap<TString, TString>* initialMetadata = nullptr;  -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Y_VERIFY(ReadActive, "Unexpected Read done callback");              Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); @@ -1085,8 +1085,8 @@ private:      void OnWriteDone(bool ok) {          TWriteCallback okCallback; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Y_VERIFY(WriteActive, "Unexpected Write done callback");              Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag"); @@ -1107,7 +1107,7 @@ private:                  if (ReadFinished) {                      Stream->Finish(&Status, OnFinishedTag.Prepare());                  } -            } else if (!WriteQueue.empty()) { +            } else if (!WriteQueue.empty()) {                   WriteCallback.swap(WriteQueue.front().Callback);                  Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());                  WriteQueue.pop_front(); @@ -1127,14 +1127,14 @@ private:      void OnFinished(bool ok) {          TGrpcStatus status; -        std::deque<TWriteItem> writesDropped; -        std::vector<TReadCallback> finishedCallbacks; +        std::deque<TWriteItem> writesDropped;  +        std::vector<TReadCallback> finishedCallbacks;           TConnectedCallback connectedCallback;          TReadCallback readCallback;          TReadCallback finishCallback; -        { -            std::unique_lock<std::mutex> guard(Mutex); +        {  +            std::unique_lock<std::mutex> guard(Mutex);               Finished = true;              FinishedOk = ok;              LocalContext.reset(); @@ -1211,15 +1211,15 @@ private:      TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };  private: -    std::mutex Mutex; +    std::mutex Mutex;       TAsyncReaderWriterPtr Stream;      TConnectedCallback ConnectedCallback;      TReadCallback ReadCallback;      TReadCallback FinishCallback; -    std::vector<TReadCallback> FinishedCallbacks; -    std::deque<TWriteItem> WriteQueue; +    std::vector<TReadCallback> FinishedCallbacks;  +    std::deque<TWriteItem> WriteQueue;       TWriteCallback WriteCallback; -    std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; +    std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;       bool Started = false;      bool HasInitialMetadata = false;      bool ReadActive = false; @@ -1231,30 +1231,30 @@ private:      bool FinishedOk = false;  }; -class TGRpcClientLow; - -template<typename TGRpcService> -class TServiceConnection { -    using TStub = typename TGRpcService::Stub; -    friend class TGRpcClientLow; - -public: -    /* -     * Start simple request -     */ -    template<typename TRequest, typename TResponse> -    void DoRequest(const TRequest& request, +class TGRpcClientLow;  +  +template<typename TGRpcService>  +class TServiceConnection {  +    using TStub = typename TGRpcService::Stub;  +    friend class TGRpcClientLow;  +  +public:  +    /*  +     * Start simple request  +     */  +    template<typename TRequest, typename TResponse>  +    void DoRequest(const TRequest& request,                      TResponseCallback<TResponse> callback,                     typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, -                   const TCallMeta& metas = { }, +                   const TCallMeta& metas = { },                      IQueueClientContextProvider* provider = nullptr) -    { -        auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); -        processor->ApplyMeta(metas); -        processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_); -    } - -    /* +    {  +        auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));  +        processor->ApplyMeta(metas);  +        processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);  +    }  +  +    /*        * Start simple request       */      template<typename TRequest, typename TResponse> @@ -1270,58 +1270,58 @@ public:      }      /* -     * Start bidirectional streamming -     */ -    template<typename TRequest, typename TResponse> -    void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback, -                         typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, -                         const TCallMeta& metas = { }, -                         IQueueClientContextProvider* provider = nullptr) -    { -        auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback)); -        processor->ApplyMeta(metas); -        processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_); -    } - -    /* -     * Start streaming response reading (one request, many responses) -     */ +     * Start bidirectional streamming  +     */  +    template<typename TRequest, typename TResponse>  +    void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback,  +                         typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,  +                         const TCallMeta& metas = { },  +                         IQueueClientContextProvider* provider = nullptr)  +    {  +        auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback));  +        processor->ApplyMeta(metas);  +        processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);  +    }  + +    /*  +     * Start streaming response reading (one request, many responses)  +     */       template<typename TRequest, typename TResponse> -    void DoStreamRequest(const TRequest& request, -                         TStreamReaderCallback<TResponse> callback, -                         typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, +    void DoStreamRequest(const TRequest& request,  +                         TStreamReaderCallback<TResponse> callback,  +                         typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,                            const TCallMeta& metas = { },                           IQueueClientContextProvider* provider = nullptr)      { -        auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback)); +        auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));           processor->ApplyMeta(metas); -        processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_); +        processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);       } -private: -    TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci, -                       IQueueClientContextProvider* provider) -        : Stub_(TGRpcService::NewStub(ci)) -        , Provider_(provider) -    { -        Y_VERIFY(Provider_, "Connection does not have a queue provider"); -    } - -    TServiceConnection(TStubsHolder& holder, +private:  +    TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,                          IQueueClientContextProvider* provider) -        : Stub_(holder.GetOrCreateStub<TStub>()) +        : Stub_(TGRpcService::NewStub(ci))           , Provider_(provider)      {          Y_VERIFY(Provider_, "Connection does not have a queue provider");      } -    std::shared_ptr<TStub> Stub_; +    TServiceConnection(TStubsHolder& holder,  +                       IQueueClientContextProvider* provider)  +        : Stub_(holder.GetOrCreateStub<TStub>())  +        , Provider_(provider)  +    {  +        Y_VERIFY(Provider_, "Connection does not have a queue provider");  +    }  +  +    std::shared_ptr<TStub> Stub_;       IQueueClientContextProvider* Provider_; -}; - -class TGRpcClientLow +};  +  +class TGRpcClientLow       :  public IQueueClientContextProvider -{ +{       class TContextImpl;      friend class TContextImpl; @@ -1331,10 +1331,10 @@ class TGRpcClientLow          STOP_EXPLICIT = 2,      }; -public: +public:       explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false); -    ~TGRpcClientLow(); - +    ~TGRpcClientLow();  +       // Tries to stop all currently running requests (via their stop callbacks)      // Will shutdown CQ and drain events once all requests have finished      // No new requests may be started after this call @@ -1357,24 +1357,24 @@ public:      IQueueClientContextPtr CreateContext() override; -    template<typename TGRpcService> -    std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) { -        return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this)); -    } - -    template<typename TGRpcService> -    std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) { -        return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this)); -    } - +    template<typename TGRpcService>  +    std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {  +        return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));  +    }  +  +    template<typename TGRpcService>  +    std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) {  +        return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));  +    }  +       // Tests only, not thread-safe      void AddWorkerThreadForTest(); -private: +private:       using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;      using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>; -    void Init(size_t numWorkerThread); - +    void Init(size_t numWorkerThread);  +       inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }      inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); } @@ -1385,15 +1385,15 @@ private:  private:      bool UseCompletionQueuePerThread_; -    std::vector<CompletionQueueRef> CQS_; -    std::vector<IThreadRef> WorkerThreads_; +    std::vector<CompletionQueueRef> CQS_;  +    std::vector<IThreadRef> WorkerThreads_;       TAtomic CqState_ = -1; -    std::mutex Mtx_; -    std::condition_variable ContextsEmpty_; -    std::unordered_set<TContextImpl*> Contexts_; - -    std::mutex JoinMutex_; -}; +    std::mutex Mtx_;  +    std::condition_variable ContextsEmpty_;  +    std::unordered_set<TContextImpl*> Contexts_;  +    std::mutex JoinMutex_;  +};  +   } // namespace NGRpc diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h index ffcdafe0458..ac62e8b3315 100644 --- a/library/cpp/grpc/client/grpc_common.h +++ b/library/cpp/grpc/client/grpc_common.h @@ -1,53 +1,53 @@ -#pragma once - -#include <grpc++/grpc++.h> +#pragma once  +  +#include <grpc++/grpc++.h>   #include <grpc++/resource_quota.h> - +   #include <util/datetime/base.h> -#include <unordered_map> -#include <util/generic/string.h> - -constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; - +#include <unordered_map>  +#include <util/generic/string.h>  +  +constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;  +   namespace NGrpc { - -struct TGRpcClientConfig { -    TString Locator; // format host:port -    TDuration Timeout = TDuration::Max(); // request timeout -    ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size +  +struct TGRpcClientConfig {  +    TString Locator; // format host:port  +    TDuration Timeout = TDuration::Max(); // request timeout  +    ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size       ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests      ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests -    ui32 MaxInFlight = 0; +    ui32 MaxInFlight = 0;       bool EnableSsl = false;      TString SslCaCert;  //Implicitly enables Ssl if not empty      grpc_compression_algorithm CompressionAlgoritm = GRPC_COMPRESS_NONE;      ui64 MemQuota = 0; -    std::unordered_map<TString, TString> StringChannelParams; -    std::unordered_map<TString, int> IntChannelParams; +    std::unordered_map<TString, TString> StringChannelParams;  +    std::unordered_map<TString, int> IntChannelParams;       TString LoadBalancingPolicy = { };      TString SslTargetNameOverride = { }; - -    TGRpcClientConfig() = default; -    TGRpcClientConfig(const TGRpcClientConfig&) = default; -    TGRpcClientConfig(TGRpcClientConfig&&) = default; -    TGRpcClientConfig& operator=(const TGRpcClientConfig&) = default; -    TGRpcClientConfig& operator=(TGRpcClientConfig&&) = default; - -    TGRpcClientConfig(const TString& locator, TDuration timeout = TDuration::Max(), +  +    TGRpcClientConfig() = default;  +    TGRpcClientConfig(const TGRpcClientConfig&) = default;  +    TGRpcClientConfig(TGRpcClientConfig&&) = default;  +    TGRpcClientConfig& operator=(const TGRpcClientConfig&) = default;  +    TGRpcClientConfig& operator=(TGRpcClientConfig&&) = default;  +  +    TGRpcClientConfig(const TString& locator, TDuration timeout = TDuration::Max(),               ui64 maxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT, ui32 maxInFlight = 0, TString caCert = "",              grpc_compression_algorithm compressionAlgorithm = GRPC_COMPRESS_NONE, bool enableSsl = false) -        : Locator(locator) -        , Timeout(timeout) -        , MaxMessageSize(maxMessageSize) -        , MaxInFlight(maxInFlight) +        : Locator(locator)  +        , Timeout(timeout)  +        , MaxMessageSize(maxMessageSize)  +        , MaxInFlight(maxInFlight)           , EnableSsl(enableSsl) -        , SslCaCert(caCert) +        , SslCaCert(caCert)           , CompressionAlgoritm(compressionAlgorithm) -    {} -}; - -inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){ -    grpc::ChannelArguments args; +    {}  +};  +  +inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){  +    grpc::ChannelArguments args;       args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize);      args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize);      args.SetCompressionAlgorithm(config.CompressionAlgoritm); @@ -65,9 +65,9 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp          quota.Resize(config.MemQuota);          args.SetResourceQuota(quota);      } -    if (mutator) { -        args.SetSocketMutator(mutator); -    } +    if (mutator) {  +        args.SetSocketMutator(mutator);  +    }       if (!config.LoadBalancingPolicy.empty()) {          args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy);      } @@ -75,10 +75,10 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp          args.SetSslTargetNameOverride(config.SslTargetNameOverride);      }      if (config.EnableSsl || config.SslCaCert) { -        return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args); -    } else { -        return grpc::CreateCustomChannel(config.Locator, grpc::InsecureChannelCredentials(), args); -    } -} - +        return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args);  +    } else {  +        return grpc::CreateCustomChannel(config.Locator, grpc::InsecureChannelCredentials(), args);  +    }  +}  +   } // namespace NGRpc diff --git a/library/cpp/grpc/client/ya.make b/library/cpp/grpc/client/ya.make index a4e74b067cf..11f36aa94f5 100644 --- a/library/cpp/grpc/client/ya.make +++ b/library/cpp/grpc/client/ya.make @@ -6,11 +6,11 @@ OWNER(  )  SRCS( -    grpc_client_low.cpp +    grpc_client_low.cpp   )  PEERDIR( -    contrib/libs/grpc +    contrib/libs/grpc   )  END() diff --git a/library/cpp/grpc/server/event_callback.cpp b/library/cpp/grpc/server/event_callback.cpp index f423836bd61..559a4728077 100644 --- a/library/cpp/grpc/server/event_callback.cpp +++ b/library/cpp/grpc/server/event_callback.cpp @@ -1 +1 @@ -#include "event_callback.h" +#include "event_callback.h"  diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h index d0b700b3c92..c0e16ee5043 100644 --- a/library/cpp/grpc/server/event_callback.h +++ b/library/cpp/grpc/server/event_callback.h @@ -1,80 +1,80 @@ -#pragma once - -#include "grpc_server.h" - +#pragma once  +  +#include "grpc_server.h"  +   namespace NGrpc { - -enum class EQueueEventStatus { -    OK, -    ERROR -}; - -template<class TCallback> +  +enum class EQueueEventStatus {  +    OK,  +    ERROR  +};  +  +template<class TCallback>   class TQueueEventCallback: public IQueueEvent { -public: -    TQueueEventCallback(const TCallback& callback) -        : Callback(callback) -    {} - -    TQueueEventCallback(TCallback&& callback) -        : Callback(std::move(callback)) -    {} - -    bool Execute(bool ok) override { -        Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); -        return false; -    } - -    void DestroyRequest() override { -        delete this; -    } - -private: -    TCallback Callback; -}; - +public:  +    TQueueEventCallback(const TCallback& callback)  +        : Callback(callback)  +    {}  +  +    TQueueEventCallback(TCallback&& callback)  +        : Callback(std::move(callback))  +    {}  +  +    bool Execute(bool ok) override {  +        Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);  +        return false;  +    }  +  +    void DestroyRequest() override {  +        delete this;  +    }  +  +private:  +    TCallback Callback;  +};  +   // Implementation of IQueueEvent that reduces allocations -template<class TSelf> +template<class TSelf>   class TQueueFixedEvent: private IQueueEvent { -    using TCallback = void (TSelf::*)(EQueueEventStatus); - -public: -    TQueueFixedEvent(TSelf* self, TCallback callback) -        : Self(self) -        , Callback(callback) -    { } - +    using TCallback = void (TSelf::*)(EQueueEventStatus);  +  +public:  +    TQueueFixedEvent(TSelf* self, TCallback callback)  +        : Self(self)  +        , Callback(callback)  +    { }  +       IQueueEvent* Prepare() { -        Self->Ref(); -        return this; -    } - -private: -    bool Execute(bool ok) override { -        ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); -        return false; -    } - -    void DestroyRequest() override { -        Self->UnRef(); -    } - -private: -    TSelf* const Self; -    TCallback const Callback; -}; - -template<class TCallback> +        Self->Ref();  +        return this;  +    }  +  +private:  +    bool Execute(bool ok) override {  +        ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);  +        return false;  +    }  +  +    void DestroyRequest() override {  +        Self->UnRef();  +    }  +  +private:  +    TSelf* const Self;  +    TCallback const Callback;  +};  +  +template<class TCallback>   inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) { -    return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback)); -} - -template<class T> +    return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback));  +}  +  +template<class T>   inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) { -    using TPtr = TIntrusivePtr<T>; -    return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) { -        ((*self).*method)(status); -    }); -} - +    using TPtr = TIntrusivePtr<T>;  +    return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) {  +        ((*self).*method)(status);  +    });  +}  +   } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h index 51356d4ce5a..d170e2ef340 100644 --- a/library/cpp/grpc/server/grpc_async_ctx_base.h +++ b/library/cpp/grpc/server/grpc_async_ctx_base.h @@ -1,48 +1,48 @@ -#pragma once - -#include "grpc_server.h" - -#include <util/generic/vector.h> -#include <util/generic/string.h> -#include <util/system/yassert.h> +#pragma once  +  +#include "grpc_server.h"  +  +#include <util/generic/vector.h>  +#include <util/generic/string.h>  +#include <util/system/yassert.h>   #include <util/generic/set.h> - -#include <grpc++/server.h> -#include <grpc++/server_context.h> - -#include <chrono> - +  +#include <grpc++/server.h>  +#include <grpc++/server_context.h>  +  +#include <chrono>  +   namespace NGrpc { - -template<typename TService> +  +template<typename TService>   class TBaseAsyncContext: public ICancelableContext { -public: -    TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq) -        : Service(service) -        , CQ(cq) -    { -    } - -    TString GetPeerName() const { -        return TString(Context.peer()); -    } - -    TInstant Deadline() const { -        // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline +public:  +    TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)  +        : Service(service)  +        , CQ(cq)  +    {  +    }  +  +    TString GetPeerName() const {  +        return TString(Context.peer());  +    }  +  +    TInstant Deadline() const {  +        // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline           // right before the request is getting to be send.          // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md          // -        // After this timeout calculated back to the deadline on the server side -        // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method). -        // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME -        // - -        std::chrono::system_clock::time_point t = Context.deadline(); -        if (t == std::chrono::system_clock::time_point::max()) { -            return TInstant::Max(); +        // After this timeout calculated back to the deadline on the server side  +        // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).  +        // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME  +        //  + +        std::chrono::system_clock::time_point t = Context.deadline();  +        if (t == std::chrono::system_clock::time_point::max()) {  +            return TInstant::Max();           } -        auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t); -        return TInstant::MicroSeconds(us.time_since_epoch().count()); +        auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);  +        return TInstant::MicroSeconds(us.time_since_epoch().count());       }      TSet<TStringBuf> GetPeerMetaKeys() const { @@ -54,7 +54,7 @@ public:      }      TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const { -        const auto& clientMetadata = Context.client_metadata(); +        const auto& clientMetadata = Context.client_metadata();           const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});          if (range.first == range.second) {              return {}; @@ -63,32 +63,32 @@ public:          TVector<TStringBuf> values;          values.reserve(std::distance(range.first, range.second)); -        for (auto it = range.first; it != range.second; ++it) { +        for (auto it = range.first; it != range.second; ++it) {               values.emplace_back(it->second.data(), it->second.size()); -        } +        }           return values; -    } - +    }  +       grpc_compression_level GetCompressionLevel() const {          return Context.compression_level();      } -    void Shutdown() override { -        // Shutdown may only be called after request has started successfully -        if (Context.c_call()) -            Context.TryCancel(); -    } - -protected: -    //! The means of communication with the gRPC runtime for an asynchronous -    //! server. -    typename TService::TCurrentGRpcService::AsyncService* const Service; -    //! The producer-consumer queue where for asynchronous server notifications. -    grpc::ServerCompletionQueue* const CQ; -    //! Context for the rpc, allowing to tweak aspects of it such as the use -    //! of compression, authentication, as well as to send metadata back to the -    //! client. -    grpc::ServerContext Context; -}; - +    void Shutdown() override {  +        // Shutdown may only be called after request has started successfully  +        if (Context.c_call())  +            Context.TryCancel();  +    }  +  +protected:  +    //! The means of communication with the gRPC runtime for an asynchronous  +    //! server.  +    typename TService::TCurrentGRpcService::AsyncService* const Service;  +    //! The producer-consumer queue where for asynchronous server notifications.  +    grpc::ServerCompletionQueue* const CQ;  +    //! Context for the rpc, allowing to tweak aspects of it such as the use  +    //! of compression, authentication, as well as to send metadata back to the  +    //! client.  +    grpc::ServerContext Context;  +};  +   } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp index fa96e0100b9..f3a0b2b5e98 100644 --- a/library/cpp/grpc/server/grpc_counters.cpp +++ b/library/cpp/grpc/server/grpc_counters.cpp @@ -1,4 +1,4 @@ -#include "grpc_counters.h" +#include "grpc_counters.h"   namespace NGrpc {  namespace { diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h index 0b6c36c84cc..25b7c28369c 100644 --- a/library/cpp/grpc/server/grpc_counters.h +++ b/library/cpp/grpc/server/grpc_counters.h @@ -1,11 +1,11 @@ -#pragma once - +#pragma once  +   #include <library/cpp/monlib/dynamic_counters/percentile/percentile.h>  #include <library/cpp/monlib/dynamic_counters/counters.h> -#include <util/generic/ptr.h> - +#include <util/generic/ptr.h>  +   namespace NGrpc { - +   struct ICounterBlock : public TThrRefBase {      virtual void CountNotOkRequest() = 0;      virtual void CountNotOkResponse() = 0; @@ -15,9 +15,9 @@ struct ICounterBlock : public TThrRefBase {      virtual void CountResponseBytes(ui32 responseSize) = 0;      virtual void StartProcessing(ui32 requestSize) = 0;      virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0; -    virtual void CountRequestsWithoutDatabase() {} -    virtual void CountRequestsWithoutToken() {} -    virtual void CountRequestWithoutTls() {} +    virtual void CountRequestsWithoutDatabase() {}  +    virtual void CountRequestsWithoutToken() {}  +    virtual void CountRequestWithoutTls() {}       virtual TIntrusivePtr<ICounterBlock> Clone() { return this; }      virtual void UseDatabase(const TString& database) { Y_UNUSED(database); } @@ -26,62 +26,62 @@ struct ICounterBlock : public TThrRefBase {  using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>;  class TCounterBlock final : public ICounterBlock { -    NMonitoring::TDynamicCounters::TCounterPtr TotalCounter; -    NMonitoring::TDynamicCounters::TCounterPtr InflyCounter; -    NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter; -    NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter; -    NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; -    NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes; -    NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes; -    NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated; -    NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted; +    NMonitoring::TDynamicCounters::TCounterPtr TotalCounter;  +    NMonitoring::TDynamicCounters::TCounterPtr InflyCounter;  +    NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter;  +    NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter;  +    NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;  +    NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes;  +    NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes;  +    NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated;  +    NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted;       bool Percentile = false;      NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs; -    std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2>  GRpcStatusCounters; - -public: -    TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter, -            NMonitoring::TDynamicCounters::TCounterPtr inflyCounter, -            NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter, -            NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter, -            NMonitoring::TDynamicCounters::TCounterPtr requestBytes, -            NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes, +    std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2>  GRpcStatusCounters;  +  +public:  +    TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter,  +            NMonitoring::TDynamicCounters::TCounterPtr inflyCounter,  +            NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter,  +            NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter,  +            NMonitoring::TDynamicCounters::TCounterPtr requestBytes,  +            NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes,               NMonitoring::TDynamicCounters::TCounterPtr responseBytes, -            NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated, -            NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted, +            NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated,  +            NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted,               TIntrusivePtr<NMonitoring::TDynamicCounters> group) -        : TotalCounter(std::move(totalCounter)) -        , InflyCounter(std::move(inflyCounter)) -        , NotOkRequestCounter(std::move(notOkRequestCounter)) -        , NotOkResponseCounter(std::move(notOkResponseCounter)) -        , RequestBytes(std::move(requestBytes)) -        , InflyRequestBytes(std::move(inflyRequestBytes)) -        , ResponseBytes(std::move(responseBytes)) -        , NotAuthenticated(std::move(notAuthenticated)) -        , ResourceExhausted(std::move(resourceExhausted)) +        : TotalCounter(std::move(totalCounter))  +        , InflyCounter(std::move(inflyCounter))  +        , NotOkRequestCounter(std::move(notOkRequestCounter))  +        , NotOkResponseCounter(std::move(notOkResponseCounter))  +        , RequestBytes(std::move(requestBytes))  +        , InflyRequestBytes(std::move(inflyRequestBytes))  +        , ResponseBytes(std::move(responseBytes))  +        , NotAuthenticated(std::move(notAuthenticated))  +        , ResourceExhausted(std::move(resourceExhausted))       {          if (group) {              RequestHistMs.Initialize(group, "event", "request", "ms", {0.5f, 0.9f, 0.99f, 0.999f, 1.0f});              Percentile = true;          }      } - +       void CountNotOkRequest() override { -        NotOkRequestCounter->Inc(); -    } - +        NotOkRequestCounter->Inc();  +    }  +       void CountNotOkResponse() override { -        NotOkResponseCounter->Inc(); -    } - +        NotOkResponseCounter->Inc();  +    }  +       void CountNotAuthenticated() override { -        NotAuthenticated->Inc(); -    } - +        NotAuthenticated->Inc();  +    }  +       void CountResourceExhausted() override { -        ResourceExhausted->Inc(); -    } - +        ResourceExhausted->Inc();  +    }  +       void CountRequestBytes(ui32 requestSize) override {          *RequestBytes += requestSize;      } @@ -91,27 +91,27 @@ public:      }      void StartProcessing(ui32 requestSize) override { -        TotalCounter->Inc(); -        InflyCounter->Inc(); -        *RequestBytes += requestSize; -        *InflyRequestBytes += requestSize; -    } - +        TotalCounter->Inc();  +        InflyCounter->Inc();  +        *RequestBytes += requestSize;  +        *InflyRequestBytes += requestSize;  +    }  +       void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status,          TDuration requestDuration) override      {          Y_UNUSED(status); -        InflyCounter->Dec(); -        *InflyRequestBytes -= requestSize; -        *ResponseBytes += responseSize; -        if (!ok) { -            NotOkResponseCounter->Inc(); -        } +        InflyCounter->Dec();  +        *InflyRequestBytes -= requestSize;  +        *ResponseBytes += responseSize;  +        if (!ok) {  +            NotOkResponseCounter->Inc();  +        }           if (Percentile) {              RequestHistMs.Increment(requestDuration.MilliSeconds());          } -    } +    }       ICounterBlockPtr Clone() override {          return this; @@ -122,10 +122,10 @@ public:              RequestHistMs.Update();          }      } -}; - -using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>; - +};  +  +using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>;  +   /**   * Creates new instance of ICounterBlock implementation which does nothing.   * diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp index d18a32776f2..33264fe6f29 100644 --- a/library/cpp/grpc/server/grpc_request.cpp +++ b/library/cpp/grpc/server/grpc_request.cpp @@ -1,59 +1,59 @@ -#include "grpc_request.h" - +#include "grpc_request.h"  +   namespace NGrpc { - -const char* GRPC_USER_AGENT_HEADER = "user-agent"; - +  +const char* GRPC_USER_AGENT_HEADER = "user-agent";  +   class TStreamAdaptor: public IStreamAdaptor { -public: -    TStreamAdaptor() -        : StreamIsReady_(true) -    {} - -    void Enqueue(std::function<void()>&& fn, bool urgent) override { -        with_lock(Mtx_) { -            if (!UrgentQueue_.empty() || !NormalQueue_.empty()) { -                Y_VERIFY(!StreamIsReady_); -            } -            auto& queue = urgent ? UrgentQueue_ : NormalQueue_; -            if (StreamIsReady_ && queue.empty()) { -                StreamIsReady_ = false; -            } else { -                queue.push_back(std::move(fn)); -                return; -            } -        } -        fn(); -    } - -    size_t ProcessNext() override { -        size_t left = 0; -        std::function<void()> fn; -        with_lock(Mtx_) { -            Y_VERIFY(!StreamIsReady_); -            auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_; -            if (queue.empty()) { -                // Both queues are empty -                StreamIsReady_ = true; -            } else { -                fn = std::move(queue.front()); -                queue.pop_front(); -                left = UrgentQueue_.size() + NormalQueue_.size(); -            } -        } -        if (fn) -            fn(); -        return left; -    } -private: -    bool StreamIsReady_; -    TList<std::function<void()>> NormalQueue_; -    TList<std::function<void()>> UrgentQueue_; -    TMutex Mtx_; -}; - -IStreamAdaptor::TPtr CreateStreamAdaptor() { -    return std::make_unique<TStreamAdaptor>(); -} - +public:  +    TStreamAdaptor()  +        : StreamIsReady_(true)  +    {}  +  +    void Enqueue(std::function<void()>&& fn, bool urgent) override {  +        with_lock(Mtx_) {  +            if (!UrgentQueue_.empty() || !NormalQueue_.empty()) {  +                Y_VERIFY(!StreamIsReady_);  +            }  +            auto& queue = urgent ? UrgentQueue_ : NormalQueue_;  +            if (StreamIsReady_ && queue.empty()) {  +                StreamIsReady_ = false;  +            } else {  +                queue.push_back(std::move(fn));  +                return;  +            }  +        }  +        fn();  +    }  +  +    size_t ProcessNext() override {  +        size_t left = 0;  +        std::function<void()> fn;  +        with_lock(Mtx_) {  +            Y_VERIFY(!StreamIsReady_);  +            auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_;  +            if (queue.empty()) {  +                // Both queues are empty  +                StreamIsReady_ = true;  +            } else {  +                fn = std::move(queue.front());  +                queue.pop_front();  +                left = UrgentQueue_.size() + NormalQueue_.size();  +            }  +        }  +        if (fn)  +            fn();  +        return left;  +    }  +private:  +    bool StreamIsReady_;  +    TList<std::function<void()>> NormalQueue_;  +    TList<std::function<void()>> UrgentQueue_;  +    TMutex Mtx_;  +};  +  +IStreamAdaptor::TPtr CreateStreamAdaptor() {  +    return std::make_unique<TStreamAdaptor>();  +}  +   } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index 5bd8d3902b5..2841069d62f 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -1,118 +1,118 @@ -#pragma once - +#pragma once  +   #include <google/protobuf/text_format.h>  #include <google/protobuf/arena.h>  #include <google/protobuf/message.h> - +   #include <library/cpp/monlib/dynamic_counters/counters.h>  #include <library/cpp/logger/priority.h> - +   #include "grpc_response.h" -#include "event_callback.h" +#include "event_callback.h"   #include "grpc_async_ctx_base.h"  #include "grpc_counters.h" -#include "grpc_request_base.h" -#include "grpc_server.h" +#include "grpc_request_base.h"  +#include "grpc_server.h"   #include "logger.h" - +   #include <util/system/hp_timer.h> -#include <grpc++/server.h> -#include <grpc++/server_context.h> +#include <grpc++/server.h>  +#include <grpc++/server_context.h>   #include <grpc++/support/async_stream.h>  #include <grpc++/support/async_unary_call.h> -#include <grpc++/support/byte_buffer.h> -#include <grpc++/impl/codegen/async_stream.h> - +#include <grpc++/support/byte_buffer.h>  +#include <grpc++/impl/codegen/async_stream.h>  +   namespace NGrpc { - -class IStreamAdaptor { -public: -    using TPtr = std::unique_ptr<IStreamAdaptor>; -    virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0; -    virtual size_t ProcessNext() = 0; -    virtual ~IStreamAdaptor() = default; -}; - -IStreamAdaptor::TPtr CreateStreamAdaptor(); - -/////////////////////////////////////////////////////////////////////////////// +  +class IStreamAdaptor {  +public:  +    using TPtr = std::unique_ptr<IStreamAdaptor>;  +    virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0;  +    virtual size_t ProcessNext() = 0;  +    virtual ~IStreamAdaptor() = default;  +};  +  +IStreamAdaptor::TPtr CreateStreamAdaptor();  +  +///////////////////////////////////////////////////////////////////////////////   template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter>  class TGRpcRequestImpl -    : public TBaseAsyncContext<TService> -    , public IQueueEvent -    , public IRequestContextBase -{ +    : public TBaseAsyncContext<TService>  +    , public IQueueEvent  +    , public IRequestContextBase  +{       using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; -public: -    using TOnRequest = std::function<void (IRequestContextBase* ctx)>; -    using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, -        grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); -    using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, -        grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); - +public:  +    using TOnRequest = std::function<void (IRequestContextBase* ctx)>;  +    using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,  +        grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);  +    using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,  +        grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);  +       TGRpcRequestImpl(TService* server, -                 typename TService::TCurrentGRpcService::AsyncService* service, -                 grpc::ServerCompletionQueue* cq, -                 TOnRequest cb, -                 TRequestCallback requestCallback, +                 typename TService::TCurrentGRpcService::AsyncService* service,  +                 grpc::ServerCompletionQueue* cq,  +                 TOnRequest cb,  +                 TRequestCallback requestCallback,                    const char* name,                   TLoggerPtr logger,                   ICounterBlockPtr counters,                   IGRpcRequestLimiterPtr limiter) -        : TBaseAsyncContext<TService>(service, cq) -        , Server_(server) -        , Cb_(cb) -        , RequestCallback_(requestCallback) -        , StreamRequestCallback_(nullptr) -        , Name_(name) +        : TBaseAsyncContext<TService>(service, cq)  +        , Server_(server)  +        , Cb_(cb)  +        , RequestCallback_(requestCallback)  +        , StreamRequestCallback_(nullptr)  +        , Name_(name)           , Logger_(std::move(logger)) -        , Counters_(std::move(counters)) +        , Counters_(std::move(counters))           , RequestLimiter_(std::move(limiter))          , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))          , StateFunc_(&TThis::SetRequestDone) -    { -        AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); -        Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); -        Y_VERIFY(Request_); +    {  +        AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);  +        Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);  +        Y_VERIFY(Request_);           GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); -        FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); -    } - +        FinishPromise_ = NThreading::NewPromise<EFinishStatus>();  +    }  +       TGRpcRequestImpl(TService* server, -                 typename TService::TCurrentGRpcService::AsyncService* service, -                 grpc::ServerCompletionQueue* cq, -                 TOnRequest cb, -                 TStreamRequestCallback requestCallback, +                 typename TService::TCurrentGRpcService::AsyncService* service,  +                 grpc::ServerCompletionQueue* cq,  +                 TOnRequest cb,  +                 TStreamRequestCallback requestCallback,                    const char* name,                   TLoggerPtr logger,                   ICounterBlockPtr counters,                   IGRpcRequestLimiterPtr limiter) -        : TBaseAsyncContext<TService>(service, cq) -        , Server_(server) -        , Cb_(cb) -        , RequestCallback_(nullptr) -        , StreamRequestCallback_(requestCallback) -        , Name_(name) +        : TBaseAsyncContext<TService>(service, cq)  +        , Server_(server)  +        , Cb_(cb)  +        , RequestCallback_(nullptr)  +        , StreamRequestCallback_(requestCallback)  +        , Name_(name)           , Logger_(std::move(logger)) -        , Counters_(std::move(counters)) +        , Counters_(std::move(counters))           , RequestLimiter_(std::move(limiter)) -        , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) +        , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))           , StateFunc_(&TThis::SetRequestDone) -    { -        AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); -        Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); -        Y_VERIFY(Request_); +    {  +        AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);  +        Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);  +        Y_VERIFY(Request_);           GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); -        FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); -        StreamAdaptor_ = CreateStreamAdaptor(); -    } - -    TAsyncFinishResult GetFinishFuture() override { -        return FinishPromise_.GetFuture(); -    } - +        FinishPromise_ = NThreading::NewPromise<EFinishStatus>();  +        StreamAdaptor_ = CreateStreamAdaptor();  +    }  +  +    TAsyncFinishResult GetFinishFuture() override {  +        return FinishPromise_.GetFuture();  +    }  +       TString GetPeer() const override {          return TString(this->Context.peer());      } @@ -121,7 +121,7 @@ public:          return Server_->SslServer();      } -    void Run() { +    void Run() {           // Start request unless server is shutting down          if (auto guard = Server_->ProtectShutdown()) {              Ref(); //For grpc c runtime @@ -135,28 +135,28 @@ public:                          (&this->Context, Request_,                          reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag());              } -        } -    } - +        }  +    }  +       ~TGRpcRequestImpl() { -        // No direct dtor call allowed -        Y_ASSERT(RefCount() == 0); -    } - -    bool Execute(bool ok) override { -        return (this->*StateFunc_)(ok); -    } - -    void DestroyRequest() override { +        // No direct dtor call allowed  +        Y_ASSERT(RefCount() == 0);  +    }  +  +    bool Execute(bool ok) override {  +        return (this->*StateFunc_)(ok);  +    }  +  +    void DestroyRequest() override {           if (RequestRegistered_) {              Server_->DeregisterRequestCtx(this);              RequestRegistered_ = false;          } -        UnRef(); -    } - -    TInstant Deadline() const override { -        return TBaseAsyncContext<TService>::Deadline(); +        UnRef();  +    }  +  +    TInstant Deadline() const override {  +        return TBaseAsyncContext<TService>::Deadline();       }      TSet<TStringBuf> GetPeerMetaKeys() const override { @@ -164,299 +164,299 @@ public:      }      TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { -        return TBaseAsyncContext<TService>::GetPeerMetaValues(key); -    } - +        return TBaseAsyncContext<TService>::GetPeerMetaValues(key);  +    }  +       grpc_compression_level GetCompressionLevel() const override {          return TBaseAsyncContext<TService>::GetCompressionLevel();      } -    //! Get pointer to the request's message. -    const NProtoBuf::Message* GetRequest() const override { -        return Request_; -    } - -    TAuthState& GetAuthState() override { -        return AuthState_; -    } - +    //! Get pointer to the request's message.  +    const NProtoBuf::Message* GetRequest() const override {  +        return Request_;  +    }  +  +    TAuthState& GetAuthState() override {  +        return AuthState_;  +    }  +       void Reply(NProtoBuf::Message* resp, ui32 status) override {          ResponseStatus = status; -        WriteDataOk(resp); -    } - +        WriteDataOk(resp);  +    }  +       void Reply(grpc::ByteBuffer* resp, ui32 status) override {          ResponseStatus = status; -        WriteByteDataOk(resp); -    } - +        WriteByteDataOk(resp);  +    }  +       void ReplyError(grpc::StatusCode code, const TString& msg) override { -        FinishGrpcStatus(code, msg, false); -    } - -    void ReplyUnauthenticated(const TString& in) override { -        const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; -        FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); -    } - -    void SetNextReplyCallback(TOnNextReply&& cb) override { -        NextReplyCb_ = cb; -    } - +        FinishGrpcStatus(code, msg, false);  +    } + +    void ReplyUnauthenticated(const TString& in) override {  +        const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;  +        FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false);  +    }  +  +    void SetNextReplyCallback(TOnNextReply&& cb) override {  +        NextReplyCb_ = cb;  +    }  +       void AddTrailingMetadata(const TString& key, const TString& value) override { -        this->Context.AddTrailingMetadata(key, value); -    } - -    void FinishStreamingOk() override { -        GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_, -                  this->Context.peer().c_str()); -        auto cb = [this]() { -            StateFunc_ = &TThis::SetFinishDone; -            GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_, -                      this->Context.peer().c_str()); - -            StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag()); -        }; -        StreamAdaptor_->Enqueue(std::move(cb), false); -    } - -    google::protobuf::Arena* GetArena() override { -        return &Arena_; -    } - +        this->Context.AddTrailingMetadata(key, value);  +    }  +  +    void FinishStreamingOk() override {  +        GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_,  +                  this->Context.peer().c_str());  +        auto cb = [this]() {  +            StateFunc_ = &TThis::SetFinishDone;  +            GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_,  +                      this->Context.peer().c_str());  +  +            StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag());  +        };  +        StreamAdaptor_->Enqueue(std::move(cb), false);  +    }  +  +    google::protobuf::Arena* GetArena() override {  +        return &Arena_;  +    }  +       void UseDatabase(const TString& database) override {          Counters_->UseDatabase(database);      } -private: -    void Clone() { -        if (!Server_->IsShuttingDown()) { -            if (RequestCallback_) { +private:  +    void Clone() {  +        if (!Server_->IsShuttingDown()) {  +            if (RequestCallback_) {                   MakeIntrusive<TThis>(                      Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); -            } else { +            } else {                   MakeIntrusive<TThis>(                      Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); -            } -        } -    } - -    void WriteDataOk(NProtoBuf::Message* resp) { -        auto makeResponseString = [&] { -            TString x; +            }  +        }  +    }  +  +    void WriteDataOk(NProtoBuf::Message* resp) {  +        auto makeResponseString = [&] {  +            TString x;               TOutProtoPrinter printer; -            printer.SetSingleLineMode(true); -            printer.PrintToString(*resp, &x); -            return x; -        }; - -        auto sz = (size_t)resp->ByteSize(); -        if (Writer_) { -            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_, -                makeResponseString().data(), this->Context.peer().c_str()); +            printer.SetSingleLineMode(true);  +            printer.PrintToString(*resp, &x);  +            return x;  +        };  +  +        auto sz = (size_t)resp->ByteSize();  +        if (Writer_) {  +            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,  +                makeResponseString().data(), this->Context.peer().c_str());               StateFunc_ = &TThis::SetFinishDone; -            ResponseSize = sz; -            Y_VERIFY(this->Context.c_call()); +            ResponseSize = sz;  +            Y_VERIFY(this->Context.c_call());               Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); -        } else { -            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)", -                this, Name_, makeResponseString().data(), this->Context.peer().c_str()); +        } else {  +            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",  +                this, Name_, makeResponseString().data(), this->Context.peer().c_str());               // because of std::function cannot hold move-only captured object              // we allocate shared object on heap to avoid message copy              auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);              auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() { -                GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)", -                    this, Name_, makeResponseString().data(), this->Context.peer().c_str()); -                StateFunc_ = &TThis::NextReply; -                ResponseSize += sz; +                GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",  +                    this, Name_, makeResponseString().data(), this->Context.peer().c_str());  +                StateFunc_ = &TThis::NextReply;  +                ResponseSize += sz;                   StreamWriter_->Write(*uResp, GetGRpcTag()); -            }; -            StreamAdaptor_->Enqueue(std::move(cb), false); -        } -    } - -    void WriteByteDataOk(grpc::ByteBuffer* resp) { -        auto sz = resp->Length(); -        if (Writer_) { -            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, -                this->Context.peer().c_str()); +            };  +            StreamAdaptor_->Enqueue(std::move(cb), false);  +        }  +    }  +  +    void WriteByteDataOk(grpc::ByteBuffer* resp) {  +        auto sz = resp->Length();  +        if (Writer_) {  +            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,  +                this->Context.peer().c_str());               StateFunc_ = &TThis::SetFinishDone; -            ResponseSize = sz; +            ResponseSize = sz;               Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); -        } else { -            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_, -                this->Context.peer().c_str()); +        } else {  +            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,  +                this->Context.peer().c_str());               // because of std::function cannot hold move-only captured object              // we allocate shared object on heap to avoid buffer copy              auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);              auto cb = [this, uResp = std::move(uResp), sz]() { -                GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)", -                    this, Name_, this->Context.peer().c_str()); -                StateFunc_ = &TThis::NextReply; -                ResponseSize += sz; +                GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",  +                    this, Name_, this->Context.peer().c_str());  +                StateFunc_ = &TThis::NextReply;  +                ResponseSize += sz;                   StreamWriter_->Write(*uResp, GetGRpcTag()); -            }; -            StreamAdaptor_->Enqueue(std::move(cb), false); -        } -    } - -    void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) { -        Y_VERIFY(code != grpc::OK); -        if (code == grpc::StatusCode::UNAUTHENTICATED) { -            Counters_->CountNotAuthenticated(); -        } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) { -            Counters_->CountResourceExhausted(); -        } - -        if (Writer_) { -            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this, -                Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); -            StateFunc_ = &TThis::SetFinishError; +            };  +            StreamAdaptor_->Enqueue(std::move(cb), false);  +        }  +    }  +  +    void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) {  +        Y_VERIFY(code != grpc::OK);  +        if (code == grpc::StatusCode::UNAUTHENTICATED) {  +            Counters_->CountNotAuthenticated();  +        } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {  +            Counters_->CountResourceExhausted();  +        }  +  +        if (Writer_) {  +            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,  +                Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);  +            StateFunc_ = &TThis::SetFinishError;               TOut resp;              Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); -        } else { -            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" -                                    " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); -            auto cb = [this, code, msg]() { -                GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" -                                        " (pushed to grpc)", this, Name_, msg.c_str(), -                               this->Context.peer().c_str(), (int)code); -                StateFunc_ = &TThis::SetFinishError; -                StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag()); -            }; -            StreamAdaptor_->Enqueue(std::move(cb), urgent); -        } -    } - -    bool SetRequestDone(bool ok) { -        auto makeRequestString = [&] { -            TString resp; -            if (ok) { +        } else {  +            GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"  +                                    " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);  +            auto cb = [this, code, msg]() {  +                GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"  +                                        " (pushed to grpc)", this, Name_, msg.c_str(),  +                               this->Context.peer().c_str(), (int)code);  +                StateFunc_ = &TThis::SetFinishError;  +                StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag());  +            };  +            StreamAdaptor_->Enqueue(std::move(cb), urgent);  +        }  +    }  +  +    bool SetRequestDone(bool ok) {  +        auto makeRequestString = [&] {  +            TString resp;  +            if (ok) {                   TInProtoPrinter printer; -                printer.SetSingleLineMode(true); -                printer.PrintToString(*Request_, &resp); -            } else { -                resp = "<not ok>"; -            } -            return resp; -        }; +                printer.SetSingleLineMode(true);  +                printer.PrintToString(*Request_, &resp);  +            } else {  +                resp = "<not ok>";  +            }  +            return resp;  +        };           GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,              ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str()); - -        if (this->Context.c_call() == nullptr) { +  +        if (this->Context.c_call() == nullptr) {               Y_VERIFY(!ok); -            // One ref by OnFinishTag, grpc will not call this tag if no request received -            UnRef(); +            // One ref by OnFinishTag, grpc will not call this tag if no request received  +            UnRef();           } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {              // Request cannot be registered due to shutdown              // It's unsafe to continue, so drop this request without processing              GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);              this->Context.TryCancel();              return false; -        } - -        Clone(); // TODO: Request pool? -        if (!ok) { -            Counters_->CountNotOkRequest(); -            return false; -        } - +        }  +  +        Clone(); // TODO: Request pool?  +        if (!ok) {  +            Counters_->CountNotOkRequest();  +            return false;  +        }  +           if (IncRequest()) { -            // Adjust counters. -            RequestSize = Request_->ByteSize(); -            Counters_->StartProcessing(RequestSize); +            // Adjust counters.  +            RequestSize = Request_->ByteSize();  +            Counters_->StartProcessing(RequestSize);               RequestTimer.Reset(); - -            if (!SslServer()) { -                Counters_->CountRequestWithoutTls(); -            } - -            //TODO: Move this in to grpc_request_proxy +  +            if (!SslServer()) {  +                Counters_->CountRequestWithoutTls();  +            }  +  +            //TODO: Move this in to grpc_request_proxy               auto maybeDatabase = GetPeerMetaValues(TStringBuf("x-ydb-database")); -            if (maybeDatabase.empty()) { -                Counters_->CountRequestsWithoutDatabase(); -            } +            if (maybeDatabase.empty()) {  +                Counters_->CountRequestsWithoutDatabase();  +            }               auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket")); -            if (maybeToken.empty() || maybeToken[0].empty()) { +            if (maybeToken.empty() || maybeToken[0].empty()) {                   TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}}; -                Counters_->CountRequestsWithoutToken(); -                GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token " -                    "Name# %s data# %s peer# %s database# %s", this, Name_, -                    makeRequestString().data(), this->Context.peer().c_str(), db.c_str()); -            } - -            // Handle current request. -            Cb_(this); -        } else { -            //This request has not been counted -            SkipUpdateCountersOnError = true; -            FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true); -        } -        return true; -    } - -    bool NextReply(bool ok) { -        auto logCb = [this, ok](int left) { -            GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_, -                ok ? "true" : "false", this->Context.peer().c_str(), left); -        }; - -        if (!ok) { -            logCb(-1); +                Counters_->CountRequestsWithoutToken();  +                GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "  +                    "Name# %s data# %s peer# %s database# %s", this, Name_,  +                    makeRequestString().data(), this->Context.peer().c_str(), db.c_str());  +            }  +  +            // Handle current request.  +            Cb_(this);  +        } else {  +            //This request has not been counted  +            SkipUpdateCountersOnError = true;  +            FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true);  +        }  +        return true;  +    }  +  +    bool NextReply(bool ok) {  +        auto logCb = [this, ok](int left) {  +            GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_,  +                ok ? "true" : "false", this->Context.peer().c_str(), left);  +        };  +  +        if (!ok) {  +            logCb(-1);               DecRequest(); -            Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, -                TDuration::Seconds(RequestTimer.Passed())); -            return false; -        } - -        Ref();  // To prevent destroy during this call in case of execution Finish -        size_t left = StreamAdaptor_->ProcessNext(); -        logCb(left); -        if (NextReplyCb_) { -            NextReplyCb_(left); -        } -        // Now it is safe to destroy even if Finish was called -        UnRef(); -        return true; -    } - -    bool SetFinishDone(bool ok) { +            Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,  +                TDuration::Seconds(RequestTimer.Passed()));  +            return false;  +        }  +  +        Ref();  // To prevent destroy during this call in case of execution Finish  +        size_t left = StreamAdaptor_->ProcessNext();  +        logCb(left);  +        if (NextReplyCb_) {  +            NextReplyCb_(left);  +        }  +        // Now it is safe to destroy even if Finish was called  +        UnRef();  +        return true;  +    }  +  +    bool SetFinishDone(bool ok) {           GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, -            ok ? "true" : "false", this->Context.peer().c_str()); -        //PrintBackTrace(); +            ok ? "true" : "false", this->Context.peer().c_str());  +        //PrintBackTrace();           DecRequest();          Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,              TDuration::Seconds(RequestTimer.Passed())); -        return false; -    } - -    bool SetFinishError(bool ok) { +        return false;  +    }  +  +    bool SetFinishError(bool ok) {           GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, -            ok ? "true" : "false", this->Context.peer().c_str()); -        if (!SkipUpdateCountersOnError) { +            ok ? "true" : "false", this->Context.peer().c_str());  +        if (!SkipUpdateCountersOnError) {               DecRequest();              Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,                  TDuration::Seconds(RequestTimer.Passed())); -        } -        return false; -    } - -    // Returns pointer to IQueueEvent to pass into grpc c runtime -    // Implicit C style cast from this to void* is wrong due to multiple inheritance -    void* GetGRpcTag() { -        return static_cast<IQueueEvent*>(this); -    } - -    void OnFinish(EQueueEventStatus evStatus) { -        if (this->Context.IsCancelled()) { -            FinishPromise_.SetValue(EFinishStatus::CANCEL); -        } else { -            FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR); -        } -    } - +        }  +        return false;  +    }  +  +    // Returns pointer to IQueueEvent to pass into grpc c runtime  +    // Implicit C style cast from this to void* is wrong due to multiple inheritance  +    void* GetGRpcTag() {  +        return static_cast<IQueueEvent*>(this);  +    }  +  +    void OnFinish(EQueueEventStatus evStatus) {  +        if (this->Context.IsCancelled()) {  +            FinishPromise_.SetValue(EFinishStatus::CANCEL);  +        } else {  +            FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);  +        }  +    }  +       bool IncRequest() {          if (!Server_->IncRequest())              return false; @@ -480,36 +480,36 @@ private:      }      using TStateFunc = bool (TThis::*)(bool); -    TService* Server_; -    TOnRequest Cb_; -    TRequestCallback RequestCallback_; -    TStreamRequestCallback StreamRequestCallback_; +    TService* Server_;  +    TOnRequest Cb_;  +    TRequestCallback RequestCallback_;  +    TStreamRequestCallback StreamRequestCallback_;       const char* const Name_;      TLoggerPtr Logger_;      ICounterBlockPtr Counters_;      IGRpcRequestLimiterPtr RequestLimiter_; - +       THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; -    THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_; -    TStateFunc StateFunc_; -    TIn* Request_; - -    google::protobuf::Arena Arena_; -    TOnNextReply NextReplyCb_; -    ui32 RequestSize = 0; -    ui32 ResponseSize = 0; +    THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;  +    TStateFunc StateFunc_;  +    TIn* Request_;  +  +    google::protobuf::Arena Arena_;  +    TOnNextReply NextReplyCb_;  +    ui32 RequestSize = 0;  +    ui32 ResponseSize = 0;       ui32 ResponseStatus = 0;      THPTimer RequestTimer; -    TAuthState AuthState_ = 0; +    TAuthState AuthState_ = 0;       bool RequestRegistered_ = false; - +       using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;      TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; -    NThreading::TPromise<EFinishStatus> FinishPromise_; -    bool SkipUpdateCountersOnError = false; -    IStreamAdaptor::TPtr StreamAdaptor_; -}; - +    NThreading::TPromise<EFinishStatus> FinishPromise_;  +    bool SkipUpdateCountersOnError = false;  +    IStreamAdaptor::TPtr StreamAdaptor_;  +};  +   template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>  class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> {      using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index fcfce1c181a..506221dd988 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -1,33 +1,33 @@ -#pragma once - +#pragma once  +   #include <google/protobuf/message.h>  #include <library/cpp/threading/future/future.h> - +   #include <grpc++/server_context.h> -namespace grpc { -class ByteBuffer; -} - +namespace grpc {  +class ByteBuffer;  +}  +   namespace NGrpc { - -extern const char* GRPC_USER_AGENT_HEADER; - -struct TAuthState { -    enum EAuthState { -        AS_NOT_PERFORMED, -        AS_OK, -        AS_FAIL, -        AS_UNAVAILABLE -    }; -    TAuthState(bool needAuth) -        : NeedAuth(needAuth) -        , State(AS_NOT_PERFORMED) -    {} -    bool NeedAuth; -    EAuthState State; -}; - +  +extern const char* GRPC_USER_AGENT_HEADER;  +  +struct TAuthState {  +    enum EAuthState {  +        AS_NOT_PERFORMED,  +        AS_OK,  +        AS_FAIL,  +        AS_UNAVAILABLE  +    };  +    TAuthState(bool needAuth)  +        : NeedAuth(needAuth)  +        , State(AS_NOT_PERFORMED)  +    {}  +    bool NeedAuth;  +    EAuthState State;  +};  +   //! An interface that may be used to limit concurrency of requests  class IGRpcRequestLimiter: public TThrRefBase { @@ -38,79 +38,79 @@ public:  using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>; -//! State of current request +//! State of current request   class IRequestContextBase: public TThrRefBase { -public: -    enum class EFinishStatus { -        OK, -        ERROR, -        CANCEL -    }; -    using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>; - -    using TOnNextReply = std::function<void (size_t left)>; - -    //! Get pointer to the request's message. -    virtual const NProtoBuf::Message* GetRequest() const = 0; - -    //! Get current auth state -    virtual TAuthState& GetAuthState() = 0; - -    //! Send common response (The request shoult be created for protobuf response type) -    //! Implementation can swap protobuf message +public:  +    enum class EFinishStatus {  +        OK,  +        ERROR,  +        CANCEL  +    };  +    using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;  +  +    using TOnNextReply = std::function<void (size_t left)>;  +  +    //! Get pointer to the request's message.  +    virtual const NProtoBuf::Message* GetRequest() const = 0;  +  +    //! Get current auth state  +    virtual TAuthState& GetAuthState() = 0;  +  +    //! Send common response (The request shoult be created for protobuf response type)  +    //! Implementation can swap protobuf message       virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; - +       //! Send serialised response (The request shoult be created for bytes response type) -    //! Implementation can swap ByteBuffer +    //! Implementation can swap ByteBuffer       virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0; - -    //! Send grpc UNAUTHENTICATED status -    virtual void ReplyUnauthenticated(const TString& in) = 0; - +  +    //! Send grpc UNAUTHENTICATED status  +    virtual void ReplyUnauthenticated(const TString& in) = 0;  +       //! Send grpc error      virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; -    //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise -    virtual TInstant Deadline() const = 0; +    //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise  +    virtual TInstant Deadline() const = 0;       //! Returns available peer metadata keys      virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0; -    //! Returns peer optional metavalue +    //! Returns peer optional metavalue       virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0; - +       //! Returns request compression level      virtual grpc_compression_level GetCompressionLevel() const = 0; -    //! Returns protobuf arena allocator associated with current request -    //! Lifetime of the arena is lifetime of the context -    virtual google::protobuf::Arena* GetArena() = 0; - -    //! Add trailing metadata in to grpc context -    //! The metadata will be send at the time of rpc finish -    virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0; - +    //! Returns protobuf arena allocator associated with current request  +    //! Lifetime of the arena is lifetime of the context  +    virtual google::protobuf::Arena* GetArena() = 0;  +  +    //! Add trailing metadata in to grpc context  +    //! The metadata will be send at the time of rpc finish  +    virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0;  +       //! Use validated database name for counters      virtual void UseDatabase(const TString& database) = 0; -    // Streaming part - -    //! Set callback. The callback will be called when response deliverid to the client -    //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one -    //! reply in flight -    virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0; - -    //! Finish streaming reply -    virtual void FinishStreamingOk() = 0; - -    //! Returns future to get cancel of finish notification -    virtual TAsyncFinishResult GetFinishFuture() = 0; +    // Streaming part  +  +    //! Set callback. The callback will be called when response deliverid to the client  +    //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one  +    //! reply in flight  +    virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0;  +  +    //! Finish streaming reply  +    virtual void FinishStreamingOk() = 0;  +  +    //! Returns future to get cancel of finish notification  +    virtual TAsyncFinishResult GetFinishFuture() = 0;       //! Returns peer address      virtual TString GetPeer() const = 0;      //! Returns true if server is using ssl      virtual bool SslServer() const = 0; -}; - +};  +   } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h index 8e9afe44d53..53a3195982d 100644 --- a/library/cpp/grpc/server/grpc_response.h +++ b/library/cpp/grpc/server/grpc_response.h @@ -11,7 +11,7 @@ namespace NGrpc {   * Universal response that owns underlying message or buffer.   */  template <typename TMsg> -class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly { +class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly {       friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>;  public: diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 7437b7a8f5e..3e68b26e1c2 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -1,12 +1,12 @@ -#include "grpc_server.h" - -#include <util/string/join.h> -#include <util/generic/yexception.h> -#include <util/system/thread.h> - -#include <grpc++/resource_quota.h> +#include "grpc_server.h"  +  +#include <util/string/join.h>  +#include <util/generic/yexception.h>  +#include <util/system/thread.h>  +  +#include <grpc++/resource_quota.h>   #include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> - +   #if !defined(_WIN32) && !defined(_WIN64)  #include <sys/socket.h> @@ -16,9 +16,9 @@  #endif  namespace NGrpc { - -using NThreading::TFuture; - +  +using NThreading::TFuture;  +   static void PullEvents(grpc::ServerCompletionQueue* cq) {      TThread::SetCurrentThreadName("grpc_server");      while (true) { @@ -37,33 +37,33 @@ static void PullEvents(grpc::ServerCompletionQueue* cq) {      }  } -TGRpcServer::TGRpcServer(const TServerOptions& opts) -    : Options_(opts) -    , Limiter_(Options_.MaxGlobalRequestInFlight) -    {} - -TGRpcServer::~TGRpcServer() { -    Y_VERIFY(Ts.empty()); -    Services_.clear(); -} - -void TGRpcServer::AddService(IGRpcServicePtr service) { -    Services_.push_back(service); -} - -void TGRpcServer::Start() { +TGRpcServer::TGRpcServer(const TServerOptions& opts)  +    : Options_(opts)  +    , Limiter_(Options_.MaxGlobalRequestInFlight)  +    {}  +  +TGRpcServer::~TGRpcServer() {  +    Y_VERIFY(Ts.empty());  +    Services_.clear();  +}  +  +void TGRpcServer::AddService(IGRpcServicePtr service) {  +    Services_.push_back(service);  +}  +  +void TGRpcServer::Start() {       TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695 -    using grpc::ServerBuilder; -    using grpc::ResourceQuota; -    ServerBuilder builder; +    using grpc::ServerBuilder;  +    using grpc::ResourceQuota;  +    ServerBuilder builder;       auto credentials = grpc::InsecureServerCredentials();      if (Options_.SslData) { -        grpc::SslServerCredentialsOptions::PemKeyCertPair keycert; -        keycert.cert_chain = std::move(Options_.SslData->Cert); -        keycert.private_key = std::move(Options_.SslData->Key); -        grpc::SslServerCredentialsOptions sslOps; -        sslOps.pem_root_certs = std::move(Options_.SslData->Root); -        sslOps.pem_key_cert_pairs.push_back(keycert); +        grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;  +        keycert.cert_chain = std::move(Options_.SslData->Cert);  +        keycert.private_key = std::move(Options_.SslData->Key);  +        grpc::SslServerCredentialsOptions sslOps;  +        sslOps.pem_root_certs = std::move(Options_.SslData->Root);  +        sslOps.pem_key_cert_pairs.push_back(keycert);           credentials = grpc::SslServerCredentials(sslOps);      }      if (Options_.ExternalListener) { @@ -72,58 +72,58 @@ void TGRpcServer::Start() {              credentials          ));      } else { -        builder.AddListeningPort(server_address, credentials); -    } -    builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); -    builder.SetMaxSendMessageSize(Options_.MaxMessageSize); -    for (IGRpcServicePtr service : Services_) { +        builder.AddListeningPort(server_address, credentials);  +    }  +    builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);  +    builder.SetMaxSendMessageSize(Options_.MaxMessageSize);  +    for (IGRpcServicePtr service : Services_) {           service->SetServerOptions(Options_); -        builder.RegisterService(service->GetService()); -        service->SetGlobalLimiterHandle(&Limiter_); -    } - +        builder.RegisterService(service->GetService());  +        service->SetGlobalLimiterHandle(&Limiter_);  +    }  +       class TKeepAliveOption: public grpc::ServerBuilderOption { -    public: -        TKeepAliveOption(int idle, int interval) -            : Idle(idle) -            , Interval(interval) -            , KeepAliveEnabled(true) -        {} - -        TKeepAliveOption() -            : Idle(0) -            , Interval(0) -            , KeepAliveEnabled(false) -       {} - -       void UpdateArguments(grpc::ChannelArguments *args) override { -            args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0); -            args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000); -            if (KeepAliveEnabled) { -                args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); -                args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); -                args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000); -                args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000); -                args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000); -            } -        } - -        void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override -        {} -    private: -        const int Idle; -        const int Interval; -        const bool KeepAliveEnabled; -    }; - -    if (Options_.KeepAliveEnable) { -        builder.SetOption(std::make_unique<TKeepAliveOption>( -            Options_.KeepAliveIdleTimeoutTriggerSec, -            Options_.KeepAliveProbeIntervalSec)); -    } else { -        builder.SetOption(std::make_unique<TKeepAliveOption>()); -    } - +    public:  +        TKeepAliveOption(int idle, int interval)  +            : Idle(idle)  +            , Interval(interval)  +            , KeepAliveEnabled(true)  +        {}  +  +        TKeepAliveOption()  +            : Idle(0)  +            , Interval(0)  +            , KeepAliveEnabled(false)  +       {}  +  +       void UpdateArguments(grpc::ChannelArguments *args) override {  +            args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);  +            args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);  +            if (KeepAliveEnabled) {  +                args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);  +                args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);  +                args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);  +                args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);  +                args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);  +            }  +        }  +  +        void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override  +        {}  +    private:  +        const int Idle;  +        const int Interval;  +        const bool KeepAliveEnabled;  +    };  +  +    if (Options_.KeepAliveEnable) {  +        builder.SetOption(std::make_unique<TKeepAliveOption>(  +            Options_.KeepAliveIdleTimeoutTriggerSec,  +            Options_.KeepAliveProbeIntervalSec));  +    } else {  +        builder.SetOption(std::make_unique<TKeepAliveOption>());  +    }  +       if (Options_.UseCompletionQueuePerThread) {          for (size_t i = 0; i < Options_.WorkerThreads; ++i) {              CQS_.push_back(builder.AddCompletionQueue()); @@ -132,30 +132,30 @@ void TGRpcServer::Start() {          CQS_.push_back(builder.AddCompletionQueue());      } -    if (Options_.GRpcMemoryQuotaBytes) { +    if (Options_.GRpcMemoryQuotaBytes) {           // See details KIKIMR-6932 -        /* -        grpc::ResourceQuota quota("memory_bound"); -        quota.Resize(Options_.GRpcMemoryQuotaBytes); - -        builder.SetResourceQuota(quota); -        */ +        /*  +        grpc::ResourceQuota quota("memory_bound");  +        quota.Resize(Options_.GRpcMemoryQuotaBytes);  +  +        builder.SetResourceQuota(quota);  +        */           Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl; -    } +    }       Options_.ServerBuilderMutator(builder);      builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel); - -    Server_ = builder.BuildAndStart(); -    if (!Server_) { +  +    Server_ = builder.BuildAndStart();  +    if (!Server_) {           ythrow yexception() << "can't start grpc server on " << server_address; -    } +    }       size_t index = 0; -    for (IGRpcServicePtr service : Services_) { +    for (IGRpcServicePtr service : Services_) {           // TODO: provide something else for services instead of ServerCompletionQueue          service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); -    } - +    }  +       if (Options_.UseCompletionQueuePerThread) {          for (size_t i = 0; i < Options_.WorkerThreads; ++i) {              auto* cq = &CQS_[i]; @@ -170,71 +170,71 @@ void TGRpcServer::Start() {                  PullEvents(cq->get());              }));          } -    } +    }       if (Options_.ExternalListener) {          Options_.ExternalListener->Start();      } -} - -void TGRpcServer::Stop() { -    for (auto& service : Services_) { -        service->StopService(); -    } - -    auto now = TInstant::Now(); - -    if (Server_) { -        i64 sec = Options_.GRpcShutdownDeadline.Seconds(); -        Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>()); -        i32 nanosecOfSec =  Options_.GRpcShutdownDeadline.NanoSecondsOfSecond(); -        Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN}); -    } - +}  +  +void TGRpcServer::Stop() {  +    for (auto& service : Services_) {  +        service->StopService();  +    }  +  +    auto now = TInstant::Now();  +  +    if (Server_) {  +        i64 sec = Options_.GRpcShutdownDeadline.Seconds();  +        Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());  +        i32 nanosecOfSec =  Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();  +        Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});  +    }  +       for (ui64 attempt = 0; ; ++attempt) {          bool unsafe = false; -        size_t infly = 0; -        for (auto& service : Services_) { +        size_t infly = 0;  +        for (auto& service : Services_) {               unsafe |= service->IsUnsafeToShutdown();              infly += service->RequestsInProgress(); -        } - +        }  +           if (!unsafe && !infly) -            break; +            break;  -        auto spent = (TInstant::Now() - now).SecondsFloat(); +        auto spent = (TInstant::Now() - now).SecondsFloat();           if (attempt % 300 == 0) {              // don't log too much              Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" <<  Endl;          }          if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat()) -            break; -        Sleep(TDuration::MilliSeconds(10)); -    } - -    // Always shutdown the completion queue after the server. +            break;  +        Sleep(TDuration::MilliSeconds(10));  +    }  +  +    // Always shutdown the completion queue after the server.       for (auto& cq : CQS_) {          cq->Shutdown(); -    } - -    for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { -        (*ti)->Join(); -    } - -    Ts.clear(); +    }  +  +    for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {  +        (*ti)->Join();  +    }  +  +    Ts.clear();       if (Options_.ExternalListener) {          Options_.ExternalListener->Stop();      } -} - -ui16 TGRpcServer::GetPort() const { -    return Options_.Port; -} - -TString TGRpcServer::GetHost() const { -    return Options_.Host; -} - +}  +  +ui16 TGRpcServer::GetPort() const {  +    return Options_.Port;  +}  +  +TString TGRpcServer::GetHost() const {  +    return Options_.Host;  +}  +   } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index d6814a90a0d..0a4123d84e1 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -1,32 +1,32 @@ -#pragma once - +#pragma once  +   #include "grpc_request_base.h"  #include "logger.h"  #include <library/cpp/threading/future/future.h> - -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/generic/vector.h> +  +#include <util/generic/ptr.h>  +#include <util/generic/string.h>  +#include <util/generic/vector.h>   #include <util/generic/maybe.h> -#include <util/generic/queue.h> -#include <util/generic/hash_set.h> -#include <util/system/types.h> -#include <util/system/mutex.h> +#include <util/generic/queue.h>  +#include <util/generic/hash_set.h>  +#include <util/system/types.h>  +#include <util/system/mutex.h>   #include <util/thread/factory.h> - -#include <grpc++/grpc++.h> - +  +#include <grpc++/grpc++.h>  +   namespace NGrpc { - -constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; - -struct TSslData { -    TString Cert; -    TString Key; -    TString Root; -}; - +  +constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;  +  +struct TSslData {  +    TString Cert;  +    TString Key;  +    TString Root;  +};  +   struct IExternalListener      : public TThrRefBase  { @@ -36,55 +36,55 @@ struct IExternalListener      virtual void Stop() = 0;  }; -//! Server's options. -struct TServerOptions { -#define DECLARE_FIELD(name, type, default) \ -    type name{default}; \ -    inline TServerOptions& Set##name(const type& value) { \ -        name = value; \ -        return *this; \ -    } - -    //! Hostname of server to bind to. -    DECLARE_FIELD(Host, TString, "[::]"); -    //! Service port. -    DECLARE_FIELD(Port, ui16, 0); - -    //! Number of worker threads. -    DECLARE_FIELD(WorkerThreads, size_t, 2); - +//! Server's options.  +struct TServerOptions {  +#define DECLARE_FIELD(name, type, default) \  +    type name{default}; \  +    inline TServerOptions& Set##name(const type& value) { \  +        name = value; \  +        return *this; \  +    }  +  +    //! Hostname of server to bind to.  +    DECLARE_FIELD(Host, TString, "[::]");  +    //! Service port.  +    DECLARE_FIELD(Port, ui16, 0);  +  +    //! Number of worker threads.  +    DECLARE_FIELD(WorkerThreads, size_t, 2);  +       //! Create one completion queue per thread      DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); -    //! Memory quota size for grpc server in bytes. Zero means unlimited. -    DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); - -    //! How long to wait until pending rpcs are forcefully terminated. -    DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30)); - -    //! In/Out message size limit -    DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT); - -    //! Use GRpc keepalive -    DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>()); - -    //! GRPC_ARG_KEEPALIVE_TIME_MS setting +    //! Memory quota size for grpc server in bytes. Zero means unlimited.  +    DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);  +  +    //! How long to wait until pending rpcs are forcefully terminated.  +    DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));  +  +    //! In/Out message size limit  +    DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);  +  +    //! Use GRpc keepalive  +    DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());  + +    //! GRPC_ARG_KEEPALIVE_TIME_MS setting       DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0); -    //! Deprecated, ths option ignored. Will be removed soon. +    //! Deprecated, ths option ignored. Will be removed soon.       DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0); -    //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting +    //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting       DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0); -    //! Max number of requests processing by services (global limit for grpc server) -    DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000); - -    //! SSL server data -    DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>()); - -    //! GRPC auth -    DECLARE_FIELD(UseAuth, bool, false); +    //! Max number of requests processing by services (global limit for grpc server)  +    DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);  +  +    //! SSL server data  +    DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());  +  +    //! GRPC auth  +    DECLARE_FIELD(UseAuth, bool, false);       //! Default compression level. Used when no compression options provided by client.      //  Mapping to particular compression algorithm depends on client. @@ -98,75 +98,75 @@ struct TServerOptions {      //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).      DECLARE_FIELD(Logger, TLoggerPtr, nullptr); -#undef DECLARE_FIELD -}; - -class IQueueEvent { -public: -    virtual ~IQueueEvent() = default; - +#undef DECLARE_FIELD  +};  +  +class IQueueEvent {  +public:  +    virtual ~IQueueEvent() = default;  +       //! Execute an action defined by implementation. -    virtual bool Execute(bool ok) = 0; - -    //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also -    //  used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does -    //  not implement in flight management. -    virtual void Process() {} - -    //! Finish and destroy request. -    virtual void DestroyRequest() = 0; -}; - -class ICancelableContext { -public: -    virtual void Shutdown() = 0; -    virtual ~ICancelableContext() = default; -}; - +    virtual bool Execute(bool ok) = 0;  +  +    //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also  +    //  used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does  +    //  not implement in flight management.  +    virtual void Process() {}  +  +    //! Finish and destroy request.  +    virtual void DestroyRequest() = 0;  +};  +  +class ICancelableContext {  +public:  +    virtual void Shutdown() = 0;  +    virtual ~ICancelableContext() = default;  +};  +   template <class TLimit>  class TInFlightLimiterImpl { -public: +public:       explicit TInFlightLimiterImpl(const TLimit& limit) -        : Limit_(limit) -    {} - -    bool Inc() { -        i64 newVal; -        i64 prev; -        do { -            prev = AtomicGet(CurInFlightReqs_); -            Y_VERIFY(prev >= 0); -            if (Limit_ && prev > Limit_) { -                return false; -            } -            newVal = prev + 1; -        } while (!AtomicCas(&CurInFlightReqs_, newVal, prev)); -        return true; -    } - -    void Dec() { -        i64 newVal = AtomicDecrement(CurInFlightReqs_); -        Y_VERIFY(newVal >= 0); -    } - -    i64 GetCurrentInFlight() const { -        return AtomicGet(CurInFlightReqs_); -    } - -private: +        : Limit_(limit)  +    {}  +  +    bool Inc() {  +        i64 newVal;  +        i64 prev;  +        do {  +            prev = AtomicGet(CurInFlightReqs_);  +            Y_VERIFY(prev >= 0);  +            if (Limit_ && prev > Limit_) {  +                return false;  +            }  +            newVal = prev + 1;  +        } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));  +        return true;  +    }  +  +    void Dec() {  +        i64 newVal = AtomicDecrement(CurInFlightReqs_);  +        Y_VERIFY(newVal >= 0);  +    }  +  +    i64 GetCurrentInFlight() const {  +        return AtomicGet(CurInFlightReqs_);  +    }  +  +private:       const TLimit Limit_; -    TAtomic CurInFlightReqs_ = 0; -}; - +    TAtomic CurInFlightReqs_ = 0;  +};  +   using TGlobalLimiter = TInFlightLimiterImpl<i64>;  class IGRpcService: public TThrRefBase { -public: -    virtual grpc::Service* GetService() = 0; -    virtual void StopService() noexcept = 0; +public:  +    virtual grpc::Service* GetService() = 0;  +    virtual void StopService() noexcept = 0;       virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; -    virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; +    virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;       virtual bool IsUnsafeToShutdown() const = 0;      virtual size_t RequestsInProgress() const = 0; @@ -175,11 +175,11 @@ public:       * service to inspect server options and initialize accordingly.       */      virtual void SetServerOptions(const TServerOptions& options) = 0; -}; - -template<typename T> +};  +  +template<typename T>   class TGrpcServiceBase: public IGRpcService { -public: +public:       class TShutdownGuard {          using TOwner = TGrpcServiceBase<T>;          friend class TGrpcServiceBase<T>; @@ -232,20 +232,20 @@ public:      };  public: -    using TCurrentGRpcService = T; - -    void StopService() noexcept override { -        with_lock(Lock_) { +    using TCurrentGRpcService = T;  +  +    void StopService() noexcept override {  +        with_lock(Lock_) {               AtomicSet(ShuttingDown_, 1); -            // Send TryCansel to event (can be send after finishing). -            // Actual dtors will be called from grpc thread, so deadlock impossible -            for (auto* request : Requests_) { -                request->Shutdown(); -            } -        } -    } - +            // Send TryCansel to event (can be send after finishing).  +            // Actual dtors will be called from grpc thread, so deadlock impossible  +            for (auto* request : Requests_) {  +                request->Shutdown();  +            }  +        }  +    }  +       TShutdownGuard ProtectShutdown() noexcept {          AtomicIncrement(GuardCount_);          if (IsShuttingDown()) { @@ -261,35 +261,35 @@ public:      }      size_t RequestsInProgress() const override { -        size_t c = 0; -        with_lock(Lock_) { -            c = Requests_.size(); -        } -        return c; -    } - +        size_t c = 0;  +        with_lock(Lock_) {  +            c = Requests_.size();  +        }  +        return c;  +    }  +       void SetServerOptions(const TServerOptions& options) override {          SslServer_ = bool(options.SslData);          NeedAuth_ = options.UseAuth; -    } - -    void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} - -    //! Check if the server is going to shut down. -    bool IsShuttingDown() const { -       return AtomicGet(ShuttingDown_); -    } - +    }  +  +    void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}  +  +    //! Check if the server is going to shut down.  +    bool IsShuttingDown() const {  +       return AtomicGet(ShuttingDown_);  +    }  +       bool SslServer() const {          return SslServer_;      } -    bool NeedAuth() const { -        return NeedAuth_; -    } - +    bool NeedAuth() const {  +        return NeedAuth_;  +    }  +       bool RegisterRequestCtx(ICancelableContext* req) { -        with_lock(Lock_) { +        with_lock(Lock_) {               auto r = Requests_.emplace(req);              Y_VERIFY(r.second, "Ctx already registered"); @@ -298,59 +298,59 @@ public:                  Requests_.erase(r.first);                  return false;              } -        } +        }           return true; -    } - -    void DeregisterRequestCtx(ICancelableContext* req) { -        with_lock(Lock_) { +    }  +  +    void DeregisterRequestCtx(ICancelableContext* req) {  +        with_lock(Lock_) {               Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); -        } -    } - -protected: -    using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService; -    TGrpcAsyncService Service_; - +        }  +    }  +  +protected:  +    using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;  +    TGrpcAsyncService Service_;  +       TGrpcAsyncService* GetService() override { -        return &Service_; -    } - -private: -    TAtomic ShuttingDown_ = 0; +        return &Service_;  +    }  +  +private:  +    TAtomic ShuttingDown_ = 0;       TAtomic GuardCount_ = 0; - +       bool SslServer_ = false; -    bool NeedAuth_ = false; - -    THashSet<ICancelableContext*> Requests_; -    TAdaptiveLock Lock_; -}; - +    bool NeedAuth_ = false;  +  +    THashSet<ICancelableContext*> Requests_;  +    TAdaptiveLock Lock_;  +};  +   class TGRpcServer { -public: -    using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; -    TGRpcServer(const TServerOptions& opts); -    ~TGRpcServer(); -    void AddService(IGRpcServicePtr service); -    void Start(); -    // Send stop to registred services and call Shutdown on grpc server -    // This method MUST be called before destroying TGRpcServer -    void Stop(); -    ui16 GetPort() const; -    TString GetHost() const; - -private: +public:  +    using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;  +    TGRpcServer(const TServerOptions& opts);  +    ~TGRpcServer();  +    void AddService(IGRpcServicePtr service);  +    void Start();  +    // Send stop to registred services and call Shutdown on grpc server  +    // This method MUST be called before destroying TGRpcServer  +    void Stop();  +    ui16 GetPort() const;  +    TString GetHost() const;  +  +private:       using IThreadRef = TAutoPtr<IThreadFactory::IThread>; - -    const TServerOptions Options_; -    std::unique_ptr<grpc::Server> Server_; +  +    const TServerOptions Options_;  +    std::unique_ptr<grpc::Server> Server_;       std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;      TVector<IThreadRef> Ts; - +       TVector<IGRpcServicePtr> Services_; -    TGlobalLimiter Limiter_; -}; - +    TGlobalLimiter Limiter_;  +};  +   } // namespace NGrpc diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp index c34d3b8c2bf..0840c77176d 100644 --- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp +++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp @@ -1,121 +1,121 @@  #include <library/cpp/grpc/server/grpc_request.h>  #include <library/cpp/testing/unittest/registar.h>  #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/system/thread.h> -#include <util/thread/pool.h> - +  +#include <util/system/thread.h>  +#include <util/thread/pool.h>  +   using namespace NGrpc; - -// Here we emulate stream data producer +  +// Here we emulate stream data producer   class TOrderedProducer: public TThread { -public: -    TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp) -        : TThread(&ThreadProc, this) -        , Adaptor_(adaptor) -        , Max_(max) -        , WithSleep_(withSleep) -        , ConsumerOp_(std::move(consumerOp)) -    {} - -    static void* ThreadProc(void* _this) { -        SetCurrentThreadName("OrderedProducerThread"); -        static_cast<TOrderedProducer*>(_this)->Exec(); -        return nullptr; -    } - -    void Exec() { -        for (ui64 i = 0; i < Max_; i++) { -            auto cb = [i, this]() mutable { -                ConsumerOp_(i); -            }; -            Adaptor_->Enqueue(std::move(cb), false); -            if (WithSleep_ && (i % 256 == 0)) { -                Sleep(TDuration::MilliSeconds(10)); -            } -        } -    } - -private: -    IStreamAdaptor* Adaptor_; -    const ui64 Max_; -    const bool WithSleep_; -    std::function<void(ui64)> ConsumerOp_; -}; - -Y_UNIT_TEST_SUITE(StreamAdaptor) { -    static void OrderingTest(size_t threads, bool withSleep) { - -        auto adaptor = CreateStreamAdaptor(); - -        const i64 max = 10000; - -        // Here we will emulate grpc stream (NextReply call after writing) +public:  +    TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)  +        : TThread(&ThreadProc, this)  +        , Adaptor_(adaptor)  +        , Max_(max)  +        , WithSleep_(withSleep)  +        , ConsumerOp_(std::move(consumerOp))  +    {}  +  +    static void* ThreadProc(void* _this) {  +        SetCurrentThreadName("OrderedProducerThread");  +        static_cast<TOrderedProducer*>(_this)->Exec();  +        return nullptr;  +    }  +  +    void Exec() {  +        for (ui64 i = 0; i < Max_; i++) {  +            auto cb = [i, this]() mutable {  +                ConsumerOp_(i);  +            };  +            Adaptor_->Enqueue(std::move(cb), false);  +            if (WithSleep_ && (i % 256 == 0)) {  +                Sleep(TDuration::MilliSeconds(10));  +            }  +        }  +    }  +  +private:  +    IStreamAdaptor* Adaptor_;  +    const ui64 Max_;  +    const bool WithSleep_;  +    std::function<void(ui64)> ConsumerOp_;  +};  +  +Y_UNIT_TEST_SUITE(StreamAdaptor) {  +    static void OrderingTest(size_t threads, bool withSleep) {  +  +        auto adaptor = CreateStreamAdaptor();  +  +        const i64 max = 10000;  +  +        // Here we will emulate grpc stream (NextReply call after writing)           std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false))); -        // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue) -        consumerQueue->Start(threads, 1); - -        // Non atomic!!! Stream adaptor must protect us -        ui64 curVal = 0; - -        // Used just to wait in the main thread -        TAtomic finished = false; -        auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) { -            // Check no reordering inside stream adaptor -            // and no simultanious consumer Op call -            UNIT_ASSERT_VALUES_EQUAL(curVal, i); -            curVal++; -            // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext -            // so compare here and set after -            bool tmp = curVal == max; -            bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() { -                // Additional check the value still same -                // run under tsan makes sure no consumer Op call before we call ProcessNext -                UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1); -                ptr->ProcessNext(); -                // Reordering after ProcessNext is possible, so check tmp and set finished to true -                if (tmp) -                    AtomicSet(finished, true); -            }); -            UNIT_ASSERT(res); -        }; - -        TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp)); - -        producer.Start(); -        producer.Join(); - -        while (!AtomicGet(finished)) -        { -            Sleep(TDuration::MilliSeconds(100)); -        } - -        consumerQueue->Stop(); - -        UNIT_ASSERT_VALUES_EQUAL(curVal, max); -    } - -    Y_UNIT_TEST(OrderingOneThread) { -        OrderingTest(1, false); -    } - -    Y_UNIT_TEST(OrderingTwoThreads) { -        OrderingTest(2, false); -    } - -    Y_UNIT_TEST(OrderingManyThreads) { -        OrderingTest(10, false); -    } - -    Y_UNIT_TEST(OrderingOneThreadWithSleep) { -        OrderingTest(1, true); -    } - -    Y_UNIT_TEST(OrderingTwoThreadsWithSleep) { -        OrderingTest(2, true); -    } - -    Y_UNIT_TEST(OrderingManyThreadsWithSleep) { -        OrderingTest(10, true); -    } -} +        // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)  +        consumerQueue->Start(threads, 1);  +  +        // Non atomic!!! Stream adaptor must protect us  +        ui64 curVal = 0;  +  +        // Used just to wait in the main thread  +        TAtomic finished = false;  +        auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {  +            // Check no reordering inside stream adaptor  +            // and no simultanious consumer Op call  +            UNIT_ASSERT_VALUES_EQUAL(curVal, i);  +            curVal++;  +            // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext  +            // so compare here and set after  +            bool tmp = curVal == max;  +            bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {  +                // Additional check the value still same  +                // run under tsan makes sure no consumer Op call before we call ProcessNext  +                UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);  +                ptr->ProcessNext();  +                // Reordering after ProcessNext is possible, so check tmp and set finished to true  +                if (tmp)  +                    AtomicSet(finished, true);  +            });  +            UNIT_ASSERT(res);  +        };  +  +        TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));  +  +        producer.Start();  +        producer.Join();  +  +        while (!AtomicGet(finished))  +        {  +            Sleep(TDuration::MilliSeconds(100));  +        }  +  +        consumerQueue->Stop();  +  +        UNIT_ASSERT_VALUES_EQUAL(curVal, max);  +    }  +  +    Y_UNIT_TEST(OrderingOneThread) {  +        OrderingTest(1, false);  +    }  +  +    Y_UNIT_TEST(OrderingTwoThreads) {  +        OrderingTest(2, false);  +    }  +  +    Y_UNIT_TEST(OrderingManyThreads) {  +        OrderingTest(10, false);  +    }  +  +    Y_UNIT_TEST(OrderingOneThreadWithSleep) {  +        OrderingTest(1, true);  +    }  +  +    Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {  +        OrderingTest(2, true);  +    }  +  +    Y_UNIT_TEST(OrderingManyThreadsWithSleep) {  +        OrderingTest(10, true);  +    }  +}  diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make index feb3291af92..2f7fa1afc2d 100644 --- a/library/cpp/grpc/server/ut/ya.make +++ b/library/cpp/grpc/server/ut/ya.make @@ -1,21 +1,21 @@  UNITTEST_FOR(library/cpp/grpc/server) - -OWNER( -    dcherednik -    g:kikimr -) - +  +OWNER(  +    dcherednik  +    g:kikimr  +)  +   TIMEOUT(600)  SIZE(MEDIUM) - -PEERDIR( +  +PEERDIR(       library/cpp/grpc/server -) - -SRCS( +)  +  +SRCS(       grpc_response_ut.cpp -    stream_adaptor_ut.cpp -) - -END() - +    stream_adaptor_ut.cpp  +)  +  +END()  +  diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make index 356a1b6793d..75499b926a0 100644 --- a/library/cpp/grpc/server/ya.make +++ b/library/cpp/grpc/server/ya.make @@ -1,25 +1,25 @@ -LIBRARY() - -OWNER( -    dcherednik -    g:kikimr -) - -SRCS( -    event_callback.cpp -    grpc_request.cpp -    grpc_server.cpp -    grpc_counters.cpp -) - -GENERATE_ENUM_SERIALIZATION(grpc_request_base.h) - -PEERDIR( -    contrib/libs/grpc +LIBRARY()  +  +OWNER(  +    dcherednik  +    g:kikimr  +)  +  +SRCS(  +    event_callback.cpp  +    grpc_request.cpp  +    grpc_server.cpp  +    grpc_counters.cpp  +)  +  +GENERATE_ENUM_SERIALIZATION(grpc_request_base.h)  +  +PEERDIR(  +    contrib/libs/grpc       library/cpp/monlib/dynamic_counters/percentile -) - -END() +)  +  +END()   RECURSE_FOR_TESTS(ut) | 
