aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client/stream.cpp
blob: df0995b537db7355b60327ab1ddd7fbf74f193b8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include "stream.h"

namespace NUnifiedAgent {
    namespace {
        class TDefaultStreamRecordConverter : public IStreamRecordConverter {
        public:
            TDefaultStreamRecordConverter(bool stripTrailingNewLine)
                : StripTrailingNewLine(stripTrailingNewLine)
            {
            }

            TClientMessage Convert(const void* buf, size_t len) const override {
                TStringBuf str(static_cast<const char*>(buf), len);
                if (StripTrailingNewLine) {
                    str.ChopSuffix("\n");
                }
                return {
                    TString(str),
                    {}
                };
            }

        private:
            const bool StripTrailingNewLine;
        };

        class TClientSessionAdapter: public IOutputStream {
        public:
            explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter)
                : Session(session)
                , RecordConverter(std::move(recordConverter))
            {
            }

            void DoWrite(const void* buf, size_t len) override {
                Session->Send(RecordConverter->Convert(buf, len));
            }

            void DoFlush() override {
            }

        private:
            TClientSessionPtr Session;
            THolder<IStreamRecordConverter> RecordConverter;
        };

        class TSessionHolder {
        protected:
            TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
                : Client(MakeClient(parameters))
                , Session(Client->CreateSession(sessionParameters))
            {
            }

        protected:
            TClientPtr Client;
            TClientSessionPtr Session;
        };

        class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter {
        public:
            TAgentOutputStream(const TClientParameters& parameters,
                const TSessionParameters& sessionParameters,
                THolder<IStreamRecordConverter> recordConverter)
                : TSessionHolder(parameters, sessionParameters)
                , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
            {
            }

            ~TAgentOutputStream() override {
                TSessionHolder::Session->Close();
            }
        };
    }

    THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) {
        return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine);
    }

    THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters,
        const TSessionParameters& sessionParameters,
        THolder<IStreamRecordConverter> recordConverter)
    {
        if (!recordConverter) {
            recordConverter = MakeDefaultStreamRecordConverter();
        }
        return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter));
    }
}