aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/persqueue/sdk/deprecated/cpp/v2/impl/compat_producer.h
blob: e5287d46944c41b85257068ded1210a1a4cd60a2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#pragma once

#include "iproducer_p.h"

#include <kikimr/public/sdk/cpp/client/ydb_persqueue/persqueue.h>

#include <memory>
#include <queue>

namespace NPersQueue {


/**
 * Compatibility producer:
 *      wraps old "interface" around the new one
 */

class TYdbSdkCompatibilityProducer: public IProducerImpl, public std::enable_shared_from_this<TYdbSdkCompatibilityProducer> {

    struct TMsgData {
        NThreading::TPromise<TProducerCommitResponse> Promise;
        TMaybe<TProducerSeqNo> SeqNo;
        TData Data;

        TMsgData() {};
        TMsgData(NThreading::TPromise<TProducerCommitResponse> promise, TMaybe<TProducerSeqNo> seqNo, TData data)
        : Promise(promise)
        , SeqNo(seqNo)
        , Data(data) {}
    };

    std::shared_ptr<NYdb::NPersQueue::IWriteSession> WriteSession;
    NThreading::TPromise<TError> IsDeadPromise;
    NThreading::TFuture<void> NextEvent;

    TMaybe<NYdb::NPersQueue::TContinuationToken> ContToken;
    std::queue<TMsgData> ToWrite;
    std::queue<TMsgData> ToAck;
    TSpinLock Lock;
    bool Closed;

    void DoProcessNextEvent();
    void SubscribeToNextEvent();

    void WriteImpl(NThreading::TPromise<TProducerCommitResponse>& promise, TMaybe<TProducerSeqNo> seqNo, TData data) noexcept;

    void NotifyClient(NErrorCode::EErrorCode code, const TString& reason);

public:
    using IProducerImpl::Write;
    NThreading::TFuture<TProducerCreateResponse> Start(TInstant deadline = TInstant::Max()) noexcept override;
    NThreading::TFuture<TError> IsDead() noexcept override;
    void Write(NThreading::TPromise<TProducerCommitResponse>& promise, TData data) noexcept override;
    void Write(NThreading::TPromise<TProducerCommitResponse>& promise, TProducerSeqNo seqNo, TData data) noexcept override;
    void Cancel() override;
    void Init() noexcept override;

    ~TYdbSdkCompatibilityProducer();

    TYdbSdkCompatibilityProducer(const TProducerSettings& settings, NYdb::NPersQueue::TPersQueueClient& persQueueClient,
                    std::shared_ptr<void> destroyEventRef, TIntrusivePtr<TPQLibPrivate> pqLib);

};


}   // namespace NYdb::NPersQueue