1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
#include "stream.h"
namespace NUnifiedAgent {
namespace {
class TDefaultStreamRecordConverter : public IStreamRecordConverter {
public:
TDefaultStreamRecordConverter(bool stripTrailingNewLine)
: StripTrailingNewLine(stripTrailingNewLine)
{
}
TClientMessage Convert(const void* buf, size_t len) const override {
TStringBuf str(static_cast<const char*>(buf), len);
if (StripTrailingNewLine) {
str.ChopSuffix("\n");
}
return {
TString(str),
{}
};
}
private:
const bool StripTrailingNewLine;
};
class TClientSessionAdapter: public IOutputStream {
public:
explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter)
: Session(session)
, RecordConverter(std::move(recordConverter))
{
}
void DoWrite(const void* buf, size_t len) override {
Session->Send(RecordConverter->Convert(buf, len));
}
void DoFlush() override {
}
private:
TClientSessionPtr Session;
THolder<IStreamRecordConverter> RecordConverter;
};
class TSessionHolder {
protected:
TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
: Client(MakeClient(parameters))
, Session(Client->CreateSession(sessionParameters))
{
}
protected:
TClientPtr Client;
TClientSessionPtr Session;
};
class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter {
public:
TAgentOutputStream(const TClientParameters& parameters,
const TSessionParameters& sessionParameters,
THolder<IStreamRecordConverter> recordConverter)
: TSessionHolder(parameters, sessionParameters)
, TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
{
}
~TAgentOutputStream() override {
TSessionHolder::Session->Close();
}
};
}
THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) {
return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine);
}
THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters,
const TSessionParameters& sessionParameters,
THolder<IStreamRecordConverter> recordConverter)
{
if (!recordConverter) {
recordConverter = MakeDefaultStreamRecordConverter();
}
return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter));
}
}
|