aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/watermark_tracker.cpp
blob: 5d57364b367c39e1ce43d5448f8ba60166d34ab4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "watermark_tracker.h"

#include<util/system/yassert.h>

namespace NKikimr {
namespace NMiniKQL {

TWatermarkTracker::TWatermarkTracker(
    ui64 delay,
    ui64 granularity)
    : Delay(delay)
    , Granularity(granularity)
{
    Y_ABORT_UNLESS(granularity > 0);
}

std::optional<ui64> TWatermarkTracker::HandleNextEventTime(ui64 ts) {
    if (Y_UNLIKELY(ts >= NextEventWithWatermark)) {
        NextEventWithWatermark = CalcNextEventWithWatermark(ts);
        return CalcLastWatermark();
    }

    return std::nullopt;
}

ui64 TWatermarkTracker::CalcNextEventWithWatermark(ui64 ts) {
    return ts + Granularity - (ts - Delay) % Granularity;
}

std::optional<ui64> TWatermarkTracker::CalcLastWatermark() {
    if (Y_UNLIKELY(Delay + Granularity > NextEventWithWatermark)) {
        // Protect from negative values
        return std::nullopt;
    }
    return NextEventWithWatermark - Delay - Granularity;
}

} // NMiniKQL
} // NKikimr