aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/yndx/persqueue/msgbus_server/read_session_info.h
blob: f92572677d8184d84cc3184650169c3c9a0bb83f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#pragma once

#include <ydb/core/client/server/msgbus_server_pq_read_session_info.h>

#include <kikimr/yndx/grpc_services/persqueue/grpc_pq_actor.h>


namespace NKikimr {
namespace NMsgBusProxy {

class TPersQueueGetReadSessionsInfoWorkerWithPQv0 : public IPersQueueGetReadSessionsInfoWorker {
public:
    using TBase = IPersQueueGetReadSessionsInfoWorker;
    using TBase::TBase;
    using TBase::SendStatusRequest;

    STFUNC(StateFunc) override {
        switch (ev->GetTypeRewrite()) {
            HFunc(NGRpcProxy::TEvPQProxy::TEvReadSessionStatusResponse, HandleStatusResponse<NGRpcProxy::TEvPQProxy::TEvReadSessionStatusResponse>);
            HFunc(NGRpcProxy::V1::TEvPQProxy::TEvReadSessionStatusResponse, HandleStatusResponse<NGRpcProxy::V1::TEvPQProxy::TEvReadSessionStatusResponse>);
            HFunc(TEvents::TEvUndelivered, Undelivered);
            HFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected);
        }
    }

private:
    void SendStatusRequest(const TString& sessionName, TActorId actorId, const TActorContext& ctx) override;
};

class TPersQueueGetReadSessionsInfoWorkerWithPQv0Factory : public IPersQueueGetReadSessionsInfoWorkerFactory {
public:
    THolder<IPersQueueGetReadSessionsInfoWorker> Create(
        const TActorId& parentId,
        const THashMap<TString, TActorId>& readSessions,
        std::shared_ptr<const TPersQueueBaseRequestProcessor::TNodesInfo> nodesInfo
    ) const override {
        return MakeHolder<TPersQueueGetReadSessionsInfoWorkerWithPQv0>(parentId, readSessions, nodesInfo);
    }
};

} // namespace NMsgBusProxy
} // namespace NKikimr