aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql/plugin/native/progress_merger.cpp
blob: 3a0a02604ee6d9fd487611b37bccbe61aaf1891c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include "progress_merger.h"


namespace NYT::NYqlPlugin {

//////////////////////////////////////////////////////////////////////////////

TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p)
    : TNodeProgressBase(p)
{}

void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const
{
    writer.OnBeginMap();
    {
        writer.OnKeyedItem("category");
        writer.OnStringScalar(Progress_.Category);

        writer.OnKeyedItem("state");
        writer.OnStringScalar(ToString(Progress_.State));

        writer.OnKeyedItem("remoteId");
        writer.OnStringScalar(Progress_.RemoteId);

        writer.OnKeyedItem("stages");
        writer.OnBeginMap();
        for (size_t index = 0; index < Stages_.size(); index++) {
            writer.OnKeyedItem(ToString(index));
            writer.OnBeginMap();
            {
                writer.OnKeyedItem(Stages_[index].first);
                writer.OnStringScalar(Stages_[index].second.ToString());
            }
            writer.OnEndMap();
        }
        writer.OnEndMap();

        if (Progress_.Counters) {
            writer.OnKeyedItem("completed");
            writer.OnUint64Scalar(Progress_.Counters->Completed);

            writer.OnKeyedItem("running");
            writer.OnUint64Scalar(Progress_.Counters->Running);

            writer.OnKeyedItem("total");
            writer.OnUint64Scalar(Progress_.Counters->Total);

            writer.OnKeyedItem("aborted");
            writer.OnUint64Scalar(Progress_.Counters->Aborted);

            writer.OnKeyedItem("failed");
            writer.OnUint64Scalar(Progress_.Counters->Failed);

            writer.OnKeyedItem("lost");
            writer.OnUint64Scalar(Progress_.Counters->Lost);

            writer.OnKeyedItem("pending");
            writer.OnUint64Scalar(Progress_.Counters->Pending);
        }

        writer.OnKeyedItem("startedAt");
        writer.OnStringScalar(StartedAt_.ToString());

        if (FinishedAt_ != TInstant::Max()) {
            writer.OnKeyedItem("finishedAt");
            writer.OnStringScalar(FinishedAt_.ToString());
        }
    }
    writer.OnEndMap();
}

//////////////////////////////////////////////////////////////////////////////

void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress)
{
    auto in = NodesMap_.emplace(progress.Id, progress);
    if (!in.second) {
        in.first->second.MergeWith(progress);
    }
    HasChanges_ = true;
}

void TProgressMerger::AbortAllUnfinishedNodes()
{
    for (auto& node: NodesMap_) {
        if (node.second.IsUnfinished()) {
            node.second.Abort();
            HasChanges_ = true;
        }
    }
}

TString TProgressMerger::ToYsonString()
{
    TStringStream yson;
    ::NYson::TYsonWriter writer(&yson);

    writer.OnBeginMap();
    for (auto& node: NodesMap_) {
        writer.OnKeyedItem(ToString(node.first));
        node.second.Serialize(writer);
    }
    writer.OnEndMap();
    HasChanges_ = false;

    return yson.Str();
}

bool TProgressMerger::HasChangesSinceLastFlush() const
{
    return HasChanges_;
}

//////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NYqlPlugin