aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h
blob: 6bdfe749c911c6244c3e08d48b8b05a6ed82696c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
#ifndef AWS_MQTT_PRIVATE_CLIENT_IMPL_H
#define AWS_MQTT_PRIVATE_CLIENT_IMPL_H

/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

#include <aws/mqtt/client.h>

#include <aws/mqtt/private/fixed_header.h>
#include <aws/mqtt/private/topic_tree.h>

#include <aws/common/hash_table.h>
#include <aws/common/mutex.h>
#include <aws/common/task_scheduler.h>

#include <aws/io/channel.h>
#include <aws/io/channel_bootstrap.h>
#include <aws/io/message_pool.h>
#include <aws/io/socket.h>
#include <aws/io/tls_channel_handler.h>

#define MQTT_CLIENT_CALL_CALLBACK(client_ptr, callback)                                                                \
    do {                                                                                                               \
        if ((client_ptr)->callback) {                                                                                  \
            (client_ptr)->callback((client_ptr), (client_ptr)->callback##_ud);                                         \
        }                                                                                                              \
    } while (false)
#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...)                                                      \
    do {                                                                                                               \
        if ((client_ptr)->callback) {                                                                                  \
            (client_ptr)->callback((client_ptr), __VA_ARGS__, (client_ptr)->callback##_ud);                            \
        }                                                                                                              \
    } while (false)

#if ASSERT_LOCK_HELD
#    define ASSERT_SYNCED_DATA_LOCK_HELD(object)                                                                       \
        {                                                                                                              \
            int cached_error = aws_last_error();                                                                       \
            AWS_ASSERT(aws_mutex_try_lock(&(object)->synced_data.lock) == AWS_OP_ERR);                                 \
            aws_raise_error(cached_error);                                                                             \
        }
#else
#    define ASSERT_SYNCED_DATA_LOCK_HELD(object)
#endif

enum aws_mqtt_client_connection_state {
    AWS_MQTT_CLIENT_STATE_CONNECTING,
    AWS_MQTT_CLIENT_STATE_CONNECTED,
    AWS_MQTT_CLIENT_STATE_RECONNECTING,
    AWS_MQTT_CLIENT_STATE_DISCONNECTING,
    AWS_MQTT_CLIENT_STATE_DISCONNECTED,
};

enum aws_mqtt_client_request_state {
    AWS_MQTT_CLIENT_REQUEST_ONGOING,
    AWS_MQTT_CLIENT_REQUEST_COMPLETE,
    AWS_MQTT_CLIENT_REQUEST_ERROR,
};

/**
 * Contains some simple statistics about the current state of the connection's queue of operations
 */
struct aws_mqtt_connection_operation_statistics_impl {
    /**
     * total number of operations submitted to the connection that have not yet been completed.  Unacked operations
     * are a subset of this.
     */
    struct aws_atomic_var incomplete_operation_count_atomic;

    /**
     * total packet size of operations submitted to the connection that have not yet been completed.  Unacked operations
     * are a subset of this.
     */
    struct aws_atomic_var incomplete_operation_size_atomic;

    /**
     * total number of operations that have been sent to the server and are waiting for a corresponding ACK before
     * they can be completed.
     */
    struct aws_atomic_var unacked_operation_count_atomic;

    /**
     * total packet size of operations that have been sent to the server and are waiting for a corresponding ACK before
     * they can be completed.
     */
    struct aws_atomic_var unacked_operation_size_atomic;
};

/**
 * Called after the timeout if a matching ack packet hasn't arrived, with is_first_attempt set as false.
 * Or called when the request packet attempt to send firstly, with is_first_attempt set as true.
 * Return AWS_MQTT_CLIENT_REQUEST_ONGOING to check on the task later.
 * Return AWS_MQTT_CLIENT_REQUEST_COMPLETE to consider request complete.
 * Return AWS_MQTT_CLIENT_REQUEST_ERROR cancel the task and report an error to the caller.
 */
typedef enum aws_mqtt_client_request_state(
    aws_mqtt_send_request_fn)(uint16_t packet_id, bool is_first_attempt, void *userdata);

/**
 * Called when the operation statistics change.
 */
typedef void(aws_mqtt_on_operation_statistics_fn)(struct aws_mqtt_client_connection *connection, void *userdata);

/* Flags that indicate the way in which way an operation is currently affecting the statistics of the connection */
enum aws_mqtt_operation_statistic_state_flags {
    /* The operation is not affecting the connection's statistics at all */
    AWS_MQTT_OSS_NONE = 0,

    /* The operation is affecting the connection's "incomplete operation" statistics */
    AWS_MQTT_OSS_INCOMPLETE = 1 << 0,

    /* The operation is affecting the connection's "unacked operation" statistics */
    AWS_MQTT_OSS_UNACKED = 1 << 1,
};

struct aws_mqtt_request {
    struct aws_linked_list_node list_node;

    struct aws_allocator *allocator;
    struct aws_mqtt_client_connection *connection;

    struct aws_channel_task outgoing_task;

    /* How this operation is currently affecting the statistics of the connection */
    enum aws_mqtt_operation_statistic_state_flags statistic_state_flags;
    /* The encoded size of the packet - used for operation statistics tracking */
    uint64_t packet_size;

    uint16_t packet_id;
    bool retryable;
    bool initiated;
    aws_mqtt_send_request_fn *send_request;
    void *send_request_ud;
    aws_mqtt_op_complete_fn *on_complete;
    void *on_complete_ud;
};

struct aws_mqtt_reconnect_task {
    struct aws_task task;
    struct aws_atomic_var connection_ptr;
    struct aws_allocator *allocator;
};

/* The lifetime of this struct is from subscribe -> suback */
struct subscribe_task_arg {

    struct aws_mqtt_client_connection *connection;

    /* list of pointer of subscribe_task_topics */
    struct aws_array_list topics;

    /* Packet to populate */
    struct aws_mqtt_packet_subscribe subscribe;

    /* true if transaction was committed to the topic tree, false requires a retry */
    bool tree_updated;

    struct {
        aws_mqtt_suback_multi_fn *multi;
        aws_mqtt_suback_fn *single;
    } on_suback;
    void *on_suback_ud;
};

/* The lifetime of this struct is the same as the lifetime of the subscription */
struct subscribe_task_topic {
    struct aws_mqtt_client_connection *connection;

    struct aws_mqtt_topic_subscription request;
    struct aws_string *filter;
    bool is_local;

    struct aws_ref_count ref_count;
};

struct aws_mqtt_client_connection {

    struct aws_allocator *allocator;
    struct aws_ref_count ref_count;
    struct aws_mqtt_client *client;

    /* Channel handler information */
    struct aws_channel_handler handler;
    struct aws_channel_slot *slot;

    /* The host information, changed by user when state is AWS_MQTT_CLIENT_STATE_DISCONNECTED */
    struct aws_string *host_name;
    uint16_t port;
    struct aws_tls_connection_options tls_options;
    struct aws_socket_options socket_options;
    struct aws_http_proxy_config *http_proxy_config;
    struct aws_event_loop *loop;

    /* Connect parameters */
    struct aws_byte_buf client_id;
    bool clean_session;
    uint16_t keep_alive_time_secs;
    uint64_t ping_timeout_ns;
    uint64_t operation_timeout_ns;
    struct aws_string *username;
    struct aws_string *password;
    struct {
        struct aws_byte_buf topic;
        enum aws_mqtt_qos qos;
        bool retain;
        struct aws_byte_buf payload;
    } will;
    struct {
        uint64_t current_sec; /* seconds */
        uint64_t min_sec;     /* seconds */
        uint64_t max_sec;     /* seconds */

        /*
         * Invariant: this is always zero except when the current MQTT channel has received a successful connack
         * and is not yet shutdown.  During that interval, it is the timestamp the connack was received.
         */
        uint64_t channel_successful_connack_timestamp_ns;
    } reconnect_timeouts;

    /* User connection callbacks */
    aws_mqtt_client_on_connection_complete_fn *on_connection_complete;
    void *on_connection_complete_ud;
    aws_mqtt_client_on_connection_interrupted_fn *on_interrupted;
    void *on_interrupted_ud;
    aws_mqtt_client_on_connection_resumed_fn *on_resumed;
    void *on_resumed_ud;
    aws_mqtt_client_on_connection_closed_fn *on_closed;
    void *on_closed_ud;
    aws_mqtt_client_publish_received_fn *on_any_publish;
    void *on_any_publish_ud;
    aws_mqtt_client_on_disconnect_fn *on_disconnect;
    void *on_disconnect_ud;
    aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics;
    void *on_any_operation_statistics_ud;

    /* Connection tasks. */
    struct aws_mqtt_reconnect_task *reconnect_task;
    struct aws_channel_task ping_task;

    /**
     * Number of times this connection has successfully CONNACK-ed, used
     * to ensure on_connection_completed is sent on the first completed
     * CONNECT/CONNACK cycle
     */
    size_t connection_count;
    bool use_tls; /* Only used by main thread */

    /* Only the event-loop thread may touch this data */
    struct {
        /* If an incomplete packet arrives, store the data here. */
        struct aws_byte_buf pending_packet;

        bool waiting_on_ping_response;

        /* Keeps track of all open subscriptions */
        /* TODO: The subscriptions are liveing with the connection object. So if the connection disconnect from one
         * endpoint and connect with another endpoint, the subscription tree will still be the same as before. */
        struct aws_mqtt_topic_tree subscriptions;

        /**
         * List of all requests waiting for response.
         */
        struct aws_linked_list ongoing_requests_list;
    } thread_data;

    /* Any thread may touch this data, but the lock must be held (unless it's an atomic) */
    struct {
        /* Note: never fire user callback with lock hold. */
        struct aws_mutex lock;

        /* The state of the connection */
        enum aws_mqtt_client_connection_state state;

        /**
         * Memory pool for all aws_mqtt_request.
         */
        struct aws_memory_pool requests_pool;

        /**
         * Store all requests that is not completed including the pending requests.
         *
         * hash table from uint16_t (packet_id) to aws_mqtt_outstanding_request
         */
        struct aws_hash_table outstanding_requests_table;

        /**
         * List of all requests that cannot be scheduled until the connection comes online.
         */
        struct aws_linked_list pending_requests_list;

        /**
         * Remember the last packet ID assigned.
         * Helps us find the next free ID faster.
         */
        uint16_t packet_id;

    } synced_data;

    struct {
        aws_mqtt_transform_websocket_handshake_fn *handshake_transformer;
        void *handshake_transformer_ud;
        aws_mqtt_validate_websocket_handshake_fn *handshake_validator;
        void *handshake_validator_ud;
        bool enabled;

        struct aws_http_message *handshake_request;
    } websocket;

    /**
     * Statistics tracking operational state
     */
    struct aws_mqtt_connection_operation_statistics_impl operation_statistics_impl;
};

struct aws_channel_handler_vtable *aws_mqtt_get_client_channel_vtable(void);

/* Helper for getting a message object for a packet */
struct aws_io_message *mqtt_get_message_for_packet(
    struct aws_mqtt_client_connection *connection,
    struct aws_mqtt_fixed_header *header);

void mqtt_connection_lock_synced_data(struct aws_mqtt_client_connection *connection);
void mqtt_connection_unlock_synced_data(struct aws_mqtt_client_connection *connection);

/* Note: needs to be called with lock held. */
void mqtt_connection_set_state(
    struct aws_mqtt_client_connection *connection,
    enum aws_mqtt_client_connection_state state);

/**
 * This function registers a new outstanding request and returns the message identifier to use (or 0 on error).
 * send_request will be called from request_timeout_task if everything succeed. Not called with error.
 * on_complete will be called once the request completed, either either in success or error.
 * noRetry is true for the packets will never be retried or offline queued.
 */
AWS_MQTT_API uint16_t mqtt_create_request(
    struct aws_mqtt_client_connection *connection,
    aws_mqtt_send_request_fn *send_request,
    void *send_request_ud,
    aws_mqtt_op_complete_fn *on_complete,
    void *on_complete_ud,
    bool noRetry,
    uint64_t packet_size);

/* Call when an ack packet comes back from the server. */
AWS_MQTT_API void mqtt_request_complete(
    struct aws_mqtt_client_connection *connection,
    int error_code,
    uint16_t packet_id);

/* Call to close the connection with an error code */
AWS_MQTT_API void mqtt_disconnect_impl(struct aws_mqtt_client_connection *connection, int error_code);

/* Creates the task used to reestablish a broken connection */
AWS_MQTT_API void aws_create_reconnect_task(struct aws_mqtt_client_connection *connection);

/**
 * Sets the callback to call whenever the operation statistics change.
 *
 * \param[in] connection                  The connection object
 * \param[in] on_operation_statistics     The function to call when the operation statistics change (pass NULL to unset)
 * \param[in] on_operation_statistics_ud  Userdata for on_operation_statistics
 */
AWS_MQTT_API int aws_mqtt_client_connection_set_on_operation_statistics_handler(
    struct aws_mqtt_client_connection *connection,
    aws_mqtt_on_operation_statistics_fn *on_operation_statistics,
    void *on_operation_statistics_ud);

/*
 * Sends a PINGREQ packet to the server to keep the connection alive. This is not exported and should not ever
 * be called directly. This function is driven by the timeout values passed to aws_mqtt_client_connect().
 * If a PINGRESP is not received within a reasonable period of time, the connection will be closed.
 *
 * \params[in] connection   The connection to ping on
 *
 * \returns AWS_OP_SUCCESS if the connection is open and the PINGREQ is sent or queued to send,
 *              otherwise AWS_OP_ERR and aws_last_error() is set.
 */
int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection *connection);

/**
 * Changes the operation statistics for the passed-in aws_mqtt_request. Used for tracking
 * whether operations have been completed or not.
 *
 * NOTE: This function will get lock the synced data! Do NOT call with the synced data already
 * held or the function will deadlock trying to get the lock
 *
 * @param connection The connection whose operations are being tracked
 * @param request The request to change the state of
 * @param new_state_flags The new state to use
 */
void aws_mqtt_connection_statistics_change_operation_statistic_state(
    struct aws_mqtt_client_connection *connection,
    struct aws_mqtt_request *request,
    enum aws_mqtt_operation_statistic_state_flags new_state_flags);

#endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_H */