aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/watermark_tracker.cpp
blob: 8c359773b96bdbb07d88a3e89eeed0952a72bc3e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "watermark_tracker.h"

#include<util/system/yassert.h>

namespace NKikimr {
namespace NMiniKQL {

TWatermarkTracker::TWatermarkTracker(
    ui64 delay,
    ui64 granularity)
    : Delay_(delay)
    , Granularity_(granularity)
{
    Y_ABORT_UNLESS(granularity > 0);
}

std::optional<ui64> TWatermarkTracker::HandleNextEventTime(ui64 ts) {
    if (Y_UNLIKELY(ts >= NextEventWithWatermark_)) {
        NextEventWithWatermark_ = CalcNextEventWithWatermark(ts);
        return CalcLastWatermark();
    }

    return std::nullopt;
}

ui64 TWatermarkTracker::CalcNextEventWithWatermark(ui64 ts) {
    return ts + Granularity_ - (ts - Delay_) % Granularity_;
}

std::optional<ui64> TWatermarkTracker::CalcLastWatermark() {
    if (Y_UNLIKELY(Delay_ + Granularity_ > NextEventWithWatermark_)) {
        // Protect from negative values
        return std::nullopt;
    }
    return NextEventWithWatermark_ - Delay_ - Granularity_;
}

} // NMiniKQL
} // NKikimr