diff options
author | lo-r-d <lo-r-d@yandex-team.com> | 2025-01-20 15:44:08 +0300 |
---|---|---|
committer | lo-r-d <lo-r-d@yandex-team.com> | 2025-01-20 15:59:28 +0300 |
commit | 13b3cd284513a5d3d21e207821e73f1dd58988b7 (patch) | |
tree | 727b844c2fd742165394c3e3cd4c1fbe0a3842ba /library/cpp/unified_agent_client/stream.cpp | |
parent | f7afbbd9f19b8800552a7b50e6e9d0f7629ebfff (diff) | |
download | ydb-13b3cd284513a5d3d21e207821e73f1dd58988b7.tar.gz |
Add UnifiedAgentWriterFactory
commit_hash:af6dedadd4d7fe292bcb7a8b6de366aff4e630b1
Diffstat (limited to 'library/cpp/unified_agent_client/stream.cpp')
-rw-r--r-- | library/cpp/unified_agent_client/stream.cpp | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/stream.cpp b/library/cpp/unified_agent_client/stream.cpp new file mode 100644 index 0000000000..df0995b537 --- /dev/null +++ b/library/cpp/unified_agent_client/stream.cpp @@ -0,0 +1,89 @@ +#include "stream.h" + +namespace NUnifiedAgent { + namespace { + class TDefaultStreamRecordConverter : public IStreamRecordConverter { + public: + TDefaultStreamRecordConverter(bool stripTrailingNewLine) + : StripTrailingNewLine(stripTrailingNewLine) + { + } + + TClientMessage Convert(const void* buf, size_t len) const override { + TStringBuf str(static_cast<const char*>(buf), len); + if (StripTrailingNewLine) { + str.ChopSuffix("\n"); + } + return { + TString(str), + {} + }; + } + + private: + const bool StripTrailingNewLine; + }; + + class TClientSessionAdapter: public IOutputStream { + public: + explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter) + : Session(session) + , RecordConverter(std::move(recordConverter)) + { + } + + void DoWrite(const void* buf, size_t len) override { + Session->Send(RecordConverter->Convert(buf, len)); + } + + void DoFlush() override { + } + + private: + TClientSessionPtr Session; + THolder<IStreamRecordConverter> RecordConverter; + }; + + class TSessionHolder { + protected: + TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters) + : Client(MakeClient(parameters)) + , Session(Client->CreateSession(sessionParameters)) + { + } + + protected: + TClientPtr Client; + TClientSessionPtr Session; + }; + + class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter { + public: + TAgentOutputStream(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IStreamRecordConverter> recordConverter) + : TSessionHolder(parameters, sessionParameters) + , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter)) + { + } + + ~TAgentOutputStream() override { + TSessionHolder::Session->Close(); + } + }; + } + + THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) { + return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine); + } + + THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IStreamRecordConverter> recordConverter) + { + if (!recordConverter) { + recordConverter = MakeDefaultStreamRecordConverter(); + } + return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter)); + } +} |