aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client/grpc_io.h
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/unified_agent_client/grpc_io.h
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/unified_agent_client/grpc_io.h')
-rw-r--r--library/cpp/unified_agent_client/grpc_io.h141
1 files changed, 141 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/grpc_io.h b/library/cpp/unified_agent_client/grpc_io.h
new file mode 100644
index 0000000000..5f368a5943
--- /dev/null
+++ b/library/cpp/unified_agent_client/grpc_io.h
@@ -0,0 +1,141 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/async_joiner.h>
+#include <library/cpp/unified_agent_client/f_maybe.h>
+
+#include <contrib/libs/grpc/include/grpcpp/alarm.h>
+#include <contrib/libs/grpc/include/grpc++/grpc++.h>
+
+#include <thread>
+
+struct grpc_cq_completion;
+
+namespace NUnifiedAgent {
+ enum class EIOStatus {
+ Ok,
+ Error
+ };
+
+ class IIOCallback {
+ public:
+ virtual ~IIOCallback() = default;
+
+ virtual IIOCallback* Ref() = 0;
+
+ virtual void OnIOCompleted(EIOStatus status) = 0;
+ };
+
+ template<typename TCallback, typename TCounter>
+ class TIOCallback: public IIOCallback {
+ public:
+ explicit TIOCallback(TCallback&& callback, TCounter* counter)
+ : Callback(std::move(callback))
+ , Counter(counter)
+ {
+ }
+
+ IIOCallback* Ref() override {
+ Counter->Ref();
+ return this;
+ }
+
+ void OnIOCompleted(EIOStatus status) override {
+ Callback(status);
+ Counter->UnRef();
+ }
+
+ private:
+ TCallback Callback;
+ TCounter* Counter;
+ };
+
+ template<typename TCallback, typename TCounter>
+ THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) {
+ return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter);
+ }
+
+ template<typename TTarget, typename TCounter = TTarget>
+ THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus),
+ TCounter* counter = nullptr)
+ {
+ return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); },
+ counter ? counter : target);
+ }
+
+ class TGrpcNotification: private ::grpc::internal::CompletionQueueTag {
+ public:
+ TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
+
+ ~TGrpcNotification();
+
+ void Trigger();
+
+ private:
+ bool FinalizeResult(void** tag, bool* status) override;
+
+ private:
+ grpc::CompletionQueue& CompletionQueue;
+ THolder<IIOCallback> IOCallback;
+ THolder<grpc_cq_completion> Completion;
+ std::atomic<bool> InQueue;
+ };
+
+ class TGrpcTimer: private IIOCallback {
+ public:
+ TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
+
+ void Set(TInstant triggerTime);
+
+ void Cancel();
+
+ private:
+ IIOCallback* Ref() override;
+
+ void OnIOCompleted(EIOStatus status) override;
+
+ private:
+ grpc::CompletionQueue& CompletionQueue;
+ THolder<IIOCallback> IOCallback;
+ grpc::Alarm Alarm;
+ bool AlarmIsSet;
+ TFMaybe<TInstant> NextTriggerTime;
+ };
+
+ class TGrpcCompletionQueuePoller {
+ public:
+ explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue);
+
+ void Start();
+
+ void Join();
+
+ private:
+ grpc::CompletionQueue& Queue;
+ std::thread Thread;
+ };
+
+ class TGrpcCompletionQueueHost {
+ public:
+ TGrpcCompletionQueueHost();
+
+ void Start();
+
+ void Stop();
+
+ inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
+ return CompletionQueue;
+ }
+
+ private:
+ grpc::CompletionQueue CompletionQueue;
+ TGrpcCompletionQueuePoller Poller;
+ };
+
+ gpr_timespec InstantToTimespec(TInstant instant);
+
+ void EnsureGrpcConfigured();
+
+ void StartGrpcTracing();
+
+ void FinishGrpcTracing();
+}