diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
commit | 4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server/grpc_request.cpp | |
parent | 17e20fa084178ddcb16255f974dbde74fb93608b (diff) | |
download | ydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz |
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_request.cpp')
-rw-r--r-- | library/cpp/grpc/server/grpc_request.cpp | 112 |
1 files changed, 56 insertions, 56 deletions
diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp index 33264fe6f2..d18a32776f 100644 --- a/library/cpp/grpc/server/grpc_request.cpp +++ b/library/cpp/grpc/server/grpc_request.cpp @@ -1,59 +1,59 @@ -#include "grpc_request.h" - +#include "grpc_request.h" + namespace NGrpc { - -const char* GRPC_USER_AGENT_HEADER = "user-agent"; - + +const char* GRPC_USER_AGENT_HEADER = "user-agent"; + class TStreamAdaptor: public IStreamAdaptor { -public: - TStreamAdaptor() - : StreamIsReady_(true) - {} - - void Enqueue(std::function<void()>&& fn, bool urgent) override { - with_lock(Mtx_) { - if (!UrgentQueue_.empty() || !NormalQueue_.empty()) { - Y_VERIFY(!StreamIsReady_); - } - auto& queue = urgent ? UrgentQueue_ : NormalQueue_; - if (StreamIsReady_ && queue.empty()) { - StreamIsReady_ = false; - } else { - queue.push_back(std::move(fn)); - return; - } - } - fn(); - } - - size_t ProcessNext() override { - size_t left = 0; - std::function<void()> fn; - with_lock(Mtx_) { - Y_VERIFY(!StreamIsReady_); - auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_; - if (queue.empty()) { - // Both queues are empty - StreamIsReady_ = true; - } else { - fn = std::move(queue.front()); - queue.pop_front(); - left = UrgentQueue_.size() + NormalQueue_.size(); - } - } - if (fn) - fn(); - return left; - } -private: - bool StreamIsReady_; - TList<std::function<void()>> NormalQueue_; - TList<std::function<void()>> UrgentQueue_; - TMutex Mtx_; -}; - -IStreamAdaptor::TPtr CreateStreamAdaptor() { - return std::make_unique<TStreamAdaptor>(); -} - +public: + TStreamAdaptor() + : StreamIsReady_(true) + {} + + void Enqueue(std::function<void()>&& fn, bool urgent) override { + with_lock(Mtx_) { + if (!UrgentQueue_.empty() || !NormalQueue_.empty()) { + Y_VERIFY(!StreamIsReady_); + } + auto& queue = urgent ? UrgentQueue_ : NormalQueue_; + if (StreamIsReady_ && queue.empty()) { + StreamIsReady_ = false; + } else { + queue.push_back(std::move(fn)); + return; + } + } + fn(); + } + + size_t ProcessNext() override { + size_t left = 0; + std::function<void()> fn; + with_lock(Mtx_) { + Y_VERIFY(!StreamIsReady_); + auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_; + if (queue.empty()) { + // Both queues are empty + StreamIsReady_ = true; + } else { + fn = std::move(queue.front()); + queue.pop_front(); + left = UrgentQueue_.size() + NormalQueue_.size(); + } + } + if (fn) + fn(); + return left; + } +private: + bool StreamIsReady_; + TList<std::function<void()>> NormalQueue_; + TList<std::function<void()>> UrgentQueue_; + TMutex Mtx_; +}; + +IStreamAdaptor::TPtr CreateStreamAdaptor() { + return std::make_unique<TStreamAdaptor>(); +} + } // namespace NGrpc |