diff options
| author | Alexander Smirnov <[email protected]> | 2025-01-21 22:25:25 +0000 |
|---|---|---|
| committer | Alexander Smirnov <[email protected]> | 2025-01-21 22:25:25 +0000 |
| commit | f64d8ea3633ca6e8062dd5cd795e72f107353854 (patch) | |
| tree | 504421e3c33bca6b195d2edc1d0e2dafe781b8b5 /library/cpp | |
| parent | caa23328d636860e580cd9523e82a57df709968a (diff) | |
| parent | e6da128e02a23efc5e5168df795c037b49a7bde7 (diff) | |
Merge pull request #13603 from ydb-platform/merge-libs-250121-0019
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/unified_agent_client/stream.cpp | 89 | ||||
| -rw-r--r-- | library/cpp/unified_agent_client/stream.h | 18 | ||||
| -rw-r--r-- | library/cpp/unified_agent_client/ya.make | 1 |
3 files changed, 108 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/stream.cpp b/library/cpp/unified_agent_client/stream.cpp new file mode 100644 index 00000000000..df0995b537d --- /dev/null +++ b/library/cpp/unified_agent_client/stream.cpp @@ -0,0 +1,89 @@ +#include "stream.h" + +namespace NUnifiedAgent { + namespace { + class TDefaultStreamRecordConverter : public IStreamRecordConverter { + public: + TDefaultStreamRecordConverter(bool stripTrailingNewLine) + : StripTrailingNewLine(stripTrailingNewLine) + { + } + + TClientMessage Convert(const void* buf, size_t len) const override { + TStringBuf str(static_cast<const char*>(buf), len); + if (StripTrailingNewLine) { + str.ChopSuffix("\n"); + } + return { + TString(str), + {} + }; + } + + private: + const bool StripTrailingNewLine; + }; + + class TClientSessionAdapter: public IOutputStream { + public: + explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter) + : Session(session) + , RecordConverter(std::move(recordConverter)) + { + } + + void DoWrite(const void* buf, size_t len) override { + Session->Send(RecordConverter->Convert(buf, len)); + } + + void DoFlush() override { + } + + private: + TClientSessionPtr Session; + THolder<IStreamRecordConverter> RecordConverter; + }; + + class TSessionHolder { + protected: + TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters) + : Client(MakeClient(parameters)) + , Session(Client->CreateSession(sessionParameters)) + { + } + + protected: + TClientPtr Client; + TClientSessionPtr Session; + }; + + class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter { + public: + TAgentOutputStream(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IStreamRecordConverter> recordConverter) + : TSessionHolder(parameters, sessionParameters) + , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter)) + { + } + + ~TAgentOutputStream() override { + TSessionHolder::Session->Close(); + } + }; + } + + THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) { + return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine); + } + + THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IStreamRecordConverter> recordConverter) + { + if (!recordConverter) { + recordConverter = MakeDefaultStreamRecordConverter(); + } + return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter)); + } +} diff --git a/library/cpp/unified_agent_client/stream.h b/library/cpp/unified_agent_client/stream.h new file mode 100644 index 00000000000..5f65d9fb2e0 --- /dev/null +++ b/library/cpp/unified_agent_client/stream.h @@ -0,0 +1,18 @@ +#pragma once + +#include <library/cpp/unified_agent_client/client.h> + +namespace NUnifiedAgent { + class IStreamRecordConverter { + public: + virtual ~IStreamRecordConverter() = default; + + virtual TClientMessage Convert(const void* buf, size_t len) const = 0; + }; + + THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine = true); + + THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters, + const TSessionParameters& sessionParameters = {}, + THolder<IStreamRecordConverter> recordConverter = {}); +} diff --git a/library/cpp/unified_agent_client/ya.make b/library/cpp/unified_agent_client/ya.make index 6771471974f..eadd9e94364 100644 --- a/library/cpp/unified_agent_client/ya.make +++ b/library/cpp/unified_agent_client/ya.make @@ -16,6 +16,7 @@ SRCS( clock.cpp duration_counter.cpp logger.cpp + stream.cpp throttling.cpp proto_weighing.cpp GLOBAL registrar.cpp |
