aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/persqueue/sdk/deprecated/cpp/v2/impl/consumer.h
blob: f2f6f81e714e2b8c9d4d4a4dd28e32c4c910a295 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#pragma once

#include <kikimr/persqueue/sdk/deprecated/cpp/v2/types.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/responses.h>
#include "channel.h"
#include "scheduler.h"
#include "iconsumer_p.h"
#include "internals.h"
#include <kikimr/yndx/api/grpc/persqueue.grpc.pb.h>

#include <library/cpp/threading/future/future.h>

#include <deque>
#include <memory>

namespace NPersQueue {

class TConsumer: public IConsumerImpl, public std::enable_shared_from_this<TConsumer> {
public:
    friend class TPQLibPrivate;

    NThreading::TFuture<TConsumerCreateResponse> Start(TInstant deadline) noexcept override;

    void Commit(const TVector<ui64>& cookies) noexcept override;

    using IConsumerImpl::GetNextMessage;
    void GetNextMessage(NThreading::TPromise<TConsumerMessage>& promise) noexcept override;

    void RequestPartitionStatus(const TString& topic, ui64 partition, ui64 generation) noexcept override;

    NThreading::TFuture<TError> IsDead() noexcept override;

    TConsumer(const TConsumerSettings& settings, std::shared_ptr<grpc::CompletionQueue> cq,
              NThreading::TPromise<TConsumerCreateResponse> promise, std::shared_ptr<void> destroyEventRef, TIntrusivePtr<TPQLibPrivate> pqLib, TIntrusivePtr<ILogger> logger) noexcept;

    void Init() override;

    ~TConsumer() noexcept;

    void Destroy() noexcept;
    void SetChannel(const TChannelHolder& channel) noexcept;
    void SetChannel(const TChannelInfo& channel) noexcept;

    void Lock(const TString& topic, const ui32 partition, const ui64 generation, const ui64 readOffset, const ui64 commitOffset, const bool verifyReadOffset) noexcept;
    void OrderRead() noexcept;

    void Cancel() override;

public:
    using TStream = grpc::ClientAsyncReaderWriterInterface<TReadRequest, TReadResponse>;

    // objects that must live after destruction of producer untill all the callbacks arrive at CompletionQueue
    struct TRpcStuff: public TAtomicRefCount<TRpcStuff> {
        TReadResponse Response;
        std::shared_ptr<grpc::CompletionQueue> CQ;
        std::shared_ptr<grpc::Channel> Channel;
        std::unique_ptr<PersQueueService::Stub> Stub;
        grpc::ClientContext Context;
        std::unique_ptr<TStream> Stream;
    };

protected:
    friend class TConsumerStreamCreated;
    friend class TConsumerReadDone;
    friend class TConsumerWriteDone;
    friend class TConsumerDestroyHandler;

    IHandlerPtr StreamCreatedHandler;
    IHandlerPtr ReadDoneHandler;
    IHandlerPtr WriteDoneHandler;

    void ProcessResponses();
    void Destroy(const TError& reason);
    void Destroy(const TString& description); // the same but with Code=ERROR
    void OnStreamCreated();
    void OnReadDone();
    void OnWriteDone();

    void DoWrite();
    void DoStart(TInstant deadline);
    void OnStartTimeout();

    TIntrusivePtr<TRpcStuff> RpcStuff;

    TChannelHolder ChannelHolder;
    TConsumerSettings Settings;

    TString SessionId;

    NThreading::TPromise<TConsumerCreateResponse> StartPromise;
    NThreading::TPromise<TError> IsDeadPromise;
    std::deque<NThreading::TPromise<TConsumerMessage>> MessagePromises;
    std::deque<NThreading::TFuture<TConsumerMessage>> MessageResponses;

    std::deque<TReadRequest> Requests;


    std::deque<std::pair<ui64, std::pair<ui32, ui64>>> ReadInfo; //cookie -> (count, size)
    ui32 UncommittedCount = 0;
    ui64 UncommittedSize = 0;

    ui64 MemoryUsage = 0;
    ui32 ReadsOrdered = 0;

    ui64 EstimateReadSize = 0;

    bool WriteInflight;

    ui64 ProxyCookie = 0;

    TIntrusivePtr<ILogger> Logger;

    TError Error;

    bool IsDestroyed;
    bool IsDestroying;

    TIntrusivePtr<TScheduler::TCallbackHandler> StartDeadlineCallback;
};

}