1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
#pragma once
#include <ydb/core/engine/minikql/change_collector_iface.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
namespace NKikimr {
namespace NDataShard {
class TDataShard;
struct TUserTable;
class IDataShardUserDb;
class IDataShardChangeGroupProvider {
protected:
~IDataShardChangeGroupProvider() = default;
public:
virtual std::optional<ui64> GetCurrentChangeGroup() const = 0;
virtual ui64 GetChangeGroup() = 0;
};
class TDataShardChangeGroupProvider final
: public IDataShardChangeGroupProvider
{
public:
// Note: for distributed transactions group is expected to be 0
TDataShardChangeGroupProvider(TDataShard& self, NTable::TDatabase& db, std::optional<ui64> group = std::nullopt)
: Self(self)
, Db(db)
, Group(group)
{ }
std::optional<ui64> GetCurrentChangeGroup() const override {
return Group;
}
ui64 GetChangeGroup() override;
private:
TDataShard& Self;
NTable::TDatabase& Db;
std::optional<ui64> Group;
};
class IDataShardChangeCollector : public NMiniKQL::IChangeCollector {
public:
// basic change record's info
struct TChange {
ui64 Order;
ui64 Group;
ui64 Step;
ui64 TxId;
TPathId PathId;
ui64 BodySize;
TPathId TableId;
ui64 SchemaVersion;
ui64 LockId = 0;
ui64 LockOffset = 0;
};
public:
virtual void OnRestart() = 0;
virtual bool NeedToReadKeys() const = 0;
virtual void CommitLockChanges(ui64 lockId, const TRowVersion& writeVersion) = 0;
virtual const TVector<TChange>& GetCollected() const = 0;
virtual TVector<TChange>&& GetCollected() = 0;
};
IDataShardChangeCollector* CreateChangeCollector(
TDataShard& dataShard,
IDataShardUserDb& userDb,
IDataShardChangeGroupProvider& groupProvider,
NTable::TDatabase& db,
const TUserTable& table);
IDataShardChangeCollector* CreateChangeCollector(
TDataShard& dataShard,
IDataShardUserDb& userDb,
IDataShardChangeGroupProvider& groupProvider,
NTable::TDatabase& db,
ui64 tableId);
} // NDataShard
} // NKikimr
|