aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/tx/datashard/change_record.h
blob: ae3ea886634b00a0c2d8c5991a92decbd676c9bb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#pragma once

#include "datashard_user_table.h"

#include <library/cpp/json/json_value.h>

#include <ydb/core/base/pathid.h>
#include <ydb/core/scheme/scheme_tablecell.h>

#include <util/generic/maybe.h>
#include <util/generic/string.h>

namespace NKikimrChangeExchange {
    class TChangeRecord;
}

namespace NKikimr::NDataShard {

class TChangeRecordBuilder;

class TChangeRecord {
    friend class TChangeRecordBuilder;

public:
    enum class EKind: ui8 {
        AsyncIndex,
        CdcDataChange,
        CdcHeartbeat,
    };

    struct TAwsJsonOptions {
        TString AwsRegion;
        NKikimrSchemeOp::ECdcStreamMode StreamMode;
        ui64 ShardId;
    };

public:
    ui64 GetOrder() const { return Order; }
    ui64 GetGroup() const { return Group; }
    ui64 GetStep() const { return Step; }
    ui64 GetTxId() const { return TxId; }
    ui64 GetLockId() const { return LockId; }
    ui64 GetLockOffset() const { return LockOffset; }
    const TPathId& GetPathId() const { return PathId; }
    EKind GetKind() const { return Kind; }
    const TString& GetBody() const { return Body; }

    const TPathId& GetTableId() const { return TableId; }
    ui64 GetSchemaVersion() const { return SchemaVersion; }

    void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const;
    void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const;
    void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const;

    TConstArrayRef<TCell> GetKey() const;
    i64 GetSeqNo() const;
    TString GetPartitionKey() const;
    TInstant GetApproximateCreationDateTime() const;
    bool IsBroadcast() const;

    TString ToString() const;
    void Out(IOutputStream& out) const;

private:
    ui64 Order = Max<ui64>();
    ui64 Group = 0;
    ui64 Step = 0;
    ui64 TxId = 0;
    ui64 LockId = 0;
    ui64 LockOffset = 0;
    TPathId PathId;
    EKind Kind;
    TString Body;

    ui64 SchemaVersion;
    TPathId TableId;
    TUserTable::TCPtr Schema;

    mutable TMaybe<TOwnedCellVec> Key;
    mutable TMaybe<TString> PartitionKey;

}; // TChangeRecord

class TChangeRecordBuilder {
    using EKind = TChangeRecord::EKind;

public:
    explicit TChangeRecordBuilder(EKind kind);

    TChangeRecordBuilder& WithLockId(ui64 lockId);
    TChangeRecordBuilder& WithLockOffset(ui64 lockOffset);

    TChangeRecordBuilder& WithOrder(ui64 order);
    TChangeRecordBuilder& WithGroup(ui64 group);
    TChangeRecordBuilder& WithStep(ui64 step);
    TChangeRecordBuilder& WithTxId(ui64 txId);
    TChangeRecordBuilder& WithPathId(const TPathId& pathId);

    TChangeRecordBuilder& WithTableId(const TPathId& tableId);
    TChangeRecordBuilder& WithSchemaVersion(ui64 version);
    TChangeRecordBuilder& WithSchema(TUserTable::TCPtr schema);

    TChangeRecordBuilder& WithBody(const TString& body);
    TChangeRecordBuilder& WithBody(TString&& body);

    TChangeRecord&& Build();

private:
    TChangeRecord Record;

}; // TChangeRecordBuilder

}

Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TChangeRecord, out, value) {
    return value.Out(out);
}