aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/tx/datashard/change_collector.h
blob: b8171933ea61940b1922d56279b0fdb909cc2284 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#pragma once

#include <ydb/core/engine/minikql/change_collector_iface.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>

namespace NKikimr {
namespace NDataShard {

class TDataShard;
struct TUserTable;

class IDataShardUserDb;

class IDataShardChangeGroupProvider {
protected:
    ~IDataShardChangeGroupProvider() = default;

public:
    virtual std::optional<ui64> GetCurrentChangeGroup() const = 0;
    virtual ui64 GetChangeGroup() = 0;
};

class TDataShardChangeGroupProvider final
    : public IDataShardChangeGroupProvider
{
public:
    // Note: for distributed transactions group is expected to be 0
    TDataShardChangeGroupProvider(TDataShard& self, NTable::TDatabase& db, std::optional<ui64> group = std::nullopt)
        : Self(self)
        , Db(db)
        , Group(group)
    { }

    std::optional<ui64> GetCurrentChangeGroup() const override {
        return Group;
    }

    ui64 GetChangeGroup() override;

private:
    TDataShard& Self;
    NTable::TDatabase& Db;
    std::optional<ui64> Group;
};

class IDataShardChangeCollector : public NMiniKQL::IChangeCollector {
public:
    // basic change record's info
    struct TChange {
        ui64 Order;
        ui64 Group;
        ui64 Step;
        ui64 TxId;
        TPathId PathId;
        ui64 BodySize;
        TPathId TableId;
        ui64 SchemaVersion;
        ui64 LockId = 0;
        ui64 LockOffset = 0;
    };

public:
    virtual void OnRestart() = 0;
    virtual bool NeedToReadKeys() const = 0;

    virtual void CommitLockChanges(ui64 lockId, const TRowVersion& writeVersion) = 0;

    virtual const TVector<TChange>& GetCollected() const = 0;
    virtual TVector<TChange>&& GetCollected() = 0;
};

IDataShardChangeCollector* CreateChangeCollector(
        TDataShard& dataShard,
        IDataShardUserDb& userDb,
        IDataShardChangeGroupProvider& groupProvider,
        NTable::TDatabase& db,
        const TUserTable& table);
IDataShardChangeCollector* CreateChangeCollector(
        TDataShard& dataShard,
        IDataShardUserDb& userDb,
        IDataShardChangeGroupProvider& groupProvider,
        NTable::TDatabase& db,
        ui64 tableId);

} // NDataShard
} // NKikimr