blob: 3a0a02604ee6d9fd487611b37bccbe61aaf1891c (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#include "progress_merger.h"
namespace NYT::NYqlPlugin {
//////////////////////////////////////////////////////////////////////////////
TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p)
: TNodeProgressBase(p)
{}
void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const
{
writer.OnBeginMap();
{
writer.OnKeyedItem("category");
writer.OnStringScalar(Progress_.Category);
writer.OnKeyedItem("state");
writer.OnStringScalar(ToString(Progress_.State));
writer.OnKeyedItem("remoteId");
writer.OnStringScalar(Progress_.RemoteId);
writer.OnKeyedItem("stages");
writer.OnBeginMap();
for (size_t index = 0; index < Stages_.size(); index++) {
writer.OnKeyedItem(ToString(index));
writer.OnBeginMap();
{
writer.OnKeyedItem(Stages_[index].first);
writer.OnStringScalar(Stages_[index].second.ToString());
}
writer.OnEndMap();
}
writer.OnEndMap();
if (Progress_.Counters) {
writer.OnKeyedItem("completed");
writer.OnUint64Scalar(Progress_.Counters->Completed);
writer.OnKeyedItem("running");
writer.OnUint64Scalar(Progress_.Counters->Running);
writer.OnKeyedItem("total");
writer.OnUint64Scalar(Progress_.Counters->Total);
writer.OnKeyedItem("aborted");
writer.OnUint64Scalar(Progress_.Counters->Aborted);
writer.OnKeyedItem("failed");
writer.OnUint64Scalar(Progress_.Counters->Failed);
writer.OnKeyedItem("lost");
writer.OnUint64Scalar(Progress_.Counters->Lost);
writer.OnKeyedItem("pending");
writer.OnUint64Scalar(Progress_.Counters->Pending);
}
writer.OnKeyedItem("startedAt");
writer.OnStringScalar(StartedAt_.ToString());
if (FinishedAt_ != TInstant::Max()) {
writer.OnKeyedItem("finishedAt");
writer.OnStringScalar(FinishedAt_.ToString());
}
}
writer.OnEndMap();
}
//////////////////////////////////////////////////////////////////////////////
void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress)
{
auto in = NodesMap_.emplace(progress.Id, progress);
if (!in.second) {
in.first->second.MergeWith(progress);
}
HasChanges_ = true;
}
void TProgressMerger::AbortAllUnfinishedNodes()
{
for (auto& node: NodesMap_) {
if (node.second.IsUnfinished()) {
node.second.Abort();
HasChanges_ = true;
}
}
}
TString TProgressMerger::ToYsonString()
{
TStringStream yson;
::NYson::TYsonWriter writer(&yson);
writer.OnBeginMap();
for (auto& node: NodesMap_) {
writer.OnKeyedItem(ToString(node.first));
node.second.Serialize(writer);
}
writer.OnEndMap();
HasChanges_ = false;
return yson.Str();
}
bool TProgressMerger::HasChangesSinceLastFlush() const
{
return HasChanges_;
}
//////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NYqlPlugin
|