1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
#pragma once
#include <library/cpp/actors/core/actor.h>
#include <ydb/core/scheme/scheme_pathid.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <util/generic/hash.h>
#include <util/generic/maybe.h>
namespace NKikimr::NDataShard {
class TCdcStreamScanManager {
public:
struct TStats {
ui64 RowsProcessed = 0;
ui64 BytesProcessed = 0;
};
private:
struct TScanInfo {
TRowVersion SnapshotVersion;
ui64 TxId = 0;
ui64 ScanId = 0;
NActors::TActorId ActorId;
TMaybe<TSerializedCellVec> LastKey;
TStats Stats;
};
public:
void Reset();
bool Load(NIceDb::TNiceDb& db);
void Add(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId, const TRowVersion& snapshotVersion);
void Forget(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId);
void Enqueue(const TPathId& streamPathId, ui64 txId, ui64 scanId);
void Register(ui64 txId, const NActors::TActorId& actorId);
void Complete(const TPathId& streamPathId);
void Complete(ui64 txId);
TScanInfo* Get(const TPathId& streamPathId);
const TScanInfo* Get(const TPathId& streamPathId) const;
bool Has(const TPathId& streamPathId) const;
bool Has(ui64 txId) const;
ui32 Size() const;
void PersistAdd(NIceDb::TNiceDb& db,
const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info);
void PersistRemove(NIceDb::TNiceDb& db,
const TPathId& tablePathId, const TPathId& streamPathId);
void PersistProgress(NIceDb::TNiceDb& db,
const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info);
private:
THashMap<TPathId, TScanInfo> Scans;
THashMap<ui64, TPathId> TxIdToPathId;
};
}
|