aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/ymq/http/http.h
blob: 44103cdb1c9724ec46100a652e9ee97b5e010a74 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#pragma once

#include "params.h"
#include "types.h"

#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/sqs.pb.h>

#include <ydb/library/http_proxy/authorization/signature.h>
#include <ydb/core/ymq/base/counters.h>

#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/http/server/http.h>

#include <util/generic/buffer.h>
#include <util/generic/maybe.h>
#include <library/cpp/cgiparam/cgiparam.h> 

namespace NKikimr::NSQS {

class TAsyncHttpServer;
class THttpRequest;

class THttpRequest : public TRequestReplier {
public:
    THttpRequest(TAsyncHttpServer* p);
    ~THttpRequest();

    void SendResponse(const TSqsHttpResponse& r);

    const TString& GetRequestId() const {
        return RequestId_;
    }

    const TAsyncHttpServer* GetServer() const {
        return Parent_;
    }

private:
    bool DoReply(const TReplyParams& p) override;

    void WriteResponse(const TReplyParams& replyParams, const TSqsHttpResponse& response);

    TString LogHttpRequestResponseCommonInfoString();
    TString LogHttpRequestResponseDebugInfoString(const TReplyParams& replyParams, const TSqsHttpResponse& response);
    void LogHttpRequestResponse(const TReplyParams& replyParams, const TSqsHttpResponse& response);

private:
    template<typename T>
    void CopyCredentials(T* const request, const NKikimrConfig::TSqsConfig& config) {
        if (SecurityToken_ && !config.GetYandexCloudMode()) {
            // it's also TVM-compatible due to universal TicketParser
            request->MutableCredentials()->SetOAuthToken(SecurityToken_);
        }
    }

    TString GetRequestPathPart(TStringBuf path, size_t partIdx) const;
    TString ExtractQueueNameFromPath(const TStringBuf path);
    TString ExtractAccountNameFromPath(const TStringBuf path);

    ui64 CalculateRequestSizeInBytes(const THttpInput& input, const ui64 contentLength) const;
    void ExtractQueueAndAccountNames(const TStringBuf path);

    TString HttpHeadersLogString(const THttpInput& input);
    void ParseHeaders(const THttpInput& input);
    void ParseAuthorization(const TString& value);
    void ParseRequest(THttpInput& input);
    void ParseCgiParameters(const TCgiParameters& params);
    void ParsePrivateRequestPathPrefix(const TStringBuf& path);

    bool SetupRequest();

    void SetupChangeMessageVisibility(TChangeMessageVisibilityRequest* const req);
    void SetupChangeMessageVisibilityBatch(TChangeMessageVisibilityBatchRequest* const req);
    void SetupCreateQueue(TCreateQueueRequest* const req);
    void SetupCreateUser(TCreateUserRequest* const req);
    void SetupGetQueueAttributes(TGetQueueAttributesRequest* const req);
    void SetupGetQueueUrl(TGetQueueUrlRequest* const req);
    void SetupDeleteMessage(TDeleteMessageRequest* const req);
    void SetupDeleteMessageBatch(TDeleteMessageBatchRequest* const req);
    void SetupDeleteQueue(TDeleteQueueRequest* const req);
    void SetupListPermissions(TListPermissionsRequest* const req);
    void SetupListDeadLetterSourceQueues(TListDeadLetterSourceQueuesRequest* const req);
    void SetupPrivateDeleteQueueBatch(TDeleteQueueBatchRequest* const req);
    void SetupPrivatePurgeQueueBatch(TPurgeQueueBatchRequest* const req);
    void SetupPrivateGetQueueAttributesBatch(TGetQueueAttributesBatchRequest* const req);
    void SetupDeleteUser(TDeleteUserRequest* const req);
    void SetupListQueues(TListQueuesRequest* const req);
    void SetupPrivateCountQueues(TCountQueuesRequest* const req);
    void SetupListUsers(TListUsersRequest* const req);
    void SetupModifyPermissions(TModifyPermissionsRequest* const req);
    void SetupReceiveMessage(TReceiveMessageRequest* const req);
    void SetupSendMessage(TSendMessageRequest* const req);
    void SetupSendMessageBatch(TSendMessageBatchRequest* const req);
    void SetupPurgeQueue(TPurgeQueueRequest* const req);
    void SetupSetQueueAttributes(TSetQueueAttributesRequest* const req);

    void ExtractSourceAddressFromSocket();

    void GenerateRequestId(const TString& sourceReqId);

    THttpActionCounters* GetActionCounters() const;

    // Checks whether request is ping and then starts ping actor.
    // If request is ping, returns true, otherwise - false.
    bool SetupPing(const TReplyParams& p);

private:
    TAsyncHttpServer* const Parent_;
    TIntrusivePtr<THttpUserCounters> UserCounters_;

    TParameters QueryParams_;
    EAction Action_ = EAction::Unknown;
    TString UserName_;
    TString AccountName_;
    TString QueueName_;
    TString SecurityToken_;
    TString IamToken_;
    TString FolderId_;
    TString ApiMethod_;

    THolder<TAwsRequestSignV4> AwsSignature_;

    TMaybe<TBuffer> InputData;
    TString HttpMethod;
    TMaybe<TSqsHttpResponse> Response_;
    TString RequestId_;

    // Source values parsed from headers
    TString SourceAddress_;

    ui64 RequestSizeInBytes_ = 0;

    bool IsPrivateRequest_ = false; // Has "/private" path prefix
    TInstant StartTime_ = TInstant::Now();
};

class TAsyncHttpServer
    : public THttpServer
    , public THttpServer::ICallBack
{
    friend THttpRequest;

public:
    TAsyncHttpServer(const NKikimrConfig::TSqsConfig& config);
    ~TAsyncHttpServer();

    void Initialize(
            NActors::TActorSystem* as,
            TIntrusivePtr<NMonitoring::TDynamicCounters> sqsCounters,
            TIntrusivePtr<NMonitoring::TDynamicCounters> ymqCounters,
            ui32 poolId);

    void Start();

    NActors::TActorSystem* GetActorSystem() const {
        return ActorSystem_;
    }

private:
    // THttpServer::ICallback
    TClientRequest* CreateClient() override;
    void OnException() override;
    static THttpServerOptions MakeHttpServerOptions(const NKikimrConfig::TSqsConfig& config);

    void UpdateConnectionsCountCounter();

private:
    const NKikimrConfig::TSqsConfig Config;
    NActors::TActorSystem* ActorSystem_ = nullptr;
    TIntrusivePtr<THttpCounters> HttpCounters_; // http subsystem counters
    THolder<TCloudAuthCounters> CloudAuthCounters_; // cloud_auth subsystem counters
    TIntrusivePtr<TUserCounters> AggregatedUserCounters_; // aggregated counters for user in core subsystem
    ui32 PoolId_ = 0;
};

} // namespace NKikimr::NSQS