blob: 8c359773b96bdbb07d88a3e89eeed0952a72bc3e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#include "watermark_tracker.h"
#include<util/system/yassert.h>
namespace NKikimr {
namespace NMiniKQL {
TWatermarkTracker::TWatermarkTracker(
ui64 delay,
ui64 granularity)
: Delay_(delay)
, Granularity_(granularity)
{
Y_ABORT_UNLESS(granularity > 0);
}
std::optional<ui64> TWatermarkTracker::HandleNextEventTime(ui64 ts) {
if (Y_UNLIKELY(ts >= NextEventWithWatermark_)) {
NextEventWithWatermark_ = CalcNextEventWithWatermark(ts);
return CalcLastWatermark();
}
return std::nullopt;
}
ui64 TWatermarkTracker::CalcNextEventWithWatermark(ui64 ts) {
return ts + Granularity_ - (ts - Delay_) % Granularity_;
}
std::optional<ui64> TWatermarkTracker::CalcLastWatermark() {
if (Y_UNLIKELY(Delay_ + Granularity_ > NextEventWithWatermark_)) {
// Protect from negative values
return std::nullopt;
}
return NextEventWithWatermark_ - Delay_ - Granularity_;
}
} // NMiniKQL
} // NKikimr
|