aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client/stream.cpp
diff options
context:
space:
mode:
authorlo-r-d <lo-r-d@yandex-team.com>2025-01-20 15:44:08 +0300
committerlo-r-d <lo-r-d@yandex-team.com>2025-01-20 15:59:28 +0300
commit13b3cd284513a5d3d21e207821e73f1dd58988b7 (patch)
tree727b844c2fd742165394c3e3cd4c1fbe0a3842ba /library/cpp/unified_agent_client/stream.cpp
parentf7afbbd9f19b8800552a7b50e6e9d0f7629ebfff (diff)
downloadydb-13b3cd284513a5d3d21e207821e73f1dd58988b7.tar.gz
Add UnifiedAgentWriterFactory
commit_hash:af6dedadd4d7fe292bcb7a8b6de366aff4e630b1
Diffstat (limited to 'library/cpp/unified_agent_client/stream.cpp')
-rw-r--r--library/cpp/unified_agent_client/stream.cpp89
1 files changed, 89 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/stream.cpp b/library/cpp/unified_agent_client/stream.cpp
new file mode 100644
index 0000000000..df0995b537
--- /dev/null
+++ b/library/cpp/unified_agent_client/stream.cpp
@@ -0,0 +1,89 @@
+#include "stream.h"
+
+namespace NUnifiedAgent {
+ namespace {
+ class TDefaultStreamRecordConverter : public IStreamRecordConverter {
+ public:
+ TDefaultStreamRecordConverter(bool stripTrailingNewLine)
+ : StripTrailingNewLine(stripTrailingNewLine)
+ {
+ }
+
+ TClientMessage Convert(const void* buf, size_t len) const override {
+ TStringBuf str(static_cast<const char*>(buf), len);
+ if (StripTrailingNewLine) {
+ str.ChopSuffix("\n");
+ }
+ return {
+ TString(str),
+ {}
+ };
+ }
+
+ private:
+ const bool StripTrailingNewLine;
+ };
+
+ class TClientSessionAdapter: public IOutputStream {
+ public:
+ explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter)
+ : Session(session)
+ , RecordConverter(std::move(recordConverter))
+ {
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ Session->Send(RecordConverter->Convert(buf, len));
+ }
+
+ void DoFlush() override {
+ }
+
+ private:
+ TClientSessionPtr Session;
+ THolder<IStreamRecordConverter> RecordConverter;
+ };
+
+ class TSessionHolder {
+ protected:
+ TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
+ : Client(MakeClient(parameters))
+ , Session(Client->CreateSession(sessionParameters))
+ {
+ }
+
+ protected:
+ TClientPtr Client;
+ TClientSessionPtr Session;
+ };
+
+ class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter {
+ public:
+ TAgentOutputStream(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IStreamRecordConverter> recordConverter)
+ : TSessionHolder(parameters, sessionParameters)
+ , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
+ {
+ }
+
+ ~TAgentOutputStream() override {
+ TSessionHolder::Session->Close();
+ }
+ };
+ }
+
+ THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) {
+ return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine);
+ }
+
+ THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IStreamRecordConverter> recordConverter)
+ {
+ if (!recordConverter) {
+ recordConverter = MakeDefaultStreamRecordConverter();
+ }
+ return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter));
+ }
+}