blob: caa0b1fd5b87e01d0b461ba58489e7f55fb54e24 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
#pragma once
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
#include <library/cpp/actors/core/actor.h>
namespace NYq {
struct TPendingCheckpointStats {
const TInstant CreatedAt = TInstant::Now();
ui64 StateSize = 0;
};
class TPendingCheckpoint {
THashSet<NActors::TActorId> NotYetAcknowledged;
TPendingCheckpointStats Stats;
public:
explicit TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats = TPendingCheckpointStats());
void Acknowledge(const NActors::TActorId& actorId);
void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize);
[[nodiscard]]
bool GotAllAcknowledges() const;
[[nodiscard]]
size_t NotYetAcknowledgedCount() const;
[[nodiscard]]
const TPendingCheckpointStats& GetStats() const;
};
class TPendingRestoreCheckpoint {
public:
TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged);
void Acknowledge(const NActors::TActorId& actorId);
[[nodiscard]]
bool GotAllAcknowledges() const;
[[nodiscard]]
size_t NotYetAcknowledgedCount() const;
public:
TCheckpointId CheckpointId;
bool CommitAfterRestore;
private:
THashSet<NActors::TActorId> NotYetAcknowledged;
};
class TPendingInitCoordinator {
public:
explicit TPendingInitCoordinator(size_t actorsCount)
: ActorsCount(actorsCount)
{
}
void OnNewCheckpointCoordinatorAck();
bool AllNewCheckpointCoordinatorAcksProcessed() const;
bool CanInjectCheckpoint() const;
public:
const size_t ActorsCount;
size_t NewCheckpointCoordinatorAcksGot = 0;
TMaybe<TCheckpointId> CheckpointId;
};
} // namespace NYq
|