aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/services/persqueue_v1/grpc_pq_read.h
blob: 0eaf98c22a72ce7751f9b9aa7e4118a09300dc7d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#pragma once

#include "actors/read_session_actor.h"
#include "actors/direct_read_actor.h"

#include <ydb/core/client/server/grpc_base.h>
#include <ydb/core/persqueue/cluster_tracker.h>
#include <ydb/core/mind/address_classification/net_classifier.h>

#include <ydb/library/actors/core/actorsystem.h>

#include <util/generic/hash.h>
#include <util/system/mutex.h>

#include <type_traits>


namespace NKikimr {
namespace NGRpcProxy {
namespace V1 {



IActor* CreatePQReadService(const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
                            TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const ui32 maxSessions);

class TPQReadService : public NActors::TActorBootstrapped<TPQReadService> {
public:
    TPQReadService(const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
                   TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const ui32 maxSessions);

    ~TPQReadService()
    {}

    void Bootstrap(const TActorContext& ctx);

private:
    ui64 NextCookie();

    bool TooMuchSessions();
    TString AvailableLocalCluster();

    STFUNC(StateFunc) {
        switch (ev->GetTypeRewrite()) {
            HFunc(NGRpcService::TEvStreamTopicReadRequest, Handle);
            HFunc(NGRpcService::TEvStreamTopicDirectReadRequest, Handle);
            HFunc(NGRpcService::TEvStreamPQMigrationReadRequest, Handle);
            HFunc(NGRpcService::TEvCommitOffsetRequest, Handle);
            HFunc(NGRpcService::TEvPQReadInfoRequest, Handle);
            HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
            HFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, Handle);
            HFunc(TEvPQProxy::TEvSessionDead, Handle);
        }
    }

    template <typename ReadRequest>
    void HandleStreamPQReadRequest(typename ReadRequest::TPtr& ev, const TActorContext& ctx);

private:
    void Handle(NGRpcService::TEvStreamTopicReadRequest::TPtr& ev, const TActorContext& ctx);
    void Handle(NGRpcService::TEvStreamTopicDirectReadRequest::TPtr& ev, const TActorContext& ctx);
    void Handle(NGRpcService::TEvStreamPQMigrationReadRequest::TPtr& ev, const TActorContext& ctx);
    void Handle(NGRpcService::TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx);
    void Handle(NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
    void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx);
    void Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx);

    void Handle(TEvPQProxy::TEvSessionDead::TPtr& ev, const TActorContext& ctx);

private:
    NActors::TActorId SchemeCache;
    NActors::TActorId NewSchemeCache;

    TAtomic LastCookie = 0;

    THashMap<ui64, TActorId> Sessions;

    TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;

    ui32 MaxSessions;
    TVector<TString> Clusters;
    TString LocalCluster;

    NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; // Detects client's datacenter by IP. May be null
    std::shared_ptr<NPersQueue::TTopicNamesConverterFactory> TopicConverterFactory;
    std::unique_ptr<NPersQueue::TTopicsListController> TopicsHandler;
    bool HaveClusters;
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// template methods implementation

template <bool UseMigrationProtocol>
auto FillReadResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code) {
    using ServerMessage = typename std::conditional<UseMigrationProtocol,
                                                    PersQueue::V1::MigrationStreamingReadServerMessage,
                                                    Topic::StreamReadMessage::FromServer>::type;
    ServerMessage res;
    FillIssue(res.add_issues(), code, errorReason);
    res.set_status(ConvertPersQueueInternalCodeToStatus(code));
    return res;
}

Topic::StreamDirectReadMessage::FromServer FillDirectReadResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code);


template <typename ReadRequest>
void TPQReadService::HandleStreamPQReadRequest(typename ReadRequest::TPtr& ev, const TActorContext& ctx) {
    constexpr bool UseMigrationProtocol = std::is_same_v<ReadRequest, NGRpcService::TEvStreamPQMigrationReadRequest>;

    LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new grpc connection");

    if (TooMuchSessions()) {
        LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new grpc connection failed - too much sessions");
        ev->Get()->GetStreamCtx()->Attach(ctx.SelfID);
        ev->Get()->GetStreamCtx()->WriteAndFinish(
            FillReadResponse<UseMigrationProtocol>("proxy overloaded", PersQueue::ErrorCode::OVERLOAD), grpc::Status::OK); //CANCELLED
        return;
    }
    if (HaveClusters && (Clusters.empty() || LocalCluster.empty())) {
        LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new grpc connection failed - cluster is not known yet");

        ev->Get()->GetStreamCtx()->Attach(ctx.SelfID);
        ev->Get()->GetStreamCtx()->WriteAndFinish(
            FillReadResponse<UseMigrationProtocol>("cluster initializing", PersQueue::ErrorCode::INITIALIZING), grpc::Status::OK); //CANCELLED
        // TODO: Inc SLI Errors
        return;
    } else {

        Y_ABORT_UNLESS(TopicsHandler != nullptr);
        const ui64 cookie = NextCookie();

        LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new session created cookie " << cookie);

        auto ip = ev->Get()->GetStreamCtx()->GetPeerName();

        TActorId worker = ctx.Register(new TReadSessionActor<UseMigrationProtocol>(
                ev->Release().Release(), cookie, SchemeCache, NewSchemeCache, Counters,
                DatacenterClassifier ? DatacenterClassifier->ClassifyAddress(NAddressClassifier::ExtractAddress(ip)) : "unknown",
                *TopicsHandler
        ));

        Sessions[cookie] = worker;
    }
}


}
}
}