aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/core/progress_merger/progress_merger.cpp
blob: 9005b689f181810415f2cd5ca2a6700765df6e6a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include "progress_merger.h"


namespace NYql::NProgressMerger {

//////////////////////////////////////////////////////////////////////////////
// TNodeProgressBase
//////////////////////////////////////////////////////////////////////////////

TNodeProgressBase::TNodeProgressBase(const TOperationProgress& p)
    : Progress_(p)
    , StartedAt_(TInstant::Now())
    , FinishedAt_(p.State == EState::Finished ? StartedAt_ : TInstant::Max())
    , Dirty_(true)
{
    if (!p.Stage.first.empty()) {
        Stages_.emplace_back(p.Stage);
    }
}

TNodeProgressBase::TNodeProgressBase(
    const TOperationProgress& p,
    TInstant startedAt,
    TInstant finishedAt,
    const TVector<TOperationProgress::TStage>& stages)
    : Progress_(p)
    , StartedAt_(startedAt)
    , FinishedAt_(finishedAt)
    , Stages_(stages)
    , Dirty_(true)
{}


bool TNodeProgressBase::MergeWith(const TOperationProgress& p) {
    bool dirty = false;

    // (1) remote id
    if (!p.RemoteId.empty() && p.RemoteId != Progress_.RemoteId) {
        Progress_.RemoteId = p.RemoteId;
        dirty = true;
    }

    // (2) state
    if (p.State != Progress_.State) {
        Progress_.State = p.State;
        dirty = true;
    }

    // (3) counters
    if (p.Counters && (!Progress_.Counters || *p.Counters != *Progress_.Counters)) {
        Progress_.Counters = p.Counters;
        dirty = true;
    }

    // (4) finished time
    if (Progress_.State == EState::Finished) {
        FinishedAt_ = TInstant::Now();
        dirty = true;
    }

    // (5) stage
    if (!p.Stage.first.empty() &&  Progress_.Stage != p.Stage) {
        Progress_.Stage = p.Stage;
        Stages_.push_back(p.Stage);
        dirty = true;
    }

    // (6) remote data
    if (!p.RemoteData.empty() && p.RemoteData != Progress_.RemoteData) {
        Progress_.RemoteData = p.RemoteData;
        dirty = true;
    }
    return Dirty_ = dirty;
}

void TNodeProgressBase::Abort() {
    Progress_.State = EState::Aborted;
    FinishedAt_ = TInstant::Now();
    Dirty_ = true;
}

bool TNodeProgressBase::IsUnfinished() const {
    return Progress_.State == EState::Started ||
            Progress_.State == EState::InProgress;
}

bool TNodeProgressBase::IsDirty() const {
    return Dirty_;
}

void TNodeProgressBase::SetDirty(bool dirty) {
    Dirty_ = dirty;
}

} // namespace NYql::NProgressMerger