diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/unified_agent_client/grpc_io.h | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/unified_agent_client/grpc_io.h')
-rw-r--r-- | library/cpp/unified_agent_client/grpc_io.h | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/grpc_io.h b/library/cpp/unified_agent_client/grpc_io.h new file mode 100644 index 0000000000..5f368a5943 --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_io.h @@ -0,0 +1,141 @@ +#pragma once + +#include <library/cpp/unified_agent_client/async_joiner.h> +#include <library/cpp/unified_agent_client/f_maybe.h> + +#include <contrib/libs/grpc/include/grpcpp/alarm.h> +#include <contrib/libs/grpc/include/grpc++/grpc++.h> + +#include <thread> + +struct grpc_cq_completion; + +namespace NUnifiedAgent { + enum class EIOStatus { + Ok, + Error + }; + + class IIOCallback { + public: + virtual ~IIOCallback() = default; + + virtual IIOCallback* Ref() = 0; + + virtual void OnIOCompleted(EIOStatus status) = 0; + }; + + template<typename TCallback, typename TCounter> + class TIOCallback: public IIOCallback { + public: + explicit TIOCallback(TCallback&& callback, TCounter* counter) + : Callback(std::move(callback)) + , Counter(counter) + { + } + + IIOCallback* Ref() override { + Counter->Ref(); + return this; + } + + void OnIOCompleted(EIOStatus status) override { + Callback(status); + Counter->UnRef(); + } + + private: + TCallback Callback; + TCounter* Counter; + }; + + template<typename TCallback, typename TCounter> + THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) { + return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter); + } + + template<typename TTarget, typename TCounter = TTarget> + THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus), + TCounter* counter = nullptr) + { + return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); }, + counter ? counter : target); + } + + class TGrpcNotification: private ::grpc::internal::CompletionQueueTag { + public: + TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback); + + ~TGrpcNotification(); + + void Trigger(); + + private: + bool FinalizeResult(void** tag, bool* status) override; + + private: + grpc::CompletionQueue& CompletionQueue; + THolder<IIOCallback> IOCallback; + THolder<grpc_cq_completion> Completion; + std::atomic<bool> InQueue; + }; + + class TGrpcTimer: private IIOCallback { + public: + TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback); + + void Set(TInstant triggerTime); + + void Cancel(); + + private: + IIOCallback* Ref() override; + + void OnIOCompleted(EIOStatus status) override; + + private: + grpc::CompletionQueue& CompletionQueue; + THolder<IIOCallback> IOCallback; + grpc::Alarm Alarm; + bool AlarmIsSet; + TFMaybe<TInstant> NextTriggerTime; + }; + + class TGrpcCompletionQueuePoller { + public: + explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue); + + void Start(); + + void Join(); + + private: + grpc::CompletionQueue& Queue; + std::thread Thread; + }; + + class TGrpcCompletionQueueHost { + public: + TGrpcCompletionQueueHost(); + + void Start(); + + void Stop(); + + inline grpc::CompletionQueue& GetCompletionQueue() noexcept { + return CompletionQueue; + } + + private: + grpc::CompletionQueue CompletionQueue; + TGrpcCompletionQueuePoller Poller; + }; + + gpr_timespec InstantToTimespec(TInstant instant); + + void EnsureGrpcConfigured(); + + void StartGrpcTracing(); + + void FinishGrpcTracing(); +} |