aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server
diff options
context:
space:
mode:
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:03 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:03 +0300
commit2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch)
treeb83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/grpc/server
parent3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff)
downloadydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r--library/cpp/grpc/server/actors/logger.cpp90
-rw-r--r--library/cpp/grpc/server/actors/logger.h22
-rw-r--r--library/cpp/grpc/server/actors/ya.make26
-rw-r--r--library/cpp/grpc/server/event_callback.h16
-rw-r--r--library/cpp/grpc/server/grpc_async_ctx_base.h60
-rw-r--r--library/cpp/grpc/server/grpc_counters.cpp88
-rw-r--r--library/cpp/grpc/server/grpc_counters.h24
-rw-r--r--library/cpp/grpc/server/grpc_request.cpp6
-rw-r--r--library/cpp/grpc/server/grpc_request.h92
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h18
-rw-r--r--library/cpp/grpc/server/grpc_response.h178
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp8
-rw-r--r--library/cpp/grpc/server/grpc_server.h18
-rw-r--r--library/cpp/grpc/server/logger.h86
-rw-r--r--library/cpp/grpc/server/ut/grpc_response_ut.cpp176
-rw-r--r--library/cpp/grpc/server/ut/stream_adaptor_ut.cpp6
-rw-r--r--library/cpp/grpc/server/ut/ya.make10
-rw-r--r--library/cpp/grpc/server/ya.make8
18 files changed, 466 insertions, 466 deletions
diff --git a/library/cpp/grpc/server/actors/logger.cpp b/library/cpp/grpc/server/actors/logger.cpp
index 176675366a..d8b2042576 100644
--- a/library/cpp/grpc/server/actors/logger.cpp
+++ b/library/cpp/grpc/server/actors/logger.cpp
@@ -1,45 +1,45 @@
-#include "logger.h"
-
-namespace NGrpc {
-namespace {
-
-static_assert(
- ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) &&
- ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG),
- "log levels in the library/log and library/cpp/actors don't match");
-
-class TActorSystemLogger final: public TLogger {
-public:
- TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept
- : ActorSystem_{as}
- , Component_{component}
- {
- }
-
-protected:
- bool DoIsEnabled(ELogPriority p) const noexcept override {
- const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings());
- const auto priority = static_cast<::NActors::NLog::EPriority>(p);
-
- return settings && settings->Satisfies(priority, Component_, 0);
- }
-
- void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override {
- Y_VERIFY_DEBUG(DoIsEnabled(p));
-
- const auto priority = static_cast<::NActors::NLog::EPriority>(p);
- ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args);
- }
-
-private:
- NActors::TActorSystem& ActorSystem_;
- NActors::NLog::EComponent Component_;
-};
-
-} // namespace
-
-TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) {
- return MakeIntrusive<TActorSystemLogger>(as, component);
-}
-
-} // namespace NGrpc
+#include "logger.h"
+
+namespace NGrpc {
+namespace {
+
+static_assert(
+ ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) &&
+ ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG),
+ "log levels in the library/log and library/cpp/actors don't match");
+
+class TActorSystemLogger final: public TLogger {
+public:
+ TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept
+ : ActorSystem_{as}
+ , Component_{component}
+ {
+ }
+
+protected:
+ bool DoIsEnabled(ELogPriority p) const noexcept override {
+ const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings());
+ const auto priority = static_cast<::NActors::NLog::EPriority>(p);
+
+ return settings && settings->Satisfies(priority, Component_, 0);
+ }
+
+ void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override {
+ Y_VERIFY_DEBUG(DoIsEnabled(p));
+
+ const auto priority = static_cast<::NActors::NLog::EPriority>(p);
+ ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args);
+ }
+
+private:
+ NActors::TActorSystem& ActorSystem_;
+ NActors::NLog::EComponent Component_;
+};
+
+} // namespace
+
+TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) {
+ return MakeIntrusive<TActorSystemLogger>(as, component);
+}
+
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/actors/logger.h b/library/cpp/grpc/server/actors/logger.h
index c066a40add..abf9270f7b 100644
--- a/library/cpp/grpc/server/actors/logger.h
+++ b/library/cpp/grpc/server/actors/logger.h
@@ -1,11 +1,11 @@
-#pragma once
-
-#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/core/log.h>
-#include <library/cpp/grpc/server/logger.h>
-
-namespace NGrpc {
-
-TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component);
-
-} // namespace NGrpc
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/grpc/server/logger.h>
+
+namespace NGrpc {
+
+TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component);
+
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/actors/ya.make b/library/cpp/grpc/server/actors/ya.make
index 072db84142..6c9d80aa45 100644
--- a/library/cpp/grpc/server/actors/ya.make
+++ b/library/cpp/grpc/server/actors/ya.make
@@ -1,13 +1,13 @@
-LIBRARY()
-
-OWNER(g:kikimr g:solomon)
-
-SRCS(
- logger.cpp
-)
-
-PEERDIR(
- library/cpp/actors/core
-)
-
-END()
+LIBRARY()
+
+OWNER(g:kikimr g:solomon)
+
+SRCS(
+ logger.cpp
+)
+
+PEERDIR(
+ library/cpp/actors/core
+)
+
+END()
diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h
index 13d9bb46b2..d0b700b3c9 100644
--- a/library/cpp/grpc/server/event_callback.h
+++ b/library/cpp/grpc/server/event_callback.h
@@ -2,7 +2,7 @@
#include "grpc_server.h"
-namespace NGrpc {
+namespace NGrpc {
enum class EQueueEventStatus {
OK,
@@ -10,7 +10,7 @@ enum class EQueueEventStatus {
};
template<class TCallback>
-class TQueueEventCallback: public IQueueEvent {
+class TQueueEventCallback: public IQueueEvent {
public:
TQueueEventCallback(const TCallback& callback)
: Callback(callback)
@@ -33,9 +33,9 @@ private:
TCallback Callback;
};
-// Implementation of IQueueEvent that reduces allocations
+// Implementation of IQueueEvent that reduces allocations
template<class TSelf>
-class TQueueFixedEvent: private IQueueEvent {
+class TQueueFixedEvent: private IQueueEvent {
using TCallback = void (TSelf::*)(EQueueEventStatus);
public:
@@ -44,7 +44,7 @@ public:
, Callback(callback)
{ }
- IQueueEvent* Prepare() {
+ IQueueEvent* Prepare() {
Self->Ref();
return this;
}
@@ -65,16 +65,16 @@ private:
};
template<class TCallback>
-inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) {
+inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) {
return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback));
}
template<class T>
-inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) {
+inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) {
using TPtr = TIntrusivePtr<T>;
return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) {
((*self).*method)(status);
});
}
-} // namespace NGrpc
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h
index 65341fa1ad..51356d4ce5 100644
--- a/library/cpp/grpc/server/grpc_async_ctx_base.h
+++ b/library/cpp/grpc/server/grpc_async_ctx_base.h
@@ -5,17 +5,17 @@
#include <util/generic/vector.h>
#include <util/generic/string.h>
#include <util/system/yassert.h>
-#include <util/generic/set.h>
+#include <util/generic/set.h>
#include <grpc++/server.h>
#include <grpc++/server_context.h>
#include <chrono>
-namespace NGrpc {
+namespace NGrpc {
template<typename TService>
-class TBaseAsyncContext: public ICancelableContext {
+class TBaseAsyncContext: public ICancelableContext {
public:
TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)
: Service(service)
@@ -29,44 +29,44 @@ public:
TInstant Deadline() const {
// The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline
- // right before the request is getting to be send.
- // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
- //
+ // right before the request is getting to be send.
+ // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
+ //
// After this timeout calculated back to the deadline on the server side
// using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).
// deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME
//
-
+
std::chrono::system_clock::time_point t = Context.deadline();
if (t == std::chrono::system_clock::time_point::max()) {
return TInstant::Max();
- }
+ }
auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);
return TInstant::MicroSeconds(us.time_since_epoch().count());
- }
-
- TSet<TStringBuf> GetPeerMetaKeys() const {
- TSet<TStringBuf> keys;
- for (const auto& [key, _]: Context.client_metadata()) {
- keys.emplace(key.data(), key.size());
- }
- return keys;
- }
-
- TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {
+ }
+
+ TSet<TStringBuf> GetPeerMetaKeys() const {
+ TSet<TStringBuf> keys;
+ for (const auto& [key, _]: Context.client_metadata()) {
+ keys.emplace(key.data(), key.size());
+ }
+ return keys;
+ }
+
+ TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {
const auto& clientMetadata = Context.client_metadata();
- const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});
- if (range.first == range.second) {
- return {};
- }
-
- TVector<TStringBuf> values;
- values.reserve(std::distance(range.first, range.second));
-
+ const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});
+ if (range.first == range.second) {
+ return {};
+ }
+
+ TVector<TStringBuf> values;
+ values.reserve(std::distance(range.first, range.second));
+
for (auto it = range.first; it != range.second; ++it) {
- values.emplace_back(it->second.data(), it->second.size());
+ values.emplace_back(it->second.data(), it->second.size());
}
- return values;
+ return values;
}
grpc_compression_level GetCompressionLevel() const {
@@ -91,4 +91,4 @@ protected:
grpc::ServerContext Context;
};
-} // namespace NGrpc
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp
index bdd72b3292..fa96e0100b 100644
--- a/library/cpp/grpc/server/grpc_counters.cpp
+++ b/library/cpp/grpc/server/grpc_counters.cpp
@@ -1,45 +1,45 @@
#include "grpc_counters.h"
-
-namespace NGrpc {
-namespace {
-
-class TFakeCounterBlock final: public ICounterBlock {
-private:
- void CountNotOkRequest() override {
- }
-
- void CountNotOkResponse() override {
- }
-
- void CountNotAuthenticated() override {
- }
-
- void CountResourceExhausted() override {
- }
-
- void CountRequestBytes(ui32 /*requestSize*/) override {
- }
-
- void CountResponseBytes(ui32 /*responseSize*/) override {
- }
-
- void StartProcessing(ui32 /*requestSize*/) override {
- }
-
- void FinishProcessing(
- ui32 /*requestSize*/,
- ui32 /*responseSize*/,
- bool /*ok*/,
- ui32 /*status*/,
- TDuration /*requestDuration*/) override
- {
- }
-};
-
-} // namespace
-
-ICounterBlockPtr FakeCounterBlock() {
- return MakeIntrusive<TFakeCounterBlock>();
-}
-
-} // namespace NGrpc
+
+namespace NGrpc {
+namespace {
+
+class TFakeCounterBlock final: public ICounterBlock {
+private:
+ void CountNotOkRequest() override {
+ }
+
+ void CountNotOkResponse() override {
+ }
+
+ void CountNotAuthenticated() override {
+ }
+
+ void CountResourceExhausted() override {
+ }
+
+ void CountRequestBytes(ui32 /*requestSize*/) override {
+ }
+
+ void CountResponseBytes(ui32 /*responseSize*/) override {
+ }
+
+ void StartProcessing(ui32 /*requestSize*/) override {
+ }
+
+ void FinishProcessing(
+ ui32 /*requestSize*/,
+ ui32 /*responseSize*/,
+ bool /*ok*/,
+ ui32 /*status*/,
+ TDuration /*requestDuration*/) override
+ {
+ }
+};
+
+} // namespace
+
+ICounterBlockPtr FakeCounterBlock() {
+ return MakeIntrusive<TFakeCounterBlock>();
+}
+
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h
index a591beb84e..0b6c36c84c 100644
--- a/library/cpp/grpc/server/grpc_counters.h
+++ b/library/cpp/grpc/server/grpc_counters.h
@@ -1,10 +1,10 @@
#pragma once
-#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <util/generic/ptr.h>
-namespace NGrpc {
+namespace NGrpc {
struct ICounterBlock : public TThrRefBase {
virtual void CountNotOkRequest() = 0;
@@ -14,7 +14,7 @@ struct ICounterBlock : public TThrRefBase {
virtual void CountRequestBytes(ui32 requestSize) = 0;
virtual void CountResponseBytes(ui32 responseSize) = 0;
virtual void StartProcessing(ui32 requestSize) = 0;
- virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0;
+ virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0;
virtual void CountRequestsWithoutDatabase() {}
virtual void CountRequestsWithoutToken() {}
virtual void CountRequestWithoutTls() {}
@@ -126,11 +126,11 @@ public:
using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>;
-/**
- * Creates new instance of ICounterBlock implementation which does nothing.
- *
- * @return new instance
- */
-ICounterBlockPtr FakeCounterBlock();
-
-} // namespace NGrpc
+/**
+ * Creates new instance of ICounterBlock implementation which does nothing.
+ *
+ * @return new instance
+ */
+ICounterBlockPtr FakeCounterBlock();
+
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp
index 60db2f230d..d18a32776f 100644
--- a/library/cpp/grpc/server/grpc_request.cpp
+++ b/library/cpp/grpc/server/grpc_request.cpp
@@ -1,10 +1,10 @@
#include "grpc_request.h"
-namespace NGrpc {
+namespace NGrpc {
const char* GRPC_USER_AGENT_HEADER = "user-agent";
-class TStreamAdaptor: public IStreamAdaptor {
+class TStreamAdaptor: public IStreamAdaptor {
public:
TStreamAdaptor()
: StreamIsReady_(true)
@@ -56,4 +56,4 @@ IStreamAdaptor::TPtr CreateStreamAdaptor() {
return std::make_unique<TStreamAdaptor>();
}
-} // namespace NGrpc
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index dd9041eec7..5bd8d3902b 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -4,19 +4,19 @@
#include <google/protobuf/arena.h>
#include <google/protobuf/message.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/logger/priority.h>
-#include "grpc_response.h"
+#include "grpc_response.h"
#include "event_callback.h"
-#include "grpc_async_ctx_base.h"
-#include "grpc_counters.h"
+#include "grpc_async_ctx_base.h"
+#include "grpc_counters.h"
#include "grpc_request_base.h"
#include "grpc_server.h"
-#include "logger.h"
+#include "logger.h"
+
+#include <util/system/hp_timer.h>
-#include <util/system/hp_timer.h>
-
#include <grpc++/server.h>
#include <grpc++/server_context.h>
#include <grpc++/support/async_stream.h>
@@ -24,7 +24,7 @@
#include <grpc++/support/byte_buffer.h>
#include <grpc++/impl/codegen/async_stream.h>
-namespace NGrpc {
+namespace NGrpc {
class IStreamAdaptor {
public:
@@ -57,7 +57,7 @@ public:
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TRequestCallback requestCallback,
- const char* name,
+ const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
@@ -67,10 +67,10 @@ public:
, RequestCallback_(requestCallback)
, StreamRequestCallback_(nullptr)
, Name_(name)
- , Logger_(std::move(logger))
+ , Logger_(std::move(logger))
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
- , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
+ , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
@@ -85,7 +85,7 @@ public:
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TStreamRequestCallback requestCallback,
- const char* name,
+ const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
@@ -95,7 +95,7 @@ public:
, RequestCallback_(nullptr)
, StreamRequestCallback_(requestCallback)
, Name_(name)
- , Logger_(std::move(logger))
+ , Logger_(std::move(logger))
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
@@ -157,13 +157,13 @@ public:
TInstant Deadline() const override {
return TBaseAsyncContext<TService>::Deadline();
- }
-
- TSet<TStringBuf> GetPeerMetaKeys() const override {
- return TBaseAsyncContext<TService>::GetPeerMetaKeys();
- }
-
- TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
+ }
+
+ TSet<TStringBuf> GetPeerMetaKeys() const override {
+ return TBaseAsyncContext<TService>::GetPeerMetaKeys();
+ }
+
+ TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
return TBaseAsyncContext<TService>::GetPeerMetaValues(key);
}
@@ -233,10 +233,10 @@ private:
if (!Server_->IsShuttingDown()) {
if (RequestCallback_) {
MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
+ Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
} else {
MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
+ Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
}
}
}
@@ -257,20 +257,20 @@ private:
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
Y_VERIFY(this->Context.c_call());
- Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
+ Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
this, Name_, makeResponseString().data(), this->Context.peer().c_str());
-
- // because of std::function cannot hold move-only captured object
- // we allocate shared object on heap to avoid message copy
- auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
+
+ // because of std::function cannot hold move-only captured object
+ // we allocate shared object on heap to avoid message copy
+ auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
+ auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
this, Name_, makeResponseString().data(), this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
- StreamWriter_->Write(*uResp, GetGRpcTag());
+ StreamWriter_->Write(*uResp, GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), false);
}
@@ -283,20 +283,20 @@ private:
this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
- Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
+ Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
this->Context.peer().c_str());
-
- // because of std::function cannot hold move-only captured object
- // we allocate shared object on heap to avoid buffer copy
- auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz]() {
+
+ // because of std::function cannot hold move-only captured object
+ // we allocate shared object on heap to avoid buffer copy
+ auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
+ auto cb = [this, uResp = std::move(uResp), sz]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
- StreamWriter_->Write(*uResp, GetGRpcTag());
+ StreamWriter_->Write(*uResp, GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), false);
}
@@ -314,8 +314,8 @@ private:
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,
Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
StateFunc_ = &TThis::SetFinishError;
- TOut resp;
- Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());
+ TOut resp;
+ Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
" (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
@@ -380,7 +380,7 @@ private:
}
auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket"));
if (maybeToken.empty() || maybeToken[0].empty()) {
- TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};
+ TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};
Counters_->CountRequestsWithoutToken();
GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "
"Name# %s data# %s peer# %s database# %s", this, Name_,
@@ -484,12 +484,12 @@ private:
TOnRequest Cb_;
TRequestCallback RequestCallback_;
TStreamRequestCallback StreamRequestCallback_;
- const char* const Name_;
+ const char* const Name_;
TLoggerPtr Logger_;
ICounterBlockPtr Counters_;
IGRpcRequestLimiterPtr RequestLimiter_;
- THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
+ THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;
TStateFunc StateFunc_;
TIn* Request_;
@@ -520,10 +520,10 @@ public:
typename TBase::TOnRequest cb,
typename TBase::TRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter = nullptr)
- : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
+ : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
{
}
@@ -533,11 +533,11 @@ public:
typename TBase::TOnRequest cb,
typename TBase::TStreamRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters)
- : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
+ : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
{
}
};
-} // namespace NGrpc
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index b61cf553aa..fcfce1c181 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -9,7 +9,7 @@ namespace grpc {
class ByteBuffer;
}
-namespace NGrpc {
+namespace NGrpc {
extern const char* GRPC_USER_AGENT_HEADER;
@@ -30,7 +30,7 @@ struct TAuthState {
//! An interface that may be used to limit concurrency of requests
-class IGRpcRequestLimiter: public TThrRefBase {
+class IGRpcRequestLimiter: public TThrRefBase {
public:
virtual bool IncRequest() = 0;
virtual void DecRequest() = 0;
@@ -39,7 +39,7 @@ public:
using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>;
//! State of current request
-class IRequestContextBase: public TThrRefBase {
+class IRequestContextBase: public TThrRefBase {
public:
enum class EFinishStatus {
OK,
@@ -72,12 +72,12 @@ public:
//! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise
virtual TInstant Deadline() const = 0;
-
- //! Returns available peer metadata keys
- virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0;
-
+
+ //! Returns available peer metadata keys
+ virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0;
+
//! Returns peer optional metavalue
- virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0;
+ virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0;
//! Returns request compression level
virtual grpc_compression_level GetCompressionLevel() const = 0;
@@ -113,4 +113,4 @@ public:
virtual bool SslServer() const = 0;
};
-} // namespace NGrpc
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h
index 47b22c28d0..8e9afe44d5 100644
--- a/library/cpp/grpc/server/grpc_response.h
+++ b/library/cpp/grpc/server/grpc_response.h
@@ -1,90 +1,90 @@
-#pragma once
-
-#include <grpc++/impl/codegen/byte_buffer.h>
-#include <grpc++/impl/codegen/proto_utils.h>
-
-#include <variant>
-
-namespace NGrpc {
-
-/**
- * Universal response that owns underlying message or buffer.
- */
-template <typename TMsg>
+#pragma once
+
+#include <grpc++/impl/codegen/byte_buffer.h>
+#include <grpc++/impl/codegen/proto_utils.h>
+
+#include <variant>
+
+namespace NGrpc {
+
+/**
+ * Universal response that owns underlying message or buffer.
+ */
+template <typename TMsg>
class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly {
- friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>;
-
-public:
- explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept
- : Data_{TMsg{}}
- {
- std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg));
- }
-
- explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept
- : Data_{grpc::ByteBuffer{}}
- {
- std::get<grpc::ByteBuffer>(Data_).Swap(buffer);
- }
-
-private:
- std::variant<TMsg, grpc::ByteBuffer> Data_;
-};
-
-/**
- * Universal response that only keeps reference to underlying message or buffer.
- */
-template <typename TMsg>
-class TUniversalResponseRef: private TMoveOnly {
- friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>;
-
-public:
- explicit TUniversalResponseRef(const NProtoBuf::Message* msg)
- : Data_{msg}
- {
- }
-
- explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer)
- : Data_{buffer}
- {
- }
-
-private:
- std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_;
-};
-
-} // namespace NGrpc
-
-namespace grpc {
-
-template <typename TMsg>
-class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> {
-public:
- static Status Serialize(
- const NGrpc::TUniversalResponse<TMsg>& resp,
- ByteBuffer* buffer,
- bool* ownBuffer)
- {
- return std::visit([&](const auto& data) {
- using T = std::decay_t<decltype(data)>;
- return SerializationTraits<T>::Serialize(data, buffer, ownBuffer);
- }, resp.Data_);
- }
-};
-
-template <typename TMsg>
-class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> {
-public:
- static Status Serialize(
- const NGrpc::TUniversalResponseRef<TMsg>& resp,
- ByteBuffer* buffer,
- bool* ownBuffer)
- {
- return std::visit([&](const auto* data) {
- using T = std::decay_t<std::remove_pointer_t<decltype(data)>>;
- return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer);
- }, resp.Data_);
- }
-};
-
-} // namespace grpc
+ friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>;
+
+public:
+ explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept
+ : Data_{TMsg{}}
+ {
+ std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg));
+ }
+
+ explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept
+ : Data_{grpc::ByteBuffer{}}
+ {
+ std::get<grpc::ByteBuffer>(Data_).Swap(buffer);
+ }
+
+private:
+ std::variant<TMsg, grpc::ByteBuffer> Data_;
+};
+
+/**
+ * Universal response that only keeps reference to underlying message or buffer.
+ */
+template <typename TMsg>
+class TUniversalResponseRef: private TMoveOnly {
+ friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>;
+
+public:
+ explicit TUniversalResponseRef(const NProtoBuf::Message* msg)
+ : Data_{msg}
+ {
+ }
+
+ explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer)
+ : Data_{buffer}
+ {
+ }
+
+private:
+ std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_;
+};
+
+} // namespace NGrpc
+
+namespace grpc {
+
+template <typename TMsg>
+class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> {
+public:
+ static Status Serialize(
+ const NGrpc::TUniversalResponse<TMsg>& resp,
+ ByteBuffer* buffer,
+ bool* ownBuffer)
+ {
+ return std::visit([&](const auto& data) {
+ using T = std::decay_t<decltype(data)>;
+ return SerializationTraits<T>::Serialize(data, buffer, ownBuffer);
+ }, resp.Data_);
+ }
+};
+
+template <typename TMsg>
+class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> {
+public:
+ static Status Serialize(
+ const NGrpc::TUniversalResponseRef<TMsg>& resp,
+ ByteBuffer* buffer,
+ bool* ownBuffer)
+ {
+ return std::visit([&](const auto* data) {
+ using T = std::decay_t<std::remove_pointer_t<decltype(data)>>;
+ return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer);
+ }, resp.Data_);
+ }
+};
+
+} // namespace grpc
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 4f4c7412fc..7437b7a8f5 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -15,7 +15,7 @@
#endif
-namespace NGrpc {
+namespace NGrpc {
using NThreading::TFuture;
@@ -82,7 +82,7 @@ void TGRpcServer::Start() {
service->SetGlobalLimiterHandle(&Limiter_);
}
- class TKeepAliveOption: public grpc::ServerBuilderOption {
+ class TKeepAliveOption: public grpc::ServerBuilderOption {
public:
TKeepAliveOption(int idle, int interval)
: Idle(idle)
@@ -153,7 +153,7 @@ void TGRpcServer::Start() {
size_t index = 0;
for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
- service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
+ service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
}
if (Options_.UseCompletionQueuePerThread) {
@@ -237,4 +237,4 @@ TString TGRpcServer::GetHost() const {
return Options_.Host;
}
-} // namespace NGrpc
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index 24c8caa78d..d6814a90a0 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -1,7 +1,7 @@
#pragma once
#include "grpc_request_base.h"
-#include "logger.h"
+#include "logger.h"
#include <library/cpp/threading/future/future.h>
@@ -17,7 +17,7 @@
#include <grpc++/grpc++.h>
-namespace NGrpc {
+namespace NGrpc {
constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
@@ -95,9 +95,9 @@ struct TServerOptions {
DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr);
- //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
- DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
-
+ //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
+ DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
+
#undef DECLARE_FIELD
};
@@ -161,11 +161,11 @@ private:
using TGlobalLimiter = TInFlightLimiterImpl<i64>;
-class IGRpcService: public TThrRefBase {
+class IGRpcService: public TThrRefBase {
public:
virtual grpc::Service* GetService() = 0;
virtual void StopService() noexcept = 0;
- virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
+ virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
virtual bool IsUnsafeToShutdown() const = 0;
virtual size_t RequestsInProgress() const = 0;
@@ -178,7 +178,7 @@ public:
};
template<typename T>
-class TGrpcServiceBase: public IGRpcService {
+class TGrpcServiceBase: public IGRpcService {
public:
class TShutdownGuard {
using TOwner = TGrpcServiceBase<T>;
@@ -353,4 +353,4 @@ private:
TGlobalLimiter Limiter_;
};
-} // namespace NGrpc
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/logger.h b/library/cpp/grpc/server/logger.h
index 5e44d83d67..53af26be9c 100644
--- a/library/cpp/grpc/server/logger.h
+++ b/library/cpp/grpc/server/logger.h
@@ -1,43 +1,43 @@
-#pragma once
-
-#include <library/cpp/logger/priority.h>
-
-#include <util/generic/ptr.h>
-
-namespace NGrpc {
-
-class TLogger: public TThrRefBase {
-protected:
- TLogger() = default;
-
-public:
- [[nodiscard]]
- bool IsEnabled(ELogPriority priority) const noexcept {
- return DoIsEnabled(priority);
- }
-
- void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept {
- va_list args;
- va_start(args, format);
- DoWrite(priority, format, args);
- va_end(args);
- }
-
-protected:
- virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0;
- virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept = 0;
-};
-
-using TLoggerPtr = TIntrusivePtr<TLogger>;
-
-#define GRPC_LOG_DEBUG(logger, format, ...) \
- if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \
- logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \
- } else { }
-
-#define GRPC_LOG_INFO(logger, format, ...) \
- if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \
- logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \
- } else { }
-
-} // namespace NGrpc
+#pragma once
+
+#include <library/cpp/logger/priority.h>
+
+#include <util/generic/ptr.h>
+
+namespace NGrpc {
+
+class TLogger: public TThrRefBase {
+protected:
+ TLogger() = default;
+
+public:
+ [[nodiscard]]
+ bool IsEnabled(ELogPriority priority) const noexcept {
+ return DoIsEnabled(priority);
+ }
+
+ void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept {
+ va_list args;
+ va_start(args, format);
+ DoWrite(priority, format, args);
+ va_end(args);
+ }
+
+protected:
+ virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0;
+ virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept = 0;
+};
+
+using TLoggerPtr = TIntrusivePtr<TLogger>;
+
+#define GRPC_LOG_DEBUG(logger, format, ...) \
+ if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \
+ logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \
+ } else { }
+
+#define GRPC_LOG_INFO(logger, format, ...) \
+ if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \
+ logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \
+ } else { }
+
+} // namespace NGrpc
diff --git a/library/cpp/grpc/server/ut/grpc_response_ut.cpp b/library/cpp/grpc/server/ut/grpc_response_ut.cpp
index cb66478e94..8abc4e4e0e 100644
--- a/library/cpp/grpc/server/ut/grpc_response_ut.cpp
+++ b/library/cpp/grpc/server/ut/grpc_response_ut.cpp
@@ -1,88 +1,88 @@
-#include <library/cpp/grpc/server/grpc_response.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <google/protobuf/duration.pb.h>
-#include <grpc++/impl/codegen/proto_utils.h>
-#include <grpc++/impl/grpc_library.h>
-
-static ::grpc::internal::GrpcLibraryInitializer grpcInitializer;
-
-using namespace NGrpc;
-
-using google::protobuf::Duration;
-
-Y_UNIT_TEST_SUITE(ResponseTest) {
-
- template <typename T>
- grpc::ByteBuffer Serialize(T resp) {
- grpc::ByteBuffer buf;
- bool ownBuf = false;
- grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf);
- UNIT_ASSERT(status.ok());
- return buf;
- }
-
- template <typename T>
- T Deserialize(grpc::ByteBuffer* buf) {
- T message;
- auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message);
- UNIT_ASSERT(status.ok());
- return message;
- }
-
- Y_UNIT_TEST(UniversalResponseMsg) {
- Duration d1;
- d1.set_seconds(12345);
- d1.set_nanos(67890);
-
- auto buf = Serialize(TUniversalResponse<Duration>(&d1));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
- }
-
- Y_UNIT_TEST(UniversalResponseBuf) {
- Duration d1;
- d1.set_seconds(123);
- d1.set_nanos(456);
-
- TString data = d1.SerializeAsString();
- grpc::Slice dataSlice{data.data(), data.size()};
- grpc::ByteBuffer dataBuf{&dataSlice, 1};
-
- auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
- }
-
- Y_UNIT_TEST(UniversalResponseRefMsg) {
- Duration d1;
- d1.set_seconds(12345);
- d1.set_nanos(67890);
-
- auto buf = Serialize(TUniversalResponseRef<Duration>(&d1));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
- }
-
- Y_UNIT_TEST(UniversalResponseRefBuf) {
- Duration d1;
- d1.set_seconds(123);
- d1.set_nanos(456);
-
- TString data = d1.SerializeAsString();
- grpc::Slice dataSlice{data.data(), data.size()};
- grpc::ByteBuffer dataBuf{&dataSlice, 1};
-
- auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
- }
-}
+#include <library/cpp/grpc/server/grpc_response.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <google/protobuf/duration.pb.h>
+#include <grpc++/impl/codegen/proto_utils.h>
+#include <grpc++/impl/grpc_library.h>
+
+static ::grpc::internal::GrpcLibraryInitializer grpcInitializer;
+
+using namespace NGrpc;
+
+using google::protobuf::Duration;
+
+Y_UNIT_TEST_SUITE(ResponseTest) {
+
+ template <typename T>
+ grpc::ByteBuffer Serialize(T resp) {
+ grpc::ByteBuffer buf;
+ bool ownBuf = false;
+ grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf);
+ UNIT_ASSERT(status.ok());
+ return buf;
+ }
+
+ template <typename T>
+ T Deserialize(grpc::ByteBuffer* buf) {
+ T message;
+ auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message);
+ UNIT_ASSERT(status.ok());
+ return message;
+ }
+
+ Y_UNIT_TEST(UniversalResponseMsg) {
+ Duration d1;
+ d1.set_seconds(12345);
+ d1.set_nanos(67890);
+
+ auto buf = Serialize(TUniversalResponse<Duration>(&d1));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
+ }
+
+ Y_UNIT_TEST(UniversalResponseBuf) {
+ Duration d1;
+ d1.set_seconds(123);
+ d1.set_nanos(456);
+
+ TString data = d1.SerializeAsString();
+ grpc::Slice dataSlice{data.data(), data.size()};
+ grpc::ByteBuffer dataBuf{&dataSlice, 1};
+
+ auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
+ }
+
+ Y_UNIT_TEST(UniversalResponseRefMsg) {
+ Duration d1;
+ d1.set_seconds(12345);
+ d1.set_nanos(67890);
+
+ auto buf = Serialize(TUniversalResponseRef<Duration>(&d1));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
+ }
+
+ Y_UNIT_TEST(UniversalResponseRefBuf) {
+ Duration d1;
+ d1.set_seconds(123);
+ d1.set_nanos(456);
+
+ TString data = d1.SerializeAsString();
+ grpc::Slice dataSlice{data.data(), data.size()};
+ grpc::ByteBuffer dataBuf{&dataSlice, 1};
+
+ auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
+ }
+}
diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
index 3457e98bf1..c34d3b8c2b 100644
--- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
+++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
@@ -1,14 +1,14 @@
-#include <library/cpp/grpc/server/grpc_request.h>
+#include <library/cpp/grpc/server/grpc_request.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
#include <util/system/thread.h>
#include <util/thread/pool.h>
-using namespace NGrpc;
+using namespace NGrpc;
// Here we emulate stream data producer
-class TOrderedProducer: public TThread {
+class TOrderedProducer: public TThread {
public:
TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
: TThread(&ThreadProc, this)
diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make
index ff6c8fdb7b..feb3291af9 100644
--- a/library/cpp/grpc/server/ut/ya.make
+++ b/library/cpp/grpc/server/ut/ya.make
@@ -1,19 +1,19 @@
-UNITTEST_FOR(library/cpp/grpc/server)
+UNITTEST_FOR(library/cpp/grpc/server)
OWNER(
dcherednik
g:kikimr
)
-TIMEOUT(600)
-SIZE(MEDIUM)
+TIMEOUT(600)
+SIZE(MEDIUM)
PEERDIR(
- library/cpp/grpc/server
+ library/cpp/grpc/server
)
SRCS(
- grpc_response_ut.cpp
+ grpc_response_ut.cpp
stream_adaptor_ut.cpp
)
diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make
index b0f262e5dc..356a1b6793 100644
--- a/library/cpp/grpc/server/ya.make
+++ b/library/cpp/grpc/server/ya.make
@@ -16,10 +16,10 @@ GENERATE_ENUM_SERIALIZATION(grpc_request_base.h)
PEERDIR(
contrib/libs/grpc
- library/cpp/monlib/dynamic_counters/percentile
+ library/cpp/monlib/dynamic_counters/percentile
)
END()
-
-RECURSE_FOR_TESTS(ut)
-
+
+RECURSE_FOR_TESTS(ut)
+