1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
#pragma once
#include "consumer.h"
#include "scheduler.h"
#include "internals.h"
#include "persqueue_p.h"
#include "iconsumer_p.h"
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/responses.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/persqueue.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/types.h>
#include <kikimr/yndx/api/grpc/persqueue.grpc.pb.h>
#include <library/cpp/threading/future/future.h>
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
#include <util/generic/vector.h>
#include <deque>
namespace NPersQueue {
// @brief consumer implementation which transparently retries all connectivity errors
class TRetryingConsumer: public IConsumerImpl, public std::enable_shared_from_this<TRetryingConsumer> {
// @brief locked partitions info
struct TLockInfo {
THashSet<ui64> Cookies; // related cookies
ui64 Gen = 0;
ui64 OriginalGen = 0; // original generation
ui64 ReadOffset = 0; // read offset specified by client
bool Locked = false;
};
struct TCookieInfo {
THashSet<TLockInfo*> Locks; // related locks
ui64 OriginalCookie = 0; // zero means invalid cookie
ui64 UserCookie = 0;
};
public:
TRetryingConsumer(const TConsumerSettings& settings, std::shared_ptr<void> destroyEventRef,
TIntrusivePtr<TPQLibPrivate> pqLib, TIntrusivePtr<ILogger> logger);
~TRetryingConsumer() noexcept override;
NThreading::TFuture<TConsumerCreateResponse> Start(TInstant deadline = TInstant::Max()) noexcept override;
using IConsumerImpl::GetNextMessage;
void GetNextMessage(NThreading::TPromise<TConsumerMessage>& promise) noexcept override;
void Commit(const TVector<ui64>& cookies) noexcept override;
void RequestPartitionStatus(const TString& topic, ui64 partition, ui64 generation) noexcept override;
NThreading::TFuture<TError> IsDead() noexcept override;
void Cancel() override;
NThreading::TFuture<void> Destroyed() noexcept override;
private:
void OnStartDeadline();
void OnConsumerDead(const TError& error);
void ScheduleReconnect();
void DoReconnect(TInstant deadline);
void StartProcessing(const NThreading::TFuture<TConsumerCreateResponse>& f);
void SubscribeDestroyed();
void Destroy(const TError& error);
void Destroy(const TString& description, NErrorCode::EErrorCode code = NErrorCode::ERROR);
void DoRequest();
void ProcessResponse(TConsumerMessage&& message);
void FastResponse(TConsumerMessage&& message);
void FastCommit(const TVector<ui64>& cookies);
void UpdateReadyToRead(const NPersQueue::TLockInfo& readyToRead, const TString& topic, ui32 partition, ui64 generation);
private:
TConsumerSettings Settings;
TIntrusivePtr<ILogger> Logger;
std::shared_ptr<IConsumerImpl> Consumer;
TString SessionId;
NThreading::TFuture<TConsumerCreateResponse> StartFuture;
NThreading::TPromise<TConsumerCreateResponse> StartPromise;
NThreading::TPromise<TError> IsDeadPromise;
NThreading::TPromise<void> ConsumerDestroyedPromise;
// requests which are waiting for response
std::deque<NThreading::TPromise<TConsumerMessage>> PendingRequests;
// ready messages for returning to clients immediately
std::deque<TConsumerMessage> ReadyResponses;
// active cookies
std::deque<TCookieInfo> Cookies;
THashMap<ui64, ui64> CommittingCookies;
// active data per topic, partition
THashMap<std::pair<TString, ui64>, TLockInfo> Locks; // topic, partition -> LockInfo
// number of unsuccessful retries after last error
ui64 GenCounter;
ui64 CookieCounter;
unsigned ReconnectionAttemptsDone;
// destroying process started
bool Stopping;
// reconnecting in process
bool Reconnecting;
};
}
|