blob: 5d57364b367c39e1ce43d5448f8ba60166d34ab4 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#include "watermark_tracker.h"
#include<util/system/yassert.h>
namespace NKikimr {
namespace NMiniKQL {
TWatermarkTracker::TWatermarkTracker(
ui64 delay,
ui64 granularity)
: Delay(delay)
, Granularity(granularity)
{
Y_ABORT_UNLESS(granularity > 0);
}
std::optional<ui64> TWatermarkTracker::HandleNextEventTime(ui64 ts) {
if (Y_UNLIKELY(ts >= NextEventWithWatermark)) {
NextEventWithWatermark = CalcNextEventWithWatermark(ts);
return CalcLastWatermark();
}
return std::nullopt;
}
ui64 TWatermarkTracker::CalcNextEventWithWatermark(ui64 ts) {
return ts + Granularity - (ts - Delay) % Granularity;
}
std::optional<ui64> TWatermarkTracker::CalcLastWatermark() {
if (Y_UNLIKELY(Delay + Granularity > NextEventWithWatermark)) {
// Protect from negative values
return std::nullopt;
}
return NextEventWithWatermark - Delay - Granularity;
}
} // NMiniKQL
} // NKikimr
|