aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/tx/datashard/cdc_stream_scan.h
blob: 3abee90ccb6f9a1e8e51cf15d6aa58f923a5b356 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#pragma once

#include <library/cpp/actors/core/actor.h>

#include <ydb/core/scheme/scheme_pathid.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>

#include <util/generic/hash.h>
#include <util/generic/maybe.h>

namespace NKikimr::NDataShard {

class TCdcStreamScanManager {
public:
    struct TStats {
        ui64 RowsProcessed = 0;
        ui64 BytesProcessed = 0;
    };

private:
    struct TScanInfo {
        TRowVersion SnapshotVersion;
        ui64 TxId = 0;
        ui64 ScanId = 0;
        NActors::TActorId ActorId;
        TMaybe<TSerializedCellVec> LastKey;
        TStats Stats;
    };

public:
    void Reset();
    bool Load(NIceDb::TNiceDb& db);
    void Add(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId, const TRowVersion& snapshotVersion);
    void Forget(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId);

    void Enqueue(const TPathId& streamPathId, ui64 txId, ui64 scanId);
    void Register(ui64 txId, const NActors::TActorId& actorId);

    void Complete(const TPathId& streamPathId);
    void Complete(ui64 txId);

    TScanInfo* Get(const TPathId& streamPathId);
    const TScanInfo* Get(const TPathId& streamPathId) const;

    bool Has(const TPathId& streamPathId) const;
    bool Has(ui64 txId) const;

    ui32 Size() const;

    void PersistAdd(NIceDb::TNiceDb& db,
        const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info);
    void PersistRemove(NIceDb::TNiceDb& db,
        const TPathId& tablePathId, const TPathId& streamPathId);
    void PersistProgress(NIceDb::TNiceDb& db,
        const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info);

private:
    THashMap<TPathId, TScanInfo> Scans;
    THashMap<ui64, TPathId> TxIdToPathId;
};

}