aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/client/grpc_client_low.h
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-10 16:47:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:43 +0300
commit330c83f8c116bd45316397b179275e9d87007e7d (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/grpc/client/grpc_client_low.h
parent22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff)
downloadydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.h')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h1234
1 files changed, 617 insertions, 617 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index 5d0983f804..ab0a0627be 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -52,125 +52,125 @@ public:
virtual void Destroy() = 0;
};
-// Implementation of IQueueClientEvent that reduces allocations
-template<class TSelf>
-class TQueueClientFixedEvent : private IQueueClientEvent {
- using TCallback = void (TSelf::*)(bool);
-
-public:
- TQueueClientFixedEvent(TSelf* self, TCallback callback)
- : Self(self)
- , Callback(callback)
- { }
-
- IQueueClientEvent* Prepare() {
- Self->Ref();
- return this;
- }
-
-private:
- bool Execute(bool ok) override {
- ((*Self).*Callback)(ok);
- return false;
- }
-
- void Destroy() override {
- Self->UnRef();
- }
-
-private:
- TSelf* const Self;
- TCallback const Callback;
-};
-
-class IQueueClientContext;
-using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>;
-
-// Provider of IQueueClientContext instances
-class IQueueClientContextProvider {
-public:
- virtual ~IQueueClientContextProvider() = default;
-
- virtual IQueueClientContextPtr CreateContext() = 0;
-};
-
-// Activity context for a low-level client
-class IQueueClientContext : public IQueueClientContextProvider {
-public:
- virtual ~IQueueClientContext() = default;
-
- //! Returns CompletionQueue associated with the client
- virtual grpc::CompletionQueue* CompletionQueue() = 0;
-
- //! Returns true if context has been cancelled
- virtual bool IsCancelled() const = 0;
-
- //! Tries to cancel context, calling all registered callbacks
- virtual bool Cancel() = 0;
-
- //! Subscribes callback to cancellation
- //
- // Note there's no way to unsubscribe, if subscription is temporary
- // make sure you create a new context with CreateContext and release
- // it as soon as it's no longer needed.
- virtual void SubscribeCancel(std::function<void()> callback) = 0;
-
- //! Subscribes callback to cancellation
- //
- // This alias is for compatibility with older code.
- void SubscribeStop(std::function<void()> callback) {
- SubscribeCancel(std::move(callback));
- }
-};
-
+// Implementation of IQueueClientEvent that reduces allocations
+template<class TSelf>
+class TQueueClientFixedEvent : private IQueueClientEvent {
+ using TCallback = void (TSelf::*)(bool);
+
+public:
+ TQueueClientFixedEvent(TSelf* self, TCallback callback)
+ : Self(self)
+ , Callback(callback)
+ { }
+
+ IQueueClientEvent* Prepare() {
+ Self->Ref();
+ return this;
+ }
+
+private:
+ bool Execute(bool ok) override {
+ ((*Self).*Callback)(ok);
+ return false;
+ }
+
+ void Destroy() override {
+ Self->UnRef();
+ }
+
+private:
+ TSelf* const Self;
+ TCallback const Callback;
+};
+
+class IQueueClientContext;
+using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>;
+
+// Provider of IQueueClientContext instances
+class IQueueClientContextProvider {
+public:
+ virtual ~IQueueClientContextProvider() = default;
+
+ virtual IQueueClientContextPtr CreateContext() = 0;
+};
+
+// Activity context for a low-level client
+class IQueueClientContext : public IQueueClientContextProvider {
+public:
+ virtual ~IQueueClientContext() = default;
+
+ //! Returns CompletionQueue associated with the client
+ virtual grpc::CompletionQueue* CompletionQueue() = 0;
+
+ //! Returns true if context has been cancelled
+ virtual bool IsCancelled() const = 0;
+
+ //! Tries to cancel context, calling all registered callbacks
+ virtual bool Cancel() = 0;
+
+ //! Subscribes callback to cancellation
+ //
+ // Note there's no way to unsubscribe, if subscription is temporary
+ // make sure you create a new context with CreateContext and release
+ // it as soon as it's no longer needed.
+ virtual void SubscribeCancel(std::function<void()> callback) = 0;
+
+ //! Subscribes callback to cancellation
+ //
+ // This alias is for compatibility with older code.
+ void SubscribeStop(std::function<void()> callback) {
+ SubscribeCancel(std::move(callback));
+ }
+};
+
// Represents grpc status and error message string
struct TGrpcStatus {
- TString Msg;
+ TString Msg;
TString Details;
- int GRpcStatusCode;
- bool InternalError;
-
- TGrpcStatus()
- : GRpcStatusCode(grpc::StatusCode::OK)
- , InternalError(false)
- { }
-
- TGrpcStatus(TString msg, int statusCode, bool internalError)
- : Msg(std::move(msg))
- , GRpcStatusCode(statusCode)
- , InternalError(internalError)
- { }
-
+ int GRpcStatusCode;
+ bool InternalError;
+
+ TGrpcStatus()
+ : GRpcStatusCode(grpc::StatusCode::OK)
+ , InternalError(false)
+ { }
+
+ TGrpcStatus(TString msg, int statusCode, bool internalError)
+ : Msg(std::move(msg))
+ , GRpcStatusCode(statusCode)
+ , InternalError(internalError)
+ { }
+
TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {})
- : Msg(std::move(msg))
+ : Msg(std::move(msg))
, Details(std::move(details))
- , GRpcStatusCode(status)
- , InternalError(false)
- { }
-
- TGrpcStatus(const grpc::Status& status)
+ , GRpcStatusCode(status)
+ , InternalError(false)
+ { }
+
+ TGrpcStatus(const grpc::Status& status)
: TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details()))
- { }
-
- TGrpcStatus& operator=(const grpc::Status& status) {
- Msg = TString(status.error_message());
+ { }
+
+ TGrpcStatus& operator=(const grpc::Status& status) {
+ Msg = TString(status.error_message());
Details = TString(status.error_details());
- GRpcStatusCode = status.error_code();
- InternalError = false;
- return *this;
- }
-
- static TGrpcStatus Internal(TString msg) {
- return { std::move(msg), -1, true };
- }
-
- bool Ok() const {
- return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
- }
+ GRpcStatusCode = status.error_code();
+ InternalError = false;
+ return *this;
+ }
+
+ static TGrpcStatus Internal(TString msg) {
+ return { std::move(msg), -1, true };
+ }
+
+ bool Ok() const {
+ return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
+ }
};
bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
- return status.Ok();
+ return status.Ok();
}
// Response callback type - this callback will be called when request is finished
@@ -241,9 +241,9 @@ public:
{ }
~TSimpleRequestProcessor() {
- if (!Replied_ && Callback_) {
- Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
+ if (!Replied_ && Callback_) {
+ Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
+ Callback_ = nullptr; // free resources as early as possible
}
}
@@ -251,53 +251,53 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex_);
LocalContext.reset();
- }
- TGrpcStatus status;
- if (ok) {
+ }
+ TGrpcStatus status;
+ if (ok) {
status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
}
Replied_ = true;
Callback_(std::move(status), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
+ Callback_ = nullptr; // free resources as early as possible
return false;
}
void Destroy() override {
- UnRef();
+ UnRef();
}
private:
- IQueueClientEvent* FinishedEvent() {
- Ref();
- return this;
+ IQueueClientEvent* FinishedEvent() {
+ Ref();
+ return this;
}
void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- Replied_ = true;
- Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
- Callback_ = nullptr;
- return;
- }
+ auto context = provider->CreateContext();
+ if (!context) {
+ Replied_ = true;
+ Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
+ Callback_ = nullptr;
+ return;
+ }
{
std::unique_lock<std::mutex> guard(Mutex_);
LocalContext = context;
Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
Reader_->Finish(&Reply_, &Status, FinishedEvent());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Stop();
- });
- }
-
- void Stop() {
+ }
+ context->SubscribeStop([self = TPtr(this)] {
+ self->Stop();
+ });
+ }
+
+ void Stop() {
Context.TryCancel();
- }
-
- TResponseCallback<TResponse> Callback_;
+ }
+
+ TResponseCallback<TResponse> Callback_;
TResponse Reply_;
std::mutex Mutex_;
TAsyncReaderPtr Reader_;
@@ -387,49 +387,49 @@ private:
template<class TResponse>
class IStreamRequestReadProcessor : public TThrRefBase {
-public:
+public:
using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
- using TReadCallback = std::function<void(TGrpcStatus&&)>;
-
- /**
- * Asynchronously cancel the request
- */
- virtual void Cancel() = 0;
-
- /**
+ using TReadCallback = std::function<void(TGrpcStatus&&)>;
+
+ /**
+ * Asynchronously cancel the request
+ */
+ virtual void Cancel() = 0;
+
+ /**
* Scheduled initial server metadata read from the stream
*/
virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;
/**
- * Scheduled response read from the stream
- * Callback will be called with the status if it failed
- * Only one Read or Finish call may be active at a time
- */
- virtual void Read(TResponse* response, TReadCallback callback) = 0;
-
- /**
- * Stop reading and gracefully finish the stream
- * Only one Read or Finish call may be active at a time
- */
- virtual void Finish(TReadCallback callback) = 0;
-
- /**
- * Additional callback to be called when stream has finished
- */
- virtual void AddFinishedCallback(TReadCallback callback) = 0;
-};
-
+ * Scheduled response read from the stream
+ * Callback will be called with the status if it failed
+ * Only one Read or Finish call may be active at a time
+ */
+ virtual void Read(TResponse* response, TReadCallback callback) = 0;
+
+ /**
+ * Stop reading and gracefully finish the stream
+ * Only one Read or Finish call may be active at a time
+ */
+ virtual void Finish(TReadCallback callback) = 0;
+
+ /**
+ * Additional callback to be called when stream has finished
+ */
+ virtual void AddFinishedCallback(TReadCallback callback) = 0;
+};
+
template<class TRequest, class TResponse>
class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {
public:
using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;
- using TWriteCallback = std::function<void(TGrpcStatus&&)>;
+ using TWriteCallback = std::function<void(TGrpcStatus&&)>;
/**
* Scheduled request write to the stream
*/
- virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
+ virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
};
class TGRpcKeepAliveSocketMutator;
@@ -548,7 +548,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
Cancelled = true;
- if (Started && !ReadFinished) {
+ if (Started && !ReadFinished) {
if (!ReadActive) {
ReadFinished = true;
}
@@ -640,31 +640,31 @@ public:
callback(std::move(status));
}
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
-
- TGrpcStatus status;
-
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_VERIFY(callback, "Unexpected empty callback");
+
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
-
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
+ if (!Finished) {
+ FinishedCallbacks.emplace_back().swap(callback);
+ return;
+ }
+
+ if (FinishedOk) {
+ status = Status;
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ callback(std::move(status));
+ }
+
private:
void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
auto context = provider->CreateContext();
@@ -727,8 +727,8 @@ private:
{
std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
+ Started = true;
+ if (!ok || Cancelled) {
ReadFinished = true;
Stream->Finish(&Status, OnFinishedTag.Prepare());
return;
@@ -743,7 +743,7 @@ private:
void OnFinished(bool ok) {
TGrpcStatus status;
std::vector<TReadCallback> finishedCallbacks;
- TReaderCallback startCallback;
+ TReaderCallback startCallback;
TReadCallback readCallback;
TReadCallback finishCallback;
@@ -756,19 +756,19 @@ private:
if (ok) {
status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
} else {
status = TGrpcStatus::Internal("Unexpected error");
}
- finishedCallbacks.swap(FinishedCallbacks);
-
- if (Callback) {
- Y_VERIFY(!ReadActive);
- startCallback = std::move(Callback);
- Callback = nullptr;
- } else if (ReadActive) {
+ finishedCallbacks.swap(FinishedCallbacks);
+
+ if (Callback) {
+ Y_VERIFY(!ReadActive);
+ startCallback = std::move(Callback);
+ Callback = nullptr;
+ } else if (ReadActive) {
if (ReadCallback) {
readCallback = std::move(ReadCallback);
ReadCallback = nullptr;
@@ -780,18 +780,18 @@ private:
}
}
- for (auto& finishedCallback : finishedCallbacks) {
- auto statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
-
- if (startCallback) {
+ for (auto& finishedCallback : finishedCallbacks) {
+ auto statusCopy = status;
+ finishedCallback(std::move(statusCopy));
+ }
+
+ if (startCallback) {
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
+ }
+ startCallback(std::move(status), nullptr);
+ } else if (readCallback) {
if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- startCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
}
readCallback(std::move(status));
@@ -812,7 +812,7 @@ private:
TReadCallback FinishCallback;
std::vector<TReadCallback> FinishedCallbacks;
std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
+ bool Started = false;
bool HasInitialMetadata = false;
bool ReadActive = false;
bool ReadFinished = false;
@@ -821,72 +821,72 @@ private:
bool FinishedOk = false;
};
-template<class TRequest, class TResponse>
+template<class TRequest, class TResponse>
using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;
-
-template<class TStub, class TRequest, class TResponse>
+
+template<class TStub, class TRequest, class TResponse>
class TStreamRequestReadWriteProcessor
: public IStreamRequestReadWriteProcessor<TRequest, TResponse>
, public TGRpcRequestProcessorCommon {
-public:
+public:
using TSelf = TStreamRequestReadWriteProcessor;
using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;
- using TPtr = TIntrusivePtr<TSelf>;
- using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
- using TReadCallback = typename TBase::TReadCallback;
- using TWriteCallback = typename TBase::TWriteCallback;
- using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
- using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
-
+ using TPtr = TIntrusivePtr<TSelf>;
+ using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
+ using TReadCallback = typename TBase::TReadCallback;
+ using TWriteCallback = typename TBase::TWriteCallback;
+ using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
+ using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
+
explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
- : ConnectedCallback(std::move(callback))
- {
- Y_VERIFY(ConnectedCallback, "Missing connected callback");
- }
-
- void Cancel() override {
- Context.TryCancel();
-
+ : ConnectedCallback(std::move(callback))
+ {
+ Y_VERIFY(ConnectedCallback, "Missing connected callback");
+ }
+
+ void Cancel() override {
+ Context.TryCancel();
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Cancelled = true;
- if (Started && !(ReadFinished && WriteFinished)) {
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (ReadFinished && WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- }
-
- void Write(TRequest&& request, TWriteCallback callback) override {
- TGrpcStatus status;
-
+ Cancelled = true;
+ if (Started && !(ReadFinished && WriteFinished)) {
+ if (!ReadActive) {
+ ReadFinished = true;
+ }
+ if (!WriteActive) {
+ WriteFinished = true;
+ }
+ if (ReadFinished && WriteFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ }
+ }
+ }
+
+ void Write(TRequest&& request, TWriteCallback callback) override {
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- if (Cancelled || ReadFinished || WriteFinished) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- } else if (WriteActive) {
- auto& item = WriteQueue.emplace_back();
- item.Callback.swap(callback);
- item.Request.Swap(&request);
- } else {
- WriteActive = true;
- WriteCallback.swap(callback);
- Stream->Write(request, OnWriteDoneTag.Prepare());
- }
- }
-
- if (!status.Ok() && callback) {
- callback(std::move(status));
- }
- }
-
+ if (Cancelled || ReadFinished || WriteFinished) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
+ } else if (WriteActive) {
+ auto& item = WriteQueue.emplace_back();
+ item.Callback.swap(callback);
+ item.Request.Swap(&request);
+ } else {
+ WriteActive = true;
+ WriteCallback.swap(callback);
+ Stream->Write(request, OnWriteDoneTag.Prepare());
+ }
+ }
+
+ if (!status.Ok() && callback) {
+ callback(std::move(status));
+ }
+ }
+
void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
TGrpcStatus status;
@@ -916,321 +916,321 @@ public:
callback(std::move(status));
}
- void Read(TResponse* message, TReadCallback callback) override {
- TGrpcStatus status;
-
+ void Read(TResponse* message, TReadCallback callback) override {
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- if (!ReadFinished) {
- Stream->Read(message, OnReadDoneTag.Prepare());
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
-
- callback(std::move(status));
- }
-
- void Finish(TReadCallback callback) override {
- TGrpcStatus status;
-
+ Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ if (!Finished) {
+ ReadActive = true;
+ ReadCallback = std::move(callback);
+ if (!ReadFinished) {
+ Stream->Read(message, OnReadDoneTag.Prepare());
+ }
+ return;
+ }
+ if (FinishedOk) {
+ status = Status;
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ }
+
+ callback(std::move(status));
+ }
+
+ void Finish(TReadCallback callback) override {
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- FinishCallback = std::move(callback);
- if (!ReadFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
-
- TGrpcStatus status;
-
+ Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ if (!Finished) {
+ ReadActive = true;
+ FinishCallback = std::move(callback);
+ if (!ReadFinished) {
+ ReadFinished = true;
+ if (!WriteActive) {
+ WriteFinished = true;
+ }
+ if (WriteFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ }
+ return;
+ }
+ if (FinishedOk) {
+ status = Status;
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ callback(std::move(status));
+ }
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_VERIFY(callback, "Unexpected empty callback");
+
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
-
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
-private:
- template<typename> friend class TServiceConnection;
-
- void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- auto callback = std::move(ConnectedCallback);
- TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
- callback(std::move(status), nullptr);
- return;
- }
-
+ if (!Finished) {
+ FinishedCallbacks.emplace_back().swap(callback);
+ return;
+ }
+
+ if (FinishedOk) {
+ status = Status;
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ callback(std::move(status));
+ }
+
+private:
+ template<typename> friend class TServiceConnection;
+
+ void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
+ auto context = provider->CreateContext();
+ if (!context) {
+ auto callback = std::move(ConnectedCallback);
+ TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
+ callback(std::move(status), nullptr);
+ return;
+ }
+
{
std::unique_lock<std::mutex> guard(Mutex);
- LocalContext = context;
- Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
- }
-
- context->SubscribeStop([self = TPtr(this)] {
- self->Cancel();
- });
- }
-
-private:
- void OnConnected(bool ok) {
- TConnectedCallback callback;
-
+ LocalContext = context;
+ Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
+ }
+
+ context->SubscribeStop([self = TPtr(this)] {
+ self->Cancel();
+ });
+ }
+
+private:
+ void OnConnected(bool ok) {
+ TConnectedCallback callback;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
- ReadFinished = true;
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
-
- callback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- }
-
- callback({ }, typename TBase::TPtr(this));
- }
-
- void OnReadDone(bool ok) {
- TGrpcStatus status;
- TReadCallback callback;
+ Started = true;
+ if (!ok || Cancelled) {
+ ReadFinished = true;
+ WriteFinished = true;
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ return;
+ }
+
+ callback = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ }
+
+ callback({ }, typename TBase::TPtr(this));
+ }
+
+ void OnReadDone(bool ok) {
+ TGrpcStatus status;
+ TReadCallback callback;
std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
-
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(ReadActive, "Unexpected Read done callback");
- Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
-
- if (!ok || Cancelled || WriteFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- if (!ok) {
- // Keep ReadActive=true, so callback is called
- // after the call is finished with an error
- return;
- }
- }
-
- callback = std::move(ReadCallback);
- ReadCallback = nullptr;
- ReadActive = false;
+ Y_VERIFY(ReadActive, "Unexpected Read done callback");
+ Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
+
+ if (!ok || Cancelled || WriteFinished) {
+ ReadFinished = true;
+ if (!WriteActive) {
+ WriteFinished = true;
+ }
+ if (WriteFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ if (!ok) {
+ // Keep ReadActive=true, so callback is called
+ // after the call is finished with an error
+ return;
+ }
+ }
+
+ callback = std::move(ReadCallback);
+ ReadCallback = nullptr;
+ ReadActive = false;
initialMetadata = InitialMetadata;
InitialMetadata = nullptr;
HasInitialMetadata = true;
- }
-
+ }
+
if (initialMetadata) {
GetInitialMetadata(initialMetadata);
}
- callback(std::move(status));
- }
-
- void OnWriteDone(bool ok) {
- TWriteCallback okCallback;
-
+ callback(std::move(status));
+ }
+
+ void OnWriteDone(bool ok) {
+ TWriteCallback okCallback;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(WriteActive, "Unexpected Write done callback");
- Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag");
-
- if (ok) {
- okCallback.swap(WriteCallback);
- } else if (WriteCallback) {
- // Put callback back on the queue until OnFinished
- auto& item = WriteQueue.emplace_front();
- item.Callback.swap(WriteCallback);
- }
-
- if (!ok || Cancelled) {
- WriteActive = false;
- WriteFinished = true;
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (ReadFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
+ Y_VERIFY(WriteActive, "Unexpected Write done callback");
+ Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag");
+
+ if (ok) {
+ okCallback.swap(WriteCallback);
+ } else if (WriteCallback) {
+ // Put callback back on the queue until OnFinished
+ auto& item = WriteQueue.emplace_front();
+ item.Callback.swap(WriteCallback);
+ }
+
+ if (!ok || Cancelled) {
+ WriteActive = false;
+ WriteFinished = true;
+ if (!ReadActive) {
+ ReadFinished = true;
+ }
+ if (ReadFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
} else if (!WriteQueue.empty()) {
- WriteCallback.swap(WriteQueue.front().Callback);
- Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
- WriteQueue.pop_front();
- } else {
- WriteActive = false;
- if (ReadFinished) {
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
-
- if (okCallback) {
- okCallback(TGrpcStatus());
- }
- }
-
- void OnFinished(bool ok) {
- TGrpcStatus status;
+ WriteCallback.swap(WriteQueue.front().Callback);
+ Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
+ WriteQueue.pop_front();
+ } else {
+ WriteActive = false;
+ if (ReadFinished) {
+ WriteFinished = true;
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ }
+ }
+
+ if (okCallback) {
+ okCallback(TGrpcStatus());
+ }
+ }
+
+ void OnFinished(bool ok) {
+ TGrpcStatus status;
std::deque<TWriteItem> writesDropped;
std::vector<TReadCallback> finishedCallbacks;
- TConnectedCallback connectedCallback;
- TReadCallback readCallback;
- TReadCallback finishCallback;
-
+ TConnectedCallback connectedCallback;
+ TReadCallback readCallback;
+ TReadCallback finishCallback;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Finished = true;
- FinishedOk = ok;
- LocalContext.reset();
-
- if (ok) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
-
- writesDropped.swap(WriteQueue);
- finishedCallbacks.swap(FinishedCallbacks);
-
- if (ConnectedCallback) {
- Y_VERIFY(!ReadActive);
- connectedCallback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- } else if (ReadActive) {
- if (ReadCallback) {
- readCallback = std::move(ReadCallback);
- ReadCallback = nullptr;
- } else {
- finishCallback = std::move(FinishCallback);
- FinishCallback = nullptr;
- }
- ReadActive = false;
- }
- }
-
- for (auto& item : writesDropped) {
- if (item.Callback) {
- TGrpcStatus writeStatus = status;
- if (writeStatus.Ok()) {
- writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- }
- item.Callback(std::move(writeStatus));
- }
- }
-
- for (auto& finishedCallback : finishedCallbacks) {
- TGrpcStatus statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
-
- if (connectedCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- connectedCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
- readCallback(std::move(status));
- } else if (finishCallback) {
- finishCallback(std::move(status));
- }
- }
-
-private:
- struct TWriteItem {
- TWriteCallback Callback;
- TRequest Request;
- };
-
-private:
- using TFixedEvent = TQueueClientFixedEvent<TSelf>;
-
- TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected };
- TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
- TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone };
- TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
-
-private:
+ Finished = true;
+ FinishedOk = ok;
+ LocalContext.reset();
+
+ if (ok) {
+ status = Status;
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+
+ writesDropped.swap(WriteQueue);
+ finishedCallbacks.swap(FinishedCallbacks);
+
+ if (ConnectedCallback) {
+ Y_VERIFY(!ReadActive);
+ connectedCallback = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ } else if (ReadActive) {
+ if (ReadCallback) {
+ readCallback = std::move(ReadCallback);
+ ReadCallback = nullptr;
+ } else {
+ finishCallback = std::move(FinishCallback);
+ FinishCallback = nullptr;
+ }
+ ReadActive = false;
+ }
+ }
+
+ for (auto& item : writesDropped) {
+ if (item.Callback) {
+ TGrpcStatus writeStatus = status;
+ if (writeStatus.Ok()) {
+ writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
+ }
+ item.Callback(std::move(writeStatus));
+ }
+ }
+
+ for (auto& finishedCallback : finishedCallbacks) {
+ TGrpcStatus statusCopy = status;
+ finishedCallback(std::move(statusCopy));
+ }
+
+ if (connectedCallback) {
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
+ }
+ connectedCallback(std::move(status), nullptr);
+ } else if (readCallback) {
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ }
+ readCallback(std::move(status));
+ } else if (finishCallback) {
+ finishCallback(std::move(status));
+ }
+ }
+
+private:
+ struct TWriteItem {
+ TWriteCallback Callback;
+ TRequest Request;
+ };
+
+private:
+ using TFixedEvent = TQueueClientFixedEvent<TSelf>;
+
+ TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected };
+ TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
+ TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone };
+ TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
+
+private:
std::mutex Mutex;
- TAsyncReaderWriterPtr Stream;
- TConnectedCallback ConnectedCallback;
- TReadCallback ReadCallback;
- TReadCallback FinishCallback;
+ TAsyncReaderWriterPtr Stream;
+ TConnectedCallback ConnectedCallback;
+ TReadCallback ReadCallback;
+ TReadCallback FinishCallback;
std::vector<TReadCallback> FinishedCallbacks;
std::deque<TWriteItem> WriteQueue;
- TWriteCallback WriteCallback;
+ TWriteCallback WriteCallback;
std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
+ bool Started = false;
bool HasInitialMetadata = false;
- bool ReadActive = false;
- bool ReadFinished = false;
- bool WriteActive = false;
- bool WriteFinished = false;
- bool Finished = false;
- bool Cancelled = false;
- bool FinishedOk = false;
-};
-
+ bool ReadActive = false;
+ bool ReadFinished = false;
+ bool WriteActive = false;
+ bool WriteFinished = false;
+ bool Finished = false;
+ bool Cancelled = false;
+ bool FinishedOk = false;
+};
+
class TGRpcClientLow;
template<typename TGRpcService>
@@ -1245,9 +1245,9 @@ public:
template<typename TRequest, typename TResponse>
void DoRequest(const TRequest& request,
TResponseCallback<TResponse> callback,
- typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
+ typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
+ IQueueClientContextProvider* provider = nullptr)
{
auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
processor->ApplyMeta(metas);
@@ -1282,31 +1282,31 @@ public:
processor->ApplyMeta(metas);
processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);
}
-
+
/*
* Start streaming response reading (one request, many responses)
*/
- template<typename TRequest, typename TResponse>
+ template<typename TRequest, typename TResponse>
void DoStreamRequest(const TRequest& request,
TStreamReaderCallback<TResponse> callback,
typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
+ const TCallMeta& metas = { },
+ IQueueClientContextProvider* provider = nullptr)
+ {
auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
+ processor->ApplyMeta(metas);
processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);
- }
-
+ }
+
private:
TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,
- IQueueClientContextProvider* provider)
+ IQueueClientContextProvider* provider)
: Stub_(TGRpcService::NewStub(ci))
- , Provider_(provider)
- {
- Y_VERIFY(Provider_, "Connection does not have a queue provider");
- }
-
+ , Provider_(provider)
+ {
+ Y_VERIFY(Provider_, "Connection does not have a queue provider");
+ }
+
TServiceConnection(TStubsHolder& holder,
IQueueClientContextProvider* provider)
: Stub_(holder.GetOrCreateStub<TStub>())
@@ -1316,47 +1316,47 @@ private:
}
std::shared_ptr<TStub> Stub_;
- IQueueClientContextProvider* Provider_;
+ IQueueClientContextProvider* Provider_;
};
class TGRpcClientLow
: public IQueueClientContextProvider
{
- class TContextImpl;
- friend class TContextImpl;
-
- enum ECqState : TAtomicBase {
- WORKING = 0,
- STOP_SILENT = 1,
- STOP_EXPLICIT = 2,
- };
-
+ class TContextImpl;
+ friend class TContextImpl;
+
+ enum ECqState : TAtomicBase {
+ WORKING = 0,
+ STOP_SILENT = 1,
+ STOP_EXPLICIT = 2,
+ };
+
public:
explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
~TGRpcClientLow();
- // Tries to stop all currently running requests (via their stop callbacks)
- // Will shutdown CQ and drain events once all requests have finished
- // No new requests may be started after this call
- void Stop(bool wait = false);
-
- // Waits until all currently running requests finish execution
- void WaitIdle();
-
- inline bool IsStopping() const {
- switch (GetCqState()) {
- case WORKING:
- return false;
- case STOP_SILENT:
- case STOP_EXPLICIT:
- return true;
- }
-
- Y_UNREACHABLE();
- }
-
- IQueueClientContextPtr CreateContext() override;
-
+ // Tries to stop all currently running requests (via their stop callbacks)
+ // Will shutdown CQ and drain events once all requests have finished
+ // No new requests may be started after this call
+ void Stop(bool wait = false);
+
+ // Waits until all currently running requests finish execution
+ void WaitIdle();
+
+ inline bool IsStopping() const {
+ switch (GetCqState()) {
+ case WORKING:
+ return false;
+ case STOP_SILENT:
+ case STOP_EXPLICIT:
+ return true;
+ }
+
+ Y_UNREACHABLE();
+ }
+
+ IQueueClientContextPtr CreateContext() override;
+
template<typename TGRpcService>
std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {
return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
@@ -1367,32 +1367,32 @@ public:
return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));
}
- // Tests only, not thread-safe
- void AddWorkerThreadForTest();
-
+ // Tests only, not thread-safe
+ void AddWorkerThreadForTest();
+
private:
using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;
using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
void Init(size_t numWorkerThread);
- inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
- inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
-
- void StopInternal(bool silent);
- void WaitInternal();
-
- void ForgetContext(TContextImpl* context);
-
-private:
+ inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
+ inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
+
+ void StopInternal(bool silent);
+ void WaitInternal();
+
+ void ForgetContext(TContextImpl* context);
+
+private:
bool UseCompletionQueuePerThread_;
std::vector<CompletionQueueRef> CQS_;
std::vector<IThreadRef> WorkerThreads_;
- TAtomic CqState_ = -1;
-
+ TAtomic CqState_ = -1;
+
std::mutex Mtx_;
std::condition_variable ContextsEmpty_;
std::unordered_set<TContextImpl*> Contexts_;
-
+
std::mutex JoinMutex_;
};