1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
#pragma once
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/types.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/responses.h>
#include "channel.h"
#include "scheduler.h"
#include "iconsumer_p.h"
#include "internals.h"
#include <kikimr/yndx/api/grpc/persqueue.grpc.pb.h>
#include <library/cpp/threading/future/future.h>
#include <deque>
#include <memory>
namespace NPersQueue {
class TConsumer: public IConsumerImpl, public std::enable_shared_from_this<TConsumer> {
public:
friend class TPQLibPrivate;
NThreading::TFuture<TConsumerCreateResponse> Start(TInstant deadline) noexcept override;
void Commit(const TVector<ui64>& cookies) noexcept override;
using IConsumerImpl::GetNextMessage;
void GetNextMessage(NThreading::TPromise<TConsumerMessage>& promise) noexcept override;
void RequestPartitionStatus(const TString& topic, ui64 partition, ui64 generation) noexcept override;
NThreading::TFuture<TError> IsDead() noexcept override;
TConsumer(const TConsumerSettings& settings, std::shared_ptr<grpc::CompletionQueue> cq,
NThreading::TPromise<TConsumerCreateResponse> promise, std::shared_ptr<void> destroyEventRef, TIntrusivePtr<TPQLibPrivate> pqLib, TIntrusivePtr<ILogger> logger) noexcept;
void Init() override;
~TConsumer() noexcept;
void Destroy() noexcept;
void SetChannel(const TChannelHolder& channel) noexcept;
void SetChannel(const TChannelInfo& channel) noexcept;
void Lock(const TString& topic, const ui32 partition, const ui64 generation, const ui64 readOffset, const ui64 commitOffset, const bool verifyReadOffset) noexcept;
void OrderRead() noexcept;
void Cancel() override;
public:
using TStream = grpc::ClientAsyncReaderWriterInterface<TReadRequest, TReadResponse>;
// objects that must live after destruction of producer untill all the callbacks arrive at CompletionQueue
struct TRpcStuff: public TAtomicRefCount<TRpcStuff> {
TReadResponse Response;
std::shared_ptr<grpc::CompletionQueue> CQ;
std::shared_ptr<grpc::Channel> Channel;
std::unique_ptr<PersQueueService::Stub> Stub;
grpc::ClientContext Context;
std::unique_ptr<TStream> Stream;
};
protected:
friend class TConsumerStreamCreated;
friend class TConsumerReadDone;
friend class TConsumerWriteDone;
friend class TConsumerDestroyHandler;
IHandlerPtr StreamCreatedHandler;
IHandlerPtr ReadDoneHandler;
IHandlerPtr WriteDoneHandler;
void ProcessResponses();
void Destroy(const TError& reason);
void Destroy(const TString& description); // the same but with Code=ERROR
void OnStreamCreated();
void OnReadDone();
void OnWriteDone();
void DoWrite();
void DoStart(TInstant deadline);
void OnStartTimeout();
TIntrusivePtr<TRpcStuff> RpcStuff;
TChannelHolder ChannelHolder;
TConsumerSettings Settings;
TString SessionId;
NThreading::TPromise<TConsumerCreateResponse> StartPromise;
NThreading::TPromise<TError> IsDeadPromise;
std::deque<NThreading::TPromise<TConsumerMessage>> MessagePromises;
std::deque<NThreading::TFuture<TConsumerMessage>> MessageResponses;
std::deque<TReadRequest> Requests;
std::deque<std::pair<ui64, std::pair<ui32, ui64>>> ReadInfo; //cookie -> (count, size)
ui32 UncommittedCount = 0;
ui64 UncommittedSize = 0;
ui64 MemoryUsage = 0;
ui32 ReadsOrdered = 0;
ui64 EstimateReadSize = 0;
bool WriteInflight;
ui64 ProxyCookie = 0;
TIntrusivePtr<ILogger> Logger;
TError Error;
bool IsDestroyed;
bool IsDestroying;
TIntrusivePtr<TScheduler::TCallbackHandler> StartDeadlineCallback;
};
}
|