diff options
| author | robot-contrib <[email protected]> | 2025-05-15 11:55:04 +0300 |
|---|---|---|
| committer | robot-contrib <[email protected]> | 2025-05-15 12:10:01 +0300 |
| commit | 5569fa802a72982a60a1a53697dc2e535c2c13e8 (patch) | |
| tree | e66068d6db87bab720a248f9e5ca2f3da0fde1f9 /contrib | |
| parent | f9bf93a1d46559e7c9f99d23ae31befd03476139 (diff) | |
Update contrib/restricted/aws/aws-c-mqtt to 0.13.0
commit_hash:900d7ae27d5925a212ae7faea8bcdad8f42ef947
Diffstat (limited to 'contrib')
32 files changed, 6111 insertions, 87 deletions
diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report index fb8c16b3fff..7edd55c4b9a 100644 --- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report +++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report @@ -50,10 +50,14 @@ BELONGS ya.make include/aws/mqtt/private/client_impl_shared.h [5:5] include/aws/mqtt/private/fixed_header.h [5:5] include/aws/mqtt/private/mqtt311_decoder.h [5:5] + include/aws/mqtt/private/mqtt311_listener.h [2:2] include/aws/mqtt/private/mqtt_client_test_helper.h [4:4] include/aws/mqtt/private/mqtt_subscription_set.h [2:2] include/aws/mqtt/private/packets.h [5:5] - include/aws/mqtt/private/shared_constants.h [2:2] + include/aws/mqtt/private/request-response/protocol_adapter.h [5:5] + include/aws/mqtt/private/request-response/request_response_subscription_set.h [5:5] + include/aws/mqtt/private/request-response/subscription_manager.h [5:5] + include/aws/mqtt/private/shared.h [2:2] include/aws/mqtt/private/topic_tree.h [5:5] include/aws/mqtt/private/v5/mqtt5_callbacks.h [2:2] include/aws/mqtt/private/v5/mqtt5_client_impl.h [5:5] @@ -64,6 +68,7 @@ BELONGS ya.make include/aws/mqtt/private/v5/mqtt5_topic_alias.h [2:2] include/aws/mqtt/private/v5/mqtt5_utils.h [5:5] include/aws/mqtt/private/v5/rate_limiters.h [2:2] + include/aws/mqtt/request-response/request_response_client.h [5:5] include/aws/mqtt/v5/mqtt5_client.h [5:5] include/aws/mqtt/v5/mqtt5_listener.h [2:2] include/aws/mqtt/v5/mqtt5_packet_storage.h [5:5] @@ -74,9 +79,14 @@ BELONGS ya.make source/fixed_header.c [2:2] source/mqtt.c [2:2] source/mqtt311_decoder.c [2:2] + source/mqtt311_listener.c [2:2] source/mqtt_subscription_set.c [2:2] source/packets.c [2:2] - source/shared_constants.c [2:2] + source/request-response/protocol_adapter.c [2:2] + source/request-response/request_response_client.c [2:2] + source/request-response/request_response_subscription_set.c [2:2] + source/request-response/subscription_manager.c [2:2] + source/shared.c [2:2] source/topic_tree.c [2:2] source/v5/mqtt5_callbacks.c [2:2] source/v5/mqtt5_client.c [2:2] diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report index 6fc0570ab0d..051c89ca1a9 100644 --- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report +++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report @@ -73,7 +73,7 @@ KEEP Apache-2.0 6c901454b872854c0dea3ec06b67701a BELONGS ya.make License text: \## License - This library is licensed under the Apache 2.0 License. + This library is licensed under the Apache 2.0 License. Scancode info: Original SPDX id: Apache-2.0 Score : 100.00 @@ -99,10 +99,14 @@ BELONGS ya.make include/aws/mqtt/private/client_impl_shared.h [6:6] include/aws/mqtt/private/fixed_header.h [6:6] include/aws/mqtt/private/mqtt311_decoder.h [6:6] + include/aws/mqtt/private/mqtt311_listener.h [3:3] include/aws/mqtt/private/mqtt_client_test_helper.h [5:5] include/aws/mqtt/private/mqtt_subscription_set.h [3:3] include/aws/mqtt/private/packets.h [6:6] - include/aws/mqtt/private/shared_constants.h [3:3] + include/aws/mqtt/private/request-response/protocol_adapter.h [6:6] + include/aws/mqtt/private/request-response/request_response_subscription_set.h [6:6] + include/aws/mqtt/private/request-response/subscription_manager.h [6:6] + include/aws/mqtt/private/shared.h [3:3] include/aws/mqtt/private/topic_tree.h [6:6] include/aws/mqtt/private/v5/mqtt5_callbacks.h [3:3] include/aws/mqtt/private/v5/mqtt5_client_impl.h [6:6] @@ -113,6 +117,7 @@ BELONGS ya.make include/aws/mqtt/private/v5/mqtt5_topic_alias.h [3:3] include/aws/mqtt/private/v5/mqtt5_utils.h [6:6] include/aws/mqtt/private/v5/rate_limiters.h [3:3] + include/aws/mqtt/request-response/request_response_client.h [6:6] include/aws/mqtt/v5/mqtt5_client.h [6:6] include/aws/mqtt/v5/mqtt5_listener.h [3:3] include/aws/mqtt/v5/mqtt5_packet_storage.h [6:6] @@ -123,9 +128,14 @@ BELONGS ya.make source/fixed_header.c [3:3] source/mqtt.c [3:3] source/mqtt311_decoder.c [3:3] + source/mqtt311_listener.c [3:3] source/mqtt_subscription_set.c [3:3] source/packets.c [3:3] - source/shared_constants.c [3:3] + source/request-response/protocol_adapter.c [3:3] + source/request-response/request_response_client.c [3:3] + source/request-response/request_response_subscription_set.c [3:3] + source/request-response/subscription_manager.c [3:3] + source/shared.c [3:3] source/topic_tree.c [3:3] source/v5/mqtt5_callbacks.c [3:3] source/v5/mqtt5_client.c [3:3] diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt index fff5fc6ca1f..05e74d27d6c 100644 --- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt +++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt @@ -208,7 +208,7 @@ ====================Apache-2.0==================== ## License -This library is licensed under the Apache 2.0 License. +This library is licensed under the Apache 2.0 License. ====================Apache-2.0==================== diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix index f02bd667fdb..9b4a3b26076 100644 --- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix +++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix @@ -1,10 +1,10 @@ pkgs: attrs: with pkgs; with attrs; rec { - version = "0.10.4"; + version = "0.13.0"; src = fetchFromGitHub { owner = "awslabs"; repo = "aws-c-mqtt"; rev = "v${version}"; - hash = "sha256-i+ssZzHC8MPfyOaRqvjq0z7w772BJqIA6BwntW1fRek="; + hash = "sha256-mDtQ5H8PUExil2y70LaUPKx/PNqTscUnY6CQoYNfKY4="; }; } diff --git a/contrib/restricted/aws/aws-c-mqtt/README.md b/contrib/restricted/aws/aws-c-mqtt/README.md index 2f99277e399..6a6a7bf582f 100644 --- a/contrib/restricted/aws/aws-c-mqtt/README.md +++ b/contrib/restricted/aws/aws-c-mqtt/README.md @@ -4,13 +4,13 @@ C99 implementation of the MQTT 3.1.1 and MQTT 5 specifications. ## License -This library is licensed under the Apache 2.0 License. +This library is licensed under the Apache 2.0 License. ## Usage ### Building -CMake 3.1+ is required to build. +CMake 3.9+ is required to build. `<install-path>` must be an absolute path in the following instructions. @@ -203,6 +203,6 @@ the wire. For QoS 1, as soon as PUBACK comes back. For QoS 2, PUBCOMP. `topic` a ```c int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection *connection); ``` -Sends a PINGREQ packet to the server. +Sends a PINGREQ packet to the server. [aws-c-io]: https://github.com/awslabs/aws-c-io diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h index d87f4760f0a..b5d83e928fa 100644 --- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h @@ -4,7 +4,7 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ -#if defined(USE_WINDOWS_DLL_SEMANTICS) || defined(WIN32) +#if defined(AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined(_WIN32) # ifdef AWS_MQTT_USE_IMPORT_EXPORT # ifdef AWS_MQTT_EXPORTS # define AWS_MQTT_API __declspec(dllexport) @@ -15,13 +15,13 @@ # define AWS_MQTT_API # endif /* USE_IMPORT_EXPORT */ -#else /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */ -# if ((__GNUC__ >= 4) || defined(__clang__)) && defined(AWS_MQTT_USE_IMPORT_EXPORT) && defined(AWS_MQTT_EXPORTS) +#else /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */ +# if defined(AWS_MQTT_USE_IMPORT_EXPORT) && defined(AWS_MQTT_EXPORTS) # define AWS_MQTT_API __attribute__((visibility("default"))) # else # define AWS_MQTT_API -# endif /* __GNUC__ >= 4 || defined(__clang__) */ +# endif -#endif /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */ +#endif /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */ #endif /* AWS_MQTT_EXPORTS_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h index 38510c8c80e..d71d96a5d55 100644 --- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h @@ -82,6 +82,17 @@ enum aws_mqtt_error { AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS, AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE, AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE, + AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, + AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, + AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, + AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY, + AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE, + AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, + AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE, + AWS_ERROR_MQTT_REUQEST_RESPONSE_STREAM_ALREADY_ACTIVATED, + AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR, + AWS_ERROR_MQTT_REQUEST_RESPONSE_PAYLOAD_PARSE_ERROR, + AWS_ERROR_MQTT_REQUEST_RESPONSE_INVALID_RESPONSE_PATH, AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID), }; @@ -94,6 +105,7 @@ enum aws_mqtt_log_subject { AWS_LS_MQTT5_CLIENT, AWS_LS_MQTT5_CANARY, AWS_LS_MQTT5_TO_MQTT3_ADAPTER, + AWS_LS_MQTT_REQUEST_RESPONSE, }; /** Function called on cleanup of a userdata. */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h index 1d0dd67a0c2..ab9822c3b0b 100644 --- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h @@ -11,6 +11,7 @@ #include <aws/mqtt/private/client_impl_shared.h> #include <aws/mqtt/private/fixed_header.h> #include <aws/mqtt/private/mqtt311_decoder.h> +#include <aws/mqtt/private/mqtt311_listener.h> #include <aws/mqtt/private/topic_tree.h> #include <aws/common/hash_table.h> @@ -153,6 +154,25 @@ struct aws_mqtt_reconnect_task { struct aws_allocator *allocator; }; +struct request_timeout_wrapper; + +/* used for timeout task */ +struct request_timeout_task_arg { + uint16_t packet_id; + struct aws_mqtt_client_connection_311_impl *connection; + struct request_timeout_wrapper *task_arg_wrapper; +}; + +/* + * We want the timeout task to be able to destroy the forward reference from the operation's task arg structure + * to the timeout task. But the operation task arg structures don't have any data structure in common. So to allow + * the timeout to refer back to a zero-able forward pointer, we wrap a pointer to the timeout task and embed it + * in every operation's task arg that needs to create a timeout. + */ +struct request_timeout_wrapper { + struct request_timeout_task_arg *timeout_task_arg; +}; + /* The lifetime of this struct is from subscribe -> suback */ struct subscribe_task_arg { @@ -172,6 +192,9 @@ struct subscribe_task_arg { aws_mqtt_suback_fn *single; } on_suback; void *on_suback_ud; + + struct request_timeout_wrapper timeout_wrapper; + uint64_t timeout_duration_in_ns; }; /* The lifetime of this struct is the same as the lifetime of the subscription */ @@ -255,6 +278,9 @@ struct aws_mqtt_client_connection_311_impl { aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics; void *on_any_operation_statistics_ud; + /* listener callbacks */ + struct aws_mqtt311_callback_set_manager callback_manager; + /* Connection tasks. */ struct aws_mqtt_reconnect_task *reconnect_task; struct aws_channel_task ping_task; @@ -425,4 +451,32 @@ void aws_mqtt_connection_statistics_change_operation_statistic_state( AWS_MQTT_API const struct aws_mqtt_client_connection_packet_handlers *aws_mqtt311_get_default_packet_handlers(void); +AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_unsubscribe( + struct aws_mqtt_client_connection_311_impl *connection, + const struct aws_byte_cursor *topic_filter, + aws_mqtt_op_complete_fn *on_unsuback, + void *on_unsuback_ud, + uint64_t timeout_ns); + +AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_subscribe( + struct aws_mqtt_client_connection_311_impl *connection, + const struct aws_byte_cursor *topic_filter, + enum aws_mqtt_qos qos, + aws_mqtt_client_publish_received_fn *on_publish, + void *on_publish_ud, + aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, + aws_mqtt_suback_fn *on_suback, + void *on_suback_ud, + uint64_t timeout_ns); + +AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_publish( + struct aws_mqtt_client_connection_311_impl *connection, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, + bool retain, + const struct aws_byte_cursor *payload, + aws_mqtt_op_complete_fn *on_complete, + void *userdata, + uint64_t timeout_ns); + #endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h index d244cfe7c66..248b4658e60 100644 --- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h @@ -10,6 +10,20 @@ struct aws_mqtt_client_connection; +/* + * Internal enum that indicates what type of struct the underlying impl pointer actually is. We use this + * to safely interact with private APIs on the implementation or extract the adapted 5 client directly, as + * necessary. + */ +enum aws_mqtt311_impl_type { + + /* 311 connection impl can be cast to `struct aws_mqtt_client_connection_311_impl` */ + AWS_MQTT311_IT_311_CONNECTION, + + /* 311 connection impl can be cast to `struct aws_mqtt_client_connection_5_impl`*/ + AWS_MQTT311_IT_5_ADAPTER, +}; + struct aws_mqtt_client_connection_vtable { struct aws_mqtt_client_connection *(*acquire_fn)(void *impl); @@ -107,6 +121,10 @@ struct aws_mqtt_client_connection_vtable { void *userdata); int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats); + + enum aws_mqtt311_impl_type (*get_impl_type)(const void *impl); + + struct aws_event_loop *(*get_event_loop)(const void *impl); }; struct aws_mqtt_client_connection { @@ -114,10 +132,16 @@ struct aws_mqtt_client_connection { void *impl; }; +AWS_MQTT_API enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type( + const struct aws_mqtt_client_connection *connection); + AWS_MQTT_API uint64_t aws_mqtt_hash_uint16_t(const void *item); AWS_MQTT_API bool aws_mqtt_compare_uint16_t_eq(const void *a, const void *b); AWS_MQTT_API bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b); +AWS_MQTT_API struct aws_event_loop *aws_mqtt_client_connection_get_event_loop( + const struct aws_mqtt_client_connection *connection); + #endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_SHARED_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/mqtt311_listener.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/mqtt311_listener.h new file mode 100644 index 00000000000..c66e4f246ec --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/mqtt311_listener.h @@ -0,0 +1,204 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#ifndef AWS_MQTT_MQTT311_LISTENER_H +#define AWS_MQTT_MQTT311_LISTENER_H + +#include <aws/mqtt/mqtt.h> + +#include <aws/common/rw_lock.h> +#include <aws/mqtt/client.h> + +AWS_PUSH_SANE_WARNING_LEVEL + +/** + * Callback signature for when an mqtt311 listener has completely destroyed itself. + */ +typedef void(aws_mqtt311_listener_termination_completion_fn)(void *complete_ctx); + +/** + * A record that tracks MQTT311 client connection callbacks which can be dynamically injected via a listener. + * + * All the callbacks that are supported here are invoked only on the 311 connection's event loop. With the + * add/remove callback set also on the event loop, everything is correctly serialized without data races. + * + * If binding additional callbacks, they must only be invoked from the connection's event loop. + * + * We only listen to connection-success because the only connection-level event we care about is a failure + * to rejoin a session (which invalidates all subscriptions that were considered valid) + */ +struct aws_mqtt311_callback_set { + + /* Called from s_packet_handler_publish which is event-loop invoked */ + aws_mqtt_client_publish_received_fn *publish_received_handler; + + /* Called from s_packet_handler_connack which is event-loop invoked */ + aws_mqtt_client_on_connection_success_fn *connection_success_handler; + + /* Called from s_mqtt_client_shutdown which is event-loop invoked */ + aws_mqtt_client_on_connection_interrupted_fn *connection_interrupted_handler; + + /* Called from s_mqtt_client_shutdown which is event-loop invoked */ + aws_mqtt_client_on_disconnect_fn *disconnect_handler; + + void *user_data; +}; + +/** + * An internal type for managing chains of callbacks attached to an mqtt311 client connection. Supports chains for + * lifecycle events and incoming publish packet handling. + * + * Assumed to be owned and used only by an MQTT311 client connection. + */ +struct aws_mqtt311_callback_set_manager { + struct aws_allocator *allocator; + + struct aws_mqtt_client_connection *connection; + + struct aws_linked_list callback_set_entries; + + uint64_t next_callback_set_entry_id; +}; + +/** + * Configuration options for MQTT311 listener objects. + */ +struct aws_mqtt311_listener_config { + + /** + * MQTT311 client connection to listen to events on + */ + struct aws_mqtt_client_connection *connection; + + /** + * Callbacks to invoke when events occur on the MQTT311 client connection + */ + struct aws_mqtt311_callback_set listener_callbacks; + + /** + * Listener destruction is asynchronous and thus requires a termination callback and associated user data + * to notify the user that the listener has been fully destroyed and no further events will be received. + */ + aws_mqtt311_listener_termination_completion_fn *termination_callback; + void *termination_callback_user_data; +}; + +AWS_EXTERN_C_BEGIN + +/** + * Creates a new MQTT311 listener object. For as long as the listener lives, incoming publishes and lifecycle events + * will be forwarded to the callbacks configured on the listener. + * + * @param allocator allocator to use + * @param config listener configuration + * @return a new aws_mqtt311_listener object + */ +AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_new( + struct aws_allocator *allocator, + struct aws_mqtt311_listener_config *config); + +/** + * Adds a reference to an mqtt311 listener. + * + * @param listener listener to add a reference to + * @return the listener object + */ +AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener); + +/** + * Removes a reference to an mqtt311 listener. When the reference count drops to zero, the listener's asynchronous + * destruction will be started. + * + * @param listener listener to remove a reference from + * @return NULL + */ +AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener); + +/** + * Initializes a callback set manager + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_init( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *connection); + +/** + * Cleans up a callback set manager. + * + * aws_mqtt311_callback_set_manager_init must have been previously called or this will crash. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager); + +/** + * Adds a callback set to the front of the handler chain. Returns an integer id that can be used to selectively + * remove the callback set from the manager. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +uint64_t aws_mqtt311_callback_set_manager_push_front( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_mqtt311_callback_set *callback_set); + +/** + * Removes a callback set from the handler chain. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_remove( + struct aws_mqtt311_callback_set_manager *manager, + uint64_t callback_set_id); + +/** + * Walks the incoming publish handler chain for an MQTT311 connection, invoking each in sequence. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_publish_received( + struct aws_mqtt311_callback_set_manager *manager, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + bool dup, + enum aws_mqtt_qos qos, + bool retain); + +/** + * Invokes a connection success event on each listener in the manager's collection of callback sets. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_connection_success( + struct aws_mqtt311_callback_set_manager *manager, + enum aws_mqtt_connect_return_code return_code, + bool rejoined_session); + +/** + * Invokes a connection interrupted event on each listener in the manager's collection of callback sets. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_connection_interrupted( + struct aws_mqtt311_callback_set_manager *manager, + int error_code); + +/** + * Invokes a disconnection event on each listener in the manager's collection of callback sets. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_disconnect(struct aws_mqtt311_callback_set_manager *manager); + +AWS_EXTERN_C_END + +AWS_POP_SANE_WARNING_LEVEL + +#endif /* AWS_MQTT_MQTT311_LISTENER_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/protocol_adapter.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/protocol_adapter.h new file mode 100644 index 00000000000..adcc5e1abad --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/protocol_adapter.h @@ -0,0 +1,211 @@ +#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H +#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/exports.h> +#include <aws/mqtt/mqtt.h> + +#include <aws/common/byte_buf.h> + +struct aws_allocator; +struct aws_event_loop; +struct aws_mqtt_client_connection; +struct aws_mqtt5_client; +struct aws_mqtt_rr_incoming_publish_event; + +/* + * The request-response protocol adapter is a translation layer that sits between the request-response native client + * implementation and a protocol client capable of subscribing, unsubscribing, and publishing MQTT messages. + * Valid protocol clients include the CRT MQTT5 client, the CRT MQTT311 client, and an eventstream RPC connection + * that belongs to a Greengrass IPC client. Each of these protocol clients has a different (or even implicit) + * contract for carrying out pub-sub operations. The protocol adapter abstracts these details with a simple, + * minimal interface based on the requirements identified in the request-response design documents. + */ + +/* + * Minimal MQTT subscribe options + */ +struct aws_protocol_adapter_subscribe_options { + struct aws_byte_cursor topic_filter; + uint32_t ack_timeout_seconds; +}; + +/* + * Minimal MQTT unsubscribe options + */ +struct aws_protocol_adapter_unsubscribe_options { + struct aws_byte_cursor topic_filter; + uint32_t ack_timeout_seconds; +}; + +/* + * Minimal MQTT publish options + */ +struct aws_protocol_adapter_publish_options { + struct aws_byte_cursor topic; + struct aws_byte_cursor payload; + uint32_t ack_timeout_seconds; + + /* + * Invoked on success/failure of the publish itself. Our implementations use QoS1 which means that success + * will be on puback receipt. + */ + void (*completion_callback_fn)(int, void *); + + /* + * User data to pass in when invoking the completion callback + */ + void *user_data; +}; + +/* + * Describes the type of subscription event (relative to a topic filter) + */ +enum aws_protocol_adapter_subscription_event_type { + AWS_PASET_SUBSCRIBE, + AWS_PASET_UNSUBSCRIBE, +}; + +/* + * An event emitted by the protocol adapter when a subscribe or unsubscribe is completed by the adapted protocol + * client. + */ +struct aws_protocol_adapter_subscription_event { + struct aws_byte_cursor topic_filter; + enum aws_protocol_adapter_subscription_event_type event_type; + int error_code; + bool retryable; +}; + +enum aws_protocol_adapter_connection_event_type { + AWS_PACET_CONNECTED, + AWS_PACET_DISCONNECTED, +}; + +/* + * An event emitted by the protocol adapter whenever the protocol client's connection status changes + */ +struct aws_protocol_adapter_connection_event { + enum aws_protocol_adapter_connection_event_type event_type; + bool joined_session; +}; + +typedef void(aws_protocol_adapter_subscription_event_fn)( + const struct aws_protocol_adapter_subscription_event *event, + void *user_data); + +typedef void(aws_protocol_adapter_incoming_publish_fn)( + const struct aws_mqtt_rr_incoming_publish_event *publish, + void *user_data); + +typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data); + +typedef void(aws_protocol_adapter_connection_event_fn)( + const struct aws_protocol_adapter_connection_event *event, + void *user_data); + +/* + * Set of callbacks invoked by the protocol adapter. These must all be set. + */ +struct aws_mqtt_protocol_adapter_options { + aws_protocol_adapter_subscription_event_fn *subscription_event_callback; + aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback; + aws_protocol_adapter_terminate_callback_fn *terminate_callback; + aws_protocol_adapter_connection_event_fn *connection_event_callback; + + /* + * User data to pass into all singleton protocol adapter callbacks. Likely either the request-response client + * or the subscription manager component of the request-response client. + */ + void *user_data; +}; + +struct aws_mqtt_protocol_adapter_vtable { + + void (*aws_mqtt_protocol_adapter_destroy_fn)(void *); + + int (*aws_mqtt_protocol_adapter_subscribe_fn)(void *, struct aws_protocol_adapter_subscribe_options *); + + int (*aws_mqtt_protocol_adapter_unsubscribe_fn)(void *, struct aws_protocol_adapter_unsubscribe_options *); + + int (*aws_mqtt_protocol_adapter_publish_fn)(void *, struct aws_protocol_adapter_publish_options *); + + bool (*aws_mqtt_protocol_adapter_is_connected_fn)(void *); + + struct aws_event_loop *(*aws_mqtt_protocol_adapter_get_event_loop_fn)(void *); +}; + +struct aws_mqtt_protocol_adapter { + const struct aws_mqtt_protocol_adapter_vtable *vtable; + void *impl; +}; + +AWS_EXTERN_C_BEGIN + +/* + * Creates a new request-response protocol adapter from an MQTT311 client + */ +AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311( + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter_options *options, + struct aws_mqtt_client_connection *connection); + +/* + * Creates a new request-response protocol adapter from an MQTT5 client + */ +AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5( + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter_options *options, + struct aws_mqtt5_client *client); + +/* + * Destroys a request-response protocol adapter. Destruction is an asynchronous process and the caller must + * wait for the termination callback to be invoked before assuming that no further callbacks will be invoked. + */ +AWS_MQTT_API void aws_mqtt_protocol_adapter_destroy(struct aws_mqtt_protocol_adapter *adapter); + +/* + * Asks the adapted protocol client to perform an MQTT subscribe operation + */ +AWS_MQTT_API int aws_mqtt_protocol_adapter_subscribe( + struct aws_mqtt_protocol_adapter *adapter, + struct aws_protocol_adapter_subscribe_options *options); + +/* + * Asks the adapted protocol client to perform an MQTT unsubscribe operation + */ +AWS_MQTT_API int aws_mqtt_protocol_adapter_unsubscribe( + struct aws_mqtt_protocol_adapter *adapter, + struct aws_protocol_adapter_unsubscribe_options *options); + +/* + * Asks the adapted protocol client to perform an MQTT publish operation + */ +AWS_MQTT_API int aws_mqtt_protocol_adapter_publish( + struct aws_mqtt_protocol_adapter *adapter, + struct aws_protocol_adapter_publish_options *options); + +/* + * Synchronously checks the connection state of the adapted protocol client. May only be called from the + * protocol client's event loop. + */ +AWS_MQTT_API bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter); + +/* + * Returns the event loop that the protocol client is bound to. + */ +AWS_MQTT_API struct aws_event_loop *aws_mqtt_protocol_adapter_get_event_loop(struct aws_mqtt_protocol_adapter *adapter); + +AWS_MQTT_API const char *aws_protocol_adapter_subscription_event_type_to_c_str( + enum aws_protocol_adapter_subscription_event_type type); + +AWS_MQTT_API const char *aws_protocol_adapter_connection_event_type_to_c_str( + enum aws_protocol_adapter_connection_event_type type); + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/request_response_subscription_set.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/request_response_subscription_set.h new file mode 100644 index 00000000000..e24c220b6c8 --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/request_response_subscription_set.h @@ -0,0 +1,140 @@ +#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H +#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/common/byte_buf.h> +#include <aws/common/hash_table.h> +#include <aws/common/linked_list.h> +#include <aws/mqtt/exports.h> + +struct aws_mqtt_rr_incoming_publish_event; + +/* + * Handles subscriptions for request-response client. + * Lifetime of this struct is bound to request-response client. + */ +struct aws_request_response_subscriptions { + struct aws_allocator *allocator; + + /* + * Map from cursor (topic filter) -> list of streaming operations using that filter + * + * We don't garbage collect this table over the course of normal client operation. We only clean it up + * when the client is shutting down. + */ + struct aws_hash_table streaming_operation_subscription_lists; + + /* + * Map from cursor (topic filter with wildcards) -> list of streaming operations using that filter + * + * We don't garbage collect this table over the course of normal client operation. We only clean it up + * when the client is shutting down. + */ + struct aws_hash_table streaming_operation_wildcards_subscription_lists; + + /* + * Map from cursor (topic) -> request response path (topic, correlation token json path) + */ + struct aws_hash_table request_response_paths; +}; + +/* + * This is the (key and) value in stream subscriptions tables. + */ +struct aws_rr_operation_list_topic_filter_entry { + struct aws_allocator *allocator; + + struct aws_byte_cursor topic_filter_cursor; + struct aws_byte_buf topic_filter; + + struct aws_linked_list operations; +}; + +/* + * Value in request subscriptions table. + */ +struct aws_rr_response_path_entry { + struct aws_allocator *allocator; + + size_t ref_count; + + struct aws_byte_cursor topic_cursor; + struct aws_byte_buf topic; + + struct aws_byte_buf correlation_token_json_path; +}; + +/* + * Callback type for matched stream subscriptions. + */ +typedef void(aws_mqtt_stream_operation_subscription_match_fn)( + const struct aws_linked_list *operations, + const struct aws_byte_cursor *topic_filter, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + void *user_data); + +/* + * Callback type for matched request subscriptions. + */ +typedef void(aws_mqtt_request_operation_subscription_match_fn)( + struct aws_rr_response_path_entry *entry, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + void *user_data); + +AWS_EXTERN_C_BEGIN + +/* + * Initialize internal state of a provided request-response subscriptions structure. + */ +AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_init( + struct aws_request_response_subscriptions *subscriptions, + struct aws_allocator *allocator); + +/* + * Clean up internals of a provided request-response subscriptions structure. + */ +AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_clean_up( + struct aws_request_response_subscriptions *subscriptions); + +/* + * Add a subscription for stream operations. + * If subscription with the same topic filter is already added, previously created + * aws_rr_operation_list_topic_filter_entry instance is returned. + */ +AWS_MQTT_API struct aws_rr_operation_list_topic_filter_entry * + aws_mqtt_request_response_client_subscriptions_add_stream_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter); + +/* + * Add a subscription for request operation. + */ +AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter, + const struct aws_byte_cursor *correlation_token_json_path); + +/* + * Remove a subscription for a given request operation. + */ +AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_remove_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter); + +/* + * Call specified callbacks for all stream and request operations with subscriptions matching a provided publish event. + */ +AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_handle_incoming_publish( + const struct aws_request_response_subscriptions *subscriptions, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match, + void *user_data); + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_SET_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/subscription_manager.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/subscription_manager.h new file mode 100644 index 00000000000..da70b4c9aad --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/subscription_manager.h @@ -0,0 +1,264 @@ +#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H +#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/mqtt.h> + +#include <aws/common/hash_table.h> + +struct aws_mqtt_protocol_adapter; +struct aws_protocol_adapter_connection_event; +struct aws_protocol_adapter_subscription_event; + +/* + * Describes a change to the state of a request operation subscription + */ +enum aws_rr_subscription_event_type { + + /* + * A request subscription subscribe succeeded + */ + ARRSET_REQUEST_SUBSCRIBE_SUCCESS, + + /* + * A request subscription subscribe failed + */ + ARRSET_REQUEST_SUBSCRIBE_FAILURE, + + /* + * A previously successful request subscription has ended. + * + * Under normal circumstances this can happen when + * + * (1) failure to rejoin a session + */ + ARRSET_REQUEST_SUBSCRIPTION_ENDED, + + /* + * A streaming subscription subscribe succeeded + */ + ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, + + /* + * The protocol client failed to rejoin a session containing a previously-established streaming subscription + */ + ARRSET_STREAMING_SUBSCRIPTION_LOST, + + /* + * A streaming subscription subscribe attempt resulted in an error or reason code that the client has determined + * will result in indefinite failures to subscribe. In this case, we stop attempting to resubscribe. + * + * Situations that can lead to this: + * (1) Permission failures + * (2) Invalid topic filter + */ + ARRSET_STREAMING_SUBSCRIPTION_HALTED, + + /* + * A subscription has lost its last listener and can be purged + * + * This event is global; operation_id will always be zero. + */ + ARRSET_SUBSCRIPTION_EMPTY, + + /* + * A subscription has been unsubscribed from + * + * This event is global; operation_id will always be zero. + */ + ARRSET_UNSUBSCRIBE_COMPLETE, +}; + +struct aws_rr_subscription_status_event { + enum aws_rr_subscription_event_type type; + struct aws_byte_cursor topic_filter; + uint64_t operation_id; +}; + +/* + * Invariant: despite being on the same thread, these callbacks must be queued as cross-thread tasks on the native + * request-response client. This allows us to iterate internal collections without worrying about external + * callers disrupting things by invoking APIs back on us. + */ +typedef void( + aws_rr_subscription_status_event_callback_fn)(const struct aws_rr_subscription_status_event *event, void *userdata); + +struct aws_rr_subscription_manager_options { + + /* + * Maximum number of request-response subscriptions allowed. Must be at least two. + */ + size_t max_request_response_subscriptions; + + /* + * Maximum number of streaming subscriptions allowed. + */ + size_t max_streaming_subscriptions; + + /* + * Ack timeout to use for all subscribe and unsubscribe operations + */ + uint32_t operation_timeout_seconds; + + aws_rr_subscription_status_event_callback_fn *subscription_status_callback; + void *userdata; +}; + +/* + * The subscription manager works with the request-response client to handle subscriptions in an eager manner. + * Subscription purges are checked with every client service call. Unsubscribe failures don't trigger anything special, + * we'll just try again next time we look for subscription space. Subscribes are attempted on idle subscriptions + * that still need them, either in response to a new operation listener or a connection resumption event. + * + * We only allow one subscribe or unsubscribe to be outstanding at once for a given topic. If an operation requires a + * subscription while an unsubscribe is in progress the operation is blocked until the unsubscribe resolves. + * + * These invariants are dropped during shutdown. In that case, we immediately send unsubscribes for everything + * that is not already unsubscribing. + */ +struct aws_rr_subscription_manager { + struct aws_allocator *allocator; + + struct aws_rr_subscription_manager_options config; + + /* non-owning reference; the client is responsible for destroying this asynchronously (listener detachment) */ + struct aws_mqtt_protocol_adapter *protocol_adapter; + + /* &aws_rr_subscription_record.topic_filter_cursor -> aws_rr_subscription_record * */ + struct aws_hash_table subscriptions; + + bool is_protocol_client_connected; +}; + +enum aws_rr_subscription_type { + ARRST_EVENT_STREAM, + ARRST_REQUEST_RESPONSE, +}; + +struct aws_rr_acquire_subscription_options { + struct aws_byte_cursor *topic_filters; + size_t topic_filter_count; + + uint64_t operation_id; + enum aws_rr_subscription_type type; +}; + +struct aws_rr_release_subscription_options { + struct aws_byte_cursor *topic_filters; + size_t topic_filter_count; + + uint64_t operation_id; +}; + +enum aws_acquire_subscription_result_type { + + /* + * All requested subscriptions already exist and are active. The operation can proceed to the next stage. + */ + AASRT_SUBSCRIBED, + + /* + * The requested subscriptions now exist but at least one is not yet active. The operation must wait for subscribes + * to complete as success or failure. + */ + AASRT_SUBSCRIBING, + + /* + * At least one subscription does not exist and there is no room for it currently. Room may open up in the future, + * so the operation should wait. + */ + AASRT_BLOCKED, + + /* + * At least one subscription does not exist and there is no room for it. Unless an event stream subscription gets + * closed, no room will be available in the future. The operation should be failed. + */ + AASRT_NO_CAPACITY, + + /* + * An internal failure occurred while trying to establish subscriptions. The operation should be failed. + */ + AASRT_FAILURE +}; + +AWS_EXTERN_C_BEGIN + +/* + * Initializes a subscription manager. Every native request-response client owns a single subscription manager. + */ +AWS_MQTT_API void aws_rr_subscription_manager_init( + struct aws_rr_subscription_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter *protocol_adapter, + const struct aws_rr_subscription_manager_options *options); + +/* + * Cleans up a subscription manager. This is done early in the native request-response client shutdown process. + * After this API is called, no other subscription manager APIs will be called by the request-response client (during + * the rest of the asynchronous shutdown process). + */ +AWS_MQTT_API void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager); + +/* + * Requests the the subscription manager unsubscribe from all currently-unused subscriptions + */ +AWS_MQTT_API void aws_rr_subscription_manager_purge_unused(struct aws_rr_subscription_manager *manager); + +/* + * Signals to the subscription manager that the native request-response client is processing an operation that + * needs a subscription to a particular topic. Return value indicates to the request-response client how it should + * proceed with processing the operation. + */ +AWS_MQTT_API enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_acquire_subscription_options *options); + +/* + * Signals to the subscription manager that the native request-response client operation no longer + * needs a subscription to a particular topic. + */ +AWS_MQTT_API void aws_rr_subscription_manager_release_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_release_subscription_options *options); + +/* + * Notifies the subscription manager of a subscription status event. Invoked by the native request-response client + * that owns the subscription manager. The native request-response client also owns the protocol adapter that + * the subscription event originates from, so the control flow looks like: + * + * [Subscribe] + * subscription manager -> protocol adapter Subscribe -> protocol client Subscribe -> network... + * + * [Result] + * protocol client Suback/Timeout/Error -> protocol adapter -> native request-response client -> + * subscription manager (this API) + */ +AWS_MQTT_API void aws_rr_subscription_manager_on_protocol_adapter_subscription_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_subscription_event *event); + +/* + * Notifies the subscription manager of a connection status event. Invoked by the native request-response client + * that owns the subscription manager. The native request-response client also owns the protocol adapter that + * the connection event originates from. The control flow looks like: + * + * protocol client connect/disconnect -> protocol adapter -> native request-response client -> + * Subscription manager (this API) + */ +AWS_MQTT_API void aws_rr_subscription_manager_on_protocol_adapter_connection_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_connection_event *event); + +/* + * Checks subscription manager options for validity. + */ +AWS_MQTT_API bool aws_rr_subscription_manager_are_options_valid( + const struct aws_rr_subscription_manager_options *options); + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared_constants.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared.h index 0a835942a5e..a1165d8effd 100644 --- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared_constants.h +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared.h @@ -8,6 +8,8 @@ #include <aws/mqtt/mqtt.h> +#include <aws/mqtt/private/request-response/subscription_manager.h> + AWS_EXTERN_C_BEGIN AWS_MQTT_API extern const struct aws_byte_cursor *g_websocket_handshake_default_path; diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/request-response/request_response_client.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/request-response/request_response_client.h new file mode 100644 index 00000000000..e9a75e11966 --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/request-response/request_response_client.h @@ -0,0 +1,290 @@ +#ifndef AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H +#define AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/mqtt.h> + +struct aws_event_loop; +struct aws_mqtt_request_response_client; +struct aws_mqtt_client_connection; +struct aws_mqtt5_client; + +/* + * A response path is a pair of values - MQTT topic and a JSON path - that describe where a response to + * an MQTT-based request may arrive. For a given request type, there may be multiple response paths and each + * one is associated with a separate JSON schema for the response body. + */ +struct aws_mqtt_request_operation_response_path { + + /* + * MQTT topic that a response may arrive on. + */ + struct aws_byte_cursor topic; + + /* + * JSON path for finding correlation tokens within payloads that arrive on this path's topic. + */ + struct aws_byte_cursor correlation_token_json_path; +}; + +/* + * An event emitted by a streaming operation's subscription. + */ +struct aws_mqtt_rr_incoming_publish_event { + struct aws_byte_cursor payload; + struct aws_byte_cursor topic; + /* Below are MQTT optional fields. For MQTT3, they will always be empty, as MQTT3 does not support them. For MQTT5, + * they will be set if they are present in a packet. */ + const struct aws_byte_cursor *content_type; + size_t user_property_count; + const struct aws_mqtt5_user_property *user_properties; + /* Even though this field is supposed to be used by MQTT broker to determine if a message-to-be-sent is expired, + * certain services use this field to specify client-side timeouts. */ + const uint32_t *message_expiry_interval_seconds; +}; + +/* + * Callback signature for request-response completion. + * + * Invariants: + * If error_code is non-zero then publish_event will be NULL. + * If publish_event is not NULL, then error_code will be 0. + */ +typedef void(aws_mqtt_request_operation_completion_fn)( + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + int error_code, + void *user_data); + +/* + * Configuration options for a request-response operation. + */ +struct aws_mqtt_request_operation_options { + + /* + * Set of topic filters that should be subscribed to in order to cover all possible response paths. Sometimes + * we can use wildcards to cut down on the subscriptions needed; sometimes we can't. + */ + struct aws_byte_cursor *subscription_topic_filters; + size_t subscription_topic_filter_count; + + /* + * Set of all possible response paths associated with this request type + */ + struct aws_mqtt_request_operation_response_path *response_paths; + size_t response_path_count; + + /* + * Topic to publish the request to once response subscriptions have been established. + */ + struct aws_byte_cursor publish_topic; + + /* + * Payload to publish in order to initiate the request + */ + struct aws_byte_cursor serialized_request; + + /* + * Correlation token embedded in the request that must be found in a response message. This can be the + * empty cursor to support certain services which don't use correlation tokens. In this case, the client + * only allows one request at a time to use the associated subscriptions; no concurrency is possible. There + * are some optimizations we could make here but for now, they're not worth the complexity. + */ + struct aws_byte_cursor correlation_token; + + /* + * Callback (and associated user data) to invoke when the request is completed. + */ + aws_mqtt_request_operation_completion_fn *completion_callback; + void *user_data; +}; + +/* + * Describes a change to the state of a streaming operation subscription + */ +enum aws_rr_streaming_subscription_event_type { + + /* + * The streaming operation is successfully subscribed to its topic (filter) + */ + ARRSSET_SUBSCRIPTION_ESTABLISHED, + + /* + * The streaming operation has temporarily lost its subscription to its topic (filter) + */ + ARRSSET_SUBSCRIPTION_LOST, + + /* + * The streaming operation has entered a terminal state where it has given up trying to subscribe + * to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy). + */ + ARRSSET_SUBSCRIPTION_HALTED, +}; + +/* + * Callback signature for when the subscription status of a streaming operation changes. + */ +typedef void(aws_mqtt_streaming_operation_subscription_status_fn)( + enum aws_rr_streaming_subscription_event_type status, + int error_code, + void *user_data); + +/* + * Callback signature for when a publish arrives that matches a streaming operation's subscription + */ +typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)( + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + void *user_data); + +/* + * Callback signature for when a streaming operation is fully destroyed and no more events will be emitted. + */ +typedef void(aws_mqtt_streaming_operation_terminated_fn)(void *user_data); + +/* + * Configuration options for a streaming operation. + */ +struct aws_mqtt_streaming_operation_options { + + /* + * Topic filter that the streaming operation should listen on + */ + struct aws_byte_cursor topic_filter; + + /* + * Callback for subscription status events + */ + aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback; + + /* + * Callback for publish messages that match the operation's topic filter + */ + aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback; + + /* + * Callback for streaming operation final shutdown + */ + aws_mqtt_streaming_operation_terminated_fn *terminated_callback; + + /* + * Data passed to all streaming operation callbacks + */ + void *user_data; +}; + +typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data); +typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data); + +/* + * Request-response client configuration options + */ +struct aws_mqtt_request_response_client_options { + + /* + * Maximum number of subscriptions that the client will concurrently use for request-response operations + */ + size_t max_request_response_subscriptions; + + /* + * Maximum number of subscriptions that the client will concurrently use for streaming operations + */ + size_t max_streaming_subscriptions; + + /* + * Duration, in seconds, that a request-response operation will wait for completion before giving up + */ + uint32_t operation_timeout_seconds; + + /* + * Request-response client initialization is asynchronous. This callback is invoked when the client is fully + * initialized. + * + * Do not bind the initialized callback; it exists mostly for tests and should not be exposed + */ + aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback; + + /* + * Callback invoked when the client's asynchronous destruction process has fully completed. + */ + aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback; + + /* + * Arbitrary data to pass to the client callbacks + */ + void *user_data; +}; + +AWS_EXTERN_C_BEGIN + +/* + * Create a new request-response client that uses an MQTT311 client. + */ +AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client( + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *client, + const struct aws_mqtt_request_response_client_options *options); + +/* + * Create a new request-response client that uses an MQTT5 client. + */ +AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client( + struct aws_allocator *allocator, + struct aws_mqtt5_client *client, + const struct aws_mqtt_request_response_client_options *options); + +/* + * Add a reference to a request-response client + */ +AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire( + struct aws_mqtt_request_response_client *client); + +/* + * Remove a reference from a request-response client + */ +AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release( + struct aws_mqtt_request_response_client *client); + +/* + * Submits a request operation to the client + */ +AWS_MQTT_API int aws_mqtt_request_response_client_submit_request( + struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_request_operation_options *request_options); + +/* + * Creates a new streaming operation. Streaming operations start in an inactive state and must be + * activated before its subscription can be established. + */ +AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_streaming_operation( + struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_streaming_operation_options *streaming_options); + +/* + * Returns the event loop used by the request-response client's protocol client + */ +AWS_MQTT_API struct aws_event_loop *aws_mqtt_request_response_client_get_event_loop( + struct aws_mqtt_request_response_client *client); + +/* + * Initiates a streaming operation's subscription process. + */ +AWS_MQTT_API int aws_mqtt_rr_client_operation_activate(struct aws_mqtt_rr_client_operation *operation); + +/* + * Add a reference to a streaming operation. + */ +AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire( + struct aws_mqtt_rr_client_operation *operation); + +/* + * Remove a reference from a streaming operation + */ +AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release( + struct aws_mqtt_rr_client_operation *operation); + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */ diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h index d04d29814b0..5cac371ce88 100644 --- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h +++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h @@ -341,6 +341,7 @@ struct aws_mqtt5_publish_completion_options { aws_mqtt5_publish_completion_fn *completion_callback; void *completion_user_data; + /** Overrides the client's ack timeout with this value, for this operation only */ uint32_t ack_timeout_seconds_override; }; @@ -351,6 +352,7 @@ struct aws_mqtt5_subscribe_completion_options { aws_mqtt5_subscribe_completion_fn *completion_callback; void *completion_user_data; + /** Overrides the client's ack timeout with this value, for this operation only */ uint32_t ack_timeout_seconds_override; }; @@ -361,6 +363,7 @@ struct aws_mqtt5_unsubscribe_completion_options { aws_mqtt5_unsubscribe_completion_fn *completion_callback; void *completion_user_data; + /** Overrides the client's ack timeout with this value, for this operation only */ uint32_t ack_timeout_seconds_override; }; diff --git a/contrib/restricted/aws/aws-c-mqtt/source/client.c b/contrib/restricted/aws/aws-c-mqtt/source/client.c index c332c4da864..5a3b309cb66 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/client.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/client.c @@ -7,7 +7,7 @@ #include <aws/mqtt/private/client_impl.h> #include <aws/mqtt/private/mqtt_client_test_helper.h> #include <aws/mqtt/private/packets.h> -#include <aws/mqtt/private/shared_constants.h> +#include <aws/mqtt/private/shared.h> #include <aws/mqtt/private/topic_tree.h> #include <aws/http/proxy.h> @@ -93,25 +93,6 @@ void mqtt_connection_set_state( connection->synced_data.state = state; } -struct request_timeout_wrapper; - -/* used for timeout task */ -struct request_timeout_task_arg { - uint16_t packet_id; - struct aws_mqtt_client_connection_311_impl *connection; - struct request_timeout_wrapper *task_arg_wrapper; -}; - -/* - * We want the timeout task to be able to destroy the forward reference from the operation's task arg structure - * to the timeout task. But the operation task arg structures don't have any data structure in common. So to allow - * the timeout to refer back to a zero-able forward pointer, we wrap a pointer to the timeout task and embed it - * in every operation's task arg that needs to create a timeout. - */ -struct request_timeout_wrapper { - struct request_timeout_task_arg *timeout_task_arg; -}; - static void s_request_timeout(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) { (void)channel_task; struct request_timeout_task_arg *timeout_task_arg = arg; @@ -139,8 +120,14 @@ static void s_request_timeout(struct aws_channel_task *channel_task, void *arg, static struct request_timeout_task_arg *s_schedule_timeout_task( struct aws_mqtt_client_connection_311_impl *connection, - uint16_t packet_id) { - /* schedule a timeout task to run, in case server consider the publish is not received */ + uint16_t packet_id, + uint64_t timeout_duration_in_ns) { + + if (timeout_duration_in_ns == UINT64_MAX || timeout_duration_in_ns == 0 || packet_id == 0) { + return NULL; + } + + /* schedule a timeout task to run, in case server never sends us an ack */ struct aws_channel_task *request_timeout_task = NULL; struct request_timeout_task_arg *timeout_task_arg = NULL; if (!aws_mem_acquire_many( @@ -161,7 +148,7 @@ static struct request_timeout_task_arg *s_schedule_timeout_task( aws_mem_release(connection->allocator, timeout_task_arg); return NULL; } - timestamp = aws_add_u64_saturating(timestamp, connection->operation_timeout_ns); + timestamp = aws_add_u64_saturating(timestamp, timeout_duration_in_ns); aws_channel_schedule_task_future(connection->slot->channel, request_timeout_task, timestamp); return timeout_task_arg; } @@ -259,6 +246,7 @@ static void s_mqtt_client_shutdown( (void)channel; struct aws_mqtt_client_connection_311_impl *connection = user_data; + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection->loop)); AWS_LOGF_TRACE( AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code); @@ -404,6 +392,7 @@ static void s_mqtt_client_shutdown( "id=%p: Connection interrupted, calling callback and attempting reconnect", (void *)connection); MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_interrupted, error_code); + aws_mqtt311_callback_set_manager_on_connection_interrupted(&connection->callback_manager, error_code); /* In case user called disconnect from the on_interrupted callback */ bool stop_reconnect; @@ -442,6 +431,7 @@ static void s_mqtt_client_shutdown( (void *)connection); MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect); MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL); + aws_mqtt311_callback_set_manager_on_disconnect(&connection->callback_manager); break; case AWS_MQTT_CLIENT_STATE_DISCONNECTING: AWS_LOGF_DEBUG( @@ -450,6 +440,7 @@ static void s_mqtt_client_shutdown( (void *)connection); MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect); MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL); + aws_mqtt311_callback_set_manager_on_disconnect(&connection->callback_manager); break; case AWS_MQTT_CLIENT_STATE_CONNECTING: AWS_LOGF_TRACE( @@ -801,6 +792,8 @@ static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connec termination_handler_user_data = connection->on_termination_ud; } + aws_mqtt311_callback_set_manager_clean_up(&connection->callback_manager); + /* If the reconnect_task isn't freed, free it */ if (connection->reconnect_task) { aws_mem_release(connection->reconnect_task->allocator, connection->reconnect_task); @@ -1370,6 +1363,27 @@ error: return AWS_OP_ERR; } +struct mqtt_on_websocket_setup_task_arg { + struct aws_allocator *allocator; + struct aws_task task; + struct aws_mqtt_client_connection_311_impl *connection; + int error_code; +}; + +static void s_on_websocket_setup_task_fn(struct aws_task *task, void *userdata, enum aws_task_status status) { + (void)task; + (void)status; + + struct mqtt_on_websocket_setup_task_arg *on_websocket_setup_task_arg = userdata; + struct aws_mqtt_client_connection_311_impl *connection = on_websocket_setup_task_arg->connection; + int error_code = on_websocket_setup_task_arg->error_code; + + aws_mem_release(on_websocket_setup_task_arg->allocator, on_websocket_setup_task_arg); + + struct aws_websocket_on_connection_setup_data websocket_setup = {.error_code = error_code}; + s_on_websocket_setup(&websocket_setup, connection); +} + static void s_websocket_handshake_transform_complete( struct aws_http_message *handshake_request, int error_code, @@ -1424,9 +1438,21 @@ static void s_websocket_handshake_transform_complete( return; error:; - /* Proceed to next step, telling it that we failed. */ - struct aws_websocket_on_connection_setup_data websocket_setup = {.error_code = error_code}; - s_on_websocket_setup(&websocket_setup, connection); + /* Proceed to next step, telling it that we failed. + * s_on_websocket_setup will shutdown MQTT connection, and this MUST happen on the MQTT connection's event loop. */ + struct mqtt_on_websocket_setup_task_arg *on_websocket_setup_task_arg = + aws_mem_calloc(connection->allocator, 1, sizeof(struct mqtt_on_websocket_setup_task_arg)); + on_websocket_setup_task_arg->allocator = connection->allocator; + /* NOTE: No need in acquiring MQTT connection ref counter, as it is already acquired at the start of connection + * process. s_on_websocket_setup will release it. */ + on_websocket_setup_task_arg->connection = connection; + on_websocket_setup_task_arg->error_code = error_code; + aws_task_init( + &on_websocket_setup_task_arg->task, + s_on_websocket_setup_task_fn, + (void *)on_websocket_setup_task_arg, + "on_websocket_setup_task"); + aws_event_loop_schedule_task_now(connection->loop, &on_websocket_setup_task_arg->task); } /******************************************************************************* @@ -1894,6 +1920,20 @@ static enum aws_mqtt_client_request_state s_subscribe_send(uint16_t packet_id, b aws_mem_release(message->allocator, message); } + /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion + * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly + * fire the on_completion callbacks. */ + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg) { + /* + * Set up mutual references between the operation task args and the timeout task args. Whoever runs first + * "wins", does its logic, and then breaks the connection between the two. + */ + task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; + timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; + } + if (!task_arg->tree_updated) { aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction); task_arg->tree_updated = true; @@ -1959,6 +1999,17 @@ static void s_subscribe_complete( error_code, task_arg->on_suback_ud); } + + /* + * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should + * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later + * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back + * pointer. + */ + if (task_arg->timeout_wrapper.timeout_task_arg) { + task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; + } + for (size_t i = 0; i < list_len; i++) { aws_array_list_get_at(&task_arg->topics, &topic, i); s_task_topic_release(topic); @@ -1991,6 +2042,7 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe_multiple( task_arg->connection = connection; task_arg->on_suback.multi = on_suback; task_arg->on_suback_ud = on_suback_ud; + task_arg->timeout_duration_in_ns = connection->operation_timeout_ns; const size_t num_topics = aws_array_list_length(topic_filters); @@ -2129,23 +2181,33 @@ static void s_subscribe_single_complete( error_code, task_arg->on_suback_ud); } + + /* + * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should + * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later + * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back + * pointer. + */ + if (task_arg->timeout_wrapper.timeout_task_arg) { + task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; + } + s_task_topic_release(topic); aws_array_list_clean_up(&task_arg->topics); aws_mqtt_packet_subscribe_clean_up(&task_arg->subscribe); aws_mem_release(task_arg->connection->allocator, task_arg); } -static uint16_t s_aws_mqtt_client_connection_311_subscribe( - void *impl, +uint16_t aws_mqtt_client_connection_311_subscribe( + struct aws_mqtt_client_connection_311_impl *connection, const struct aws_byte_cursor *topic_filter, enum aws_mqtt_qos qos, aws_mqtt_client_publish_received_fn *on_publish, void *on_publish_ud, aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, aws_mqtt_suback_fn *on_suback, - void *on_suback_ud) { - - struct aws_mqtt_client_connection_311_impl *connection = impl; + void *on_suback_ud, + uint64_t timeout_ns) { AWS_PRECONDITION(connection); @@ -2174,6 +2236,7 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe( task_arg->connection = connection; task_arg->on_suback.single = on_suback; task_arg->on_suback_ud = on_suback_ud; + task_arg->timeout_duration_in_ns = timeout_ns; /* It stores the pointer */ aws_array_list_init_static(&task_arg->topics, task_topic_storage, 1, sizeof(void *)); @@ -2249,6 +2312,29 @@ handle_error: return 0; } +static uint16_t s_aws_mqtt_client_connection_311_subscribe( + void *impl, + const struct aws_byte_cursor *topic_filter, + enum aws_mqtt_qos qos, + aws_mqtt_client_publish_received_fn *on_publish, + void *on_publish_ud, + aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, + aws_mqtt_suback_fn *on_suback, + void *on_suback_ud) { + + struct aws_mqtt_client_connection_311_impl *connection = impl; + return aws_mqtt_client_connection_311_subscribe( + connection, + topic_filter, + qos, + on_publish, + on_publish_ud, + on_ud_cleanup, + on_suback, + on_suback_ud, + connection->operation_timeout_ns); +} + /******************************************************************************* * Resubscribe ******************************************************************************/ @@ -2353,6 +2439,20 @@ static enum aws_mqtt_client_request_state s_resubscribe_send( aws_mem_release(message->allocator, message); } + /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion + * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly + * fire the on_completion callbacks. */ + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg) { + /* + * Set up mutual references between the operation task args and the timeout task args. Whoever runs first + * "wins", does its logic, and then breaks the connection between the two. + */ + task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; + timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; + } + return AWS_MQTT_CLIENT_REQUEST_ONGOING; handle_error: @@ -2416,6 +2516,16 @@ static void s_resubscribe_complete( clean_up: + /* + * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should + * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later + * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back + * pointer. + */ + if (task_arg->timeout_wrapper.timeout_task_arg) { + task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; + } + /* We need to cleanup the subscribe_task_topics, since they are not inserted into the topic tree by resubscribe. We * take the ownership to clean it up */ for (size_t i = 0; i < list_len; i++) { @@ -2445,6 +2555,7 @@ static uint16_t s_aws_mqtt_311_resubscribe_existing_topics( task_arg->connection = connection; task_arg->on_suback.multi = on_suback; task_arg->on_suback_ud = on_suback_ud; + task_arg->timeout_duration_in_ns = connection->operation_timeout_ns; /* Calculate the size of the packet. * The fixed header is 2 bytes and the packet ID is 2 bytes @@ -2505,6 +2616,7 @@ struct unsubscribe_task_arg { void *on_unsuback_ud; struct request_timeout_wrapper timeout_wrapper; + uint64_t timeout_duration_in_ns; }; static enum aws_mqtt_client_request_state s_unsubscribe_send( @@ -2596,18 +2708,17 @@ static enum aws_mqtt_client_request_state s_unsubscribe_send( /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly * fire the on_completion callbacks. */ - struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(task_arg->connection, packet_id); - if (!timeout_task_arg) { - return AWS_MQTT_CLIENT_REQUEST_ERROR; + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg) { + /* + * Set up mutual references between the operation task args and the timeout task args. Whoever runs first + * "wins", does its logic, and then breaks the connection between the two. + */ + task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; + timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; } - /* - * Set up mutual references between the operation task args and the timeout task args. Whoever runs first - * "wins", does its logic, and then breaks the connection between the two. - */ - task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; - timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; - if (!task_arg->tree_updated) { aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction); task_arg->tree_updated = true; @@ -2650,7 +2761,6 @@ static void s_unsubscribe_complete( */ if (task_arg->timeout_wrapper.timeout_task_arg) { task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; - task_arg->timeout_wrapper.timeout_task_arg = NULL; } if (task_arg->on_unsuback) { @@ -2662,13 +2772,12 @@ static void s_unsubscribe_complete( aws_mem_release(task_arg->connection->allocator, task_arg); } -static uint16_t s_aws_mqtt_client_connection_311_unsubscribe( - void *impl, +uint16_t aws_mqtt_client_connection_311_unsubscribe( + struct aws_mqtt_client_connection_311_impl *connection, const struct aws_byte_cursor *topic_filter, aws_mqtt_op_complete_fn *on_unsuback, - void *on_unsuback_ud) { - - struct aws_mqtt_client_connection_311_impl *connection = impl; + void *on_unsuback_ud, + uint64_t timeout_ns) { AWS_PRECONDITION(connection); @@ -2688,6 +2797,7 @@ static uint16_t s_aws_mqtt_client_connection_311_unsubscribe( task_arg->filter = aws_byte_cursor_from_string(task_arg->filter_string); task_arg->on_unsuback = on_unsuback; task_arg->on_unsuback_ud = on_unsuback_ud; + task_arg->timeout_duration_in_ns = timeout_ns; /* Calculate the size of the unsubscribe packet. * The fixed header is always 2 bytes, the packet ID is always 2 bytes @@ -2723,6 +2833,18 @@ handle_error: return 0; } +static uint16_t s_aws_mqtt_client_connection_311_unsubscribe( + void *impl, + const struct aws_byte_cursor *topic_filter, + aws_mqtt_op_complete_fn *on_unsuback, + void *on_unsuback_ud) { + + struct aws_mqtt_client_connection_311_impl *connection = impl; + + return aws_mqtt_client_connection_311_unsubscribe( + connection, topic_filter, on_unsuback, on_unsuback_ud, connection->operation_timeout_ns); +} + /******************************************************************************* * Publish ******************************************************************************/ @@ -2742,6 +2864,7 @@ struct publish_task_arg { aws_mqtt_op_complete_fn *on_complete; void *userdata; + uint64_t timeout_duration_in_ns; struct request_timeout_wrapper timeout_wrapper; }; @@ -2876,15 +2999,13 @@ static enum aws_mqtt_client_request_state s_publish_send(uint16_t packet_id, boo goto write_payload_chunk; } } - if (!is_qos_0 && connection->operation_timeout_ns != UINT64_MAX) { - /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion - * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly - * fire fire the on_completion callbacks. */ - struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(connection, packet_id); - if (!timeout_task_arg) { - return AWS_MQTT_CLIENT_REQUEST_ERROR; - } + /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion + * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly + * fire fire the on_completion callbacks. */ + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg != NULL) { /* * Set up mutual references between the operation task args and the timeout task args. Whoever runs first * "wins", does its logic, and then breaks the connection between the two. @@ -2921,7 +3042,6 @@ static void s_publish_complete( */ if (task_arg->timeout_wrapper.timeout_task_arg != NULL) { task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; - task_arg->timeout_wrapper.timeout_task_arg = NULL; } aws_byte_buf_clean_up(&task_arg->payload_buf); @@ -2929,16 +3049,15 @@ static void s_publish_complete( aws_mem_release(connection->allocator, task_arg); } -static uint16_t s_aws_mqtt_client_connection_311_publish( - void *impl, +uint16_t aws_mqtt_client_connection_311_publish( + struct aws_mqtt_client_connection_311_impl *connection, const struct aws_byte_cursor *topic, enum aws_mqtt_qos qos, bool retain, const struct aws_byte_cursor *payload, aws_mqtt_op_complete_fn *on_complete, - void *userdata) { - - struct aws_mqtt_client_connection_311_impl *connection = impl; + void *userdata, + uint64_t timeout_ns) { AWS_PRECONDITION(connection); @@ -2962,6 +3081,7 @@ static uint16_t s_aws_mqtt_client_connection_311_publish( arg->topic = aws_byte_cursor_from_string(arg->topic_string); arg->qos = qos; arg->retain = retain; + arg->timeout_duration_in_ns = timeout_ns; struct aws_byte_cursor payload_cursor; AWS_ZERO_STRUCT(payload_cursor); @@ -3019,6 +3139,21 @@ handle_error: return 0; } +static uint16_t s_aws_mqtt_client_connection_311_publish( + void *impl, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, + bool retain, + const struct aws_byte_cursor *payload, + aws_mqtt_op_complete_fn *on_complete, + void *userdata) { + + struct aws_mqtt_client_connection_311_impl *connection = impl; + + return aws_mqtt_client_connection_311_publish( + connection, topic, qos, retain, payload, on_complete, userdata, connection->operation_timeout_ns); +} + /******************************************************************************* * Ping ******************************************************************************/ @@ -3220,6 +3355,18 @@ static void s_aws_mqtt_client_connection_311_release(void *impl) { aws_ref_count_release(&connection->ref_count); } +static enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_311_get_impl(const void *impl) { + (void)impl; + + return AWS_MQTT311_IT_311_CONNECTION; +} + +static struct aws_event_loop *s_aws_mqtt_client_connection_311_get_event_loop(const void *impl) { + const struct aws_mqtt_client_connection_311_impl *connection = impl; + + return connection->loop; +} + static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311_vtable = { .acquire_fn = s_aws_mqtt_client_connection_311_acquire, .release_fn = s_aws_mqtt_client_connection_311_release, @@ -3243,6 +3390,8 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311 .unsubscribe_fn = s_aws_mqtt_client_connection_311_unsubscribe, .publish_fn = s_aws_mqtt_client_connection_311_publish, .get_stats_fn = s_aws_mqtt_client_connection_311_get_stats, + .get_impl_type = s_aws_mqtt_client_connection_311_get_impl, + .get_event_loop = s_aws_mqtt_client_connection_311_get_event_loop, }; static struct aws_mqtt_client_connection_vtable *s_aws_mqtt_client_connection_311_vtable_ptr = @@ -3344,6 +3493,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt connection->handler.vtable = aws_mqtt_get_client_channel_vtable(); connection->handler.impl = connection; + aws_mqtt311_callback_set_manager_init(&connection->callback_manager, connection->allocator, &connection->base); + return &connection->base; failed_init_outstanding_requests_table: diff --git a/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c b/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c index 2719f41c6c3..0fe344ec53b 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c @@ -263,6 +263,9 @@ static int s_packet_handler_connack(struct aws_byte_cursor message_cursor, void MQTT_CLIENT_CALL_CALLBACK_ARGS( connection, on_connection_success, connack.connect_return_code, connack.session_present); + aws_mqtt311_callback_set_manager_on_connection_success( + &connection->callback_manager, connack.connect_return_code, connack.session_present); + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: connection callback completed", (void *)connection); s_update_next_ping_time(connection); @@ -291,6 +294,9 @@ static int s_packet_handler_publish(struct aws_byte_cursor message_cursor, void MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_any_publish, &publish.topic_name, &publish.payload, dup, qos, retain); + aws_mqtt311_callback_set_manager_on_publish_received( + &connection->callback_manager, &publish.topic_name, &publish.payload, dup, qos, retain); + AWS_LOGF_TRACE( AWS_LS_MQTT_CLIENT, "id=%p: publish received with msg id=%" PRIu16 " dup=%d qos=%d retain=%d payload-size=%zu topic=" PRInSTR, diff --git a/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c b/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c index 6f65eb88e34..019adc5c7b9 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c @@ -204,6 +204,11 @@ int aws_mqtt_client_connection_get_stats( return (*connection->vtable->get_stats_fn)(connection->impl, stats); } +enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type( + const struct aws_mqtt_client_connection *connection) { + return (*connection->vtable->get_impl_type)(connection->impl); +} + uint64_t aws_mqtt_hash_uint16_t(const void *item) { return *(uint16_t *)item; } @@ -218,3 +223,7 @@ bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b) { return aws_byte_cursor_eq(a_cursor, b_cursor); } + +struct aws_event_loop *aws_mqtt_client_connection_get_event_loop(const struct aws_mqtt_client_connection *connection) { + return (*connection->vtable->get_event_loop)(connection->impl); +} diff --git a/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c b/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c index 9caeaa9bbaa..392dda20a5c 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c @@ -236,6 +236,39 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter) AWS_DEFINE_ERROR_INFO_MQTT( AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE, "MQTT ack packet received with a failing reason code"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, + "MQTT operation returned a failing reason code"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, + "Request operation failed due to client shut down"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, + "Request operation failed due to timeout"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY, + "Streaming request operation failed because there was no space for the subscription"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE, + "Request operation failed because the associated subscribe failed synchronously"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, + "Request operation failed due to a non-specific internal error within the client."), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE, + "Request-response operation failed because the associated publish failed synchronously."), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REUQEST_RESPONSE_STREAM_ALREADY_ACTIVATED, + "Streaming operation activation failed because the operation had already been activated."), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR, + "Request-response operation failed with a modeled service error."), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_PAYLOAD_PARSE_ERROR, + "Request-response operation failed due to an inability to parse the payload."), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_INVALID_RESPONSE_PATH, + "Request-response operation failed due to arrival on an unknown response path"), }; /* clang-format on */ #undef AWS_DEFINE_ERROR_INFO_MQTT @@ -254,6 +287,7 @@ static struct aws_error_info_list s_error_list = { DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CLIENT, "mqtt5-client", "MQTT5 client and connections"), DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CANARY, "mqtt5-canary", "MQTT5 canary logging"), DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_TO_MQTT3_ADAPTER, "mqtt5-to-mqtt3-adapter", "MQTT5-To-MQTT3 adapter logging"), + DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "mqtt-request-response-systems", "MQTT request-response systems logging"), }; /* clang-format on */ diff --git a/contrib/restricted/aws/aws-c-mqtt/source/mqtt311_listener.c b/contrib/restricted/aws/aws-c-mqtt/source/mqtt311_listener.c new file mode 100644 index 00000000000..69bec5286b3 --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/source/mqtt311_listener.c @@ -0,0 +1,329 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/private/mqtt311_listener.h> + +#include <aws/common/ref_count.h> +#include <aws/common/task_scheduler.h> +#include <aws/io/event_loop.h> +#include <aws/mqtt/private/client_impl.h> +#include <aws/mqtt/private/client_impl_shared.h> + +#include <inttypes.h> + +static struct aws_event_loop *s_mqtt_client_connection_get_event_loop( + const struct aws_mqtt_client_connection *connection) { + AWS_FATAL_ASSERT(aws_mqtt_client_connection_get_impl_type(connection) == AWS_MQTT311_IT_311_CONNECTION); + + struct aws_mqtt_client_connection_311_impl *connection_impl = connection->impl; + + return connection_impl->loop; +} + +struct aws_mqtt311_listener { + struct aws_allocator *allocator; + + struct aws_ref_count ref_count; + + struct aws_mqtt311_listener_config config; + + uint64_t callback_set_id; + + struct aws_task initialize_task; + struct aws_task terminate_task; +}; + +static void s_mqtt311_listener_destroy(struct aws_mqtt311_listener *listener) { + + aws_mqtt_client_connection_release(listener->config.connection); + + aws_mqtt311_listener_termination_completion_fn *termination_callback = listener->config.termination_callback; + void *termination_callback_user_data = listener->config.termination_callback_user_data; + + aws_mem_release(listener->allocator, listener); + + if (termination_callback != NULL) { + (*termination_callback)(termination_callback_user_data); + } +} + +static void s_mqtt311_listener_initialize_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task; + + struct aws_mqtt311_listener *listener = arg; + + if (task_status == AWS_TASK_STATUS_RUN_READY) { + struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl; + listener->callback_set_id = aws_mqtt311_callback_set_manager_push_front( + &connection_impl->callback_manager, &listener->config.listener_callbacks); + AWS_LOGF_INFO( + AWS_LS_MQTT_GENERAL, + "id=%p: Mqtt311 Listener initialized, listener id=%p", + (void *)listener->config.connection, + (void *)listener); + aws_mqtt311_listener_release(listener); + } else { + s_mqtt311_listener_destroy(listener); + } +} + +static void s_mqtt311_listener_terminate_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task; + + struct aws_mqtt311_listener *listener = arg; + + if (task_status == AWS_TASK_STATUS_RUN_READY) { + struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl; + aws_mqtt311_callback_set_manager_remove(&connection_impl->callback_manager, listener->callback_set_id); + } + + AWS_LOGF_INFO( + AWS_LS_MQTT_GENERAL, + "id=%p: Mqtt311 Listener terminated, listener id=%p", + (void *)listener->config.connection, + (void *)listener); + + s_mqtt311_listener_destroy(listener); +} + +static void s_aws_mqtt311_listener_on_zero_ref_count(void *context) { + struct aws_mqtt311_listener *listener = context; + + aws_event_loop_schedule_task_now( + s_mqtt_client_connection_get_event_loop(listener->config.connection), &listener->terminate_task); +} + +struct aws_mqtt311_listener *aws_mqtt311_listener_new( + struct aws_allocator *allocator, + struct aws_mqtt311_listener_config *config) { + if (config->connection == NULL) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + if (aws_mqtt_client_connection_get_impl_type(config->connection) != AWS_MQTT311_IT_311_CONNECTION) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + struct aws_mqtt311_listener *listener = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt311_listener)); + + listener->allocator = allocator; + listener->config = *config; + + aws_mqtt_client_connection_acquire(config->connection); + aws_ref_count_init(&listener->ref_count, listener, s_aws_mqtt311_listener_on_zero_ref_count); + + aws_task_init( + &listener->initialize_task, s_mqtt311_listener_initialize_task_fn, listener, "Mqtt311ListenerInitialize"); + aws_task_init( + &listener->terminate_task, s_mqtt311_listener_terminate_task_fn, listener, "Mqtt311ListenerTerminate"); + + aws_mqtt311_listener_acquire(listener); + aws_event_loop_schedule_task_now( + s_mqtt_client_connection_get_event_loop(config->connection), &listener->initialize_task); + + return listener; +} + +struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener) { + if (listener != NULL) { + aws_ref_count_acquire(&listener->ref_count); + } + + return listener; +} + +struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener) { + if (listener != NULL) { + aws_ref_count_release(&listener->ref_count); + } + + return NULL; +} + +struct aws_mqtt311_callback_set_entry { + struct aws_allocator *allocator; + + struct aws_linked_list_node node; + + uint64_t id; + + struct aws_mqtt311_callback_set callbacks; +}; + +void aws_mqtt311_callback_set_manager_init( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *connection) { + + manager->allocator = allocator; + manager->connection = connection; /* no need to ref count, this is assumed to be owned by the client connection */ + manager->next_callback_set_entry_id = 1; + + aws_linked_list_init(&manager->callback_set_entries); +} + +void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager) { + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = + AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + aws_linked_list_remove(&entry->node); + aws_mem_release(entry->allocator, entry); + } +} + +static struct aws_mqtt311_callback_set_entry *s_new_311_callback_set_entry( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_mqtt311_callback_set *callback_set) { + struct aws_mqtt311_callback_set_entry *entry = + aws_mem_calloc(manager->allocator, 1, sizeof(struct aws_mqtt311_callback_set_entry)); + + entry->allocator = manager->allocator; + entry->id = manager->next_callback_set_entry_id++; + entry->callbacks = *callback_set; + + AWS_LOGF_INFO( + AWS_LS_MQTT_GENERAL, + "id=%p: MQTT311 callback manager created new entry id=%" PRIu64, + (void *)manager->connection, + entry->id); + + return entry; +} + +uint64_t aws_mqtt311_callback_set_manager_push_front( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_mqtt311_callback_set *callback_set) { + + AWS_FATAL_ASSERT( + aws_event_loop_thread_is_callers_thread(s_mqtt_client_connection_get_event_loop(manager->connection))); + + struct aws_mqtt311_callback_set_entry *entry = s_new_311_callback_set_entry(manager, callback_set); + + aws_linked_list_push_front(&manager->callback_set_entries, &entry->node); + + return entry->id; +} + +void aws_mqtt311_callback_set_manager_remove( + struct aws_mqtt311_callback_set_manager *manager, + uint64_t callback_set_id) { + + AWS_FATAL_ASSERT( + aws_event_loop_thread_is_callers_thread(s_mqtt_client_connection_get_event_loop(manager->connection))); + + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = + AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + if (entry->id == callback_set_id) { + aws_linked_list_remove(&entry->node); + + AWS_LOGF_INFO( + AWS_LS_MQTT_GENERAL, + "id=%p: MQTT311 callback manager removed entry id=%" PRIu64, + (void *)manager->connection, + entry->id); + aws_mem_release(entry->allocator, entry); + return; + } + } + AWS_LOGF_INFO( + AWS_LS_MQTT_GENERAL, + "id=%p: MQTT311 callback manager failed to remove entry id=%" PRIu64 ", callback set id not found.", + (void *)manager->connection, + callback_set_id); +} + +void aws_mqtt311_callback_set_manager_on_publish_received( + struct aws_mqtt311_callback_set_manager *manager, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + bool dup, + enum aws_mqtt_qos qos, + bool retain) { + + struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl; + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop)); + + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = + AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + struct aws_mqtt311_callback_set *callback_set = &entry->callbacks; + if (callback_set->publish_received_handler != NULL) { + (*callback_set->publish_received_handler)( + manager->connection, topic, payload, dup, qos, retain, callback_set->user_data); + } + } +} + +void aws_mqtt311_callback_set_manager_on_connection_success( + struct aws_mqtt311_callback_set_manager *manager, + enum aws_mqtt_connect_return_code return_code, + bool rejoined_session) { + + struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl; + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop)); + + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = + AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + struct aws_mqtt311_callback_set *callback_set = &entry->callbacks; + if (callback_set->connection_success_handler != NULL) { + (*callback_set->connection_success_handler)( + manager->connection, return_code, rejoined_session, callback_set->user_data); + } + } +} + +void aws_mqtt311_callback_set_manager_on_connection_interrupted( + struct aws_mqtt311_callback_set_manager *manager, + int error_code) { + + struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl; + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop)); + + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = + AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + struct aws_mqtt311_callback_set *callback_set = &entry->callbacks; + if (callback_set->connection_interrupted_handler != NULL) { + (*callback_set->connection_interrupted_handler)(manager->connection, error_code, callback_set->user_data); + } + } +} + +void aws_mqtt311_callback_set_manager_on_disconnect(struct aws_mqtt311_callback_set_manager *manager) { + + struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl; + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop)); + + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = + AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + struct aws_mqtt311_callback_set *callback_set = &entry->callbacks; + if (callback_set->disconnect_handler != NULL) { + (*callback_set->disconnect_handler)(manager->connection, callback_set->user_data); + } + } +} diff --git a/contrib/restricted/aws/aws-c-mqtt/source/packets.c b/contrib/restricted/aws/aws-c-mqtt/source/packets.c index 1170bcca9c6..d208a9de677 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/packets.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/packets.c @@ -235,9 +235,8 @@ int aws_mqtt_packet_connect_encode(struct aws_byte_buf *buf, const struct aws_mq } /* Write connect flags [MQTT-3.1.2.3] */ - uint8_t connect_flags = (uint8_t)( - packet->clean_session << 1 | packet->has_will << 2 | packet->will_qos << 3 | packet->will_retain << 5 | - packet->has_password << 6 | packet->has_username << 7); + uint8_t connect_flags = (uint8_t)(packet->clean_session << 1 | packet->has_will << 2 | packet->will_qos << 3 | + packet->will_retain << 5 | packet->has_password << 6 | packet->has_username << 7); if (!aws_byte_buf_write_u8(buf, connect_flags)) { return aws_raise_error(AWS_ERROR_SHORT_BUFFER); diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/protocol_adapter.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/protocol_adapter.c new file mode 100644 index 00000000000..b01a7f8a88b --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/protocol_adapter.c @@ -0,0 +1,973 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/private/request-response/protocol_adapter.h> + +#include <aws/common/clock.h> +#include <aws/io/event_loop.h> +#include <aws/mqtt/private/client_impl.h> +#include <aws/mqtt/private/client_impl_shared.h> +#include <aws/mqtt/private/mqtt311_listener.h> +#include <aws/mqtt/private/v5/mqtt5_client_impl.h> +#include <aws/mqtt/private/v5/mqtt5_to_mqtt3_adapter_impl.h> +#include <aws/mqtt/request-response/request_response_client.h> +#include <aws/mqtt/v5/mqtt5_client.h> +#include <aws/mqtt/v5/mqtt5_listener.h> + +/* + * Basic API contract + * + * Invariant 1: Subscribe is only called from the RR subscription manager when going from 0 to 1 pending operations + * Invariant 2: Unsubscribe is only called from the RR subscription manager when there are 0 pending operations, not + * necessarily on the exact transition to zero though. + * + * Additional Notes + * + * Entries are not tracked with the exception of the eventstream impl which needs the stream handles to close. + * + * A subscribe failure does not trigger an unsubscribe, a status event. + * + * The sub manager is responsible for calling Unsubscribe on all its entries when shutting down + * (before releasing hold of the adapter). + * + * Retries, when appropriate, are the responsibility of the caller. + */ + +enum aws_mqtt_protocol_adapter_operation_type { + AMPAOT_SUBSCRIBE_UNSUBSCRIBE, + AMPAOT_PUBLISH, +}; + +struct aws_mqtt_protocol_adapter_sub_unsub_data { + struct aws_byte_buf topic_filter; +}; + +struct aws_mqtt_protocol_adapter_publish_data { + void (*completion_callback_fn)(int, void *); + void *user_data; +}; + +struct aws_mqtt_protocol_adapter_operation_userdata { + struct aws_allocator *allocator; + + struct aws_linked_list_node node; + void *adapter; + + enum aws_mqtt_protocol_adapter_operation_type operation_type; + + union { + struct aws_mqtt_protocol_adapter_sub_unsub_data sub_unsub_data; + struct aws_mqtt_protocol_adapter_publish_data publish_data; + } operation_data; +}; + +static struct aws_mqtt_protocol_adapter_operation_userdata *s_aws_mqtt_protocol_adapter_sub_unsub_data_new( + struct aws_allocator *allocator, + struct aws_byte_cursor topic_filter, + void *adapter) { + + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_operation_userdata)); + + subscribe_data->allocator = allocator; + subscribe_data->operation_type = AMPAOT_SUBSCRIBE_UNSUBSCRIBE; + subscribe_data->adapter = adapter; + aws_byte_buf_init_copy_from_cursor( + &subscribe_data->operation_data.sub_unsub_data.topic_filter, allocator, topic_filter); + + return subscribe_data; +} + +static struct aws_mqtt_protocol_adapter_operation_userdata *s_aws_mqtt_protocol_adapter_publish_data_new( + struct aws_allocator *allocator, + const struct aws_protocol_adapter_publish_options *publish_options, + void *adapter) { + + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_operation_userdata)); + + publish_data->allocator = allocator; + publish_data->operation_type = AMPAOT_PUBLISH; + publish_data->adapter = adapter; + + publish_data->operation_data.publish_data.completion_callback_fn = publish_options->completion_callback_fn; + publish_data->operation_data.publish_data.user_data = publish_options->user_data; + + return publish_data; +} + +static void s_aws_mqtt_protocol_adapter_operation_user_data_destroy( + struct aws_mqtt_protocol_adapter_operation_userdata *userdata) { + if (userdata == NULL) { + return; + } + + if (aws_linked_list_node_next_is_valid(&userdata->node) && aws_linked_list_node_prev_is_valid(&userdata->node)) { + aws_linked_list_remove(&userdata->node); + } + + if (userdata->operation_type == AMPAOT_SUBSCRIBE_UNSUBSCRIBE) { + aws_byte_buf_clean_up(&userdata->operation_data.sub_unsub_data.topic_filter); + } + + aws_mem_release(userdata->allocator, userdata); +} + +/*****************************************************************************************************************/ + +struct aws_mqtt_protocol_adapter_311_impl { + struct aws_allocator *allocator; + struct aws_mqtt_protocol_adapter base; + + struct aws_linked_list incomplete_operations; + struct aws_mqtt_protocol_adapter_options config; + + struct aws_event_loop *loop; + struct aws_mqtt_client_connection *connection; + struct aws_mqtt311_listener *listener; +}; + +static void s_aws_mqtt_protocol_adapter_311_destroy(void *impl) { + struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; + + // all the real cleanup is done in the listener termination callback + aws_mqtt311_listener_release(adapter->listener); +} + +/* Subscribe */ + +static void s_protocol_adapter_311_subscribe_completion( + struct aws_mqtt_client_connection *connection, + uint16_t packet_id, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, + int error_code, + void *userdata) { + (void)connection; + (void)topic; + (void)packet_id; + + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = userdata; + struct aws_mqtt_protocol_adapter_311_impl *adapter = subscribe_data->adapter; + + if (adapter == NULL) { + goto done; + } + + if (error_code == AWS_ERROR_SUCCESS) { + if (qos >= 128) { + error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE; + } + } + + struct aws_protocol_adapter_subscription_event subscribe_event = { + .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->operation_data.sub_unsub_data.topic_filter), + .event_type = AWS_PASET_SUBSCRIBE, + .error_code = error_code, + .retryable = true, + }; + + (*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data); + +done: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); +} + +int s_aws_mqtt_protocol_adapter_311_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { + struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; + struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl; + + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &subscribe_data->node); + + uint64_t timeout_nanos = + aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + if (aws_mqtt_client_connection_311_subscribe( + connection_impl, + &options->topic_filter, + AWS_MQTT_QOS_AT_LEAST_ONCE, + NULL, + NULL, + NULL, + s_protocol_adapter_311_subscribe_completion, + subscribe_data, + timeout_nanos) == 0) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); + + return AWS_OP_ERR; +} + +/* Unsubscribe */ + +static bool s_is_retryable_unsubscribe311(int error_code) { + return error_code == AWS_ERROR_MQTT_TIMEOUT; +} + +static void s_protocol_adapter_311_unsubscribe_completion( + struct aws_mqtt_client_connection *connection, + uint16_t packet_id, + int error_code, + void *userdata) { + (void)connection; + (void)packet_id; + + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = userdata; + struct aws_mqtt_protocol_adapter_311_impl *adapter = unsubscribe_data->adapter; + + if (adapter == NULL) { + goto done; + } + + struct aws_protocol_adapter_subscription_event unsubscribe_event = { + .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->operation_data.sub_unsub_data.topic_filter), + .event_type = AWS_PASET_UNSUBSCRIBE, + .error_code = error_code, + .retryable = s_is_retryable_unsubscribe311(error_code), + }; + + (*adapter->config.subscription_event_callback)(&unsubscribe_event, adapter->config.user_data); + +done: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); +} + +int s_aws_mqtt_protocol_adapter_311_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { + struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; + struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl; + + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &unsubscribe_data->node); + + uint64_t timeout_nanos = + aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + if (aws_mqtt_client_connection_311_unsubscribe( + connection_impl, + &options->topic_filter, + s_protocol_adapter_311_unsubscribe_completion, + unsubscribe_data, + timeout_nanos) == 0) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); + + return AWS_OP_ERR; +} + +/* Publish */ + +static void s_protocol_adapter_311_publish_completion( + struct aws_mqtt_client_connection *connection, + uint16_t packet_id, + int error_code, + void *userdata) { + + (void)connection; + (void)packet_id; + + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = userdata; + struct aws_mqtt_protocol_adapter_311_impl *adapter = publish_data->adapter; + + if (adapter == NULL) { + goto done; + } + + (*publish_data->operation_data.publish_data.completion_callback_fn)( + error_code, publish_data->operation_data.publish_data.user_data); + +done: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); +} + +int s_aws_mqtt_protocol_adapter_311_publish(void *impl, struct aws_protocol_adapter_publish_options *options) { + struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; + struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl; + + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = + s_aws_mqtt_protocol_adapter_publish_data_new(adapter->allocator, options, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &publish_data->node); + + uint64_t timeout_nanos = + aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + if (aws_mqtt_client_connection_311_publish( + connection_impl, + &options->topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, + &options->payload, + s_protocol_adapter_311_publish_completion, + publish_data, + timeout_nanos) == 0) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); + + return AWS_OP_ERR; +} + +static void s_protocol_adapter_mqtt311_listener_publish_received( + struct aws_mqtt_client_connection *connection, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + bool dup, + enum aws_mqtt_qos qos, + bool retain, + void *userdata) { + + (void)connection; + (void)dup; + (void)qos; + (void)retain; + + struct aws_mqtt_protocol_adapter_311_impl *adapter = userdata; + + struct aws_mqtt_rr_incoming_publish_event publish_event = { + .topic = *topic, + .payload = *payload, + }; + + /* This will potentially include messages that are completely unrelated to MQTT request-response. + * The topic is the first thing that should be checked for relevance. */ + (*adapter->config.incoming_publish_callback)(&publish_event, adapter->config.user_data); +} + +static void s_protocol_adapter_mqtt311_listener_connection_success( + struct aws_mqtt_client_connection *connection, + enum aws_mqtt_connect_return_code return_code, + bool session_present, + void *userdata) { + (void)connection; + (void)return_code; + + struct aws_mqtt_protocol_adapter_311_impl *adapter = userdata; + + if (adapter->config.connection_event_callback != NULL) { + struct aws_protocol_adapter_connection_event connection_event = { + .event_type = AWS_PACET_CONNECTED, + .joined_session = session_present, + }; + + (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data); + } +} + +static void s_protocol_adapter_mqtt311_emit_disconnect_event(struct aws_mqtt_protocol_adapter_311_impl *adapter) { + if (adapter->config.connection_event_callback != NULL) { + struct aws_protocol_adapter_connection_event connection_event = { + .event_type = AWS_PACET_DISCONNECTED, + }; + + (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data); + } +} + +static void s_protocol_adapter_mqtt311_listener_connection_interrupted( + struct aws_mqtt_client_connection *connection, + int error_code, + void *userdata) { + (void)connection; + (void)error_code; + + s_protocol_adapter_mqtt311_emit_disconnect_event(userdata); +} + +static void s_aws_mqtt_protocol_adapter_311_disconnect_fn( + struct aws_mqtt_client_connection *connection, + void *userdata) { + (void)connection; + + s_protocol_adapter_mqtt311_emit_disconnect_event(userdata); +} + +static bool s_aws_mqtt_protocol_adapter_311_is_connected(void *impl) { + struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; + struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop)); + + mqtt_connection_lock_synced_data(connection_impl); + enum aws_mqtt_client_connection_state current_state = connection_impl->synced_data.state; + mqtt_connection_unlock_synced_data(connection_impl); + + return current_state == AWS_MQTT_CLIENT_STATE_CONNECTED; +} + +static void s_release_incomplete_operations(struct aws_linked_list *incomplete_operations) { + struct aws_linked_list dummy_list; + aws_linked_list_init(&dummy_list); + aws_linked_list_swap_contents(incomplete_operations, &dummy_list); + + while (!aws_linked_list_empty(&dummy_list)) { + struct aws_linked_list_node *head = aws_linked_list_pop_front(&dummy_list); + struct aws_mqtt_protocol_adapter_operation_userdata *userdata = + AWS_CONTAINER_OF(head, struct aws_mqtt_protocol_adapter_operation_userdata, node); + + userdata->adapter = NULL; + + if (userdata->operation_type == AMPAOT_PUBLISH) { + struct aws_mqtt_protocol_adapter_publish_data *publish_data = &userdata->operation_data.publish_data; + if (publish_data->completion_callback_fn != NULL) { + (*userdata->operation_data.publish_data.completion_callback_fn)( + AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, publish_data->user_data); + } + } + } +} + +static void s_protocol_adapter_mqtt311_listener_termination_callback(void *user_data) { + struct aws_mqtt_protocol_adapter_311_impl *adapter = user_data; + struct aws_mqtt_client_connection_311_impl *impl = adapter->connection->impl; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(impl->loop)); + + s_release_incomplete_operations(&adapter->incomplete_operations); + + aws_mqtt_client_connection_release(adapter->connection); + + aws_protocol_adapter_terminate_callback_fn *terminate_callback = adapter->config.terminate_callback; + void *terminate_user_data = adapter->config.user_data; + + aws_mem_release(adapter->allocator, adapter); + + if (terminate_callback) { + (*terminate_callback)(terminate_user_data); + } +} + +static struct aws_event_loop *s_aws_mqtt_protocol_adapter_311_get_event_loop(void *impl) { + struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; + + return adapter->loop; +} + +static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt311_vtable = { + .aws_mqtt_protocol_adapter_destroy_fn = s_aws_mqtt_protocol_adapter_311_destroy, + .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_311_subscribe, + .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_311_unsubscribe, + .aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_311_publish, + .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_311_is_connected, + .aws_mqtt_protocol_adapter_get_event_loop_fn = s_aws_mqtt_protocol_adapter_311_get_event_loop, +}; + +struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311( + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter_options *options, + struct aws_mqtt_client_connection *connection) { + + if (options == NULL || connection == NULL) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + if (aws_mqtt_client_connection_get_impl_type(connection) != AWS_MQTT311_IT_311_CONNECTION) { + struct aws_mqtt_client_connection_5_impl *adapter_impl = connection->impl; + return aws_mqtt_protocol_adapter_new_from_5(allocator, options, adapter_impl->client); + } + + struct aws_mqtt_client_connection_311_impl *impl = connection->impl; + + struct aws_mqtt_protocol_adapter_311_impl *adapter = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_311_impl)); + + adapter->allocator = allocator; + adapter->base.impl = adapter; + adapter->base.vtable = &s_protocol_adapter_mqtt311_vtable; + aws_linked_list_init(&adapter->incomplete_operations); + adapter->config = *options; + adapter->loop = impl->loop; + adapter->connection = aws_mqtt_client_connection_acquire(connection); + + struct aws_mqtt311_listener_config listener_options = { + .connection = connection, + .listener_callbacks = + { + .publish_received_handler = s_protocol_adapter_mqtt311_listener_publish_received, + .connection_success_handler = s_protocol_adapter_mqtt311_listener_connection_success, + .connection_interrupted_handler = s_protocol_adapter_mqtt311_listener_connection_interrupted, + .disconnect_handler = s_aws_mqtt_protocol_adapter_311_disconnect_fn, + .user_data = adapter, + }, + .termination_callback = s_protocol_adapter_mqtt311_listener_termination_callback, + .termination_callback_user_data = adapter, + }; + + adapter->listener = aws_mqtt311_listener_new(allocator, &listener_options); + + return &adapter->base; +} + +/******************************************************************************************************************/ + +struct aws_mqtt_protocol_adapter_5_impl { + struct aws_allocator *allocator; + struct aws_mqtt_protocol_adapter base; + struct aws_linked_list incomplete_operations; + struct aws_mqtt_protocol_adapter_options config; + + struct aws_event_loop *loop; + struct aws_mqtt5_client *client; + struct aws_mqtt5_listener *listener; +}; + +static void s_aws_mqtt_protocol_adapter_5_destroy(void *impl) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + + // all the real cleanup is done in the listener termination callback + aws_mqtt5_listener_release(adapter->listener); +} + +/* Subscribe */ + +static bool s_is_retryable_subscribe(enum aws_mqtt5_suback_reason_code reason_code, int error_code) { + if (error_code == AWS_ERROR_MQTT5_PACKET_VALIDATION || error_code == AWS_ERROR_MQTT5_SUBSCRIBE_OPTIONS_VALIDATION) { + return false; + } else if (error_code != AWS_ERROR_SUCCESS) { + return true; + } + + switch (reason_code) { + case AWS_MQTT5_SARC_GRANTED_QOS_0: + case AWS_MQTT5_SARC_GRANTED_QOS_1: + case AWS_MQTT5_SARC_GRANTED_QOS_2: + case AWS_MQTT5_SARC_UNSPECIFIED_ERROR: + case AWS_MQTT5_SARC_PACKET_IDENTIFIER_IN_USE: + case AWS_MQTT5_SARC_IMPLEMENTATION_SPECIFIC_ERROR: + case AWS_MQTT5_SARC_QUOTA_EXCEEDED: + return true; + + default: + return false; + } +} + +static void s_protocol_adapter_5_subscribe_completion( + const struct aws_mqtt5_packet_suback_view *suback, + int error_code, + void *complete_ctx) { + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = subscribe_data->adapter; + + if (adapter == NULL) { + goto done; + } + + enum aws_mqtt5_suback_reason_code reason_code = AWS_MQTT5_SARC_GRANTED_QOS_0; + if (suback != NULL && suback->reason_code_count > 0) { + reason_code = suback->reason_codes[0]; + } + bool is_retryable = s_is_retryable_subscribe(reason_code, error_code); + + if (error_code == AWS_ERROR_SUCCESS) { + if (suback == NULL || suback->reason_code_count != 1 || suback->reason_codes[0] >= 128) { + error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE; + } + } + + struct aws_protocol_adapter_subscription_event subscribe_event = { + .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->operation_data.sub_unsub_data.topic_filter), + .event_type = AWS_PASET_SUBSCRIBE, + .error_code = error_code, + .retryable = is_retryable, + }; + + (*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data); + +done: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); +} + +int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &subscribe_data->node); + + struct aws_mqtt5_subscription_view subscription_view = { + .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, + .topic_filter = options->topic_filter, + }; + + struct aws_mqtt5_packet_subscribe_view subscribe_view = { + .subscriptions = &subscription_view, + .subscription_count = 1, + }; + + struct aws_mqtt5_subscribe_completion_options completion_options = { + .ack_timeout_seconds_override = options->ack_timeout_seconds, + .completion_callback = s_protocol_adapter_5_subscribe_completion, + .completion_user_data = subscribe_data, + }; + + if (aws_mqtt5_client_subscribe(adapter->client, &subscribe_view, &completion_options)) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); + + return AWS_OP_ERR; +} + +/* Unsubscribe */ + +static bool s_is_retryable_unsubscribe5(enum aws_mqtt5_unsuback_reason_code reason_code, int error_code) { + if (error_code == AWS_ERROR_MQTT5_PACKET_VALIDATION || + error_code == AWS_ERROR_MQTT5_UNSUBSCRIBE_OPTIONS_VALIDATION) { + return false; + } else if (error_code == AWS_ERROR_MQTT_TIMEOUT) { + return true; + } + + switch (reason_code) { + case AWS_MQTT5_UARC_UNSPECIFIED_ERROR: + case AWS_MQTT5_UARC_IMPLEMENTATION_SPECIFIC_ERROR: + return true; + + default: + return false; + } +} + +static void s_protocol_adapter_5_unsubscribe_completion( + const struct aws_mqtt5_packet_unsuback_view *unsuback, + int error_code, + void *complete_ctx) { + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = unsubscribe_data->adapter; + + if (adapter == NULL) { + goto done; + } + + enum aws_mqtt5_unsuback_reason_code reason_code = AWS_MQTT5_UARC_SUCCESS; + if (unsuback != NULL && unsuback->reason_code_count > 0) { + reason_code = unsuback->reason_codes[0]; + } + + bool is_retryable = s_is_retryable_unsubscribe5(reason_code, error_code); + + if (error_code == AWS_ERROR_SUCCESS) { + if (unsuback == NULL || unsuback->reason_code_count != 1 || unsuback->reason_codes[0] >= 128) { + error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE; + } + } + + struct aws_protocol_adapter_subscription_event unsubscribe_event = { + .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->operation_data.sub_unsub_data.topic_filter), + .event_type = AWS_PASET_UNSUBSCRIBE, + .error_code = error_code, + .retryable = is_retryable, + }; + + (*adapter->config.subscription_event_callback)(&unsubscribe_event, adapter->config.user_data); + +done: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); +} + +int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &unsubscribe_data->node); + + struct aws_mqtt5_packet_unsubscribe_view unsubscribe_view = { + .topic_filters = &options->topic_filter, + .topic_filter_count = 1, + }; + + struct aws_mqtt5_unsubscribe_completion_options completion_options = { + .ack_timeout_seconds_override = options->ack_timeout_seconds, + .completion_callback = s_protocol_adapter_5_unsubscribe_completion, + .completion_user_data = unsubscribe_data, + }; + + if (aws_mqtt5_client_unsubscribe(adapter->client, &unsubscribe_view, &completion_options)) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); + + return AWS_OP_ERR; +} + +/* Publish */ + +static void s_protocol_adapter_5_publish_completion( + enum aws_mqtt5_packet_type packet_type, + const void *packet, + int error_code, + void *complete_ctx) { + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = publish_data->adapter; + + if (adapter == NULL) { + goto done; + } + + if (error_code == AWS_ERROR_SUCCESS && packet_type == AWS_MQTT5_PT_PUBACK) { + const struct aws_mqtt5_packet_puback_view *puback = packet; + if (puback->reason_code >= 128) { + error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE; + } + } + + (*publish_data->operation_data.publish_data.completion_callback_fn)( + error_code, publish_data->operation_data.publish_data.user_data); + +done: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); +} + +int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapter_publish_options *options) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = + s_aws_mqtt_protocol_adapter_publish_data_new(adapter->allocator, options, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &publish_data->node); + + struct aws_mqtt5_packet_publish_view publish_view = { + .topic = options->topic, .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, .payload = options->payload}; + + struct aws_mqtt5_publish_completion_options completion_options = { + .ack_timeout_seconds_override = options->ack_timeout_seconds, + .completion_callback = s_protocol_adapter_5_publish_completion, + .completion_user_data = publish_data, + }; + + if (aws_mqtt5_client_publish(adapter->client, &publish_view, &completion_options)) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); + + return AWS_OP_ERR; +} + +static bool s_aws_mqtt_protocol_adapter_5_is_connected(void *impl) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop)); + + enum aws_mqtt5_client_state current_state = adapter->client->current_state; + + return current_state == AWS_MCS_CONNECTED; +} + +static bool s_protocol_adapter_mqtt5_listener_publish_received( + const struct aws_mqtt5_packet_publish_view *publish, + void *user_data) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = user_data; + + struct aws_mqtt_rr_incoming_publish_event publish_event = { + .topic = publish->topic, + .payload = publish->payload, + .content_type = publish->content_type, + .user_property_count = publish->user_property_count, + .user_properties = publish->user_properties, + .message_expiry_interval_seconds = publish->message_expiry_interval_seconds, + }; + + /* This will potentially include messages that are completely unrelated to MQTT request-response. + * The topic is the first thing that should be checked for relevance. */ + (*adapter->config.incoming_publish_callback)(&publish_event, adapter->config.user_data); + + return false; +} + +static void s_protocol_adapter_mqtt5_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = event->user_data; + + switch (event->event_type) { + case AWS_MQTT5_CLET_CONNECTION_SUCCESS: { + struct aws_protocol_adapter_connection_event connection_event = { + .event_type = AWS_PACET_CONNECTED, + .joined_session = event->settings->rejoined_session, + }; + + (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data); + break; + } + case AWS_MQTT5_CLET_DISCONNECTION: { + struct aws_protocol_adapter_connection_event connection_event = { + .event_type = AWS_PACET_DISCONNECTED, + }; + + (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data); + break; + } + default: + break; + } +} + +static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_data) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = user_data; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop)); + + s_release_incomplete_operations(&adapter->incomplete_operations); + + aws_mqtt5_client_release(adapter->client); + + aws_protocol_adapter_terminate_callback_fn *terminate_callback = adapter->config.terminate_callback; + void *terminate_user_data = adapter->config.user_data; + + aws_mem_release(adapter->allocator, adapter); + + if (terminate_callback) { + (*terminate_callback)(terminate_user_data); + } +} + +static struct aws_event_loop *s_aws_mqtt_protocol_adapter_5_get_event_loop(void *impl) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + + return adapter->loop; +} + +static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable = { + .aws_mqtt_protocol_adapter_destroy_fn = s_aws_mqtt_protocol_adapter_5_destroy, + .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_5_subscribe, + .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_5_unsubscribe, + .aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_5_publish, + .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_5_is_connected, + .aws_mqtt_protocol_adapter_get_event_loop_fn = s_aws_mqtt_protocol_adapter_5_get_event_loop}; + +struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5( + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter_options *options, + struct aws_mqtt5_client *client) { + + if (options == NULL || client == NULL) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + struct aws_mqtt_protocol_adapter_5_impl *adapter = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_impl)); + + adapter->allocator = allocator; + adapter->base.impl = adapter; + adapter->base.vtable = &s_protocol_adapter_mqtt5_vtable; + aws_linked_list_init(&adapter->incomplete_operations); + adapter->config = *options; + adapter->loop = client->loop; + adapter->client = aws_mqtt5_client_acquire(client); + + struct aws_mqtt5_listener_config listener_options = { + .client = client, + .listener_callbacks = + { + .listener_publish_received_handler = s_protocol_adapter_mqtt5_listener_publish_received, + .listener_publish_received_handler_user_data = adapter, + .lifecycle_event_handler = s_protocol_adapter_mqtt5_lifecycle_event_callback, + .lifecycle_event_handler_user_data = adapter, + }, + .termination_callback = s_protocol_adapter_mqtt5_listener_termination_callback, + .termination_callback_user_data = adapter, + }; + + adapter->listener = aws_mqtt5_listener_new(allocator, &listener_options); + + return &adapter->base; +} + +void aws_mqtt_protocol_adapter_destroy(struct aws_mqtt_protocol_adapter *adapter) { + (*adapter->vtable->aws_mqtt_protocol_adapter_destroy_fn)(adapter->impl); +} + +int aws_mqtt_protocol_adapter_subscribe( + struct aws_mqtt_protocol_adapter *adapter, + struct aws_protocol_adapter_subscribe_options *options) { + return (*adapter->vtable->aws_mqtt_protocol_adapter_subscribe_fn)(adapter->impl, options); +} + +int aws_mqtt_protocol_adapter_unsubscribe( + struct aws_mqtt_protocol_adapter *adapter, + struct aws_protocol_adapter_unsubscribe_options *options) { + return (*adapter->vtable->aws_mqtt_protocol_adapter_unsubscribe_fn)(adapter->impl, options); +} + +int aws_mqtt_protocol_adapter_publish( + struct aws_mqtt_protocol_adapter *adapter, + struct aws_protocol_adapter_publish_options *options) { + return (*adapter->vtable->aws_mqtt_protocol_adapter_publish_fn)(adapter->impl, options); +} + +bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter) { + return (*adapter->vtable->aws_mqtt_protocol_adapter_is_connected_fn)(adapter->impl); +} + +struct aws_event_loop *aws_mqtt_protocol_adapter_get_event_loop(struct aws_mqtt_protocol_adapter *adapter) { + return (*adapter->vtable->aws_mqtt_protocol_adapter_get_event_loop_fn)(adapter->impl); +} + +const char *aws_protocol_adapter_subscription_event_type_to_c_str( + enum aws_protocol_adapter_subscription_event_type type) { + switch (type) { + case AWS_PASET_SUBSCRIBE: + return "Subscribe"; + + case AWS_PASET_UNSUBSCRIBE: + return "Unsubscribe"; + } + + return "Unknown"; +} + +const char *aws_protocol_adapter_connection_event_type_to_c_str(enum aws_protocol_adapter_connection_event_type type) { + switch (type) { + case AWS_PACET_CONNECTED: + return "Connected"; + + case AWS_PACET_DISCONNECTED: + return "Disconnected"; + } + + return "Unknown"; +} diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_client.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_client.c new file mode 100644 index 00000000000..7e5bedfe1df --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_client.c @@ -0,0 +1,2121 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/request-response/request_response_client.h> + +#include <aws/common/clock.h> +#include <aws/common/json.h> +#include <aws/common/ref_count.h> +#include <aws/common/task_scheduler.h> +#include <aws/io/event_loop.h> +#include <aws/mqtt/private/client_impl_shared.h> +#include <aws/mqtt/private/request-response/protocol_adapter.h> +#include <aws/mqtt/private/request-response/request_response_subscription_set.h> +#include <aws/mqtt/private/request-response/subscription_manager.h> +#include <aws/mqtt/private/v5/mqtt5_client_impl.h> + +#include <inttypes.h> + +#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50 + +struct aws_mqtt_request_operation_storage { + struct aws_mqtt_request_operation_options options; + + struct aws_array_list operation_response_paths; + struct aws_array_list subscription_topic_filters; + + struct aws_byte_buf operation_data; +}; + +struct aws_mqtt_streaming_operation_storage { + struct aws_mqtt_streaming_operation_options options; + + struct aws_byte_buf operation_data; + + struct aws_atomic_var activated; +}; + +enum aws_mqtt_request_response_operation_type { + AWS_MRROT_REQUEST, + AWS_MRROT_STREAMING, +}; + +enum aws_mqtt_request_response_operation_state { + /* creation -> in event loop enqueue */ + AWS_MRROS_NONE, + + /* in event loop queue -> non blocked response from subscription manager */ + AWS_MRROS_QUEUED, + + /* subscribing response from sub manager -> subscription success/failure event */ + AWS_MRROS_PENDING_SUBSCRIPTION, + + /* (request only) subscription success -> (publish failure OR correlated response received) */ + AWS_MRROS_PENDING_RESPONSE, + + /* (request only) the operation's destroy task has been scheduled but not yet executed */ + AWS_MRROS_PENDING_DESTROY, + + /* (streaming only) subscription success -> (operation finished OR subscription ended event) */ + AWS_MRROS_SUBSCRIBED, + + /* (streaming only) (subscription failure OR subscription ended) -> operation close/terminate */ + AWS_MRROS_TERMINAL, +}; + +const char *s_aws_mqtt_request_response_operation_state_to_c_str(enum aws_mqtt_request_response_operation_state state) { + switch (state) { + case AWS_MRROS_NONE: + return "NONE"; + + case AWS_MRROS_QUEUED: + return "QUEUED"; + + case AWS_MRROS_PENDING_SUBSCRIPTION: + return "PENDING_SUBSCRIPTION"; + + case AWS_MRROS_PENDING_RESPONSE: + return "PENDING_RESPONSE"; + + case AWS_MRROS_SUBSCRIBED: + return "SUBSCRIBED"; + + case AWS_MRROS_TERMINAL: + return "TERMINAL"; + + case AWS_MRROS_PENDING_DESTROY: + return "PENDING_DESTROY"; + + default: + return "Unknown"; + } +} + +const char *s_aws_acquire_subscription_result_type(enum aws_acquire_subscription_result_type result) { + switch (result) { + case AASRT_SUBSCRIBED: + return "SUBSCRIBED"; + + case AASRT_SUBSCRIBING: + return "SUBSCRIBING"; + + case AASRT_BLOCKED: + return "BLOCKED"; + + case AASRT_NO_CAPACITY: + return "NO_CAPACITY"; + + case AASRT_FAILURE: + return "FAILURE"; + + default: + return "Unknown"; + } +} + +/* + +Client Tables/Lookups + + (operations: authoritative operation container) + 1. &operation.id -> &operation // added on in-thread enqueue, removed on operation completion/destruction + + (request_response_paths: response path topic -> Correlation token extraction info) + 2. &topic -> &{topic, topic_buffer, correlation token json path buffer} // ref-counted, per-message-path add on + request dequeue into subscribing/subscribed state, decref/removed on operation completion/destruction + + (operations_by_correlation_tokens: correlationToken -> request operation) + 3. &operation.correlation token -> (request) &operation // added on request dequeue into subscribing/subscribed +state, removed on operation completion/destruction + + (streaming_operation_subscription_lists: streaming subscription filter -> list of all operations using that filter) + 4. &topic_filter -> &{topic_filter, linked_list} // added on request dequeue into subscribing/subscribed state, + removed from list on operation completion/destruction also checked for empty and removed from table + +*/ + +struct aws_mqtt_rr_client_operation { + struct aws_allocator *allocator; + + /* + * Operation ref-counting is a bit tricky and un-intuitive because it differs based on the type of operation. + * + * Streaming operations are managed by the user, and so the ref count is their responsibility to drop to zero. + * Dropping a streaming operation's ref count to zero schedules a task on the client event loop to destroy the + * operation. It is expected that the binding client will track (with proper synchronization) all unclosed + * streaming operations and safely close them for the user when close is called on the binding client. + * + * Request operations are managed by the client, and so the ref count is dropped to zero when either the + * operation completes normally (success or failure) or when the client is shutdown due to its external ref + * count dropping to zero. In all cases, this event happens naturally on the client event loop. + * + * So the summary is: + * + * (1) Streaming operation clean up is initiated by the user calling dec ref on the streaming operation + * (2) Request operation clean up is initiated by normal completion or client shutdown invoking dec ref. + * + * The upshot is that client shutdown dec-refs request operations but not streaming operations. + */ + struct aws_ref_count ref_count; + + struct aws_mqtt_request_response_client *client_internal_ref; + + uint64_t id; + + enum aws_mqtt_request_response_operation_type type; + + union { + struct aws_mqtt_streaming_operation_storage streaming_storage; + struct aws_mqtt_request_operation_storage request_storage; + } storage; + + uint64_t timeout_timepoint_ns; + struct aws_priority_queue_node priority_queue_node; + + /* Sometimes this is client->operation_queue, other times it is an entry in the client's topic_filter table */ + struct aws_linked_list_node node; + + enum aws_mqtt_request_response_operation_state state; + + size_t pending_subscriptions; + + bool in_client_tables; + + struct aws_task submit_task; + struct aws_task destroy_task; +}; + +/*******************************************************************************************/ + +/* Tracks the current state of the request-response client */ +enum aws_request_response_client_state { + + /* cross-thread initialization has not completed and all protocol adapter callbacks are ignored */ + AWS_RRCS_UNINITIALIZED, + + /* Normal operating state for the client. */ + AWS_RRCS_ACTIVE, + + /* asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored */ + AWS_RRCS_SHUTTING_DOWN, +}; + +/* + * Request-Response Client Notes + * + * Ref-counting/Shutdown + * + * The request-response client uses a double ref-count pattern. + * + * External references represent user references. When the external reference reaches zero, the client's asynchronous + * shutdown process is started. + * + * Internal references block final destruction. Asynchronous shutdown will not complete until all internal references + * are dropped. In addition to one long-lived internal reference (the protocol client adapter's back reference to + * the request-response client), all event loop tasks that target the request-response client hold an internal + * reference between task submission and task completion. This ensures that the task always has a valid reference + * to the client, even if we're trying to shut down at the same time. + * + * + * Initialization + * + * Initialization is complicated by the fact that the subscription manager needs to be initialized from the + * event loop thread that the client/protocol adapter/protocol client are all seated on. To do this safely, + * we add an uninitialized state that ignores all callbacks and we schedule a task on initial construction to do + * the event-loop-only initialization. Once that initialization completes on the event loop thread, we move + * the client into an active state where it will process operations and protocol adapter callbacks. + */ +struct aws_mqtt_request_response_client { + struct aws_allocator *allocator; + + struct aws_ref_count external_ref_count; + struct aws_ref_count internal_ref_count; + + struct aws_mqtt_request_response_client_options config; + + struct aws_mqtt_protocol_adapter *client_adapter; + + struct aws_rr_subscription_manager subscription_manager; + + struct aws_event_loop *loop; + + struct aws_task initialize_task; + struct aws_task external_shutdown_task; + struct aws_task internal_shutdown_task; + + uint64_t scheduled_service_timepoint_ns; + struct aws_task service_task; + + enum aws_request_response_client_state state; + + struct aws_atomic_var next_id; + + struct aws_linked_list operation_queue; + + /* &operation->id -> &operation */ + struct aws_hash_table operations; + + /* + * heap of operation pointers where the timeout is the sort value. Elements are added to this on operation + * submission and removed on operation timeout/completion/termination. Request-response operations have actual + * timeouts, while streaming operations have UINT64_MAX timeouts. + */ + struct aws_priority_queue operations_by_timeout; + + /* + * Structure to handle stream and request subscriptions. + */ + struct aws_request_response_subscriptions subscriptions; + + /* + * Map from cursor (correlation token) -> request operation + */ + struct aws_hash_table operations_by_correlation_tokens; +}; + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire_internal( + struct aws_mqtt_request_response_client *client) { + if (client != NULL) { + aws_ref_count_acquire(&client->internal_ref_count); + } + + return client; +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release_internal( + struct aws_mqtt_request_response_client *client) { + if (client != NULL) { + aws_ref_count_release(&client->internal_ref_count); + } + + return NULL; +} + +static void s_aws_rr_client_on_zero_internal_ref_count(void *context) { + struct aws_mqtt_request_response_client *client = context; + + /* Both ref counts are zero, but it's still safest to schedule final destruction, not invoke it directly */ + aws_event_loop_schedule_task_now(client->loop, &client->internal_shutdown_task); +} + +static void s_aws_rr_client_on_zero_external_ref_count(void *context) { + struct aws_mqtt_request_response_client *client = context; + + /* Start the asynchronous shutdown process */ + aws_event_loop_schedule_task_now(client->loop, &client->external_shutdown_task); +} + +static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request_response_client *client) { + aws_mqtt_request_response_client_terminated_callback_fn *terminate_callback = client->config.terminated_callback; + void *user_data = client->config.user_data; + + AWS_FATAL_ASSERT(aws_hash_table_get_entry_count(&client->operations) == 0); + aws_hash_table_clean_up(&client->operations); + + aws_priority_queue_clean_up(&client->operations_by_timeout); + + aws_mqtt_request_response_client_subscriptions_clean_up(&client->subscriptions); + aws_hash_table_clean_up(&client->operations_by_correlation_tokens); + + aws_mem_release(client->allocator, client); + + if (terminate_callback != NULL) { + (*terminate_callback)(user_data); + } +} + +static void s_mqtt_request_response_client_internal_shutdown_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { + (void)task; + (void)task_status; + + struct aws_mqtt_request_response_client *client = arg; + + /* All internal and external refs are gone; it is safe to clean up synchronously. */ + s_mqtt_request_response_client_final_destroy(client); +} + +static void s_remove_operation_from_timeout_queue(struct aws_mqtt_rr_client_operation *operation) { + struct aws_mqtt_request_response_client *client = operation->client_internal_ref; + + if (aws_priority_queue_node_is_in_queue(&operation->priority_queue_node)) { + struct aws_mqtt_rr_client_operation *queued_operation = NULL; + aws_priority_queue_remove(&client->operations_by_timeout, &queued_operation, &operation->priority_queue_node); + } +} + +static void s_change_operation_state( + struct aws_mqtt_rr_client_operation *operation, + enum aws_mqtt_request_response_operation_state new_state) { + enum aws_mqtt_request_response_operation_state old_state = operation->state; + if (old_state == new_state) { + return; + } + + operation->state = new_state; + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation %" PRIu64 " changing state from %s to %s", + (void *)operation->client_internal_ref, + operation->id, + s_aws_mqtt_request_response_operation_state_to_c_str(old_state), + s_aws_mqtt_request_response_operation_state_to_c_str(new_state)); +} + +static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) { + AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST); + AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS); + + if (operation->state == AWS_MRROS_PENDING_DESTROY) { + return; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation %" PRIu64 " failed with error code %d(%s)", + (void *)operation->client_internal_ref, + operation->id, + error_code, + aws_error_debug_str(error_code)); + + aws_mqtt_request_operation_completion_fn *completion_callback = + operation->storage.request_storage.options.completion_callback; + void *user_data = operation->storage.request_storage.options.user_data; + + if (completion_callback != NULL) { + (*completion_callback)(NULL, error_code, user_data); + } + + s_change_operation_state(operation, AWS_MRROS_PENDING_DESTROY); + + aws_mqtt_rr_client_operation_release(operation); +} + +static void s_streaming_operation_emit_streaming_subscription_event( + struct aws_mqtt_rr_client_operation *operation, + enum aws_rr_streaming_subscription_event_type event_type, + int error_code) { + aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback = + operation->storage.streaming_storage.options.subscription_status_callback; + + if (subscription_status_callback != NULL) { + void *user_data = operation->storage.streaming_storage.options.user_data; + (*subscription_status_callback)(event_type, error_code, user_data); + } +} + +static void s_halt_streaming_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) { + AWS_FATAL_ASSERT(operation->type == AWS_MRROT_STREAMING); + AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS); + + if (operation->state == AWS_MRROS_PENDING_DESTROY || operation->state == AWS_MRROS_TERMINAL) { + return; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: streaming operation %" PRIu64 " halted with error code %d(%s)", + (void *)operation->client_internal_ref, + operation->id, + error_code, + aws_error_debug_str(error_code)); + + s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_HALTED, error_code); + + s_change_operation_state(operation, AWS_MRROS_TERMINAL); +} + +static void s_request_response_fail_operation(struct aws_mqtt_rr_client_operation *operation, int error_code) { + if (operation->type == AWS_MRROT_STREAMING) { + s_halt_streaming_operation_with_failure(operation, error_code); + } else { + s_complete_request_operation_with_failure(operation, error_code); + } +} + +static int s_rr_client_clean_up_operation(void *context, struct aws_hash_element *elem) { + (void)context; + struct aws_mqtt_rr_client_operation *operation = elem->value; + + s_request_response_fail_operation(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN); + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +static void s_mqtt_request_response_client_external_shutdown_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { + (void)task; + + AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED); + + struct aws_mqtt_request_response_client *client = arg; + + /* stop handling adapter event callbacks */ + client->state = AWS_RRCS_SHUTTING_DOWN; + + if (client->scheduled_service_timepoint_ns != 0) { + aws_event_loop_cancel_task(client->loop, &client->service_task); + client->scheduled_service_timepoint_ns = 0; + } + + aws_rr_subscription_manager_clean_up(&client->subscription_manager); + + if (client->client_adapter != NULL) { + aws_mqtt_protocol_adapter_destroy(client->client_adapter); + } + + /* + * It is a client invariant that when external shutdown starts, it must be the case that there are no in-flight + * operations with un-executed submit tasks. This means it safe to assume that all tracked request operations are + * either in the process of cleaning up already (state == AWS_MRROS_PENDING_DESTROY) or can be + * completed now (state != AWS_MRROS_PENDING_DESTROY). Non-terminal streaming operations are moved into + * a terminal state and emit an appropriate failure/ended event. + * + * Actual operation destruction and client ref-count release is done by a scheduled task + * on the operation that is triggered by dec-refing it (assuming streaming operations get closed by the binding + * client). + */ + aws_hash_table_foreach(&client->operations, s_rr_client_clean_up_operation, NULL); + + aws_ref_count_release(&client->internal_ref_count); +} + +static void s_mqtt_request_response_client_wake_service(struct aws_mqtt_request_response_client *client) { + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop)); + + if (client->state != AWS_RRCS_ACTIVE) { + return; + } + + if (client->scheduled_service_timepoint_ns == 0 || now < client->scheduled_service_timepoint_ns) { + if (now < client->scheduled_service_timepoint_ns) { + aws_event_loop_cancel_task(client->loop, &client->service_task); + } + + client->scheduled_service_timepoint_ns = now; + aws_event_loop_schedule_task_now(client->loop, &client->service_task); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, "id=%p: request-response client service task woke", (void *)client); + } +} + +struct aws_rrc_incomplete_publish { + struct aws_allocator *allocator; + + struct aws_mqtt_request_response_client *rr_client; + + uint64_t operation_id; +}; + +static void s_aws_rrc_incomplete_publish_destroy(struct aws_rrc_incomplete_publish *user_data) { + if (user_data == NULL) { + return; + } + + aws_mqtt_request_response_client_release_internal(user_data->rr_client); + + aws_mem_release(user_data->allocator, user_data); +} + +static void s_on_request_publish_completion(int error_code, void *userdata) { + struct aws_rrc_incomplete_publish *publish_user_data = userdata; + + if (error_code != AWS_ERROR_SUCCESS) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation %" PRIu64 " failed publish step due to error %d(%s)", + (void *)publish_user_data->rr_client, + publish_user_data->operation_id, + error_code, + aws_error_debug_str(error_code)); + + struct aws_hash_element *element = NULL; + if (aws_hash_table_find( + &publish_user_data->rr_client->operations, &publish_user_data->operation_id, &element) == + AWS_OP_SUCCESS && + element != NULL) { + struct aws_mqtt_rr_client_operation *operation = element->value; + s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE); + } + } + + s_aws_rrc_incomplete_publish_destroy(publish_user_data); +} + +static void s_make_mqtt_request( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + (void)client; + + AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST); + + struct aws_mqtt_request_operation_options *request_options = &operation->storage.request_storage.options; + + struct aws_rrc_incomplete_publish *publish_user_data = + aws_mem_calloc(client->allocator, 1, sizeof(struct aws_rrc_incomplete_publish)); + publish_user_data->allocator = client->allocator; + publish_user_data->rr_client = aws_mqtt_request_response_client_acquire_internal(client); + publish_user_data->operation_id = operation->id; + + struct aws_protocol_adapter_publish_options publish_options = { + .topic = request_options->publish_topic, + .payload = request_options->serialized_request, + .ack_timeout_seconds = client->config.operation_timeout_seconds, + .completion_callback_fn = s_on_request_publish_completion, + .user_data = publish_user_data, + }; + + if (aws_mqtt_protocol_adapter_publish(client->client_adapter, &publish_options)) { + int error_code = aws_last_error(); + + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation %" PRIu64 " synchronously failed publish step due to error %d(%s)", + (void *)publish_user_data->rr_client, + publish_user_data->operation_id, + error_code, + aws_error_debug_str(error_code)); + s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE); + goto error; + } + + return; + +error: + + s_aws_rrc_incomplete_publish_destroy(publish_user_data); +} + +struct aws_rr_subscription_status_event_task { + struct aws_allocator *allocator; + + struct aws_task task; + + struct aws_mqtt_request_response_client *rr_client; + + enum aws_rr_subscription_event_type type; + struct aws_byte_buf topic_filter; + uint64_t operation_id; +}; + +static void s_aws_rr_subscription_status_event_task_delete(struct aws_rr_subscription_status_event_task *task) { + if (task == NULL) { + return; + } + + aws_byte_buf_clean_up(&task->topic_filter); + aws_mqtt_request_response_client_release_internal(task->rr_client); + + aws_mem_release(task->allocator, task); +} + +static void s_on_request_operation_subscription_status_event( + struct aws_mqtt_rr_client_operation *operation, + struct aws_byte_cursor topic_filter, + enum aws_rr_subscription_event_type event_type) { + (void)topic_filter; + + switch (event_type) { + case ARRSET_REQUEST_SUBSCRIBE_FAILURE: + case ARRSET_REQUEST_SUBSCRIPTION_ENDED: + s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE); + break; + + case ARRSET_REQUEST_SUBSCRIBE_SUCCESS: + if (operation->state == AWS_MRROS_PENDING_SUBSCRIPTION) { + --operation->pending_subscriptions; + if (operation->pending_subscriptions == 0) { + s_change_operation_state(operation, AWS_MRROS_PENDING_RESPONSE); + s_make_mqtt_request(operation->client_internal_ref, operation); + } + } + break; + + default: + AWS_FATAL_ASSERT(false); + } +} + +static void s_on_streaming_operation_subscription_status_event( + struct aws_mqtt_rr_client_operation *operation, + struct aws_byte_cursor topic_filter, + enum aws_rr_subscription_event_type event_type) { + (void)topic_filter; + + switch (event_type) { + case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: + if (operation->state == AWS_MRROS_PENDING_SUBSCRIPTION) { + s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED); + } + + s_streaming_operation_emit_streaming_subscription_event( + operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS); + break; + + case ARRSET_STREAMING_SUBSCRIPTION_LOST: + s_streaming_operation_emit_streaming_subscription_event( + operation, ARRSSET_SUBSCRIPTION_LOST, AWS_ERROR_SUCCESS); + break; + + case ARRSET_STREAMING_SUBSCRIPTION_HALTED: + s_halt_streaming_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE); + break; + + default: + AWS_FATAL_ASSERT(false); + } +} + +static void s_handle_subscription_status_event_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + + struct aws_rr_subscription_status_event_task *event_task = arg; + + if (status == AWS_TASK_STATUS_CANCELED) { + goto done; + } + + if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE || event_task->type == ARRSET_SUBSCRIPTION_EMPTY) { + s_mqtt_request_response_client_wake_service(event_task->rr_client); + goto done; + } + + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(&event_task->rr_client->operations, &event_task->operation_id, &element) || + element == NULL) { + goto done; + } + + struct aws_mqtt_rr_client_operation *operation = element->value; + + switch (event_task->type) { + case ARRSET_REQUEST_SUBSCRIBE_SUCCESS: + case ARRSET_REQUEST_SUBSCRIBE_FAILURE: + case ARRSET_REQUEST_SUBSCRIPTION_ENDED: + s_on_request_operation_subscription_status_event( + operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type); + break; + + case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: + case ARRSET_STREAMING_SUBSCRIPTION_LOST: + case ARRSET_STREAMING_SUBSCRIPTION_HALTED: + s_on_streaming_operation_subscription_status_event( + operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type); + break; + + default: + break; + } + +done: + + s_aws_rr_subscription_status_event_task_delete(event_task); +} + +static struct aws_rr_subscription_status_event_task *s_aws_rr_subscription_status_event_task_new( + struct aws_allocator *allocator, + struct aws_mqtt_request_response_client *rr_client, + const struct aws_rr_subscription_status_event *event) { + struct aws_rr_subscription_status_event_task *task = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_status_event_task)); + + task->allocator = allocator; + task->type = event->type; + task->operation_id = event->operation_id; + task->rr_client = aws_mqtt_request_response_client_acquire_internal(rr_client); + + aws_byte_buf_init_copy_from_cursor(&task->topic_filter, allocator, event->topic_filter); + + aws_task_init(&task->task, s_handle_subscription_status_event_task, task, "SubscriptionStatusEventTask"); + + return task; +} + +static void s_aws_rr_client_subscription_status_event_callback( + const struct aws_rr_subscription_status_event *event, + void *userdata) { + (void)event; + (void)userdata; + + /* + * We must be on the event loop, but it's safer overall to process this event as a top-level event loop task. The + * subscription manager assumes that we won't call APIs on it while iterating subscription records and listeners. + * + * These tasks hold an internal reference while they exist. + */ + + struct aws_mqtt_request_response_client *rr_client = userdata; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + AWS_FATAL_ASSERT(rr_client->state != AWS_RRCS_SHUTTING_DOWN); + + struct aws_rr_subscription_status_event_task *task = + s_aws_rr_subscription_status_event_task_new(rr_client->allocator, rr_client, event); + + aws_event_loop_schedule_task_now(rr_client->loop, &task->task); +} + +static void s_aws_rr_client_protocol_adapter_subscription_event_callback( + const struct aws_protocol_adapter_subscription_event *event, + void *user_data) { + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + + if (rr_client->state != AWS_RRCS_ACTIVE) { + return; + } + + aws_rr_subscription_manager_on_protocol_adapter_subscription_event(&rr_client->subscription_manager, event); +} + +static void s_apply_publish_to_streaming_operation_list( + const struct aws_linked_list *operations, + const struct aws_byte_cursor *topic_filter, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + void *user_data) { + + AWS_FATAL_ASSERT(operations != NULL); + + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on topic '" PRInSTR + "' matches streaming subscription on topic filter '" PRInSTR "'", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic), + AWS_BYTE_CURSOR_PRI(*topic_filter)); + + struct aws_linked_list_node *node = aws_linked_list_begin(operations); + while (node != aws_linked_list_end(operations)) { + struct aws_mqtt_rr_client_operation *operation = + AWS_CONTAINER_OF(node, struct aws_mqtt_rr_client_operation, node); + node = aws_linked_list_next(node); + + if (operation->type != AWS_MRROT_STREAMING) { + continue; + } + + if (operation->state == AWS_MRROS_PENDING_DESTROY || operation->state == AWS_MRROS_TERMINAL) { + continue; + } + + aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback = + operation->storage.streaming_storage.options.incoming_publish_callback; + if (!incoming_publish_callback) { + continue; + } + + void *operation_user_data = operation->storage.streaming_storage.options.user_data; + (*incoming_publish_callback)(publish_event, operation_user_data); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on topic '" PRInSTR + "' routed to streaming operation %" PRIu64, + (void *)operation->client_internal_ref, + AWS_BYTE_CURSOR_PRI(publish_event->topic), + operation->id); + } +} + +static void s_complete_operation_with_correlation_token( + struct aws_mqtt_request_response_client *rr_client, + struct aws_byte_cursor correlation_token, + const struct aws_mqtt_rr_incoming_publish_event *publish_event) { + struct aws_hash_element *hash_element = NULL; + + if (aws_hash_table_find(&rr_client->operations_by_correlation_tokens, &correlation_token, &hash_element)) { + return; + } + + if (hash_element == NULL) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on response path topic '" PRInSTR + "' and correlation token '" PRInSTR "' does not have an originating request entry", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic), + AWS_BYTE_CURSOR_PRI(correlation_token)); + return; + } + + struct aws_mqtt_rr_client_operation *operation = hash_element->value; + AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST); + + if (operation->state == AWS_MRROS_PENDING_DESTROY) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation %" PRIu64 " cannot be completed, already in pending destruction state", + (void *)operation->client_internal_ref, + operation->id); + return; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation %" PRIu64 " completed successfully", + (void *)operation->client_internal_ref, + operation->id); + + aws_mqtt_request_operation_completion_fn *completion_callback = + operation->storage.request_storage.options.completion_callback; + void *user_data = operation->storage.request_storage.options.user_data; + + if (completion_callback != NULL) { + (*completion_callback)(publish_event, AWS_ERROR_SUCCESS, user_data); + } + + s_change_operation_state(operation, AWS_MRROS_PENDING_DESTROY); + + aws_mqtt_rr_client_operation_release(operation); +} + +static void s_apply_publish_to_response_path_entry( + struct aws_rr_response_path_entry *entry, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + void *user_data) { + + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on topic '" PRInSTR "' matches response path", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic)); + + struct aws_json_value *json_payload = NULL; + + struct aws_byte_cursor correlation_token; + AWS_ZERO_STRUCT(correlation_token); + struct aws_byte_cursor correlation_token_json_path = aws_byte_cursor_from_buf(&entry->correlation_token_json_path); + if (correlation_token_json_path.len > 0) { + json_payload = aws_json_value_new_from_string(rr_client->allocator, publish_event->payload); + if (json_payload == NULL) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on response path topic '" PRInSTR + "' could not be deserialized into JSON", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic)); + return; + } + + struct aws_byte_cursor segment; + AWS_ZERO_STRUCT(segment); + + struct aws_json_value *correlation_token_entry = json_payload; + while (aws_byte_cursor_next_split(&correlation_token_json_path, '.', &segment)) { + if (!aws_json_value_is_object(correlation_token_entry)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on response path topic '" PRInSTR + "' unable to walk correlation token path '" PRInSTR "'", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic), + AWS_BYTE_CURSOR_PRI(correlation_token_json_path)); + goto done; + } + + correlation_token_entry = aws_json_value_get_from_object(correlation_token_entry, segment); + if (correlation_token_entry == NULL) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on response path topic '" PRInSTR + "' could not find path segment '" PRInSTR "'", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic), + AWS_BYTE_CURSOR_PRI(segment)); + goto done; + } + } + + if (!aws_json_value_is_string(correlation_token_entry)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on response path topic '" PRInSTR + "' token entry is not a string", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic)); + goto done; + } + + if (aws_json_value_get_string(correlation_token_entry, &correlation_token)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on response path topic '" PRInSTR + "' failed to extract string from token entry", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic)); + goto done; + } + } + + s_complete_operation_with_correlation_token(rr_client, correlation_token, publish_event); + +done: + + if (json_payload != NULL) { + aws_json_value_destroy(json_payload); + } +} + +static void s_aws_rr_client_protocol_adapter_incoming_publish_callback( + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + void *user_data) { + + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + + if (rr_client->state != AWS_RRCS_ACTIVE) { + return; + } + + aws_mqtt_request_response_client_subscriptions_handle_incoming_publish( + &rr_client->subscriptions, + publish_event, + s_apply_publish_to_streaming_operation_list, + s_apply_publish_to_response_path_entry, + rr_client); +} + +static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) { + struct aws_mqtt_request_response_client *rr_client = user_data; + + /* release the internal ref count "held" by the protocol adapter's existence */ + aws_ref_count_release(&rr_client->internal_ref_count); +} + +static void s_aws_rr_client_protocol_adapter_connection_event_callback( + const struct aws_protocol_adapter_connection_event *event, + void *user_data) { + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + + if (rr_client->state != AWS_RRCS_ACTIVE) { + return; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client applying connection event to subscription manager", + (void *)rr_client); + + aws_rr_subscription_manager_on_protocol_adapter_connection_event(&rr_client->subscription_manager, event); +} + +static int s_compare_rr_operation_timeouts(const void *a, const void *b) { + const struct aws_mqtt_rr_client_operation **operation_a_ptr = (void *)a; + const struct aws_mqtt_rr_client_operation *operation_a = *operation_a_ptr; + + const struct aws_mqtt_rr_client_operation **operation_b_ptr = (void *)b; + const struct aws_mqtt_rr_client_operation *operation_b = *operation_b_ptr; + + if (operation_a->timeout_timepoint_ns < operation_b->timeout_timepoint_ns) { + return -1; + } else if (operation_a->timeout_timepoint_ns > operation_b->timeout_timepoint_ns) { + return 1; + } else { + return 0; + } +} + +static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new( + struct aws_allocator *allocator, + const struct aws_mqtt_request_response_client_options *options, + struct aws_event_loop *loop) { + struct aws_rr_subscription_manager_options sm_options = { + .max_request_response_subscriptions = options->max_request_response_subscriptions, + .max_streaming_subscriptions = options->max_streaming_subscriptions, + .operation_timeout_seconds = options->operation_timeout_seconds, + }; + + /* + * We can't initialize the subscription manager until we're running on the event loop, so make sure that + * initialize can't fail by checking its options for validity now. + */ + if (!aws_rr_subscription_manager_are_options_valid(&sm_options)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, "(static) request response client creation failed - invalid client options"); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + struct aws_mqtt_request_response_client *rr_client = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client)); + + rr_client->allocator = allocator; + rr_client->config = *options; + rr_client->loop = loop; + rr_client->state = AWS_RRCS_UNINITIALIZED; + + aws_hash_table_init( + &rr_client->operations, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_hash_uint64_t_by_identity, + aws_hash_compare_uint64_t_eq, + NULL, + NULL); + + aws_priority_queue_init_dynamic( + &rr_client->operations_by_timeout, + allocator, + 100, + sizeof(struct aws_mqtt_rr_client_operation *), + s_compare_rr_operation_timeouts); + + aws_mqtt_request_response_client_subscriptions_init(&rr_client->subscriptions, allocator); + + aws_hash_table_init( + &rr_client->operations_by_correlation_tokens, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + NULL); + + aws_linked_list_init(&rr_client->operation_queue); + + aws_task_init( + &rr_client->external_shutdown_task, + s_mqtt_request_response_client_external_shutdown_task_fn, + rr_client, + "mqtt_rr_client_external_shutdown"); + + aws_task_init( + &rr_client->internal_shutdown_task, + s_mqtt_request_response_client_internal_shutdown_task_fn, + rr_client, + "mqtt_rr_client_internal_shutdown"); + + /* The initial external ref belongs to the caller */ + aws_ref_count_init(&rr_client->external_ref_count, rr_client, s_aws_rr_client_on_zero_external_ref_count); + + /* The initial internal ref belongs to ourselves (the external ref count shutdown task) */ + aws_ref_count_init(&rr_client->internal_ref_count, rr_client, s_aws_rr_client_on_zero_internal_ref_count); + + aws_atomic_store_int(&rr_client->next_id, 1); + + return rr_client; +} + +static void s_aws_rr_client_init_subscription_manager( + struct aws_mqtt_request_response_client *rr_client, + struct aws_allocator *allocator) { + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + + struct aws_rr_subscription_manager_options subscription_manager_options = { + .operation_timeout_seconds = rr_client->config.operation_timeout_seconds, + .max_request_response_subscriptions = rr_client->config.max_request_response_subscriptions, + .max_streaming_subscriptions = rr_client->config.max_streaming_subscriptions, + .subscription_status_callback = s_aws_rr_client_subscription_status_event_callback, + .userdata = rr_client, + }; + + aws_rr_subscription_manager_init( + &rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options); +} + +static void s_check_for_operation_timeouts(struct aws_mqtt_request_response_client *client) { + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + + struct aws_priority_queue *timeout_queue = &client->operations_by_timeout; + + bool done = aws_priority_queue_size(timeout_queue) == 0; + while (!done) { + struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL; + aws_priority_queue_top(timeout_queue, (void **)&next_operation_by_timeout_ptr); + AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL); + struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr; + AWS_FATAL_ASSERT(next_operation_by_timeout != NULL); + + // If the current top of the heap hasn't timed out than nothing has + if (next_operation_by_timeout->timeout_timepoint_ns > now) { + break; + } + + /* Ack timeout for this operation has been reached */ + aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout); + + s_request_response_fail_operation(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT); + + done = aws_priority_queue_size(timeout_queue) == 0; + } +} + +static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_mqtt_request_response_client *client) { + if (aws_priority_queue_size(&client->operations_by_timeout) > 0) { + struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL; + aws_priority_queue_top(&client->operations_by_timeout, (void **)&next_operation_by_timeout_ptr); + AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL); + struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr; + AWS_FATAL_ASSERT(next_operation_by_timeout != NULL); + + return next_operation_by_timeout->timeout_timepoint_ns; + } + + return UINT64_MAX; +} + +static int s_add_streaming_operation_to_subscription_topic_filter_table( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + + struct aws_byte_cursor topic_filter_cursor = operation->storage.streaming_storage.options.topic_filter; + + struct aws_rr_operation_list_topic_filter_entry *entry = + aws_mqtt_request_response_client_subscriptions_add_stream_subscription( + &client->subscriptions, &topic_filter_cursor); + if (entry == NULL) { + return AWS_OP_ERR; + } + + if (aws_linked_list_node_is_in_list(&operation->node)) { + aws_linked_list_remove(&operation->node); + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client adding streaming operation %" PRIu64 + " to streaming subscription table with topic_filter '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(topic_filter_cursor)); + + aws_linked_list_push_back(&entry->operations, &operation->node); + + return AWS_OP_SUCCESS; +} + +static int s_add_request_operation_to_response_path_table( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + + struct aws_array_list *paths = &operation->storage.request_storage.operation_response_paths; + size_t path_count = aws_array_list_length(paths); + for (size_t i = 0; i < path_count; ++i) { + struct aws_mqtt_request_operation_response_path path; + aws_array_list_get_at(paths, &path, i); + if (aws_mqtt_request_response_client_subscriptions_add_request_subscription( + &client->subscriptions, &path.topic, &path.correlation_token_json_path)) { + return AWS_OP_ERR; + } + } + return AWS_OP_SUCCESS; +} + +static int s_add_request_operation_to_correlation_token_table( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + + return aws_hash_table_put( + &client->operations_by_correlation_tokens, + &operation->storage.request_storage.options.correlation_token, + operation, + NULL); +} + +static int s_add_in_progress_operation_to_tracking_tables( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type == AWS_MRROT_STREAMING) { + if (s_add_streaming_operation_to_subscription_topic_filter_table(client, operation)) { + return AWS_OP_ERR; + } + } else { + if (s_add_request_operation_to_response_path_table(client, operation)) { + return AWS_OP_ERR; + } + + if (s_add_request_operation_to_correlation_token_table(client, operation)) { + return AWS_OP_ERR; + } + } + + operation->in_client_tables = true; + + return AWS_OP_SUCCESS; +} + +static void s_handle_operation_subscribe_result( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation, + enum aws_acquire_subscription_result_type subscribe_result) { + if (subscribe_result == AASRT_FAILURE || subscribe_result == AASRT_NO_CAPACITY) { + int error_code = (subscribe_result == AASRT_NO_CAPACITY) + ? AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY + : AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE; + s_request_response_fail_operation(operation, error_code); + return; + } + + if (s_add_in_progress_operation_to_tracking_tables(client, operation)) { + s_request_response_fail_operation(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + return; + } + + if (subscribe_result == AASRT_SUBSCRIBING) { + s_change_operation_state(operation, AWS_MRROS_PENDING_SUBSCRIPTION); + return; + } + + if (operation->type == AWS_MRROT_STREAMING) { + s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED); + s_streaming_operation_emit_streaming_subscription_event( + operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS); + } else { + s_make_mqtt_request(client, operation); + } +} + +static enum aws_rr_subscription_type s_rr_operation_type_to_subscription_type( + enum aws_mqtt_request_response_operation_type type) { + if (type == AWS_MRROT_REQUEST) { + return ARRST_REQUEST_RESPONSE; + } + + return ARRST_EVENT_STREAM; +} + +static bool s_can_operation_dequeue( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type != AWS_MRROT_REQUEST) { + return true; + } + + struct aws_hash_element *token_element = NULL; + if (aws_hash_table_find( + &client->operations_by_correlation_tokens, + &operation->storage.request_storage.options.correlation_token, + &token_element)) { + return false; + } + + return token_element == NULL; +} + +static struct aws_byte_cursor *s_aws_mqtt_rr_operation_get_subscription_topic_filters( + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type == AWS_MRROT_STREAMING) { + return &operation->storage.streaming_storage.options.topic_filter; + } else { + return operation->storage.request_storage.options.subscription_topic_filters; + } +} + +static size_t s_aws_mqtt_rr_operation_get_subscription_topic_filter_count( + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type == AWS_MRROT_STREAMING) { + return 1; + } else { + return operation->storage.request_storage.options.subscription_topic_filter_count; + } +} + +static void s_process_queued_operations(struct aws_mqtt_request_response_client *client) { + aws_rr_subscription_manager_purge_unused(&client->subscription_manager); + + while (!aws_linked_list_empty(&client->operation_queue)) { + struct aws_linked_list_node *head = aws_linked_list_front(&client->operation_queue); + struct aws_mqtt_rr_client_operation *head_operation = + AWS_CONTAINER_OF(head, struct aws_mqtt_rr_client_operation, node); + + if (!s_can_operation_dequeue(client, head_operation)) { + break; + } + + struct aws_rr_acquire_subscription_options subscribe_options = { + .topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(head_operation), + .topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(head_operation), + .operation_id = head_operation->id, + .type = s_rr_operation_type_to_subscription_type(head_operation->type), + }; + + enum aws_acquire_subscription_result_type subscribe_result = + aws_rr_subscription_manager_acquire_subscription(&client->subscription_manager, &subscribe_options); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client intake, queued operation %" PRIu64 + " yielded acquire subscription result: %s", + (void *)client, + head_operation->id, + s_aws_acquire_subscription_result_type(subscribe_result)); + + if (subscribe_result == AASRT_BLOCKED) { + break; + } + + aws_linked_list_pop_front(&client->operation_queue); + s_handle_operation_subscribe_result(client, head_operation, subscribe_result); + } +} + +static void s_mqtt_request_response_service_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { + (void)task; + + if (task_status == AWS_TASK_STATUS_CANCELED) { + return; + } + + struct aws_mqtt_request_response_client *client = arg; + client->scheduled_service_timepoint_ns = 0; + + if (client->state == AWS_RRCS_ACTIVE) { + + // timeouts + s_check_for_operation_timeouts(client); + + // operation queue + s_process_queued_operations(client); + + // schedule next service + client->scheduled_service_timepoint_ns = s_mqtt_request_response_client_get_next_service_time(client); + aws_event_loop_schedule_task_future( + client->loop, &client->service_task, client->scheduled_service_timepoint_ns); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client service, next timepoint: %" PRIu64, + (void *)client, + client->scheduled_service_timepoint_ns); + } +} + +static void s_mqtt_request_response_client_initialize_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { + (void)task; + + AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED); + + struct aws_mqtt_request_response_client *client = arg; + + if (client->state == AWS_RRCS_UNINITIALIZED) { + s_aws_rr_client_init_subscription_manager(client, client->allocator); + + client->state = AWS_RRCS_ACTIVE; + + aws_task_init(&client->service_task, s_mqtt_request_response_service_task_fn, client, "mqtt_rr_client_service"); + + aws_event_loop_schedule_task_future(client->loop, &client->service_task, UINT64_MAX); + client->scheduled_service_timepoint_ns = UINT64_MAX; + } + + if (client->config.initialized_callback != NULL) { + (*client->config.initialized_callback)(client->config.user_data); + } + + /* give up the internal ref we held while the task was pending */ + aws_ref_count_release(&client->internal_ref_count); +} + +static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client *rr_client) { + /* now that it exists, 1 internal ref belongs to protocol adapter termination */ + aws_ref_count_acquire(&rr_client->internal_ref_count); + + /* 1 internal ref belongs to the initialize task until it runs */ + aws_ref_count_acquire(&rr_client->internal_ref_count); + + aws_task_init( + &rr_client->initialize_task, + s_mqtt_request_response_client_initialize_task_fn, + rr_client, + "mqtt_rr_client_initialize"); + aws_event_loop_schedule_task_now(rr_client->loop, &rr_client->initialize_task); +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client( + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *client, + const struct aws_mqtt_request_response_client_options *options) { + + struct aws_mqtt_request_response_client *rr_client = + s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client)); + + if (rr_client == NULL) { + return NULL; + } + + struct aws_mqtt_protocol_adapter_options adapter_options = { + .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback, + .incoming_publish_callback = s_aws_rr_client_protocol_adapter_incoming_publish_callback, + .terminate_callback = s_aws_rr_client_protocol_adapter_terminate_callback, + .connection_event_callback = s_aws_rr_client_protocol_adapter_connection_event_callback, + .user_data = rr_client, + }; + + rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_311(rr_client->allocator, &adapter_options, client); + if (rr_client->client_adapter == NULL) { + goto error; + } + + s_setup_cross_thread_initialization(rr_client); + + return rr_client; + +error: + + /* even on construction failures we still need to walk through the async shutdown process */ + aws_mqtt_request_response_client_release(rr_client); + + return NULL; +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client( + struct aws_allocator *allocator, + struct aws_mqtt5_client *client, + const struct aws_mqtt_request_response_client_options *options) { + + struct aws_mqtt_request_response_client *rr_client = + s_aws_mqtt_request_response_client_new(allocator, options, client->loop); + + if (rr_client == NULL) { + return NULL; + } + + struct aws_mqtt_protocol_adapter_options adapter_options = { + .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback, + .incoming_publish_callback = s_aws_rr_client_protocol_adapter_incoming_publish_callback, + .terminate_callback = s_aws_rr_client_protocol_adapter_terminate_callback, + .connection_event_callback = s_aws_rr_client_protocol_adapter_connection_event_callback, + .user_data = rr_client, + }; + + rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_5(rr_client->allocator, &adapter_options, client); + if (rr_client->client_adapter == NULL) { + goto error; + } + + s_setup_cross_thread_initialization(rr_client); + + return rr_client; + +error: + + /* even on construction failures we still need to walk through the async shutdown process */ + aws_mqtt_request_response_client_release(rr_client); + + return NULL; +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire( + struct aws_mqtt_request_response_client *client) { + if (client != NULL) { + aws_ref_count_acquire(&client->external_ref_count); + } + + return client; +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release( + struct aws_mqtt_request_response_client *client) { + if (client != NULL) { + aws_ref_count_release(&client->external_ref_count); + } + + return NULL; +} + +///////////////////////////////////////////////// + +static bool s_are_request_operation_options_valid( + const struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_request_operation_options *request_options) { + if (request_options == NULL) { + AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client - NULL request options", (void *)client); + return false; + } + + if (request_options->response_path_count == 0) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - no response paths supplied", + (void *)client); + return false; + } + + for (size_t i = 0; i < request_options->response_path_count; ++i) { + const struct aws_mqtt_request_operation_response_path *path = &request_options->response_paths[i]; + if (!aws_mqtt_is_valid_topic(&path->topic)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - " PRInSTR " is not a valid topic", + (void *)client, + AWS_BYTE_CURSOR_PRI(path->topic)); + return false; + } + } + + if (!aws_mqtt_is_valid_topic(&request_options->publish_topic)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - " PRInSTR " is not a valid topic", + (void *)client, + AWS_BYTE_CURSOR_PRI(request_options->publish_topic)); + return false; + } + + if (request_options->subscription_topic_filter_count == 0) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - no subscription topic filters supplied", + (void *)client); + return false; + } + + for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) { + const struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i]; + if (!aws_mqtt_is_valid_topic_filter(&subscription_topic_filter)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - " PRInSTR " is not a valid subscription topic filter", + (void *)client, + AWS_BYTE_CURSOR_PRI(subscription_topic_filter)); + return false; + } + } + + if (request_options->serialized_request.len == 0) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty request payload", (void *)client); + return false; + } + + return true; +} + +static bool s_are_streaming_operation_options_valid( + struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_streaming_operation_options *streaming_options) { + if (streaming_options == NULL) { + AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client - NULL streaming options", (void *)client); + return false; + } + + if (!aws_mqtt_is_valid_topic_filter(&streaming_options->topic_filter)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client streaming options - " PRInSTR " is not a valid topic filter", + (void *)client, + AWS_BYTE_CURSOR_PRI(streaming_options->topic_filter)); + return false; + } + + return true; +} + +static uint64_t s_aws_mqtt_request_response_client_allocate_operation_id( + struct aws_mqtt_request_response_client *client) { + return aws_atomic_fetch_add(&client->next_id, 1); +} + +static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + + struct aws_mqtt_rr_client_operation *operation = arg; + struct aws_mqtt_request_response_client *client = operation->client_internal_ref; + + if (status == AWS_TASK_STATUS_CANCELED) { + goto done; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client, queuing operation %" PRIu64, + (void *)client, + operation->id); + + // add appropriate client table entries + aws_hash_table_put(&client->operations, &operation->id, operation, NULL); + + // add to timeout priority queue + if (operation->type == AWS_MRROT_REQUEST) { + aws_priority_queue_push_ref( + &client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node); + } + + // enqueue + aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node); + + s_change_operation_state(operation, AWS_MRROS_QUEUED); + + s_mqtt_request_response_client_wake_service(operation->client_internal_ref); + +done: + + /* + * We hold a second reference to the operation during submission. This ensures that even if a streaming operation + * is immediately dec-refed by the creator (before submission completes), the operation will not get destroyed. + * + * It is now safe and correct to release that reference. + * + * After this, streaming operation lifetime is completely user-driven, while request operation lifetime is + * completely client-internal. + */ + aws_mqtt_rr_client_operation_release(operation); +} + +static void s_aws_mqtt_streaming_operation_storage_clean_up(struct aws_mqtt_streaming_operation_storage *storage) { + aws_byte_buf_clean_up(&storage->operation_data); +} + +static void s_aws_mqtt_request_operation_storage_clean_up(struct aws_mqtt_request_operation_storage *storage) { + aws_array_list_clean_up(&storage->operation_response_paths); + aws_array_list_clean_up(&storage->subscription_topic_filters); + aws_byte_buf_clean_up(&storage->operation_data); +} + +static void s_remove_operation_from_client_tables(struct aws_mqtt_rr_client_operation *operation) { + if (operation->type != AWS_MRROT_REQUEST) { + return; + } + + if (!operation->in_client_tables) { + return; + } + + struct aws_mqtt_request_response_client *client = operation->client_internal_ref; + + aws_hash_table_remove( + &client->operations_by_correlation_tokens, + &operation->storage.request_storage.options.correlation_token, + NULL, + NULL); + + struct aws_array_list *paths = &operation->storage.request_storage.operation_response_paths; + + size_t path_count = aws_array_list_length(paths); + for (size_t i = 0; i < path_count; ++i) { + struct aws_mqtt_request_operation_response_path path; + aws_array_list_get_at(paths, &path, i); + if (aws_mqtt_request_response_client_subscriptions_remove_request_subscription( + &client->subscriptions, &path.topic) == AWS_OP_ERR) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: internal state error removing reference to response path for topic " PRInSTR, + (void *)client, + AWS_BYTE_CURSOR_PRI(path.topic)); + } + } +} + +static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct aws_mqtt_rr_client_operation *operation = arg; + struct aws_mqtt_request_response_client *client = operation->client_internal_ref; + + aws_hash_table_remove(&client->operations, &operation->id, NULL, NULL); + s_remove_operation_from_timeout_queue(operation); + + if (aws_linked_list_node_is_in_list(&operation->node)) { + aws_linked_list_remove(&operation->node); + } + + if (client->state != AWS_RRCS_SHUTTING_DOWN) { + struct aws_rr_release_subscription_options release_options = { + .topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(operation), + .topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(operation), + .operation_id = operation->id, + }; + aws_rr_subscription_manager_release_subscription(&client->subscription_manager, &release_options); + } + + s_remove_operation_from_client_tables(operation); + + aws_mqtt_request_response_client_release_internal(operation->client_internal_ref); + + if (operation->type == AWS_MRROT_STREAMING) { + s_aws_mqtt_streaming_operation_storage_clean_up(&operation->storage.streaming_storage); + } else { + s_aws_mqtt_request_operation_storage_clean_up(&operation->storage.request_storage); + } + + aws_mqtt_streaming_operation_terminated_fn *terminated_callback = NULL; + void *terminated_user_data = NULL; + if (operation->type == AWS_MRROT_STREAMING) { + terminated_callback = operation->storage.streaming_storage.options.terminated_callback; + terminated_user_data = operation->storage.streaming_storage.options.user_data; + } + + aws_mem_release(operation->allocator, operation); + + if (terminated_callback != NULL) { + (*terminated_callback)(terminated_user_data); + } +} + +static void s_on_mqtt_rr_client_operation_zero_ref_count(void *context) { + struct aws_mqtt_rr_client_operation *operation = context; + + aws_event_loop_schedule_task_now(operation->client_internal_ref->loop, &operation->destroy_task); +} + +static void s_aws_mqtt_rr_client_operation_init_shared( + struct aws_mqtt_rr_client_operation *operation, + struct aws_mqtt_request_response_client *client) { + operation->allocator = client->allocator; + aws_ref_count_init(&operation->ref_count, operation, s_on_mqtt_rr_client_operation_zero_ref_count); + + operation->client_internal_ref = aws_mqtt_request_response_client_acquire_internal(client); + operation->id = s_aws_mqtt_request_response_client_allocate_operation_id(client); + s_change_operation_state(operation, AWS_MRROS_NONE); + + aws_task_init( + &operation->submit_task, + s_mqtt_rr_client_submit_operation, + operation, + "MQTTRequestResponseClientOperationSubmit"); + aws_task_init( + &operation->destroy_task, + s_mqtt_rr_client_destroy_operation, + operation, + "MQTTRequestResponseClientOperationDestroy"); +} + +void s_aws_mqtt_request_operation_storage_init_from_options( + struct aws_mqtt_request_operation_storage *storage, + struct aws_allocator *allocator, + const struct aws_mqtt_request_operation_options *request_options) { + + size_t bytes_needed = 0; + bytes_needed += request_options->publish_topic.len; + bytes_needed += request_options->serialized_request.len; + bytes_needed += request_options->correlation_token.len; + + for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) { + const struct aws_byte_cursor *subscription_topic_filter = &request_options->subscription_topic_filters[i]; + + bytes_needed += subscription_topic_filter->len; + } + + for (size_t i = 0; i < request_options->response_path_count; ++i) { + const struct aws_mqtt_request_operation_response_path *response_path = &request_options->response_paths[i]; + + bytes_needed += response_path->topic.len; + bytes_needed += response_path->correlation_token_json_path.len; + } + + storage->options = *request_options; + + aws_byte_buf_init(&storage->operation_data, allocator, bytes_needed); + aws_array_list_init_dynamic( + &storage->operation_response_paths, + allocator, + request_options->response_path_count, + sizeof(struct aws_mqtt_request_operation_response_path)); + aws_array_list_init_dynamic( + &storage->subscription_topic_filters, + allocator, + request_options->subscription_topic_filter_count, + sizeof(struct aws_byte_cursor)); + + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.publish_topic) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.serialized_request) == + AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.correlation_token) == + AWS_OP_SUCCESS); + + for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) { + struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i]; + + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &subscription_topic_filter) == AWS_OP_SUCCESS); + + aws_array_list_push_back(&storage->subscription_topic_filters, &subscription_topic_filter); + } + + storage->options.subscription_topic_filters = storage->subscription_topic_filters.data; + + for (size_t i = 0; i < request_options->response_path_count; ++i) { + struct aws_mqtt_request_operation_response_path response_path = request_options->response_paths[i]; + + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &response_path.topic) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &response_path.correlation_token_json_path) == + AWS_OP_SUCCESS); + + aws_array_list_push_back(&storage->operation_response_paths, &response_path); + } + + storage->options.response_paths = storage->operation_response_paths.data; +} + +static void s_log_request_response_operation( + struct aws_mqtt_rr_client_operation *operation, + struct aws_mqtt_request_response_client *client) { + struct aws_logger *log_handle = aws_logger_get_conditional(AWS_LS_MQTT_REQUEST_RESPONSE, AWS_LL_DEBUG); + if (log_handle == NULL) { + return; + } + + struct aws_mqtt_request_operation_options *options = &operation->storage.request_storage.options; + + for (size_t i = 0; i < options->subscription_topic_filter_count; ++i) { + struct aws_byte_cursor subscription_topic_filter = options->subscription_topic_filters[i]; + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64 " - subscription topic filter %zu topic '" PRInSTR "'", + (void *)client, + operation->id, + i, + AWS_BYTE_CURSOR_PRI(subscription_topic_filter)); + } + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64 " - correlation token: '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(options->correlation_token)); + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64 " - publish topic: '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(options->publish_topic)); + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64 " - %zu response paths:", + (void *)client, + operation->id, + options->response_path_count); + + for (size_t i = 0; i < options->response_path_count; ++i) { + struct aws_mqtt_request_operation_response_path *response_path = &options->response_paths[i]; + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64 " - response path %zu topic '" PRInSTR "'", + (void *)client, + operation->id, + i, + AWS_BYTE_CURSOR_PRI(response_path->topic)); + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64 " - response path %zu correlation token path '" PRInSTR + "'", + (void *)client, + operation->id, + i, + AWS_BYTE_CURSOR_PRI(response_path->correlation_token_json_path)); + } +} + +int aws_mqtt_request_response_client_submit_request( + struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_request_operation_options *request_options) { + + if (client == NULL) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + if (!s_are_request_operation_options_valid(client, request_options)) { + /* all failure cases have logged the problem already */ + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + uint64_t now = 0; + if (aws_high_res_clock_get_ticks(&now)) { + return aws_raise_error(AWS_ERROR_CLOCK_FAILURE); + } + + struct aws_allocator *allocator = client->allocator; + struct aws_mqtt_rr_client_operation *operation = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); + operation->allocator = allocator; + operation->type = AWS_MRROT_REQUEST; + operation->timeout_timepoint_ns = + now + + aws_timestamp_convert(client->config.operation_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + operation->pending_subscriptions = request_options->subscription_topic_filter_count; + + s_aws_mqtt_request_operation_storage_init_from_options( + &operation->storage.request_storage, allocator, request_options); + s_aws_mqtt_rr_client_operation_init_shared(operation, client); + + AWS_LOGF_INFO( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client - submitting request-response operation with id %" PRIu64, + (void *)client, + operation->id); + + s_log_request_response_operation(operation, client); + + /* + * We hold a second reference to the operation during submission. This ensures that even if a streaming operation + * is immediately dec-refed by the creator (before submission runs), the operation will not get destroyed. + */ + aws_mqtt_rr_client_operation_acquire(operation); + + aws_event_loop_schedule_task_now(client->loop, &operation->submit_task); + + return AWS_OP_SUCCESS; +} + +void s_aws_mqtt_streaming_operation_storage_init_from_options( + struct aws_mqtt_streaming_operation_storage *storage, + struct aws_allocator *allocator, + const struct aws_mqtt_streaming_operation_options *streaming_options) { + size_t bytes_needed = streaming_options->topic_filter.len; + + storage->options = *streaming_options; + aws_byte_buf_init(&storage->operation_data, allocator, bytes_needed); + + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.topic_filter) == AWS_OP_SUCCESS); + + aws_atomic_init_int(&storage->activated, 0); +} + +static void s_log_streaming_operation( + struct aws_mqtt_rr_client_operation *operation, + struct aws_mqtt_request_response_client *client) { + struct aws_logger *log_handle = aws_logger_get_conditional(AWS_LS_MQTT_REQUEST_RESPONSE, AWS_LL_DEBUG); + if (log_handle == NULL) { + return; + } + + struct aws_mqtt_streaming_operation_options *options = &operation->storage.streaming_storage.options; + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client streaming operation %" PRIu64 ": topic filter: '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(options->topic_filter)); +} + +struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_streaming_operation( + struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_streaming_operation_options *streaming_options) { + + if (client == NULL) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + if (!s_are_streaming_operation_options_valid(client, streaming_options)) { + /* all failure cases have logged the problem already */ + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + struct aws_allocator *allocator = client->allocator; + struct aws_mqtt_rr_client_operation *operation = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); + operation->allocator = allocator; + operation->type = AWS_MRROT_STREAMING; + operation->timeout_timepoint_ns = UINT64_MAX; + operation->pending_subscriptions = 1; + + s_aws_mqtt_streaming_operation_storage_init_from_options( + &operation->storage.streaming_storage, allocator, streaming_options); + s_aws_mqtt_rr_client_operation_init_shared(operation, client); + + AWS_LOGF_INFO( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client - submitting streaming operation with id %" PRIu64, + (void *)client, + operation->id); + + s_log_streaming_operation(operation, client); + + return operation; +} + +int aws_mqtt_rr_client_operation_activate(struct aws_mqtt_rr_client_operation *operation) { + struct aws_atomic_var *activated = &operation->storage.streaming_storage.activated; + size_t unactivated = 0; + if (!aws_atomic_compare_exchange_int(activated, &unactivated, 1)) { + return aws_raise_error(AWS_ERROR_MQTT_REUQEST_RESPONSE_STREAM_ALREADY_ACTIVATED); + } + + struct aws_mqtt_request_response_client *rr_client = operation->client_internal_ref; + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client - activating streaming operation with id %" PRIu64, + (void *)rr_client, + operation->id); + + /* + * We hold a second reference to the operation during submission. This ensures that even if a streaming operation + * is immediately dec-refed by the creator (before submission runs), the operation will not get destroyed. + */ + aws_mqtt_rr_client_operation_acquire(operation); + + aws_event_loop_schedule_task_now(rr_client->loop, &operation->submit_task); + + return AWS_OP_SUCCESS; +} + +struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire( + struct aws_mqtt_rr_client_operation *operation) { + if (operation != NULL) { + aws_ref_count_acquire(&operation->ref_count); + } + + return operation; +} + +struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release( + struct aws_mqtt_rr_client_operation *operation) { + if (operation != NULL) { + aws_ref_count_release(&operation->ref_count); + } + + return NULL; +} + +struct aws_event_loop *aws_mqtt_request_response_client_get_event_loop( + struct aws_mqtt_request_response_client *client) { + + return aws_mqtt_protocol_adapter_get_event_loop(client->client_adapter); +} diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_subscription_set.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_subscription_set.c new file mode 100644 index 00000000000..319410c3d4b --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_subscription_set.c @@ -0,0 +1,336 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/private/request-response/request_response_subscription_set.h> + +#include <aws/mqtt/mqtt.h> +#include <aws/mqtt/private/client_impl_shared.h> +#include <aws/mqtt/request-response/request_response_client.h> + +#define MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE 50 +#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50 + +static void s_aws_rr_operation_list_topic_filter_entry_destroy(struct aws_rr_operation_list_topic_filter_entry *entry) { + if (entry == NULL) { + return; + } + + aws_byte_buf_clean_up(&entry->topic_filter); + + aws_mem_release(entry->allocator, entry); +} + +static void s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy(void *value) { + s_aws_rr_operation_list_topic_filter_entry_destroy(value); +} + +static struct aws_rr_response_path_entry *s_aws_rr_response_path_entry_new( + struct aws_allocator *allocator, + struct aws_byte_cursor topic, + struct aws_byte_cursor correlation_token_json_path) { + struct aws_rr_response_path_entry *entry = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_response_path_entry)); + + entry->allocator = allocator; + entry->ref_count = 1; + aws_byte_buf_init_copy_from_cursor(&entry->topic, allocator, topic); + entry->topic_cursor = aws_byte_cursor_from_buf(&entry->topic); + + aws_byte_buf_init_copy_from_cursor(&entry->correlation_token_json_path, allocator, correlation_token_json_path); + + return entry; +} + +static void s_aws_rr_response_path_entry_destroy(struct aws_rr_response_path_entry *entry) { + if (entry == NULL) { + return; + } + + aws_byte_buf_clean_up(&entry->topic); + aws_byte_buf_clean_up(&entry->correlation_token_json_path); + + aws_mem_release(entry->allocator, entry); +} + +static void s_aws_rr_response_path_table_hash_element_destroy(void *value) { + s_aws_rr_response_path_entry_destroy(value); +} + +int aws_mqtt_request_response_client_subscriptions_init( + struct aws_request_response_subscriptions *subscriptions, + struct aws_allocator *allocator) { + AWS_FATAL_ASSERT(subscriptions); + + subscriptions->allocator = allocator; + + if (aws_hash_table_init( + &subscriptions->streaming_operation_subscription_lists, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) { + goto clean_up; + } + + if (aws_hash_table_init( + &subscriptions->streaming_operation_wildcards_subscription_lists, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) { + goto clean_up; + } + + if (aws_hash_table_init( + &subscriptions->request_response_paths, + allocator, + MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_response_path_table_hash_element_destroy)) { + goto clean_up; + } + + return AWS_OP_SUCCESS; + +clean_up: + aws_mqtt_request_response_client_subscriptions_clean_up(subscriptions); + return AWS_OP_ERR; +} + +void aws_mqtt_request_response_client_subscriptions_clean_up(struct aws_request_response_subscriptions *subscriptions) { + if (subscriptions == NULL) { + return; + } + + if (aws_hash_table_is_valid(&subscriptions->streaming_operation_subscription_lists)) { + aws_hash_table_clean_up(&subscriptions->streaming_operation_subscription_lists); + } + if (aws_hash_table_is_valid(&subscriptions->streaming_operation_wildcards_subscription_lists)) { + aws_hash_table_clean_up(&subscriptions->streaming_operation_wildcards_subscription_lists); + } + if (aws_hash_table_is_valid(&subscriptions->request_response_paths)) { + aws_hash_table_clean_up(&subscriptions->request_response_paths); + } +} + +static struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_topic_filter_entry_new( + struct aws_allocator *allocator, + struct aws_byte_cursor topic_filter) { + struct aws_rr_operation_list_topic_filter_entry *entry = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_operation_list_topic_filter_entry)); + + entry->allocator = allocator; + aws_byte_buf_init_copy_from_cursor(&entry->topic_filter, allocator, topic_filter); + entry->topic_filter_cursor = aws_byte_cursor_from_buf(&entry->topic_filter); + + aws_linked_list_init(&entry->operations); + + return entry; +} + +static bool s_is_topic_filter_with_wildcard(const struct aws_byte_cursor *topic_filter) { + bool res = (memchr(topic_filter->ptr, '+', topic_filter->len) || memchr(topic_filter->ptr, '#', topic_filter->len)); + return res; +} + +struct aws_rr_operation_list_topic_filter_entry *aws_mqtt_request_response_client_subscriptions_add_stream_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter) { + AWS_FATAL_ASSERT(subscriptions); + + struct aws_hash_table *subscription_lists = s_is_topic_filter_with_wildcard(topic_filter) + ? &subscriptions->streaming_operation_wildcards_subscription_lists + : &subscriptions->streaming_operation_subscription_lists; + + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(subscription_lists, topic_filter, &element)) { + aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + return NULL; + } + + struct aws_rr_operation_list_topic_filter_entry *entry = NULL; + if (element == NULL) { + entry = s_aws_rr_operation_list_topic_filter_entry_new(subscriptions->allocator, *topic_filter); + aws_hash_table_put(subscription_lists, &entry->topic_filter_cursor, entry, NULL); + } else { + entry = element->value; + } + + AWS_FATAL_ASSERT(entry != NULL); + + return entry; +} + +int aws_mqtt_request_response_client_subscriptions_add_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter, + const struct aws_byte_cursor *correlation_token_json_path) { + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(&subscriptions->request_response_paths, topic_filter, &element)) { + return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + } + + if (element != NULL) { + struct aws_rr_response_path_entry *entry = element->value; + ++entry->ref_count; + return AWS_OP_SUCCESS; + } + + struct aws_rr_response_path_entry *entry = + s_aws_rr_response_path_entry_new(subscriptions->allocator, *topic_filter, *correlation_token_json_path); + if (aws_hash_table_put(&subscriptions->request_response_paths, &entry->topic_cursor, entry, NULL)) { + s_aws_rr_response_path_entry_destroy(entry); + return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + } + + return AWS_OP_SUCCESS; +} + +int aws_mqtt_request_response_client_subscriptions_remove_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter) { + + AWS_FATAL_ASSERT(subscriptions); + AWS_FATAL_ASSERT(topic_filter); + + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(&subscriptions->request_response_paths, topic_filter, &element) || element == NULL) { + return AWS_OP_ERR; + } + + struct aws_rr_response_path_entry *entry = element->value; + --entry->ref_count; + + if (entry->ref_count == 0) { + aws_hash_table_remove(&subscriptions->request_response_paths, topic_filter, NULL, NULL); + } + + return AWS_OP_SUCCESS; +} + +static void s_match_stream_subscriptions( + const struct aws_hash_table *subscriptions, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + void *user_data) { + struct aws_hash_element *subscription_filter_element = NULL; + if (aws_hash_table_find(subscriptions, &publish_event->topic, &subscription_filter_element) == AWS_OP_SUCCESS && + subscription_filter_element != NULL) { + + struct aws_rr_operation_list_topic_filter_entry *entry = subscription_filter_element->value; + on_stream_operation_subscription_match( + &entry->operations, &entry->topic_filter_cursor, publish_event, user_data); + } +} + +static bool s_is_topic_matched_to_subscription( + struct aws_byte_cursor topic, + struct aws_byte_cursor topic_filter_cursor) { + bool is_matched = true; + bool multi_level_wildcard = false; + + struct aws_byte_cursor subscription_topic_filter_segment; + AWS_ZERO_STRUCT(subscription_topic_filter_segment); + + struct aws_byte_cursor topic_segment; + AWS_ZERO_STRUCT(topic_segment); + + while (aws_byte_cursor_next_split(&topic_filter_cursor, '/', &subscription_topic_filter_segment)) { + if (!aws_byte_cursor_next_split(&topic, '/', &topic_segment)) { + is_matched = false; + break; + } + + if (aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "#")) { + multi_level_wildcard = true; + is_matched = true; + break; + } + + if (!aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "+") && + !aws_byte_cursor_eq(&topic_segment, &subscription_topic_filter_segment)) { + is_matched = false; + break; + } + } + if (!multi_level_wildcard && aws_byte_cursor_next_split(&topic, '/', &topic_segment)) { + is_matched = false; + } + + return is_matched; +} + +static void s_match_wildcard_stream_subscriptions( + const struct aws_hash_table *subscriptions, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + void *user_data) { + + /* + * Incoming event's topic is checked against each registered stream with wildcard. While this approach is far from + * optimal, it should be sufficient for request-response client where not many subscriptions with wildcards are + * used. + */ + for (struct aws_hash_iter iter = aws_hash_iter_begin(subscriptions); !aws_hash_iter_done(&iter); + aws_hash_iter_next(&iter)) { + struct aws_rr_operation_list_topic_filter_entry *entry = iter.element.value; + struct aws_byte_cursor topic_filter_cursor = entry->topic_filter_cursor; + + bool is_matched = s_is_topic_matched_to_subscription(publish_event->topic, topic_filter_cursor); + if (is_matched) { + on_stream_operation_subscription_match( + &entry->operations, &entry->topic_filter_cursor, publish_event, user_data); + } + } +} + +void s_match_request_response_subscriptions( + const struct aws_hash_table *request_response_paths, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match, + void *user_data) { + + struct aws_hash_element *response_path_element = NULL; + if (aws_hash_table_find(request_response_paths, &publish_event->topic, &response_path_element) == AWS_OP_SUCCESS && + response_path_element != NULL) { + + on_request_operation_subscription_match(response_path_element->value, publish_event, user_data); + } +} + +void aws_mqtt_request_response_client_subscriptions_handle_incoming_publish( + const struct aws_request_response_subscriptions *subscriptions, + const struct aws_mqtt_rr_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match, + void *user_data) { + + AWS_FATAL_PRECONDITION(subscriptions); + AWS_FATAL_PRECONDITION(publish_event); + AWS_FATAL_PRECONDITION(on_stream_operation_subscription_match); + AWS_FATAL_PRECONDITION(on_request_operation_subscription_match); + + /* Streaming operation handling */ + s_match_stream_subscriptions( + &subscriptions->streaming_operation_subscription_lists, + publish_event, + on_stream_operation_subscription_match, + user_data); + + s_match_wildcard_stream_subscriptions( + &subscriptions->streaming_operation_wildcards_subscription_lists, + publish_event, + on_stream_operation_subscription_match, + user_data); + + /* Request-Response handling */ + s_match_request_response_subscriptions( + &subscriptions->request_response_paths, publish_event, on_request_operation_subscription_match, user_data); +} diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/subscription_manager.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/subscription_manager.c new file mode 100644 index 00000000000..88cc542873e --- /dev/null +++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/subscription_manager.c @@ -0,0 +1,822 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/mqtt/private/request-response/subscription_manager.h> + +#include <aws/common/logging.h> +#include <aws/mqtt/private/client_impl_shared.h> +#include <aws/mqtt/private/request-response/protocol_adapter.h> + +#include <inttypes.h> + +enum aws_rr_subscription_status_type { + ARRSST_SUBSCRIBED, + ARRSST_NOT_SUBSCRIBED, +}; + +/* + * Invariant: subscriptions can only transition from nothing -> {subscribing, unsubscribing} + * + * In particular, the logic blocks subscribing while unsubscribing and unsubscribing while subscribing (unless + * shutting down). + */ +enum aws_rr_subscription_pending_action_type { + ARRSPAT_NOTHING, + ARRSPAT_SUBSCRIBING, + ARRSPAT_UNSUBSCRIBING, +}; + +struct aws_rr_subscription_listener { + struct aws_allocator *allocator; + uint64_t operation_id; +}; + +static uint64_t s_aws_hash_subscription_listener(const void *item) { + const struct aws_rr_subscription_listener *listener = item; + + return listener->operation_id; +} + +static bool s_aws_subscription_listener_hash_equality(const void *a, const void *b) { + const struct aws_rr_subscription_listener *a_listener = a; + const struct aws_rr_subscription_listener *b_listener = b; + + return a_listener->operation_id == b_listener->operation_id; +} + +static void s_aws_subscription_listener_destroy(void *element) { + struct aws_rr_subscription_listener *listener = element; + + aws_mem_release(listener->allocator, listener); +} + +struct aws_rr_subscription_record { + struct aws_allocator *allocator; + + struct aws_byte_buf topic_filter; + struct aws_byte_cursor topic_filter_cursor; + + struct aws_hash_table listeners; + + enum aws_rr_subscription_status_type status; + enum aws_rr_subscription_pending_action_type pending_action; + + enum aws_rr_subscription_type type; + + /* + * A poisoned record represents a subscription that we will never try to subscribe to because a previous + * attempt resulted in a failure that we judge to be "terminal." Terminal failures include permission failures + * and validation failures. To remove a poisoned record, all listeners must be removed. For request-response + * operations this will happen naturally. For streaming operations, the operation must be closed by the user (in + * response to the user-facing event we emit on the streaming operation when the failure that poisons the + * record occurs). + */ + bool poisoned; +}; + +static void s_aws_rr_subscription_record_destroy(void *element) { + struct aws_rr_subscription_record *record = element; + + aws_byte_buf_clean_up(&record->topic_filter); + aws_hash_table_clean_up(&record->listeners); + + aws_mem_release(record->allocator, record); +} + +static struct aws_rr_subscription_record *s_aws_rr_subscription_new( + struct aws_allocator *allocator, + struct aws_byte_cursor topic_filter, + enum aws_rr_subscription_type type) { + struct aws_rr_subscription_record *record = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_record)); + record->allocator = allocator; + + aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, topic_filter); + record->topic_filter_cursor = aws_byte_cursor_from_buf(&record->topic_filter); + + aws_hash_table_init( + &record->listeners, + allocator, + 4, + s_aws_hash_subscription_listener, + s_aws_subscription_listener_hash_equality, + NULL, + s_aws_subscription_listener_destroy); + + record->status = ARRSST_NOT_SUBSCRIBED; + record->pending_action = ARRSPAT_NOTHING; + + record->type = type; + + return record; +} + +static void s_subscription_record_unsubscribe( + struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record, + bool shutdown) { + + bool currently_subscribed = record->status == ARRSST_SUBSCRIBED; + bool currently_subscribing = record->pending_action == ARRSPAT_SUBSCRIBING; + bool currently_unsubscribing = record->pending_action == ARRSPAT_UNSUBSCRIBING; + + /* + * The difference between a shutdown unsubscribe and a normal unsubscribe is that on a shutdown we will "chase" + * a pending subscribe with an unsubscribe (breaking the invariant of never having multiple MQTT operations + * pending on a subscription). + */ + bool should_unsubscribe = currently_subscribed && !currently_unsubscribing; + if (shutdown) { + should_unsubscribe = should_unsubscribe || currently_subscribing; + } + + if (!should_unsubscribe) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - subscription ('" PRInSTR + "') has no listeners but is not in a state that allows unsubscribe yet", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor)); + return; + } + + struct aws_protocol_adapter_unsubscribe_options unsubscribe_options = { + .topic_filter = record->topic_filter_cursor, + .ack_timeout_seconds = manager->config.operation_timeout_seconds, + }; + + if (aws_mqtt_protocol_adapter_unsubscribe(manager->protocol_adapter, &unsubscribe_options)) { + int error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - sync unsubscribe failure for ('" PRInSTR "'), ec %d(%s)", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor), + error_code, + aws_error_debug_str(error_code)); + return; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - unsubscribe submitted for ('" PRInSTR "')", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor)); + + record->pending_action = ARRSPAT_UNSUBSCRIBING; +} + +/* Only called by the request-response client when shutting down */ +static int s_rr_subscription_clean_up_foreach_wrap(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_manager *manager = context; + struct aws_rr_subscription_record *subscription = elem->value; + + s_subscription_record_unsubscribe(manager, subscription, true); + s_aws_rr_subscription_record_destroy(subscription); + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; +} + +static struct aws_rr_subscription_record *s_get_subscription_record( + struct aws_rr_subscription_manager *manager, + struct aws_byte_cursor topic_filter) { + struct aws_rr_subscription_record *subscription = NULL; + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(&manager->subscriptions, &topic_filter, &element)) { + return NULL; + } + + if (element != NULL) { + subscription = element->value; + } + + return subscription; +} + +struct aws_subscription_stats { + size_t request_response_subscriptions; + size_t event_stream_subscriptions; + size_t unsubscribing_event_stream_subscriptions; +}; + +static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_element *elem) { + const struct aws_rr_subscription_record *subscription = elem->value; + struct aws_subscription_stats *stats = context; + + if (subscription->type == ARRST_EVENT_STREAM) { + ++stats->event_stream_subscriptions; + if (subscription->pending_action == ARRSPAT_UNSUBSCRIBING) { + ++stats->unsubscribing_event_stream_subscriptions; + } + } else { + ++stats->request_response_subscriptions; + } + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +static void s_get_subscription_stats( + struct aws_rr_subscription_manager *manager, + struct aws_subscription_stats *stats) { + AWS_ZERO_STRUCT(*stats); + + aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_count_foreach_wrap, stats); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager current stats: %d event stream sub records, %d request-response sub " + "records, %d unsubscribing event stream subscriptions", + (int)stats->event_stream_subscriptions, + (int)stats->request_response_subscriptions, + (int)stats->unsubscribing_event_stream_subscriptions); +} + +static void s_remove_listener_from_subscription_record( + struct aws_rr_subscription_manager *manager, + struct aws_byte_cursor topic_filter, + uint64_t operation_id) { + struct aws_rr_subscription_record *record = s_get_subscription_record(manager, topic_filter); + if (record == NULL) { + return; + } + + struct aws_rr_subscription_listener listener = { + .operation_id = operation_id, + }; + + aws_hash_table_remove(&record->listeners, &listener, NULL, NULL); + + size_t listener_count = aws_hash_table_get_entry_count(&record->listeners); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - removed listener %" PRIu64 " from subscription ('" PRInSTR + "'), %zu listeners left", + operation_id, + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor), + listener_count); + + if (listener_count == 0) { + struct aws_rr_subscription_status_event event = { + .type = ARRSET_SUBSCRIPTION_EMPTY, + .topic_filter = record->topic_filter_cursor, + .operation_id = 0, + }; + + (*manager->config.subscription_status_callback)(&event, manager->config.userdata); + } +} + +static void s_add_listener_to_subscription_record(struct aws_rr_subscription_record *record, uint64_t operation_id) { + struct aws_rr_subscription_listener *listener = + aws_mem_calloc(record->allocator, 1, sizeof(struct aws_rr_subscription_listener)); + listener->allocator = record->allocator; + listener->operation_id = operation_id; + + aws_hash_table_put(&record->listeners, listener, listener, NULL); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - added listener %" PRIu64 " to subscription ('" PRInSTR + "'), %zu listeners total", + operation_id, + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor), + aws_hash_table_get_entry_count(&record->listeners)); +} + +static int s_rr_subscription_purge_unused_subscriptions_wrapper(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_record *record = elem->value; + struct aws_rr_subscription_manager *manager = context; + + if (aws_hash_table_get_entry_count(&record->listeners) == 0) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - checking subscription ('" PRInSTR "') for removal", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor)); + + if (manager->is_protocol_client_connected) { + s_subscription_record_unsubscribe(manager, record, false); + } + + if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action == ARRSPAT_NOTHING) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - deleting subscription ('" PRInSTR "')", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor)); + + s_aws_rr_subscription_record_destroy(record); + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; + } + } + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +void aws_rr_subscription_manager_purge_unused(struct aws_rr_subscription_manager *manager) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, "request-response subscription manager - purging unused subscriptions"); + aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_purge_unused_subscriptions_wrapper, manager); +} + +static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscription_event_type type) { + switch (type) { + case ARRSET_REQUEST_SUBSCRIBE_SUCCESS: + return "RequestSubscribeSuccess"; + + case ARRSET_REQUEST_SUBSCRIBE_FAILURE: + return "RequestSubscribeFailure"; + + case ARRSET_REQUEST_SUBSCRIPTION_ENDED: + return "RequestSubscriptionEnded"; + + case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: + return "StreamingSubscriptionEstablished"; + + case ARRSET_STREAMING_SUBSCRIPTION_LOST: + return "StreamingSubscriptionLost"; + + case ARRSET_STREAMING_SUBSCRIPTION_HALTED: + return "StreamingSubscriptionHalted"; + + case ARRSET_UNSUBSCRIBE_COMPLETE: + return "UnsubscribeComplete"; + + case ARRSET_SUBSCRIPTION_EMPTY: + return "SubscriptionEmpty"; + } + + return "Unknown"; +} + +static bool s_subscription_type_matches_event_type( + enum aws_rr_subscription_type subscription_type, + enum aws_rr_subscription_event_type event_type) { + switch (event_type) { + case ARRSET_REQUEST_SUBSCRIBE_SUCCESS: + case ARRSET_REQUEST_SUBSCRIBE_FAILURE: + case ARRSET_REQUEST_SUBSCRIPTION_ENDED: + return subscription_type == ARRST_REQUEST_RESPONSE; + + case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: + case ARRSET_STREAMING_SUBSCRIPTION_LOST: + case ARRSET_STREAMING_SUBSCRIPTION_HALTED: + return subscription_type == ARRST_EVENT_STREAM; + + default: + return true; + } +} + +static void s_emit_subscription_event( + const struct aws_rr_subscription_manager *manager, + const struct aws_rr_subscription_record *record, + enum aws_rr_subscription_event_type type) { + + AWS_FATAL_ASSERT(s_subscription_type_matches_event_type(record->type, type)); + + for (struct aws_hash_iter iter = aws_hash_iter_begin(&record->listeners); !aws_hash_iter_done(&iter); + aws_hash_iter_next(&iter)) { + + struct aws_rr_subscription_listener *listener = iter.element.value; + struct aws_rr_subscription_status_event event = { + .type = type, + .topic_filter = record->topic_filter_cursor, + .operation_id = listener->operation_id, + }; + + (*manager->config.subscription_status_callback)(&event, manager->config.userdata); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - subscription event for ('" PRInSTR + "'), type: %s, operation: %" PRIu64 "", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor), + s_rr_subscription_event_type_to_c_str(type), + listener->operation_id); + } +} + +static int s_rr_activate_idle_subscription( + struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record) { + int result = AWS_OP_SUCCESS; + + if (record->poisoned) { + /* + * Don't try and establish poisoned subscriptions. This is not an error or a loggable event, it just means + * we hit a "try and make subscriptions" event when a poisoned subscription still hadn't been fully released. + */ + return AWS_OP_SUCCESS; + } + + if (manager->is_protocol_client_connected && aws_hash_table_get_entry_count(&record->listeners) > 0) { + if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action == ARRSPAT_NOTHING) { + struct aws_protocol_adapter_subscribe_options subscribe_options = { + .topic_filter = record->topic_filter_cursor, + .ack_timeout_seconds = manager->config.operation_timeout_seconds, + }; + + result = aws_mqtt_protocol_adapter_subscribe(manager->protocol_adapter, &subscribe_options); + if (result == AWS_OP_SUCCESS) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - initiating subscribe operation for ('" PRInSTR "')", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor)); + record->pending_action = ARRSPAT_SUBSCRIBING; + } else { + int error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - synchronous failure subscribing to ('" PRInSTR + "'), ec %d(%s)", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor), + error_code, + aws_error_debug_str(error_code)); + + if (record->type == ARRST_REQUEST_RESPONSE) { + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE); + } else { + record->poisoned = true; + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED); + } + } + } + } + + return result; +} + +enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_acquire_subscription_options *options) { + + if (options->topic_filter_count == 0) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire_subscription for operation %" PRIu64 + " with no topic filters", + options->operation_id); + return AASRT_FAILURE; + } + + /* + * Check for poisoned or mismatched records. This has precedence over the following unsubscribing check, + * and so we put them in separate loops + */ + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); + if (existing_record == NULL) { + continue; + } + + if (existing_record->poisoned) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 + " failed - existing subscription is poisoned and has not been released", + AWS_BYTE_CURSOR_PRI(topic_filter), + options->operation_id); + return AASRT_FAILURE; + } + + if (existing_record->type != options->type) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 + " failed - conflicts with subscription type of existing subscription", + AWS_BYTE_CURSOR_PRI(topic_filter), + options->operation_id); + return AASRT_FAILURE; + } + } + + /* blocked if an existing record is unsubscribing; also compute how many subscriptions are needed */ + size_t subscriptions_needed = 0; + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); + if (existing_record != NULL) { + if (existing_record->pending_action == ARRSPAT_UNSUBSCRIBING) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for ('" PRInSTR + "'), operation %" PRIu64 " blocked - existing subscription is unsubscribing", + AWS_BYTE_CURSOR_PRI(topic_filter), + options->operation_id); + return AASRT_BLOCKED; + } + } else { + ++subscriptions_needed; + } + } + + /* Check for space and fail or block as appropriate */ + if (subscriptions_needed > 0) { + /* how much of the budget are we using? */ + struct aws_subscription_stats stats; + s_get_subscription_stats(manager, &stats); + + if (options->type == ARRST_REQUEST_RESPONSE) { + if (subscriptions_needed > + manager->config.max_request_response_subscriptions - stats.request_response_subscriptions) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for request operation %" PRIu64 + " blocked - no room currently", + options->operation_id); + return AASRT_BLOCKED; + } + } else { + /* + * Streaming subscriptions have more complicated space-checking logic. Under certain conditions, we may + * block rather than failing + */ + if (subscriptions_needed + stats.event_stream_subscriptions > manager->config.max_streaming_subscriptions) { + if (subscriptions_needed + stats.event_stream_subscriptions <= + manager->config.max_streaming_subscriptions + stats.unsubscribing_event_stream_subscriptions) { + /* If enough subscriptions are in the process of going away then wait in the blocked state */ + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for streaming operation %" PRIu64 + " blocked - no room currently", + options->operation_id); + return AASRT_BLOCKED; + } else { + /* Otherwise, there's no hope, fail */ + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " failed - no room", + options->operation_id); + return AASRT_NO_CAPACITY; + } + } + } + } + + bool is_fully_subscribed = true; + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); + + if (existing_record == NULL) { + existing_record = s_aws_rr_subscription_new(manager->allocator, topic_filter, options->type); + aws_hash_table_put(&manager->subscriptions, &existing_record->topic_filter_cursor, existing_record, NULL); + } + + s_add_listener_to_subscription_record(existing_record, options->operation_id); + if (existing_record->status != ARRSST_SUBSCRIBED) { + is_fully_subscribed = false; + } + } + + if (is_fully_subscribed) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " fully subscribed - all required subscriptions are active", + options->operation_id); + return AASRT_SUBSCRIBED; + } + + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); + + if (s_rr_activate_idle_subscription(manager, existing_record)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " failed - synchronous subscribe failure", + options->operation_id); + return AASRT_FAILURE; + } + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " subscribing - waiting on one or more subscribes to complete", + options->operation_id); + + return AASRT_SUBSCRIBING; +} + +void aws_rr_subscription_manager_release_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_release_subscription_options *options) { + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + s_remove_listener_from_subscription_record(manager, topic_filter, options->operation_id); + } +} + +static void s_handle_protocol_adapter_request_subscription_event( + struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record, + const struct aws_protocol_adapter_subscription_event *event) { + if (event->event_type == AWS_PASET_SUBSCRIBE) { + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_SUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; + + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_SUBSCRIBED; + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_SUCCESS); + } else { + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE); + } + } else { + AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE); + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_UNSUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; + + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_NOT_SUBSCRIBED; + + struct aws_rr_subscription_status_event unsubscribe_event = { + .type = ARRSET_UNSUBSCRIBE_COMPLETE, + .topic_filter = record->topic_filter_cursor, + .operation_id = 0, + }; + + (*manager->config.subscription_status_callback)(&unsubscribe_event, manager->config.userdata); + } + } +} + +static void s_handle_protocol_adapter_streaming_subscription_event( + struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record, + const struct aws_protocol_adapter_subscription_event *event) { + if (event->event_type == AWS_PASET_SUBSCRIBE) { + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_SUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; + + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_SUBSCRIBED; + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED); + } else { + if (event->retryable) { + s_rr_activate_idle_subscription(manager, record); + } else { + record->poisoned = true; + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED); + } + } + } else { + AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE); + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_UNSUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; + + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_NOT_SUBSCRIBED; + + struct aws_rr_subscription_status_event unsubscribe_event = { + .type = ARRSET_UNSUBSCRIBE_COMPLETE, + .topic_filter = record->topic_filter_cursor, + .operation_id = 0, + }; + + (*manager->config.subscription_status_callback)(&unsubscribe_event, manager->config.userdata); + } + } +} + +void aws_rr_subscription_manager_on_protocol_adapter_subscription_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_subscription_event *event) { + struct aws_rr_subscription_record *record = s_get_subscription_record(manager, event->topic_filter); + if (record == NULL) { + return; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - received a protocol adapter subscription event for ('" PRInSTR + "'), type %s, error_code %d(%s)", + AWS_BYTE_CURSOR_PRI(event->topic_filter), + aws_protocol_adapter_subscription_event_type_to_c_str(event->event_type), + event->error_code, + aws_error_debug_str(event->error_code)); + + if (record->type == ARRST_REQUEST_RESPONSE) { + s_handle_protocol_adapter_request_subscription_event(manager, record, event); + } else { + s_handle_protocol_adapter_streaming_subscription_event(manager, record, event); + } +} + +static int s_rr_activate_idle_subscriptions_wrapper(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_record *record = elem->value; + struct aws_rr_subscription_manager *manager = context; + + s_rr_activate_idle_subscription(manager, record); + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +static void s_activate_idle_subscriptions(struct aws_rr_subscription_manager *manager) { + aws_hash_table_foreach(&manager->subscriptions, s_rr_activate_idle_subscriptions_wrapper, manager); +} + +static int s_apply_session_lost_wrapper(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_record *record = elem->value; + struct aws_rr_subscription_manager *manager = context; + + if (record->status == ARRSST_SUBSCRIBED) { + record->status = ARRSST_NOT_SUBSCRIBED; + + if (record->type == ARRST_REQUEST_RESPONSE) { + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_ENDED); + + if (record->pending_action != ARRSPAT_UNSUBSCRIBING) { + s_aws_rr_subscription_record_destroy(record); + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; + } + } else { + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_LOST); + } + } + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +static int s_apply_streaming_resubscribe_wrapper(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_record *record = elem->value; + struct aws_rr_subscription_manager *manager = context; + + if (record->type == ARRST_EVENT_STREAM) { + s_rr_activate_idle_subscription(manager, record); + } + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +static void s_apply_session_lost(struct aws_rr_subscription_manager *manager) { + aws_hash_table_foreach(&manager->subscriptions, s_apply_session_lost_wrapper, manager); + aws_hash_table_foreach(&manager->subscriptions, s_apply_streaming_resubscribe_wrapper, manager); +} + +void aws_rr_subscription_manager_on_protocol_adapter_connection_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_connection_event *event) { + + if (event->event_type == AWS_PACET_CONNECTED) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - received a protocol adapter connection event, joined_session: " + "%d", + (int)(event->joined_session ? 1 : 0)); + + manager->is_protocol_client_connected = true; + if (!event->joined_session) { + s_apply_session_lost(manager); + } + + aws_rr_subscription_manager_purge_unused(manager); + s_activate_idle_subscriptions(manager); + } else { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - received a protocol adapter disconnection event"); + + manager->is_protocol_client_connected = false; + } +} + +bool aws_rr_subscription_manager_are_options_valid(const struct aws_rr_subscription_manager_options *options) { + if (options == NULL || options->max_request_response_subscriptions < 2 || options->operation_timeout_seconds == 0) { + return false; + } + + return true; +} + +void aws_rr_subscription_manager_init( + struct aws_rr_subscription_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter *protocol_adapter, + const struct aws_rr_subscription_manager_options *options) { + AWS_ZERO_STRUCT(*manager); + + AWS_FATAL_ASSERT(aws_rr_subscription_manager_are_options_valid(options)); + + manager->allocator = allocator; + manager->config = *options; + manager->protocol_adapter = protocol_adapter; + + aws_hash_table_init( + &manager->subscriptions, + allocator, + options->max_request_response_subscriptions + options->max_streaming_subscriptions, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_subscription_record_destroy); + + manager->is_protocol_client_connected = aws_mqtt_protocol_adapter_is_connected(protocol_adapter); +} + +void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager) { + aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_clean_up_foreach_wrap, manager); + aws_hash_table_clean_up(&manager->subscriptions); +} diff --git a/contrib/restricted/aws/aws-c-mqtt/source/shared_constants.c b/contrib/restricted/aws/aws-c-mqtt/source/shared.c index 8d260799598..e84c4afe5f3 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/shared_constants.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/shared.c @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ -#include <aws/mqtt/private/shared_constants.h> +#include <aws/mqtt/private/shared.h> #include <aws/http/request_response.h> diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c index e6da6d3d397..33bad2e6ecd 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c @@ -13,7 +13,7 @@ #include <aws/io/channel_bootstrap.h> #include <aws/io/event_loop.h> #include <aws/mqtt/private/client_impl_shared.h> -#include <aws/mqtt/private/shared_constants.h> +#include <aws/mqtt/private/shared.h> #include <aws/mqtt/private/v5/mqtt5_client_impl.h> #include <aws/mqtt/private/v5/mqtt5_options_storage.h> #include <aws/mqtt/private/v5/mqtt5_utils.h> diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c index 04170394e30..ebb45b81b22 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c @@ -84,6 +84,7 @@ struct aws_mqtt5_listener *aws_mqtt5_listener_new( struct aws_allocator *allocator, struct aws_mqtt5_listener_config *config) { if (config->client == NULL) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); return NULL; } diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c index 6510c6ab21f..3cddac179e8 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c @@ -3623,7 +3623,7 @@ void aws_mqtt5_client_options_storage_log( log_handle, level, AWS_LS_MQTT5_GENERAL, - "id=%p: mqtt5_client_options_storage disabling websockets", + "id=%p: aws_mqtt5_client_options_storage disabling websockets", (void *)options_storage); } @@ -3669,7 +3669,7 @@ void aws_mqtt5_client_options_storage_log( log_handle, level, AWS_LS_MQTT5_GENERAL, - "id=%p: mqtt5_client_options_storage reconnect delay min set to %" PRIu64 " ms, max set to %" PRIu64 " ms", + "id=%p: aws_mqtt5_client_options_storage reconnect delay min set to %" PRIu64 " ms, max set to %" PRIu64 " ms", (void *)options_storage, options_storage->min_reconnect_delay_ms, options_storage->max_reconnect_delay_ms); diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c index bd90a6581fa..691d2c9a491 100644 --- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c +++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c @@ -2850,6 +2850,18 @@ error: return 0; } +static enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_5_get_impl(const void *impl) { + (void)impl; + + return AWS_MQTT311_IT_5_ADAPTER; +} + +static struct aws_event_loop *s_aws_mqtt_client_connection_5_get_event_loop(const void *impl) { + const struct aws_mqtt_client_connection_5_impl *adapter = impl; + + return adapter->client->loop; +} + static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_vtable = { .acquire_fn = s_aws_mqtt_client_connection_5_acquire, .release_fn = s_aws_mqtt_client_connection_5_release, @@ -2873,6 +2885,8 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_v .unsubscribe_fn = s_aws_mqtt_client_connection_5_unsubscribe, .publish_fn = s_aws_mqtt_client_connection_5_publish, .get_stats_fn = s_aws_mqtt_client_connection_5_get_stats, + .get_impl_type = s_aws_mqtt_client_connection_5_get_impl, + .get_event_loop = s_aws_mqtt_client_connection_5_get_event_loop, }; static struct aws_mqtt_client_connection_vtable *s_aws_mqtt_client_connection_5_vtable_ptr = diff --git a/contrib/restricted/aws/aws-c-mqtt/ya.make b/contrib/restricted/aws/aws-c-mqtt/ya.make index 8f460436aa3..d13474e9718 100644 --- a/contrib/restricted/aws/aws-c-mqtt/ya.make +++ b/contrib/restricted/aws/aws-c-mqtt/ya.make @@ -6,9 +6,9 @@ LICENSE(Apache-2.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(0.10.4) +VERSION(0.13.0) -ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-mqtt/archive/v0.10.4.tar.gz) +ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-mqtt/archive/v0.13.0.tar.gz) PEERDIR( contrib/restricted/aws/aws-c-common @@ -49,9 +49,14 @@ SRCS( source/fixed_header.c source/mqtt.c source/mqtt311_decoder.c + source/mqtt311_listener.c source/mqtt_subscription_set.c source/packets.c - source/shared_constants.c + source/request-response/protocol_adapter.c + source/request-response/request_response_client.c + source/request-response/request_response_subscription_set.c + source/request-response/subscription_manager.c + source/shared.c source/topic_tree.c source/v5/mqtt5_callbacks.c source/v5/mqtt5_client.c |
