blob: 3d6af820271945de0e67276497c1e1ef5827412d (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
#pragma once
#include <ydb/core/protos/pqconfig.pb.h>
#include <library/cpp/string_utils/base64/base64.h>
#include <util/datetime/base.h>
namespace NKikimr::NDataStreams::V1 {
class TNextToken {
public:
static constexpr ui32 LIFETIME_MS = 300*1000;
TNextToken(const TString& nextToken)
: Valid{true}
{
try {
TString decoded;
Base64Decode(nextToken, decoded);
auto ok = Proto.ParseFromString(decoded);
if (ok) {
Valid = IsAlive(TInstant::Now().MilliSeconds());
} else {
Valid = false;
}
} catch (std::exception&) {
Valid = false;
}
}
TNextToken(const TString& streamArn, ui32 alreadyRead, ui32 maxResults, ui64 creationTimestamp)
: Valid{true}
{
Proto.SetStreamArn(streamArn);
Proto.SetAlreadyRead(alreadyRead);
Proto.SetMaxResults(maxResults);
Proto.SetCreationTimestamp(creationTimestamp);
}
TString Serialize() const {
TString data;
bool result = Proto.SerializeToString(&data);
Y_VERIFY(result);
TString encoded;
Base64Encode(data, encoded);
return encoded;
}
ui32 GetAlreadyRead() const {
return Proto.GetAlreadyRead();
}
TString GetStreamArn() const {
return Proto.GetStreamArn();
}
TString GetStreamName() const {
return Proto.GetStreamArn();
}
ui32 GetMaxResults() const {
return Proto.GetMaxResults();
}
ui32 GetCreationTimestamp() const {
return Proto.GetCreationTimestamp();
}
bool IsAlive(ui64 now) const {
return now >= Proto.GetCreationTimestamp() &&
(now - Proto.GetCreationTimestamp()) < LIFETIME_MS;
}
bool IsValid() const {
return Valid && Proto.GetStreamArn().size() > 0;
}
private:
bool Valid;
NKikimrPQ::TYdsNextToken Proto;
};
} // namespace NKikimr::NDataStreams::V1
|