summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-contrib <[email protected]>2025-05-15 11:55:04 +0300
committerrobot-contrib <[email protected]>2025-05-15 12:10:01 +0300
commit5569fa802a72982a60a1a53697dc2e535c2c13e8 (patch)
treee66068d6db87bab720a248f9e5ca2f3da0fde1f9
parentf9bf93a1d46559e7c9f99d23ae31befd03476139 (diff)
Update contrib/restricted/aws/aws-c-mqtt to 0.13.0
commit_hash:900d7ae27d5925a212ae7faea8bcdad8f42ef947
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report14
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report16
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt2
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix4
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/README.md6
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h10
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h12
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h54
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h24
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/mqtt311_listener.h204
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/protocol_adapter.h211
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/request_response_subscription_set.h140
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/subscription_manager.h264
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared.h (renamed from contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared_constants.h)2
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/request-response/request_response_client.h290
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h3
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/client.c273
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c6
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c9
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/mqtt.c34
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/mqtt311_listener.c329
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/packets.c5
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/request-response/protocol_adapter.c973
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_client.c2121
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_subscription_set.c336
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/request-response/subscription_manager.c822
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/shared.c (renamed from contrib/restricted/aws/aws-c-mqtt/source/shared_constants.c)2
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c2
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c1
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c4
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c14
-rw-r--r--contrib/restricted/aws/aws-c-mqtt/ya.make11
32 files changed, 6111 insertions, 87 deletions
diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report
index fb8c16b3fff..7edd55c4b9a 100644
--- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report
+++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.copyrights.report
@@ -50,10 +50,14 @@ BELONGS ya.make
include/aws/mqtt/private/client_impl_shared.h [5:5]
include/aws/mqtt/private/fixed_header.h [5:5]
include/aws/mqtt/private/mqtt311_decoder.h [5:5]
+ include/aws/mqtt/private/mqtt311_listener.h [2:2]
include/aws/mqtt/private/mqtt_client_test_helper.h [4:4]
include/aws/mqtt/private/mqtt_subscription_set.h [2:2]
include/aws/mqtt/private/packets.h [5:5]
- include/aws/mqtt/private/shared_constants.h [2:2]
+ include/aws/mqtt/private/request-response/protocol_adapter.h [5:5]
+ include/aws/mqtt/private/request-response/request_response_subscription_set.h [5:5]
+ include/aws/mqtt/private/request-response/subscription_manager.h [5:5]
+ include/aws/mqtt/private/shared.h [2:2]
include/aws/mqtt/private/topic_tree.h [5:5]
include/aws/mqtt/private/v5/mqtt5_callbacks.h [2:2]
include/aws/mqtt/private/v5/mqtt5_client_impl.h [5:5]
@@ -64,6 +68,7 @@ BELONGS ya.make
include/aws/mqtt/private/v5/mqtt5_topic_alias.h [2:2]
include/aws/mqtt/private/v5/mqtt5_utils.h [5:5]
include/aws/mqtt/private/v5/rate_limiters.h [2:2]
+ include/aws/mqtt/request-response/request_response_client.h [5:5]
include/aws/mqtt/v5/mqtt5_client.h [5:5]
include/aws/mqtt/v5/mqtt5_listener.h [2:2]
include/aws/mqtt/v5/mqtt5_packet_storage.h [5:5]
@@ -74,9 +79,14 @@ BELONGS ya.make
source/fixed_header.c [2:2]
source/mqtt.c [2:2]
source/mqtt311_decoder.c [2:2]
+ source/mqtt311_listener.c [2:2]
source/mqtt_subscription_set.c [2:2]
source/packets.c [2:2]
- source/shared_constants.c [2:2]
+ source/request-response/protocol_adapter.c [2:2]
+ source/request-response/request_response_client.c [2:2]
+ source/request-response/request_response_subscription_set.c [2:2]
+ source/request-response/subscription_manager.c [2:2]
+ source/shared.c [2:2]
source/topic_tree.c [2:2]
source/v5/mqtt5_callbacks.c [2:2]
source/v5/mqtt5_client.c [2:2]
diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report
index 6fc0570ab0d..051c89ca1a9 100644
--- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report
+++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/devtools.licenses.report
@@ -73,7 +73,7 @@ KEEP Apache-2.0 6c901454b872854c0dea3ec06b67701a
BELONGS ya.make
License text:
\## License
- This library is licensed under the Apache 2.0 License.
+ This library is licensed under the Apache 2.0 License.
Scancode info:
Original SPDX id: Apache-2.0
Score : 100.00
@@ -99,10 +99,14 @@ BELONGS ya.make
include/aws/mqtt/private/client_impl_shared.h [6:6]
include/aws/mqtt/private/fixed_header.h [6:6]
include/aws/mqtt/private/mqtt311_decoder.h [6:6]
+ include/aws/mqtt/private/mqtt311_listener.h [3:3]
include/aws/mqtt/private/mqtt_client_test_helper.h [5:5]
include/aws/mqtt/private/mqtt_subscription_set.h [3:3]
include/aws/mqtt/private/packets.h [6:6]
- include/aws/mqtt/private/shared_constants.h [3:3]
+ include/aws/mqtt/private/request-response/protocol_adapter.h [6:6]
+ include/aws/mqtt/private/request-response/request_response_subscription_set.h [6:6]
+ include/aws/mqtt/private/request-response/subscription_manager.h [6:6]
+ include/aws/mqtt/private/shared.h [3:3]
include/aws/mqtt/private/topic_tree.h [6:6]
include/aws/mqtt/private/v5/mqtt5_callbacks.h [3:3]
include/aws/mqtt/private/v5/mqtt5_client_impl.h [6:6]
@@ -113,6 +117,7 @@ BELONGS ya.make
include/aws/mqtt/private/v5/mqtt5_topic_alias.h [3:3]
include/aws/mqtt/private/v5/mqtt5_utils.h [6:6]
include/aws/mqtt/private/v5/rate_limiters.h [3:3]
+ include/aws/mqtt/request-response/request_response_client.h [6:6]
include/aws/mqtt/v5/mqtt5_client.h [6:6]
include/aws/mqtt/v5/mqtt5_listener.h [3:3]
include/aws/mqtt/v5/mqtt5_packet_storage.h [6:6]
@@ -123,9 +128,14 @@ BELONGS ya.make
source/fixed_header.c [3:3]
source/mqtt.c [3:3]
source/mqtt311_decoder.c [3:3]
+ source/mqtt311_listener.c [3:3]
source/mqtt_subscription_set.c [3:3]
source/packets.c [3:3]
- source/shared_constants.c [3:3]
+ source/request-response/protocol_adapter.c [3:3]
+ source/request-response/request_response_client.c [3:3]
+ source/request-response/request_response_subscription_set.c [3:3]
+ source/request-response/subscription_manager.c [3:3]
+ source/shared.c [3:3]
source/topic_tree.c [3:3]
source/v5/mqtt5_callbacks.c [3:3]
source/v5/mqtt5_client.c [3:3]
diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt
index fff5fc6ca1f..05e74d27d6c 100644
--- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt
+++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/licenses.list.txt
@@ -208,7 +208,7 @@
====================Apache-2.0====================
## License
-This library is licensed under the Apache 2.0 License.
+This library is licensed under the Apache 2.0 License.
====================Apache-2.0====================
diff --git a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix
index f02bd667fdb..9b4a3b26076 100644
--- a/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix
+++ b/contrib/restricted/aws/aws-c-mqtt/.yandex_meta/override.nix
@@ -1,10 +1,10 @@
pkgs: attrs: with pkgs; with attrs; rec {
- version = "0.10.4";
+ version = "0.13.0";
src = fetchFromGitHub {
owner = "awslabs";
repo = "aws-c-mqtt";
rev = "v${version}";
- hash = "sha256-i+ssZzHC8MPfyOaRqvjq0z7w772BJqIA6BwntW1fRek=";
+ hash = "sha256-mDtQ5H8PUExil2y70LaUPKx/PNqTscUnY6CQoYNfKY4=";
};
}
diff --git a/contrib/restricted/aws/aws-c-mqtt/README.md b/contrib/restricted/aws/aws-c-mqtt/README.md
index 2f99277e399..6a6a7bf582f 100644
--- a/contrib/restricted/aws/aws-c-mqtt/README.md
+++ b/contrib/restricted/aws/aws-c-mqtt/README.md
@@ -4,13 +4,13 @@ C99 implementation of the MQTT 3.1.1 and MQTT 5 specifications.
## License
-This library is licensed under the Apache 2.0 License.
+This library is licensed under the Apache 2.0 License.
## Usage
### Building
-CMake 3.1+ is required to build.
+CMake 3.9+ is required to build.
`<install-path>` must be an absolute path in the following instructions.
@@ -203,6 +203,6 @@ the wire. For QoS 1, as soon as PUBACK comes back. For QoS 2, PUBCOMP. `topic` a
```c
int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection *connection);
```
-Sends a PINGREQ packet to the server.
+Sends a PINGREQ packet to the server.
[aws-c-io]: https://github.com/awslabs/aws-c-io
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h
index d87f4760f0a..b5d83e928fa 100644
--- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/exports.h
@@ -4,7 +4,7 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
-#if defined(USE_WINDOWS_DLL_SEMANTICS) || defined(WIN32)
+#if defined(AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined(_WIN32)
# ifdef AWS_MQTT_USE_IMPORT_EXPORT
# ifdef AWS_MQTT_EXPORTS
# define AWS_MQTT_API __declspec(dllexport)
@@ -15,13 +15,13 @@
# define AWS_MQTT_API
# endif /* USE_IMPORT_EXPORT */
-#else /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */
-# if ((__GNUC__ >= 4) || defined(__clang__)) && defined(AWS_MQTT_USE_IMPORT_EXPORT) && defined(AWS_MQTT_EXPORTS)
+#else /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */
+# if defined(AWS_MQTT_USE_IMPORT_EXPORT) && defined(AWS_MQTT_EXPORTS)
# define AWS_MQTT_API __attribute__((visibility("default")))
# else
# define AWS_MQTT_API
-# endif /* __GNUC__ >= 4 || defined(__clang__) */
+# endif
-#endif /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */
+#endif /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */
#endif /* AWS_MQTT_EXPORTS_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h
index 38510c8c80e..d71d96a5d55 100644
--- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/mqtt.h
@@ -82,6 +82,17 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS,
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE,
+ AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE,
+ AWS_ERROR_MQTT_REUQEST_RESPONSE_STREAM_ALREADY_ACTIVATED,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_PAYLOAD_PARSE_ERROR,
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_INVALID_RESPONSE_PATH,
AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
@@ -94,6 +105,7 @@ enum aws_mqtt_log_subject {
AWS_LS_MQTT5_CLIENT,
AWS_LS_MQTT5_CANARY,
AWS_LS_MQTT5_TO_MQTT3_ADAPTER,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
};
/** Function called on cleanup of a userdata. */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h
index 1d0dd67a0c2..ab9822c3b0b 100644
--- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl.h
@@ -11,6 +11,7 @@
#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/fixed_header.h>
#include <aws/mqtt/private/mqtt311_decoder.h>
+#include <aws/mqtt/private/mqtt311_listener.h>
#include <aws/mqtt/private/topic_tree.h>
#include <aws/common/hash_table.h>
@@ -153,6 +154,25 @@ struct aws_mqtt_reconnect_task {
struct aws_allocator *allocator;
};
+struct request_timeout_wrapper;
+
+/* used for timeout task */
+struct request_timeout_task_arg {
+ uint16_t packet_id;
+ struct aws_mqtt_client_connection_311_impl *connection;
+ struct request_timeout_wrapper *task_arg_wrapper;
+};
+
+/*
+ * We want the timeout task to be able to destroy the forward reference from the operation's task arg structure
+ * to the timeout task. But the operation task arg structures don't have any data structure in common. So to allow
+ * the timeout to refer back to a zero-able forward pointer, we wrap a pointer to the timeout task and embed it
+ * in every operation's task arg that needs to create a timeout.
+ */
+struct request_timeout_wrapper {
+ struct request_timeout_task_arg *timeout_task_arg;
+};
+
/* The lifetime of this struct is from subscribe -> suback */
struct subscribe_task_arg {
@@ -172,6 +192,9 @@ struct subscribe_task_arg {
aws_mqtt_suback_fn *single;
} on_suback;
void *on_suback_ud;
+
+ struct request_timeout_wrapper timeout_wrapper;
+ uint64_t timeout_duration_in_ns;
};
/* The lifetime of this struct is the same as the lifetime of the subscription */
@@ -255,6 +278,9 @@ struct aws_mqtt_client_connection_311_impl {
aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics;
void *on_any_operation_statistics_ud;
+ /* listener callbacks */
+ struct aws_mqtt311_callback_set_manager callback_manager;
+
/* Connection tasks. */
struct aws_mqtt_reconnect_task *reconnect_task;
struct aws_channel_task ping_task;
@@ -425,4 +451,32 @@ void aws_mqtt_connection_statistics_change_operation_statistic_state(
AWS_MQTT_API const struct aws_mqtt_client_connection_packet_handlers *aws_mqtt311_get_default_packet_handlers(void);
+AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_unsubscribe(
+ struct aws_mqtt_client_connection_311_impl *connection,
+ const struct aws_byte_cursor *topic_filter,
+ aws_mqtt_op_complete_fn *on_unsuback,
+ void *on_unsuback_ud,
+ uint64_t timeout_ns);
+
+AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_subscribe(
+ struct aws_mqtt_client_connection_311_impl *connection,
+ const struct aws_byte_cursor *topic_filter,
+ enum aws_mqtt_qos qos,
+ aws_mqtt_client_publish_received_fn *on_publish,
+ void *on_publish_ud,
+ aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
+ aws_mqtt_suback_fn *on_suback,
+ void *on_suback_ud,
+ uint64_t timeout_ns);
+
+AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_publish(
+ struct aws_mqtt_client_connection_311_impl *connection,
+ const struct aws_byte_cursor *topic,
+ enum aws_mqtt_qos qos,
+ bool retain,
+ const struct aws_byte_cursor *payload,
+ aws_mqtt_op_complete_fn *on_complete,
+ void *userdata,
+ uint64_t timeout_ns);
+
#endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h
index d244cfe7c66..248b4658e60 100644
--- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/client_impl_shared.h
@@ -10,6 +10,20 @@
struct aws_mqtt_client_connection;
+/*
+ * Internal enum that indicates what type of struct the underlying impl pointer actually is. We use this
+ * to safely interact with private APIs on the implementation or extract the adapted 5 client directly, as
+ * necessary.
+ */
+enum aws_mqtt311_impl_type {
+
+ /* 311 connection impl can be cast to `struct aws_mqtt_client_connection_311_impl` */
+ AWS_MQTT311_IT_311_CONNECTION,
+
+ /* 311 connection impl can be cast to `struct aws_mqtt_client_connection_5_impl`*/
+ AWS_MQTT311_IT_5_ADAPTER,
+};
+
struct aws_mqtt_client_connection_vtable {
struct aws_mqtt_client_connection *(*acquire_fn)(void *impl);
@@ -107,6 +121,10 @@ struct aws_mqtt_client_connection_vtable {
void *userdata);
int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats);
+
+ enum aws_mqtt311_impl_type (*get_impl_type)(const void *impl);
+
+ struct aws_event_loop *(*get_event_loop)(const void *impl);
};
struct aws_mqtt_client_connection {
@@ -114,10 +132,16 @@ struct aws_mqtt_client_connection {
void *impl;
};
+AWS_MQTT_API enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(
+ const struct aws_mqtt_client_connection *connection);
+
AWS_MQTT_API uint64_t aws_mqtt_hash_uint16_t(const void *item);
AWS_MQTT_API bool aws_mqtt_compare_uint16_t_eq(const void *a, const void *b);
AWS_MQTT_API bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b);
+AWS_MQTT_API struct aws_event_loop *aws_mqtt_client_connection_get_event_loop(
+ const struct aws_mqtt_client_connection *connection);
+
#endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_SHARED_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/mqtt311_listener.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/mqtt311_listener.h
new file mode 100644
index 00000000000..c66e4f246ec
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/mqtt311_listener.h
@@ -0,0 +1,204 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#ifndef AWS_MQTT_MQTT311_LISTENER_H
+#define AWS_MQTT_MQTT311_LISTENER_H
+
+#include <aws/mqtt/mqtt.h>
+
+#include <aws/common/rw_lock.h>
+#include <aws/mqtt/client.h>
+
+AWS_PUSH_SANE_WARNING_LEVEL
+
+/**
+ * Callback signature for when an mqtt311 listener has completely destroyed itself.
+ */
+typedef void(aws_mqtt311_listener_termination_completion_fn)(void *complete_ctx);
+
+/**
+ * A record that tracks MQTT311 client connection callbacks which can be dynamically injected via a listener.
+ *
+ * All the callbacks that are supported here are invoked only on the 311 connection's event loop. With the
+ * add/remove callback set also on the event loop, everything is correctly serialized without data races.
+ *
+ * If binding additional callbacks, they must only be invoked from the connection's event loop.
+ *
+ * We only listen to connection-success because the only connection-level event we care about is a failure
+ * to rejoin a session (which invalidates all subscriptions that were considered valid)
+ */
+struct aws_mqtt311_callback_set {
+
+ /* Called from s_packet_handler_publish which is event-loop invoked */
+ aws_mqtt_client_publish_received_fn *publish_received_handler;
+
+ /* Called from s_packet_handler_connack which is event-loop invoked */
+ aws_mqtt_client_on_connection_success_fn *connection_success_handler;
+
+ /* Called from s_mqtt_client_shutdown which is event-loop invoked */
+ aws_mqtt_client_on_connection_interrupted_fn *connection_interrupted_handler;
+
+ /* Called from s_mqtt_client_shutdown which is event-loop invoked */
+ aws_mqtt_client_on_disconnect_fn *disconnect_handler;
+
+ void *user_data;
+};
+
+/**
+ * An internal type for managing chains of callbacks attached to an mqtt311 client connection. Supports chains for
+ * lifecycle events and incoming publish packet handling.
+ *
+ * Assumed to be owned and used only by an MQTT311 client connection.
+ */
+struct aws_mqtt311_callback_set_manager {
+ struct aws_allocator *allocator;
+
+ struct aws_mqtt_client_connection *connection;
+
+ struct aws_linked_list callback_set_entries;
+
+ uint64_t next_callback_set_entry_id;
+};
+
+/**
+ * Configuration options for MQTT311 listener objects.
+ */
+struct aws_mqtt311_listener_config {
+
+ /**
+ * MQTT311 client connection to listen to events on
+ */
+ struct aws_mqtt_client_connection *connection;
+
+ /**
+ * Callbacks to invoke when events occur on the MQTT311 client connection
+ */
+ struct aws_mqtt311_callback_set listener_callbacks;
+
+ /**
+ * Listener destruction is asynchronous and thus requires a termination callback and associated user data
+ * to notify the user that the listener has been fully destroyed and no further events will be received.
+ */
+ aws_mqtt311_listener_termination_completion_fn *termination_callback;
+ void *termination_callback_user_data;
+};
+
+AWS_EXTERN_C_BEGIN
+
+/**
+ * Creates a new MQTT311 listener object. For as long as the listener lives, incoming publishes and lifecycle events
+ * will be forwarded to the callbacks configured on the listener.
+ *
+ * @param allocator allocator to use
+ * @param config listener configuration
+ * @return a new aws_mqtt311_listener object
+ */
+AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_new(
+ struct aws_allocator *allocator,
+ struct aws_mqtt311_listener_config *config);
+
+/**
+ * Adds a reference to an mqtt311 listener.
+ *
+ * @param listener listener to add a reference to
+ * @return the listener object
+ */
+AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener);
+
+/**
+ * Removes a reference to an mqtt311 listener. When the reference count drops to zero, the listener's asynchronous
+ * destruction will be started.
+ *
+ * @param listener listener to remove a reference from
+ * @return NULL
+ */
+AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener);
+
+/**
+ * Initializes a callback set manager
+ */
+AWS_MQTT_API
+void aws_mqtt311_callback_set_manager_init(
+ struct aws_mqtt311_callback_set_manager *manager,
+ struct aws_allocator *allocator,
+ struct aws_mqtt_client_connection *connection);
+
+/**
+ * Cleans up a callback set manager.
+ *
+ * aws_mqtt311_callback_set_manager_init must have been previously called or this will crash.
+ */
+AWS_MQTT_API
+void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager);
+
+/**
+ * Adds a callback set to the front of the handler chain. Returns an integer id that can be used to selectively
+ * remove the callback set from the manager.
+ *
+ * May only be called on the client's event loop thread.
+ */
+AWS_MQTT_API
+uint64_t aws_mqtt311_callback_set_manager_push_front(
+ struct aws_mqtt311_callback_set_manager *manager,
+ struct aws_mqtt311_callback_set *callback_set);
+
+/**
+ * Removes a callback set from the handler chain.
+ *
+ * May only be called on the client's event loop thread.
+ */
+AWS_MQTT_API
+void aws_mqtt311_callback_set_manager_remove(
+ struct aws_mqtt311_callback_set_manager *manager,
+ uint64_t callback_set_id);
+
+/**
+ * Walks the incoming publish handler chain for an MQTT311 connection, invoking each in sequence.
+ *
+ * May only be called on the client's event loop thread.
+ */
+AWS_MQTT_API
+void aws_mqtt311_callback_set_manager_on_publish_received(
+ struct aws_mqtt311_callback_set_manager *manager,
+ const struct aws_byte_cursor *topic,
+ const struct aws_byte_cursor *payload,
+ bool dup,
+ enum aws_mqtt_qos qos,
+ bool retain);
+
+/**
+ * Invokes a connection success event on each listener in the manager's collection of callback sets.
+ *
+ * May only be called on the client's event loop thread.
+ */
+AWS_MQTT_API
+void aws_mqtt311_callback_set_manager_on_connection_success(
+ struct aws_mqtt311_callback_set_manager *manager,
+ enum aws_mqtt_connect_return_code return_code,
+ bool rejoined_session);
+
+/**
+ * Invokes a connection interrupted event on each listener in the manager's collection of callback sets.
+ *
+ * May only be called on the client's event loop thread.
+ */
+AWS_MQTT_API
+void aws_mqtt311_callback_set_manager_on_connection_interrupted(
+ struct aws_mqtt311_callback_set_manager *manager,
+ int error_code);
+
+/**
+ * Invokes a disconnection event on each listener in the manager's collection of callback sets.
+ *
+ * May only be called on the client's event loop thread.
+ */
+AWS_MQTT_API
+void aws_mqtt311_callback_set_manager_on_disconnect(struct aws_mqtt311_callback_set_manager *manager);
+
+AWS_EXTERN_C_END
+
+AWS_POP_SANE_WARNING_LEVEL
+
+#endif /* AWS_MQTT_MQTT311_LISTENER_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/protocol_adapter.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/protocol_adapter.h
new file mode 100644
index 00000000000..adcc5e1abad
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/protocol_adapter.h
@@ -0,0 +1,211 @@
+#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H
+#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H
+
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/exports.h>
+#include <aws/mqtt/mqtt.h>
+
+#include <aws/common/byte_buf.h>
+
+struct aws_allocator;
+struct aws_event_loop;
+struct aws_mqtt_client_connection;
+struct aws_mqtt5_client;
+struct aws_mqtt_rr_incoming_publish_event;
+
+/*
+ * The request-response protocol adapter is a translation layer that sits between the request-response native client
+ * implementation and a protocol client capable of subscribing, unsubscribing, and publishing MQTT messages.
+ * Valid protocol clients include the CRT MQTT5 client, the CRT MQTT311 client, and an eventstream RPC connection
+ * that belongs to a Greengrass IPC client. Each of these protocol clients has a different (or even implicit)
+ * contract for carrying out pub-sub operations. The protocol adapter abstracts these details with a simple,
+ * minimal interface based on the requirements identified in the request-response design documents.
+ */
+
+/*
+ * Minimal MQTT subscribe options
+ */
+struct aws_protocol_adapter_subscribe_options {
+ struct aws_byte_cursor topic_filter;
+ uint32_t ack_timeout_seconds;
+};
+
+/*
+ * Minimal MQTT unsubscribe options
+ */
+struct aws_protocol_adapter_unsubscribe_options {
+ struct aws_byte_cursor topic_filter;
+ uint32_t ack_timeout_seconds;
+};
+
+/*
+ * Minimal MQTT publish options
+ */
+struct aws_protocol_adapter_publish_options {
+ struct aws_byte_cursor topic;
+ struct aws_byte_cursor payload;
+ uint32_t ack_timeout_seconds;
+
+ /*
+ * Invoked on success/failure of the publish itself. Our implementations use QoS1 which means that success
+ * will be on puback receipt.
+ */
+ void (*completion_callback_fn)(int, void *);
+
+ /*
+ * User data to pass in when invoking the completion callback
+ */
+ void *user_data;
+};
+
+/*
+ * Describes the type of subscription event (relative to a topic filter)
+ */
+enum aws_protocol_adapter_subscription_event_type {
+ AWS_PASET_SUBSCRIBE,
+ AWS_PASET_UNSUBSCRIBE,
+};
+
+/*
+ * An event emitted by the protocol adapter when a subscribe or unsubscribe is completed by the adapted protocol
+ * client.
+ */
+struct aws_protocol_adapter_subscription_event {
+ struct aws_byte_cursor topic_filter;
+ enum aws_protocol_adapter_subscription_event_type event_type;
+ int error_code;
+ bool retryable;
+};
+
+enum aws_protocol_adapter_connection_event_type {
+ AWS_PACET_CONNECTED,
+ AWS_PACET_DISCONNECTED,
+};
+
+/*
+ * An event emitted by the protocol adapter whenever the protocol client's connection status changes
+ */
+struct aws_protocol_adapter_connection_event {
+ enum aws_protocol_adapter_connection_event_type event_type;
+ bool joined_session;
+};
+
+typedef void(aws_protocol_adapter_subscription_event_fn)(
+ const struct aws_protocol_adapter_subscription_event *event,
+ void *user_data);
+
+typedef void(aws_protocol_adapter_incoming_publish_fn)(
+ const struct aws_mqtt_rr_incoming_publish_event *publish,
+ void *user_data);
+
+typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);
+
+typedef void(aws_protocol_adapter_connection_event_fn)(
+ const struct aws_protocol_adapter_connection_event *event,
+ void *user_data);
+
+/*
+ * Set of callbacks invoked by the protocol adapter. These must all be set.
+ */
+struct aws_mqtt_protocol_adapter_options {
+ aws_protocol_adapter_subscription_event_fn *subscription_event_callback;
+ aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback;
+ aws_protocol_adapter_terminate_callback_fn *terminate_callback;
+ aws_protocol_adapter_connection_event_fn *connection_event_callback;
+
+ /*
+ * User data to pass into all singleton protocol adapter callbacks. Likely either the request-response client
+ * or the subscription manager component of the request-response client.
+ */
+ void *user_data;
+};
+
+struct aws_mqtt_protocol_adapter_vtable {
+
+ void (*aws_mqtt_protocol_adapter_destroy_fn)(void *);
+
+ int (*aws_mqtt_protocol_adapter_subscribe_fn)(void *, struct aws_protocol_adapter_subscribe_options *);
+
+ int (*aws_mqtt_protocol_adapter_unsubscribe_fn)(void *, struct aws_protocol_adapter_unsubscribe_options *);
+
+ int (*aws_mqtt_protocol_adapter_publish_fn)(void *, struct aws_protocol_adapter_publish_options *);
+
+ bool (*aws_mqtt_protocol_adapter_is_connected_fn)(void *);
+
+ struct aws_event_loop *(*aws_mqtt_protocol_adapter_get_event_loop_fn)(void *);
+};
+
+struct aws_mqtt_protocol_adapter {
+ const struct aws_mqtt_protocol_adapter_vtable *vtable;
+ void *impl;
+};
+
+AWS_EXTERN_C_BEGIN
+
+/*
+ * Creates a new request-response protocol adapter from an MQTT311 client
+ */
+AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(
+ struct aws_allocator *allocator,
+ struct aws_mqtt_protocol_adapter_options *options,
+ struct aws_mqtt_client_connection *connection);
+
+/*
+ * Creates a new request-response protocol adapter from an MQTT5 client
+ */
+AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
+ struct aws_allocator *allocator,
+ struct aws_mqtt_protocol_adapter_options *options,
+ struct aws_mqtt5_client *client);
+
+/*
+ * Destroys a request-response protocol adapter. Destruction is an asynchronous process and the caller must
+ * wait for the termination callback to be invoked before assuming that no further callbacks will be invoked.
+ */
+AWS_MQTT_API void aws_mqtt_protocol_adapter_destroy(struct aws_mqtt_protocol_adapter *adapter);
+
+/*
+ * Asks the adapted protocol client to perform an MQTT subscribe operation
+ */
+AWS_MQTT_API int aws_mqtt_protocol_adapter_subscribe(
+ struct aws_mqtt_protocol_adapter *adapter,
+ struct aws_protocol_adapter_subscribe_options *options);
+
+/*
+ * Asks the adapted protocol client to perform an MQTT unsubscribe operation
+ */
+AWS_MQTT_API int aws_mqtt_protocol_adapter_unsubscribe(
+ struct aws_mqtt_protocol_adapter *adapter,
+ struct aws_protocol_adapter_unsubscribe_options *options);
+
+/*
+ * Asks the adapted protocol client to perform an MQTT publish operation
+ */
+AWS_MQTT_API int aws_mqtt_protocol_adapter_publish(
+ struct aws_mqtt_protocol_adapter *adapter,
+ struct aws_protocol_adapter_publish_options *options);
+
+/*
+ * Synchronously checks the connection state of the adapted protocol client. May only be called from the
+ * protocol client's event loop.
+ */
+AWS_MQTT_API bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter);
+
+/*
+ * Returns the event loop that the protocol client is bound to.
+ */
+AWS_MQTT_API struct aws_event_loop *aws_mqtt_protocol_adapter_get_event_loop(struct aws_mqtt_protocol_adapter *adapter);
+
+AWS_MQTT_API const char *aws_protocol_adapter_subscription_event_type_to_c_str(
+ enum aws_protocol_adapter_subscription_event_type type);
+
+AWS_MQTT_API const char *aws_protocol_adapter_connection_event_type_to_c_str(
+ enum aws_protocol_adapter_connection_event_type type);
+
+AWS_EXTERN_C_END
+
+#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/request_response_subscription_set.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/request_response_subscription_set.h
new file mode 100644
index 00000000000..e24c220b6c8
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/request_response_subscription_set.h
@@ -0,0 +1,140 @@
+#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H
+#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H
+
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/common/byte_buf.h>
+#include <aws/common/hash_table.h>
+#include <aws/common/linked_list.h>
+#include <aws/mqtt/exports.h>
+
+struct aws_mqtt_rr_incoming_publish_event;
+
+/*
+ * Handles subscriptions for request-response client.
+ * Lifetime of this struct is bound to request-response client.
+ */
+struct aws_request_response_subscriptions {
+ struct aws_allocator *allocator;
+
+ /*
+ * Map from cursor (topic filter) -> list of streaming operations using that filter
+ *
+ * We don't garbage collect this table over the course of normal client operation. We only clean it up
+ * when the client is shutting down.
+ */
+ struct aws_hash_table streaming_operation_subscription_lists;
+
+ /*
+ * Map from cursor (topic filter with wildcards) -> list of streaming operations using that filter
+ *
+ * We don't garbage collect this table over the course of normal client operation. We only clean it up
+ * when the client is shutting down.
+ */
+ struct aws_hash_table streaming_operation_wildcards_subscription_lists;
+
+ /*
+ * Map from cursor (topic) -> request response path (topic, correlation token json path)
+ */
+ struct aws_hash_table request_response_paths;
+};
+
+/*
+ * This is the (key and) value in stream subscriptions tables.
+ */
+struct aws_rr_operation_list_topic_filter_entry {
+ struct aws_allocator *allocator;
+
+ struct aws_byte_cursor topic_filter_cursor;
+ struct aws_byte_buf topic_filter;
+
+ struct aws_linked_list operations;
+};
+
+/*
+ * Value in request subscriptions table.
+ */
+struct aws_rr_response_path_entry {
+ struct aws_allocator *allocator;
+
+ size_t ref_count;
+
+ struct aws_byte_cursor topic_cursor;
+ struct aws_byte_buf topic;
+
+ struct aws_byte_buf correlation_token_json_path;
+};
+
+/*
+ * Callback type for matched stream subscriptions.
+ */
+typedef void(aws_mqtt_stream_operation_subscription_match_fn)(
+ const struct aws_linked_list *operations,
+ const struct aws_byte_cursor *topic_filter,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ void *user_data);
+
+/*
+ * Callback type for matched request subscriptions.
+ */
+typedef void(aws_mqtt_request_operation_subscription_match_fn)(
+ struct aws_rr_response_path_entry *entry,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ void *user_data);
+
+AWS_EXTERN_C_BEGIN
+
+/*
+ * Initialize internal state of a provided request-response subscriptions structure.
+ */
+AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_init(
+ struct aws_request_response_subscriptions *subscriptions,
+ struct aws_allocator *allocator);
+
+/*
+ * Clean up internals of a provided request-response subscriptions structure.
+ */
+AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_clean_up(
+ struct aws_request_response_subscriptions *subscriptions);
+
+/*
+ * Add a subscription for stream operations.
+ * If subscription with the same topic filter is already added, previously created
+ * aws_rr_operation_list_topic_filter_entry instance is returned.
+ */
+AWS_MQTT_API struct aws_rr_operation_list_topic_filter_entry *
+ aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
+ struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_byte_cursor *topic_filter);
+
+/*
+ * Add a subscription for request operation.
+ */
+AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
+ struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_byte_cursor *topic_filter,
+ const struct aws_byte_cursor *correlation_token_json_path);
+
+/*
+ * Remove a subscription for a given request operation.
+ */
+AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
+ struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_byte_cursor *topic_filter);
+
+/*
+ * Call specified callbacks for all stream and request operations with subscriptions matching a provided publish event.
+ */
+AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_handle_incoming_publish(
+ const struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
+ aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match,
+ void *user_data);
+
+AWS_EXTERN_C_END
+
+#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_SET_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/subscription_manager.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/subscription_manager.h
new file mode 100644
index 00000000000..da70b4c9aad
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/request-response/subscription_manager.h
@@ -0,0 +1,264 @@
+#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H
+#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H
+
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/mqtt.h>
+
+#include <aws/common/hash_table.h>
+
+struct aws_mqtt_protocol_adapter;
+struct aws_protocol_adapter_connection_event;
+struct aws_protocol_adapter_subscription_event;
+
+/*
+ * Describes a change to the state of a request operation subscription
+ */
+enum aws_rr_subscription_event_type {
+
+ /*
+ * A request subscription subscribe succeeded
+ */
+ ARRSET_REQUEST_SUBSCRIBE_SUCCESS,
+
+ /*
+ * A request subscription subscribe failed
+ */
+ ARRSET_REQUEST_SUBSCRIBE_FAILURE,
+
+ /*
+ * A previously successful request subscription has ended.
+ *
+ * Under normal circumstances this can happen when
+ *
+ * (1) failure to rejoin a session
+ */
+ ARRSET_REQUEST_SUBSCRIPTION_ENDED,
+
+ /*
+ * A streaming subscription subscribe succeeded
+ */
+ ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED,
+
+ /*
+ * The protocol client failed to rejoin a session containing a previously-established streaming subscription
+ */
+ ARRSET_STREAMING_SUBSCRIPTION_LOST,
+
+ /*
+ * A streaming subscription subscribe attempt resulted in an error or reason code that the client has determined
+ * will result in indefinite failures to subscribe. In this case, we stop attempting to resubscribe.
+ *
+ * Situations that can lead to this:
+ * (1) Permission failures
+ * (2) Invalid topic filter
+ */
+ ARRSET_STREAMING_SUBSCRIPTION_HALTED,
+
+ /*
+ * A subscription has lost its last listener and can be purged
+ *
+ * This event is global; operation_id will always be zero.
+ */
+ ARRSET_SUBSCRIPTION_EMPTY,
+
+ /*
+ * A subscription has been unsubscribed from
+ *
+ * This event is global; operation_id will always be zero.
+ */
+ ARRSET_UNSUBSCRIBE_COMPLETE,
+};
+
+struct aws_rr_subscription_status_event {
+ enum aws_rr_subscription_event_type type;
+ struct aws_byte_cursor topic_filter;
+ uint64_t operation_id;
+};
+
+/*
+ * Invariant: despite being on the same thread, these callbacks must be queued as cross-thread tasks on the native
+ * request-response client. This allows us to iterate internal collections without worrying about external
+ * callers disrupting things by invoking APIs back on us.
+ */
+typedef void(
+ aws_rr_subscription_status_event_callback_fn)(const struct aws_rr_subscription_status_event *event, void *userdata);
+
+struct aws_rr_subscription_manager_options {
+
+ /*
+ * Maximum number of request-response subscriptions allowed. Must be at least two.
+ */
+ size_t max_request_response_subscriptions;
+
+ /*
+ * Maximum number of streaming subscriptions allowed.
+ */
+ size_t max_streaming_subscriptions;
+
+ /*
+ * Ack timeout to use for all subscribe and unsubscribe operations
+ */
+ uint32_t operation_timeout_seconds;
+
+ aws_rr_subscription_status_event_callback_fn *subscription_status_callback;
+ void *userdata;
+};
+
+/*
+ * The subscription manager works with the request-response client to handle subscriptions in an eager manner.
+ * Subscription purges are checked with every client service call. Unsubscribe failures don't trigger anything special,
+ * we'll just try again next time we look for subscription space. Subscribes are attempted on idle subscriptions
+ * that still need them, either in response to a new operation listener or a connection resumption event.
+ *
+ * We only allow one subscribe or unsubscribe to be outstanding at once for a given topic. If an operation requires a
+ * subscription while an unsubscribe is in progress the operation is blocked until the unsubscribe resolves.
+ *
+ * These invariants are dropped during shutdown. In that case, we immediately send unsubscribes for everything
+ * that is not already unsubscribing.
+ */
+struct aws_rr_subscription_manager {
+ struct aws_allocator *allocator;
+
+ struct aws_rr_subscription_manager_options config;
+
+ /* non-owning reference; the client is responsible for destroying this asynchronously (listener detachment) */
+ struct aws_mqtt_protocol_adapter *protocol_adapter;
+
+ /* &aws_rr_subscription_record.topic_filter_cursor -> aws_rr_subscription_record * */
+ struct aws_hash_table subscriptions;
+
+ bool is_protocol_client_connected;
+};
+
+enum aws_rr_subscription_type {
+ ARRST_EVENT_STREAM,
+ ARRST_REQUEST_RESPONSE,
+};
+
+struct aws_rr_acquire_subscription_options {
+ struct aws_byte_cursor *topic_filters;
+ size_t topic_filter_count;
+
+ uint64_t operation_id;
+ enum aws_rr_subscription_type type;
+};
+
+struct aws_rr_release_subscription_options {
+ struct aws_byte_cursor *topic_filters;
+ size_t topic_filter_count;
+
+ uint64_t operation_id;
+};
+
+enum aws_acquire_subscription_result_type {
+
+ /*
+ * All requested subscriptions already exist and are active. The operation can proceed to the next stage.
+ */
+ AASRT_SUBSCRIBED,
+
+ /*
+ * The requested subscriptions now exist but at least one is not yet active. The operation must wait for subscribes
+ * to complete as success or failure.
+ */
+ AASRT_SUBSCRIBING,
+
+ /*
+ * At least one subscription does not exist and there is no room for it currently. Room may open up in the future,
+ * so the operation should wait.
+ */
+ AASRT_BLOCKED,
+
+ /*
+ * At least one subscription does not exist and there is no room for it. Unless an event stream subscription gets
+ * closed, no room will be available in the future. The operation should be failed.
+ */
+ AASRT_NO_CAPACITY,
+
+ /*
+ * An internal failure occurred while trying to establish subscriptions. The operation should be failed.
+ */
+ AASRT_FAILURE
+};
+
+AWS_EXTERN_C_BEGIN
+
+/*
+ * Initializes a subscription manager. Every native request-response client owns a single subscription manager.
+ */
+AWS_MQTT_API void aws_rr_subscription_manager_init(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_allocator *allocator,
+ struct aws_mqtt_protocol_adapter *protocol_adapter,
+ const struct aws_rr_subscription_manager_options *options);
+
+/*
+ * Cleans up a subscription manager. This is done early in the native request-response client shutdown process.
+ * After this API is called, no other subscription manager APIs will be called by the request-response client (during
+ * the rest of the asynchronous shutdown process).
+ */
+AWS_MQTT_API void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager);
+
+/*
+ * Requests the the subscription manager unsubscribe from all currently-unused subscriptions
+ */
+AWS_MQTT_API void aws_rr_subscription_manager_purge_unused(struct aws_rr_subscription_manager *manager);
+
+/*
+ * Signals to the subscription manager that the native request-response client is processing an operation that
+ * needs a subscription to a particular topic. Return value indicates to the request-response client how it should
+ * proceed with processing the operation.
+ */
+AWS_MQTT_API enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_rr_acquire_subscription_options *options);
+
+/*
+ * Signals to the subscription manager that the native request-response client operation no longer
+ * needs a subscription to a particular topic.
+ */
+AWS_MQTT_API void aws_rr_subscription_manager_release_subscription(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_rr_release_subscription_options *options);
+
+/*
+ * Notifies the subscription manager of a subscription status event. Invoked by the native request-response client
+ * that owns the subscription manager. The native request-response client also owns the protocol adapter that
+ * the subscription event originates from, so the control flow looks like:
+ *
+ * [Subscribe]
+ * subscription manager -> protocol adapter Subscribe -> protocol client Subscribe -> network...
+ *
+ * [Result]
+ * protocol client Suback/Timeout/Error -> protocol adapter -> native request-response client ->
+ * subscription manager (this API)
+ */
+AWS_MQTT_API void aws_rr_subscription_manager_on_protocol_adapter_subscription_event(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_protocol_adapter_subscription_event *event);
+
+/*
+ * Notifies the subscription manager of a connection status event. Invoked by the native request-response client
+ * that owns the subscription manager. The native request-response client also owns the protocol adapter that
+ * the connection event originates from. The control flow looks like:
+ *
+ * protocol client connect/disconnect -> protocol adapter -> native request-response client ->
+ * Subscription manager (this API)
+ */
+AWS_MQTT_API void aws_rr_subscription_manager_on_protocol_adapter_connection_event(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_protocol_adapter_connection_event *event);
+
+/*
+ * Checks subscription manager options for validity.
+ */
+AWS_MQTT_API bool aws_rr_subscription_manager_are_options_valid(
+ const struct aws_rr_subscription_manager_options *options);
+
+AWS_EXTERN_C_END
+
+#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared_constants.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared.h
index 0a835942a5e..a1165d8effd 100644
--- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared_constants.h
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/private/shared.h
@@ -8,6 +8,8 @@
#include <aws/mqtt/mqtt.h>
+#include <aws/mqtt/private/request-response/subscription_manager.h>
+
AWS_EXTERN_C_BEGIN
AWS_MQTT_API extern const struct aws_byte_cursor *g_websocket_handshake_default_path;
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/request-response/request_response_client.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/request-response/request_response_client.h
new file mode 100644
index 00000000000..e9a75e11966
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/request-response/request_response_client.h
@@ -0,0 +1,290 @@
+#ifndef AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H
+#define AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H
+
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/mqtt.h>
+
+struct aws_event_loop;
+struct aws_mqtt_request_response_client;
+struct aws_mqtt_client_connection;
+struct aws_mqtt5_client;
+
+/*
+ * A response path is a pair of values - MQTT topic and a JSON path - that describe where a response to
+ * an MQTT-based request may arrive. For a given request type, there may be multiple response paths and each
+ * one is associated with a separate JSON schema for the response body.
+ */
+struct aws_mqtt_request_operation_response_path {
+
+ /*
+ * MQTT topic that a response may arrive on.
+ */
+ struct aws_byte_cursor topic;
+
+ /*
+ * JSON path for finding correlation tokens within payloads that arrive on this path's topic.
+ */
+ struct aws_byte_cursor correlation_token_json_path;
+};
+
+/*
+ * An event emitted by a streaming operation's subscription.
+ */
+struct aws_mqtt_rr_incoming_publish_event {
+ struct aws_byte_cursor payload;
+ struct aws_byte_cursor topic;
+ /* Below are MQTT optional fields. For MQTT3, they will always be empty, as MQTT3 does not support them. For MQTT5,
+ * they will be set if they are present in a packet. */
+ const struct aws_byte_cursor *content_type;
+ size_t user_property_count;
+ const struct aws_mqtt5_user_property *user_properties;
+ /* Even though this field is supposed to be used by MQTT broker to determine if a message-to-be-sent is expired,
+ * certain services use this field to specify client-side timeouts. */
+ const uint32_t *message_expiry_interval_seconds;
+};
+
+/*
+ * Callback signature for request-response completion.
+ *
+ * Invariants:
+ * If error_code is non-zero then publish_event will be NULL.
+ * If publish_event is not NULL, then error_code will be 0.
+ */
+typedef void(aws_mqtt_request_operation_completion_fn)(
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ int error_code,
+ void *user_data);
+
+/*
+ * Configuration options for a request-response operation.
+ */
+struct aws_mqtt_request_operation_options {
+
+ /*
+ * Set of topic filters that should be subscribed to in order to cover all possible response paths. Sometimes
+ * we can use wildcards to cut down on the subscriptions needed; sometimes we can't.
+ */
+ struct aws_byte_cursor *subscription_topic_filters;
+ size_t subscription_topic_filter_count;
+
+ /*
+ * Set of all possible response paths associated with this request type
+ */
+ struct aws_mqtt_request_operation_response_path *response_paths;
+ size_t response_path_count;
+
+ /*
+ * Topic to publish the request to once response subscriptions have been established.
+ */
+ struct aws_byte_cursor publish_topic;
+
+ /*
+ * Payload to publish in order to initiate the request
+ */
+ struct aws_byte_cursor serialized_request;
+
+ /*
+ * Correlation token embedded in the request that must be found in a response message. This can be the
+ * empty cursor to support certain services which don't use correlation tokens. In this case, the client
+ * only allows one request at a time to use the associated subscriptions; no concurrency is possible. There
+ * are some optimizations we could make here but for now, they're not worth the complexity.
+ */
+ struct aws_byte_cursor correlation_token;
+
+ /*
+ * Callback (and associated user data) to invoke when the request is completed.
+ */
+ aws_mqtt_request_operation_completion_fn *completion_callback;
+ void *user_data;
+};
+
+/*
+ * Describes a change to the state of a streaming operation subscription
+ */
+enum aws_rr_streaming_subscription_event_type {
+
+ /*
+ * The streaming operation is successfully subscribed to its topic (filter)
+ */
+ ARRSSET_SUBSCRIPTION_ESTABLISHED,
+
+ /*
+ * The streaming operation has temporarily lost its subscription to its topic (filter)
+ */
+ ARRSSET_SUBSCRIPTION_LOST,
+
+ /*
+ * The streaming operation has entered a terminal state where it has given up trying to subscribe
+ * to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy).
+ */
+ ARRSSET_SUBSCRIPTION_HALTED,
+};
+
+/*
+ * Callback signature for when the subscription status of a streaming operation changes.
+ */
+typedef void(aws_mqtt_streaming_operation_subscription_status_fn)(
+ enum aws_rr_streaming_subscription_event_type status,
+ int error_code,
+ void *user_data);
+
+/*
+ * Callback signature for when a publish arrives that matches a streaming operation's subscription
+ */
+typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ void *user_data);
+
+/*
+ * Callback signature for when a streaming operation is fully destroyed and no more events will be emitted.
+ */
+typedef void(aws_mqtt_streaming_operation_terminated_fn)(void *user_data);
+
+/*
+ * Configuration options for a streaming operation.
+ */
+struct aws_mqtt_streaming_operation_options {
+
+ /*
+ * Topic filter that the streaming operation should listen on
+ */
+ struct aws_byte_cursor topic_filter;
+
+ /*
+ * Callback for subscription status events
+ */
+ aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback;
+
+ /*
+ * Callback for publish messages that match the operation's topic filter
+ */
+ aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback;
+
+ /*
+ * Callback for streaming operation final shutdown
+ */
+ aws_mqtt_streaming_operation_terminated_fn *terminated_callback;
+
+ /*
+ * Data passed to all streaming operation callbacks
+ */
+ void *user_data;
+};
+
+typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data);
+typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data);
+
+/*
+ * Request-response client configuration options
+ */
+struct aws_mqtt_request_response_client_options {
+
+ /*
+ * Maximum number of subscriptions that the client will concurrently use for request-response operations
+ */
+ size_t max_request_response_subscriptions;
+
+ /*
+ * Maximum number of subscriptions that the client will concurrently use for streaming operations
+ */
+ size_t max_streaming_subscriptions;
+
+ /*
+ * Duration, in seconds, that a request-response operation will wait for completion before giving up
+ */
+ uint32_t operation_timeout_seconds;
+
+ /*
+ * Request-response client initialization is asynchronous. This callback is invoked when the client is fully
+ * initialized.
+ *
+ * Do not bind the initialized callback; it exists mostly for tests and should not be exposed
+ */
+ aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback;
+
+ /*
+ * Callback invoked when the client's asynchronous destruction process has fully completed.
+ */
+ aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback;
+
+ /*
+ * Arbitrary data to pass to the client callbacks
+ */
+ void *user_data;
+};
+
+AWS_EXTERN_C_BEGIN
+
+/*
+ * Create a new request-response client that uses an MQTT311 client.
+ */
+AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(
+ struct aws_allocator *allocator,
+ struct aws_mqtt_client_connection *client,
+ const struct aws_mqtt_request_response_client_options *options);
+
+/*
+ * Create a new request-response client that uses an MQTT5 client.
+ */
+AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(
+ struct aws_allocator *allocator,
+ struct aws_mqtt5_client *client,
+ const struct aws_mqtt_request_response_client_options *options);
+
+/*
+ * Add a reference to a request-response client
+ */
+AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(
+ struct aws_mqtt_request_response_client *client);
+
+/*
+ * Remove a reference from a request-response client
+ */
+AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(
+ struct aws_mqtt_request_response_client *client);
+
+/*
+ * Submits a request operation to the client
+ */
+AWS_MQTT_API int aws_mqtt_request_response_client_submit_request(
+ struct aws_mqtt_request_response_client *client,
+ const struct aws_mqtt_request_operation_options *request_options);
+
+/*
+ * Creates a new streaming operation. Streaming operations start in an inactive state and must be
+ * activated before its subscription can be established.
+ */
+AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_streaming_operation(
+ struct aws_mqtt_request_response_client *client,
+ const struct aws_mqtt_streaming_operation_options *streaming_options);
+
+/*
+ * Returns the event loop used by the request-response client's protocol client
+ */
+AWS_MQTT_API struct aws_event_loop *aws_mqtt_request_response_client_get_event_loop(
+ struct aws_mqtt_request_response_client *client);
+
+/*
+ * Initiates a streaming operation's subscription process.
+ */
+AWS_MQTT_API int aws_mqtt_rr_client_operation_activate(struct aws_mqtt_rr_client_operation *operation);
+
+/*
+ * Add a reference to a streaming operation.
+ */
+AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(
+ struct aws_mqtt_rr_client_operation *operation);
+
+/*
+ * Remove a reference from a streaming operation
+ */
+AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release(
+ struct aws_mqtt_rr_client_operation *operation);
+
+AWS_EXTERN_C_END
+
+#endif /* AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */
diff --git a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h
index d04d29814b0..5cac371ce88 100644
--- a/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h
+++ b/contrib/restricted/aws/aws-c-mqtt/include/aws/mqtt/v5/mqtt5_client.h
@@ -341,6 +341,7 @@ struct aws_mqtt5_publish_completion_options {
aws_mqtt5_publish_completion_fn *completion_callback;
void *completion_user_data;
+ /** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};
@@ -351,6 +352,7 @@ struct aws_mqtt5_subscribe_completion_options {
aws_mqtt5_subscribe_completion_fn *completion_callback;
void *completion_user_data;
+ /** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};
@@ -361,6 +363,7 @@ struct aws_mqtt5_unsubscribe_completion_options {
aws_mqtt5_unsubscribe_completion_fn *completion_callback;
void *completion_user_data;
+ /** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/client.c b/contrib/restricted/aws/aws-c-mqtt/source/client.c
index c332c4da864..5a3b309cb66 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/client.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/client.c
@@ -7,7 +7,7 @@
#include <aws/mqtt/private/client_impl.h>
#include <aws/mqtt/private/mqtt_client_test_helper.h>
#include <aws/mqtt/private/packets.h>
-#include <aws/mqtt/private/shared_constants.h>
+#include <aws/mqtt/private/shared.h>
#include <aws/mqtt/private/topic_tree.h>
#include <aws/http/proxy.h>
@@ -93,25 +93,6 @@ void mqtt_connection_set_state(
connection->synced_data.state = state;
}
-struct request_timeout_wrapper;
-
-/* used for timeout task */
-struct request_timeout_task_arg {
- uint16_t packet_id;
- struct aws_mqtt_client_connection_311_impl *connection;
- struct request_timeout_wrapper *task_arg_wrapper;
-};
-
-/*
- * We want the timeout task to be able to destroy the forward reference from the operation's task arg structure
- * to the timeout task. But the operation task arg structures don't have any data structure in common. So to allow
- * the timeout to refer back to a zero-able forward pointer, we wrap a pointer to the timeout task and embed it
- * in every operation's task arg that needs to create a timeout.
- */
-struct request_timeout_wrapper {
- struct request_timeout_task_arg *timeout_task_arg;
-};
-
static void s_request_timeout(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
(void)channel_task;
struct request_timeout_task_arg *timeout_task_arg = arg;
@@ -139,8 +120,14 @@ static void s_request_timeout(struct aws_channel_task *channel_task, void *arg,
static struct request_timeout_task_arg *s_schedule_timeout_task(
struct aws_mqtt_client_connection_311_impl *connection,
- uint16_t packet_id) {
- /* schedule a timeout task to run, in case server consider the publish is not received */
+ uint16_t packet_id,
+ uint64_t timeout_duration_in_ns) {
+
+ if (timeout_duration_in_ns == UINT64_MAX || timeout_duration_in_ns == 0 || packet_id == 0) {
+ return NULL;
+ }
+
+ /* schedule a timeout task to run, in case server never sends us an ack */
struct aws_channel_task *request_timeout_task = NULL;
struct request_timeout_task_arg *timeout_task_arg = NULL;
if (!aws_mem_acquire_many(
@@ -161,7 +148,7 @@ static struct request_timeout_task_arg *s_schedule_timeout_task(
aws_mem_release(connection->allocator, timeout_task_arg);
return NULL;
}
- timestamp = aws_add_u64_saturating(timestamp, connection->operation_timeout_ns);
+ timestamp = aws_add_u64_saturating(timestamp, timeout_duration_in_ns);
aws_channel_schedule_task_future(connection->slot->channel, request_timeout_task, timestamp);
return timeout_task_arg;
}
@@ -259,6 +246,7 @@ static void s_mqtt_client_shutdown(
(void)channel;
struct aws_mqtt_client_connection_311_impl *connection = user_data;
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection->loop));
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code);
@@ -404,6 +392,7 @@ static void s_mqtt_client_shutdown(
"id=%p: Connection interrupted, calling callback and attempting reconnect",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_interrupted, error_code);
+ aws_mqtt311_callback_set_manager_on_connection_interrupted(&connection->callback_manager, error_code);
/* In case user called disconnect from the on_interrupted callback */
bool stop_reconnect;
@@ -442,6 +431,7 @@ static void s_mqtt_client_shutdown(
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL);
+ aws_mqtt311_callback_set_manager_on_disconnect(&connection->callback_manager);
break;
case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
AWS_LOGF_DEBUG(
@@ -450,6 +440,7 @@ static void s_mqtt_client_shutdown(
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL);
+ aws_mqtt311_callback_set_manager_on_disconnect(&connection->callback_manager);
break;
case AWS_MQTT_CLIENT_STATE_CONNECTING:
AWS_LOGF_TRACE(
@@ -801,6 +792,8 @@ static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connec
termination_handler_user_data = connection->on_termination_ud;
}
+ aws_mqtt311_callback_set_manager_clean_up(&connection->callback_manager);
+
/* If the reconnect_task isn't freed, free it */
if (connection->reconnect_task) {
aws_mem_release(connection->reconnect_task->allocator, connection->reconnect_task);
@@ -1370,6 +1363,27 @@ error:
return AWS_OP_ERR;
}
+struct mqtt_on_websocket_setup_task_arg {
+ struct aws_allocator *allocator;
+ struct aws_task task;
+ struct aws_mqtt_client_connection_311_impl *connection;
+ int error_code;
+};
+
+static void s_on_websocket_setup_task_fn(struct aws_task *task, void *userdata, enum aws_task_status status) {
+ (void)task;
+ (void)status;
+
+ struct mqtt_on_websocket_setup_task_arg *on_websocket_setup_task_arg = userdata;
+ struct aws_mqtt_client_connection_311_impl *connection = on_websocket_setup_task_arg->connection;
+ int error_code = on_websocket_setup_task_arg->error_code;
+
+ aws_mem_release(on_websocket_setup_task_arg->allocator, on_websocket_setup_task_arg);
+
+ struct aws_websocket_on_connection_setup_data websocket_setup = {.error_code = error_code};
+ s_on_websocket_setup(&websocket_setup, connection);
+}
+
static void s_websocket_handshake_transform_complete(
struct aws_http_message *handshake_request,
int error_code,
@@ -1424,9 +1438,21 @@ static void s_websocket_handshake_transform_complete(
return;
error:;
- /* Proceed to next step, telling it that we failed. */
- struct aws_websocket_on_connection_setup_data websocket_setup = {.error_code = error_code};
- s_on_websocket_setup(&websocket_setup, connection);
+ /* Proceed to next step, telling it that we failed.
+ * s_on_websocket_setup will shutdown MQTT connection, and this MUST happen on the MQTT connection's event loop. */
+ struct mqtt_on_websocket_setup_task_arg *on_websocket_setup_task_arg =
+ aws_mem_calloc(connection->allocator, 1, sizeof(struct mqtt_on_websocket_setup_task_arg));
+ on_websocket_setup_task_arg->allocator = connection->allocator;
+ /* NOTE: No need in acquiring MQTT connection ref counter, as it is already acquired at the start of connection
+ * process. s_on_websocket_setup will release it. */
+ on_websocket_setup_task_arg->connection = connection;
+ on_websocket_setup_task_arg->error_code = error_code;
+ aws_task_init(
+ &on_websocket_setup_task_arg->task,
+ s_on_websocket_setup_task_fn,
+ (void *)on_websocket_setup_task_arg,
+ "on_websocket_setup_task");
+ aws_event_loop_schedule_task_now(connection->loop, &on_websocket_setup_task_arg->task);
}
/*******************************************************************************
@@ -1894,6 +1920,20 @@ static enum aws_mqtt_client_request_state s_subscribe_send(uint16_t packet_id, b
aws_mem_release(message->allocator, message);
}
+ /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
+ * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
+ * fire the on_completion callbacks. */
+ struct request_timeout_task_arg *timeout_task_arg =
+ s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns);
+ if (timeout_task_arg) {
+ /*
+ * Set up mutual references between the operation task args and the timeout task args. Whoever runs first
+ * "wins", does its logic, and then breaks the connection between the two.
+ */
+ task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
+ timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
+ }
+
if (!task_arg->tree_updated) {
aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction);
task_arg->tree_updated = true;
@@ -1959,6 +1999,17 @@ static void s_subscribe_complete(
error_code,
task_arg->on_suback_ud);
}
+
+ /*
+ * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should
+ * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later
+ * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back
+ * pointer.
+ */
+ if (task_arg->timeout_wrapper.timeout_task_arg) {
+ task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL;
+ }
+
for (size_t i = 0; i < list_len; i++) {
aws_array_list_get_at(&task_arg->topics, &topic, i);
s_task_topic_release(topic);
@@ -1991,6 +2042,7 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe_multiple(
task_arg->connection = connection;
task_arg->on_suback.multi = on_suback;
task_arg->on_suback_ud = on_suback_ud;
+ task_arg->timeout_duration_in_ns = connection->operation_timeout_ns;
const size_t num_topics = aws_array_list_length(topic_filters);
@@ -2129,23 +2181,33 @@ static void s_subscribe_single_complete(
error_code,
task_arg->on_suback_ud);
}
+
+ /*
+ * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should
+ * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later
+ * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back
+ * pointer.
+ */
+ if (task_arg->timeout_wrapper.timeout_task_arg) {
+ task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL;
+ }
+
s_task_topic_release(topic);
aws_array_list_clean_up(&task_arg->topics);
aws_mqtt_packet_subscribe_clean_up(&task_arg->subscribe);
aws_mem_release(task_arg->connection->allocator, task_arg);
}
-static uint16_t s_aws_mqtt_client_connection_311_subscribe(
- void *impl,
+uint16_t aws_mqtt_client_connection_311_subscribe(
+ struct aws_mqtt_client_connection_311_impl *connection,
const struct aws_byte_cursor *topic_filter,
enum aws_mqtt_qos qos,
aws_mqtt_client_publish_received_fn *on_publish,
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
- void *on_suback_ud) {
-
- struct aws_mqtt_client_connection_311_impl *connection = impl;
+ void *on_suback_ud,
+ uint64_t timeout_ns) {
AWS_PRECONDITION(connection);
@@ -2174,6 +2236,7 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe(
task_arg->connection = connection;
task_arg->on_suback.single = on_suback;
task_arg->on_suback_ud = on_suback_ud;
+ task_arg->timeout_duration_in_ns = timeout_ns;
/* It stores the pointer */
aws_array_list_init_static(&task_arg->topics, task_topic_storage, 1, sizeof(void *));
@@ -2249,6 +2312,29 @@ handle_error:
return 0;
}
+static uint16_t s_aws_mqtt_client_connection_311_subscribe(
+ void *impl,
+ const struct aws_byte_cursor *topic_filter,
+ enum aws_mqtt_qos qos,
+ aws_mqtt_client_publish_received_fn *on_publish,
+ void *on_publish_ud,
+ aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
+ aws_mqtt_suback_fn *on_suback,
+ void *on_suback_ud) {
+
+ struct aws_mqtt_client_connection_311_impl *connection = impl;
+ return aws_mqtt_client_connection_311_subscribe(
+ connection,
+ topic_filter,
+ qos,
+ on_publish,
+ on_publish_ud,
+ on_ud_cleanup,
+ on_suback,
+ on_suback_ud,
+ connection->operation_timeout_ns);
+}
+
/*******************************************************************************
* Resubscribe
******************************************************************************/
@@ -2353,6 +2439,20 @@ static enum aws_mqtt_client_request_state s_resubscribe_send(
aws_mem_release(message->allocator, message);
}
+ /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
+ * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
+ * fire the on_completion callbacks. */
+ struct request_timeout_task_arg *timeout_task_arg =
+ s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns);
+ if (timeout_task_arg) {
+ /*
+ * Set up mutual references between the operation task args and the timeout task args. Whoever runs first
+ * "wins", does its logic, and then breaks the connection between the two.
+ */
+ task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
+ timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
+ }
+
return AWS_MQTT_CLIENT_REQUEST_ONGOING;
handle_error:
@@ -2416,6 +2516,16 @@ static void s_resubscribe_complete(
clean_up:
+ /*
+ * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should
+ * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later
+ * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back
+ * pointer.
+ */
+ if (task_arg->timeout_wrapper.timeout_task_arg) {
+ task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL;
+ }
+
/* We need to cleanup the subscribe_task_topics, since they are not inserted into the topic tree by resubscribe. We
* take the ownership to clean it up */
for (size_t i = 0; i < list_len; i++) {
@@ -2445,6 +2555,7 @@ static uint16_t s_aws_mqtt_311_resubscribe_existing_topics(
task_arg->connection = connection;
task_arg->on_suback.multi = on_suback;
task_arg->on_suback_ud = on_suback_ud;
+ task_arg->timeout_duration_in_ns = connection->operation_timeout_ns;
/* Calculate the size of the packet.
* The fixed header is 2 bytes and the packet ID is 2 bytes
@@ -2505,6 +2616,7 @@ struct unsubscribe_task_arg {
void *on_unsuback_ud;
struct request_timeout_wrapper timeout_wrapper;
+ uint64_t timeout_duration_in_ns;
};
static enum aws_mqtt_client_request_state s_unsubscribe_send(
@@ -2596,18 +2708,17 @@ static enum aws_mqtt_client_request_state s_unsubscribe_send(
/* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
* invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
* fire the on_completion callbacks. */
- struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(task_arg->connection, packet_id);
- if (!timeout_task_arg) {
- return AWS_MQTT_CLIENT_REQUEST_ERROR;
+ struct request_timeout_task_arg *timeout_task_arg =
+ s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns);
+ if (timeout_task_arg) {
+ /*
+ * Set up mutual references between the operation task args and the timeout task args. Whoever runs first
+ * "wins", does its logic, and then breaks the connection between the two.
+ */
+ task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
+ timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
}
- /*
- * Set up mutual references between the operation task args and the timeout task args. Whoever runs first
- * "wins", does its logic, and then breaks the connection between the two.
- */
- task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
- timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
-
if (!task_arg->tree_updated) {
aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction);
task_arg->tree_updated = true;
@@ -2650,7 +2761,6 @@ static void s_unsubscribe_complete(
*/
if (task_arg->timeout_wrapper.timeout_task_arg) {
task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL;
- task_arg->timeout_wrapper.timeout_task_arg = NULL;
}
if (task_arg->on_unsuback) {
@@ -2662,13 +2772,12 @@ static void s_unsubscribe_complete(
aws_mem_release(task_arg->connection->allocator, task_arg);
}
-static uint16_t s_aws_mqtt_client_connection_311_unsubscribe(
- void *impl,
+uint16_t aws_mqtt_client_connection_311_unsubscribe(
+ struct aws_mqtt_client_connection_311_impl *connection,
const struct aws_byte_cursor *topic_filter,
aws_mqtt_op_complete_fn *on_unsuback,
- void *on_unsuback_ud) {
-
- struct aws_mqtt_client_connection_311_impl *connection = impl;
+ void *on_unsuback_ud,
+ uint64_t timeout_ns) {
AWS_PRECONDITION(connection);
@@ -2688,6 +2797,7 @@ static uint16_t s_aws_mqtt_client_connection_311_unsubscribe(
task_arg->filter = aws_byte_cursor_from_string(task_arg->filter_string);
task_arg->on_unsuback = on_unsuback;
task_arg->on_unsuback_ud = on_unsuback_ud;
+ task_arg->timeout_duration_in_ns = timeout_ns;
/* Calculate the size of the unsubscribe packet.
* The fixed header is always 2 bytes, the packet ID is always 2 bytes
@@ -2723,6 +2833,18 @@ handle_error:
return 0;
}
+static uint16_t s_aws_mqtt_client_connection_311_unsubscribe(
+ void *impl,
+ const struct aws_byte_cursor *topic_filter,
+ aws_mqtt_op_complete_fn *on_unsuback,
+ void *on_unsuback_ud) {
+
+ struct aws_mqtt_client_connection_311_impl *connection = impl;
+
+ return aws_mqtt_client_connection_311_unsubscribe(
+ connection, topic_filter, on_unsuback, on_unsuback_ud, connection->operation_timeout_ns);
+}
+
/*******************************************************************************
* Publish
******************************************************************************/
@@ -2742,6 +2864,7 @@ struct publish_task_arg {
aws_mqtt_op_complete_fn *on_complete;
void *userdata;
+ uint64_t timeout_duration_in_ns;
struct request_timeout_wrapper timeout_wrapper;
};
@@ -2876,15 +2999,13 @@ static enum aws_mqtt_client_request_state s_publish_send(uint16_t packet_id, boo
goto write_payload_chunk;
}
}
- if (!is_qos_0 && connection->operation_timeout_ns != UINT64_MAX) {
- /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
- * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
- * fire fire the on_completion callbacks. */
- struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(connection, packet_id);
- if (!timeout_task_arg) {
- return AWS_MQTT_CLIENT_REQUEST_ERROR;
- }
+ /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
+ * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
+ * fire fire the on_completion callbacks. */
+ struct request_timeout_task_arg *timeout_task_arg =
+ s_schedule_timeout_task(connection, packet_id, task_arg->timeout_duration_in_ns);
+ if (timeout_task_arg != NULL) {
/*
* Set up mutual references between the operation task args and the timeout task args. Whoever runs first
* "wins", does its logic, and then breaks the connection between the two.
@@ -2921,7 +3042,6 @@ static void s_publish_complete(
*/
if (task_arg->timeout_wrapper.timeout_task_arg != NULL) {
task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL;
- task_arg->timeout_wrapper.timeout_task_arg = NULL;
}
aws_byte_buf_clean_up(&task_arg->payload_buf);
@@ -2929,16 +3049,15 @@ static void s_publish_complete(
aws_mem_release(connection->allocator, task_arg);
}
-static uint16_t s_aws_mqtt_client_connection_311_publish(
- void *impl,
+uint16_t aws_mqtt_client_connection_311_publish(
+ struct aws_mqtt_client_connection_311_impl *connection,
const struct aws_byte_cursor *topic,
enum aws_mqtt_qos qos,
bool retain,
const struct aws_byte_cursor *payload,
aws_mqtt_op_complete_fn *on_complete,
- void *userdata) {
-
- struct aws_mqtt_client_connection_311_impl *connection = impl;
+ void *userdata,
+ uint64_t timeout_ns) {
AWS_PRECONDITION(connection);
@@ -2962,6 +3081,7 @@ static uint16_t s_aws_mqtt_client_connection_311_publish(
arg->topic = aws_byte_cursor_from_string(arg->topic_string);
arg->qos = qos;
arg->retain = retain;
+ arg->timeout_duration_in_ns = timeout_ns;
struct aws_byte_cursor payload_cursor;
AWS_ZERO_STRUCT(payload_cursor);
@@ -3019,6 +3139,21 @@ handle_error:
return 0;
}
+static uint16_t s_aws_mqtt_client_connection_311_publish(
+ void *impl,
+ const struct aws_byte_cursor *topic,
+ enum aws_mqtt_qos qos,
+ bool retain,
+ const struct aws_byte_cursor *payload,
+ aws_mqtt_op_complete_fn *on_complete,
+ void *userdata) {
+
+ struct aws_mqtt_client_connection_311_impl *connection = impl;
+
+ return aws_mqtt_client_connection_311_publish(
+ connection, topic, qos, retain, payload, on_complete, userdata, connection->operation_timeout_ns);
+}
+
/*******************************************************************************
* Ping
******************************************************************************/
@@ -3220,6 +3355,18 @@ static void s_aws_mqtt_client_connection_311_release(void *impl) {
aws_ref_count_release(&connection->ref_count);
}
+static enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_311_get_impl(const void *impl) {
+ (void)impl;
+
+ return AWS_MQTT311_IT_311_CONNECTION;
+}
+
+static struct aws_event_loop *s_aws_mqtt_client_connection_311_get_event_loop(const void *impl) {
+ const struct aws_mqtt_client_connection_311_impl *connection = impl;
+
+ return connection->loop;
+}
+
static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311_vtable = {
.acquire_fn = s_aws_mqtt_client_connection_311_acquire,
.release_fn = s_aws_mqtt_client_connection_311_release,
@@ -3243,6 +3390,8 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311
.unsubscribe_fn = s_aws_mqtt_client_connection_311_unsubscribe,
.publish_fn = s_aws_mqtt_client_connection_311_publish,
.get_stats_fn = s_aws_mqtt_client_connection_311_get_stats,
+ .get_impl_type = s_aws_mqtt_client_connection_311_get_impl,
+ .get_event_loop = s_aws_mqtt_client_connection_311_get_event_loop,
};
static struct aws_mqtt_client_connection_vtable *s_aws_mqtt_client_connection_311_vtable_ptr =
@@ -3344,6 +3493,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt
connection->handler.vtable = aws_mqtt_get_client_channel_vtable();
connection->handler.impl = connection;
+ aws_mqtt311_callback_set_manager_init(&connection->callback_manager, connection->allocator, &connection->base);
+
return &connection->base;
failed_init_outstanding_requests_table:
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c b/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c
index 2719f41c6c3..0fe344ec53b 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/client_channel_handler.c
@@ -263,6 +263,9 @@ static int s_packet_handler_connack(struct aws_byte_cursor message_cursor, void
MQTT_CLIENT_CALL_CALLBACK_ARGS(
connection, on_connection_success, connack.connect_return_code, connack.session_present);
+ aws_mqtt311_callback_set_manager_on_connection_success(
+ &connection->callback_manager, connack.connect_return_code, connack.session_present);
+
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: connection callback completed", (void *)connection);
s_update_next_ping_time(connection);
@@ -291,6 +294,9 @@ static int s_packet_handler_publish(struct aws_byte_cursor message_cursor, void
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_any_publish, &publish.topic_name, &publish.payload, dup, qos, retain);
+ aws_mqtt311_callback_set_manager_on_publish_received(
+ &connection->callback_manager, &publish.topic_name, &publish.payload, dup, qos, retain);
+
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: publish received with msg id=%" PRIu16 " dup=%d qos=%d retain=%d payload-size=%zu topic=" PRInSTR,
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c b/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c
index 6f65eb88e34..019adc5c7b9 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/client_impl_shared.c
@@ -204,6 +204,11 @@ int aws_mqtt_client_connection_get_stats(
return (*connection->vtable->get_stats_fn)(connection->impl, stats);
}
+enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(
+ const struct aws_mqtt_client_connection *connection) {
+ return (*connection->vtable->get_impl_type)(connection->impl);
+}
+
uint64_t aws_mqtt_hash_uint16_t(const void *item) {
return *(uint16_t *)item;
}
@@ -218,3 +223,7 @@ bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b) {
return aws_byte_cursor_eq(a_cursor, b_cursor);
}
+
+struct aws_event_loop *aws_mqtt_client_connection_get_event_loop(const struct aws_mqtt_client_connection *connection) {
+ return (*connection->vtable->get_event_loop)(connection->impl);
+}
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c b/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c
index 9caeaa9bbaa..392dda20a5c 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/mqtt.c
@@ -236,6 +236,39 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE,
"MQTT ack packet received with a failing reason code"),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,
+ "MQTT operation returned a failing reason code"),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,
+ "Request operation failed due to client shut down"),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT,
+ "Request operation failed due to timeout"),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY,
+ "Streaming request operation failed because there was no space for the subscription"),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE,
+ "Request operation failed because the associated subscribe failed synchronously"),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR,
+ "Request operation failed due to a non-specific internal error within the client."),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE,
+ "Request-response operation failed because the associated publish failed synchronously."),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REUQEST_RESPONSE_STREAM_ALREADY_ACTIVATED,
+ "Streaming operation activation failed because the operation had already been activated."),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR,
+ "Request-response operation failed with a modeled service error."),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_PAYLOAD_PARSE_ERROR,
+ "Request-response operation failed due to an inability to parse the payload."),
+ AWS_DEFINE_ERROR_INFO_MQTT(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_INVALID_RESPONSE_PATH,
+ "Request-response operation failed due to arrival on an unknown response path"),
};
/* clang-format on */
#undef AWS_DEFINE_ERROR_INFO_MQTT
@@ -254,6 +287,7 @@ static struct aws_error_info_list s_error_list = {
DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CLIENT, "mqtt5-client", "MQTT5 client and connections"),
DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CANARY, "mqtt5-canary", "MQTT5 canary logging"),
DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_TO_MQTT3_ADAPTER, "mqtt5-to-mqtt3-adapter", "MQTT5-To-MQTT3 adapter logging"),
+ DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "mqtt-request-response-systems", "MQTT request-response systems logging"),
};
/* clang-format on */
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/mqtt311_listener.c b/contrib/restricted/aws/aws-c-mqtt/source/mqtt311_listener.c
new file mode 100644
index 00000000000..69bec5286b3
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/source/mqtt311_listener.c
@@ -0,0 +1,329 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/private/mqtt311_listener.h>
+
+#include <aws/common/ref_count.h>
+#include <aws/common/task_scheduler.h>
+#include <aws/io/event_loop.h>
+#include <aws/mqtt/private/client_impl.h>
+#include <aws/mqtt/private/client_impl_shared.h>
+
+#include <inttypes.h>
+
+static struct aws_event_loop *s_mqtt_client_connection_get_event_loop(
+ const struct aws_mqtt_client_connection *connection) {
+ AWS_FATAL_ASSERT(aws_mqtt_client_connection_get_impl_type(connection) == AWS_MQTT311_IT_311_CONNECTION);
+
+ struct aws_mqtt_client_connection_311_impl *connection_impl = connection->impl;
+
+ return connection_impl->loop;
+}
+
+struct aws_mqtt311_listener {
+ struct aws_allocator *allocator;
+
+ struct aws_ref_count ref_count;
+
+ struct aws_mqtt311_listener_config config;
+
+ uint64_t callback_set_id;
+
+ struct aws_task initialize_task;
+ struct aws_task terminate_task;
+};
+
+static void s_mqtt311_listener_destroy(struct aws_mqtt311_listener *listener) {
+
+ aws_mqtt_client_connection_release(listener->config.connection);
+
+ aws_mqtt311_listener_termination_completion_fn *termination_callback = listener->config.termination_callback;
+ void *termination_callback_user_data = listener->config.termination_callback_user_data;
+
+ aws_mem_release(listener->allocator, listener);
+
+ if (termination_callback != NULL) {
+ (*termination_callback)(termination_callback_user_data);
+ }
+}
+
+static void s_mqtt311_listener_initialize_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) {
+ (void)task;
+
+ struct aws_mqtt311_listener *listener = arg;
+
+ if (task_status == AWS_TASK_STATUS_RUN_READY) {
+ struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl;
+ listener->callback_set_id = aws_mqtt311_callback_set_manager_push_front(
+ &connection_impl->callback_manager, &listener->config.listener_callbacks);
+ AWS_LOGF_INFO(
+ AWS_LS_MQTT_GENERAL,
+ "id=%p: Mqtt311 Listener initialized, listener id=%p",
+ (void *)listener->config.connection,
+ (void *)listener);
+ aws_mqtt311_listener_release(listener);
+ } else {
+ s_mqtt311_listener_destroy(listener);
+ }
+}
+
+static void s_mqtt311_listener_terminate_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) {
+ (void)task;
+
+ struct aws_mqtt311_listener *listener = arg;
+
+ if (task_status == AWS_TASK_STATUS_RUN_READY) {
+ struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl;
+ aws_mqtt311_callback_set_manager_remove(&connection_impl->callback_manager, listener->callback_set_id);
+ }
+
+ AWS_LOGF_INFO(
+ AWS_LS_MQTT_GENERAL,
+ "id=%p: Mqtt311 Listener terminated, listener id=%p",
+ (void *)listener->config.connection,
+ (void *)listener);
+
+ s_mqtt311_listener_destroy(listener);
+}
+
+static void s_aws_mqtt311_listener_on_zero_ref_count(void *context) {
+ struct aws_mqtt311_listener *listener = context;
+
+ aws_event_loop_schedule_task_now(
+ s_mqtt_client_connection_get_event_loop(listener->config.connection), &listener->terminate_task);
+}
+
+struct aws_mqtt311_listener *aws_mqtt311_listener_new(
+ struct aws_allocator *allocator,
+ struct aws_mqtt311_listener_config *config) {
+ if (config->connection == NULL) {
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ return NULL;
+ }
+
+ if (aws_mqtt_client_connection_get_impl_type(config->connection) != AWS_MQTT311_IT_311_CONNECTION) {
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ return NULL;
+ }
+
+ struct aws_mqtt311_listener *listener = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt311_listener));
+
+ listener->allocator = allocator;
+ listener->config = *config;
+
+ aws_mqtt_client_connection_acquire(config->connection);
+ aws_ref_count_init(&listener->ref_count, listener, s_aws_mqtt311_listener_on_zero_ref_count);
+
+ aws_task_init(
+ &listener->initialize_task, s_mqtt311_listener_initialize_task_fn, listener, "Mqtt311ListenerInitialize");
+ aws_task_init(
+ &listener->terminate_task, s_mqtt311_listener_terminate_task_fn, listener, "Mqtt311ListenerTerminate");
+
+ aws_mqtt311_listener_acquire(listener);
+ aws_event_loop_schedule_task_now(
+ s_mqtt_client_connection_get_event_loop(config->connection), &listener->initialize_task);
+
+ return listener;
+}
+
+struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener) {
+ if (listener != NULL) {
+ aws_ref_count_acquire(&listener->ref_count);
+ }
+
+ return listener;
+}
+
+struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener) {
+ if (listener != NULL) {
+ aws_ref_count_release(&listener->ref_count);
+ }
+
+ return NULL;
+}
+
+struct aws_mqtt311_callback_set_entry {
+ struct aws_allocator *allocator;
+
+ struct aws_linked_list_node node;
+
+ uint64_t id;
+
+ struct aws_mqtt311_callback_set callbacks;
+};
+
+void aws_mqtt311_callback_set_manager_init(
+ struct aws_mqtt311_callback_set_manager *manager,
+ struct aws_allocator *allocator,
+ struct aws_mqtt_client_connection *connection) {
+
+ manager->allocator = allocator;
+ manager->connection = connection; /* no need to ref count, this is assumed to be owned by the client connection */
+ manager->next_callback_set_entry_id = 1;
+
+ aws_linked_list_init(&manager->callback_set_entries);
+}
+
+void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager) {
+ struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
+ while (node != aws_linked_list_end(&manager->callback_set_entries)) {
+ struct aws_mqtt311_callback_set_entry *entry =
+ AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
+ node = aws_linked_list_next(node);
+
+ aws_linked_list_remove(&entry->node);
+ aws_mem_release(entry->allocator, entry);
+ }
+}
+
+static struct aws_mqtt311_callback_set_entry *s_new_311_callback_set_entry(
+ struct aws_mqtt311_callback_set_manager *manager,
+ struct aws_mqtt311_callback_set *callback_set) {
+ struct aws_mqtt311_callback_set_entry *entry =
+ aws_mem_calloc(manager->allocator, 1, sizeof(struct aws_mqtt311_callback_set_entry));
+
+ entry->allocator = manager->allocator;
+ entry->id = manager->next_callback_set_entry_id++;
+ entry->callbacks = *callback_set;
+
+ AWS_LOGF_INFO(
+ AWS_LS_MQTT_GENERAL,
+ "id=%p: MQTT311 callback manager created new entry id=%" PRIu64,
+ (void *)manager->connection,
+ entry->id);
+
+ return entry;
+}
+
+uint64_t aws_mqtt311_callback_set_manager_push_front(
+ struct aws_mqtt311_callback_set_manager *manager,
+ struct aws_mqtt311_callback_set *callback_set) {
+
+ AWS_FATAL_ASSERT(
+ aws_event_loop_thread_is_callers_thread(s_mqtt_client_connection_get_event_loop(manager->connection)));
+
+ struct aws_mqtt311_callback_set_entry *entry = s_new_311_callback_set_entry(manager, callback_set);
+
+ aws_linked_list_push_front(&manager->callback_set_entries, &entry->node);
+
+ return entry->id;
+}
+
+void aws_mqtt311_callback_set_manager_remove(
+ struct aws_mqtt311_callback_set_manager *manager,
+ uint64_t callback_set_id) {
+
+ AWS_FATAL_ASSERT(
+ aws_event_loop_thread_is_callers_thread(s_mqtt_client_connection_get_event_loop(manager->connection)));
+
+ struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
+ while (node != aws_linked_list_end(&manager->callback_set_entries)) {
+ struct aws_mqtt311_callback_set_entry *entry =
+ AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
+ node = aws_linked_list_next(node);
+
+ if (entry->id == callback_set_id) {
+ aws_linked_list_remove(&entry->node);
+
+ AWS_LOGF_INFO(
+ AWS_LS_MQTT_GENERAL,
+ "id=%p: MQTT311 callback manager removed entry id=%" PRIu64,
+ (void *)manager->connection,
+ entry->id);
+ aws_mem_release(entry->allocator, entry);
+ return;
+ }
+ }
+ AWS_LOGF_INFO(
+ AWS_LS_MQTT_GENERAL,
+ "id=%p: MQTT311 callback manager failed to remove entry id=%" PRIu64 ", callback set id not found.",
+ (void *)manager->connection,
+ callback_set_id);
+}
+
+void aws_mqtt311_callback_set_manager_on_publish_received(
+ struct aws_mqtt311_callback_set_manager *manager,
+ const struct aws_byte_cursor *topic,
+ const struct aws_byte_cursor *payload,
+ bool dup,
+ enum aws_mqtt_qos qos,
+ bool retain) {
+
+ struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl;
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop));
+
+ struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
+ while (node != aws_linked_list_end(&manager->callback_set_entries)) {
+ struct aws_mqtt311_callback_set_entry *entry =
+ AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
+ node = aws_linked_list_next(node);
+
+ struct aws_mqtt311_callback_set *callback_set = &entry->callbacks;
+ if (callback_set->publish_received_handler != NULL) {
+ (*callback_set->publish_received_handler)(
+ manager->connection, topic, payload, dup, qos, retain, callback_set->user_data);
+ }
+ }
+}
+
+void aws_mqtt311_callback_set_manager_on_connection_success(
+ struct aws_mqtt311_callback_set_manager *manager,
+ enum aws_mqtt_connect_return_code return_code,
+ bool rejoined_session) {
+
+ struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl;
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop));
+
+ struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
+ while (node != aws_linked_list_end(&manager->callback_set_entries)) {
+ struct aws_mqtt311_callback_set_entry *entry =
+ AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
+ node = aws_linked_list_next(node);
+
+ struct aws_mqtt311_callback_set *callback_set = &entry->callbacks;
+ if (callback_set->connection_success_handler != NULL) {
+ (*callback_set->connection_success_handler)(
+ manager->connection, return_code, rejoined_session, callback_set->user_data);
+ }
+ }
+}
+
+void aws_mqtt311_callback_set_manager_on_connection_interrupted(
+ struct aws_mqtt311_callback_set_manager *manager,
+ int error_code) {
+
+ struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl;
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop));
+
+ struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
+ while (node != aws_linked_list_end(&manager->callback_set_entries)) {
+ struct aws_mqtt311_callback_set_entry *entry =
+ AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
+ node = aws_linked_list_next(node);
+
+ struct aws_mqtt311_callback_set *callback_set = &entry->callbacks;
+ if (callback_set->connection_interrupted_handler != NULL) {
+ (*callback_set->connection_interrupted_handler)(manager->connection, error_code, callback_set->user_data);
+ }
+ }
+}
+
+void aws_mqtt311_callback_set_manager_on_disconnect(struct aws_mqtt311_callback_set_manager *manager) {
+
+ struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl;
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop));
+
+ struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
+ while (node != aws_linked_list_end(&manager->callback_set_entries)) {
+ struct aws_mqtt311_callback_set_entry *entry =
+ AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
+ node = aws_linked_list_next(node);
+
+ struct aws_mqtt311_callback_set *callback_set = &entry->callbacks;
+ if (callback_set->disconnect_handler != NULL) {
+ (*callback_set->disconnect_handler)(manager->connection, callback_set->user_data);
+ }
+ }
+}
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/packets.c b/contrib/restricted/aws/aws-c-mqtt/source/packets.c
index 1170bcca9c6..d208a9de677 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/packets.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/packets.c
@@ -235,9 +235,8 @@ int aws_mqtt_packet_connect_encode(struct aws_byte_buf *buf, const struct aws_mq
}
/* Write connect flags [MQTT-3.1.2.3] */
- uint8_t connect_flags = (uint8_t)(
- packet->clean_session << 1 | packet->has_will << 2 | packet->will_qos << 3 | packet->will_retain << 5 |
- packet->has_password << 6 | packet->has_username << 7);
+ uint8_t connect_flags = (uint8_t)(packet->clean_session << 1 | packet->has_will << 2 | packet->will_qos << 3 |
+ packet->will_retain << 5 | packet->has_password << 6 | packet->has_username << 7);
if (!aws_byte_buf_write_u8(buf, connect_flags)) {
return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/protocol_adapter.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/protocol_adapter.c
new file mode 100644
index 00000000000..b01a7f8a88b
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/protocol_adapter.c
@@ -0,0 +1,973 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/private/request-response/protocol_adapter.h>
+
+#include <aws/common/clock.h>
+#include <aws/io/event_loop.h>
+#include <aws/mqtt/private/client_impl.h>
+#include <aws/mqtt/private/client_impl_shared.h>
+#include <aws/mqtt/private/mqtt311_listener.h>
+#include <aws/mqtt/private/v5/mqtt5_client_impl.h>
+#include <aws/mqtt/private/v5/mqtt5_to_mqtt3_adapter_impl.h>
+#include <aws/mqtt/request-response/request_response_client.h>
+#include <aws/mqtt/v5/mqtt5_client.h>
+#include <aws/mqtt/v5/mqtt5_listener.h>
+
+/*
+ * Basic API contract
+ *
+ * Invariant 1: Subscribe is only called from the RR subscription manager when going from 0 to 1 pending operations
+ * Invariant 2: Unsubscribe is only called from the RR subscription manager when there are 0 pending operations, not
+ * necessarily on the exact transition to zero though.
+ *
+ * Additional Notes
+ *
+ * Entries are not tracked with the exception of the eventstream impl which needs the stream handles to close.
+ *
+ * A subscribe failure does not trigger an unsubscribe, a status event.
+ *
+ * The sub manager is responsible for calling Unsubscribe on all its entries when shutting down
+ * (before releasing hold of the adapter).
+ *
+ * Retries, when appropriate, are the responsibility of the caller.
+ */
+
+enum aws_mqtt_protocol_adapter_operation_type {
+ AMPAOT_SUBSCRIBE_UNSUBSCRIBE,
+ AMPAOT_PUBLISH,
+};
+
+struct aws_mqtt_protocol_adapter_sub_unsub_data {
+ struct aws_byte_buf topic_filter;
+};
+
+struct aws_mqtt_protocol_adapter_publish_data {
+ void (*completion_callback_fn)(int, void *);
+ void *user_data;
+};
+
+struct aws_mqtt_protocol_adapter_operation_userdata {
+ struct aws_allocator *allocator;
+
+ struct aws_linked_list_node node;
+ void *adapter;
+
+ enum aws_mqtt_protocol_adapter_operation_type operation_type;
+
+ union {
+ struct aws_mqtt_protocol_adapter_sub_unsub_data sub_unsub_data;
+ struct aws_mqtt_protocol_adapter_publish_data publish_data;
+ } operation_data;
+};
+
+static struct aws_mqtt_protocol_adapter_operation_userdata *s_aws_mqtt_protocol_adapter_sub_unsub_data_new(
+ struct aws_allocator *allocator,
+ struct aws_byte_cursor topic_filter,
+ void *adapter) {
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_operation_userdata));
+
+ subscribe_data->allocator = allocator;
+ subscribe_data->operation_type = AMPAOT_SUBSCRIBE_UNSUBSCRIBE;
+ subscribe_data->adapter = adapter;
+ aws_byte_buf_init_copy_from_cursor(
+ &subscribe_data->operation_data.sub_unsub_data.topic_filter, allocator, topic_filter);
+
+ return subscribe_data;
+}
+
+static struct aws_mqtt_protocol_adapter_operation_userdata *s_aws_mqtt_protocol_adapter_publish_data_new(
+ struct aws_allocator *allocator,
+ const struct aws_protocol_adapter_publish_options *publish_options,
+ void *adapter) {
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *publish_data =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_operation_userdata));
+
+ publish_data->allocator = allocator;
+ publish_data->operation_type = AMPAOT_PUBLISH;
+ publish_data->adapter = adapter;
+
+ publish_data->operation_data.publish_data.completion_callback_fn = publish_options->completion_callback_fn;
+ publish_data->operation_data.publish_data.user_data = publish_options->user_data;
+
+ return publish_data;
+}
+
+static void s_aws_mqtt_protocol_adapter_operation_user_data_destroy(
+ struct aws_mqtt_protocol_adapter_operation_userdata *userdata) {
+ if (userdata == NULL) {
+ return;
+ }
+
+ if (aws_linked_list_node_next_is_valid(&userdata->node) && aws_linked_list_node_prev_is_valid(&userdata->node)) {
+ aws_linked_list_remove(&userdata->node);
+ }
+
+ if (userdata->operation_type == AMPAOT_SUBSCRIBE_UNSUBSCRIBE) {
+ aws_byte_buf_clean_up(&userdata->operation_data.sub_unsub_data.topic_filter);
+ }
+
+ aws_mem_release(userdata->allocator, userdata);
+}
+
+/*****************************************************************************************************************/
+
+struct aws_mqtt_protocol_adapter_311_impl {
+ struct aws_allocator *allocator;
+ struct aws_mqtt_protocol_adapter base;
+
+ struct aws_linked_list incomplete_operations;
+ struct aws_mqtt_protocol_adapter_options config;
+
+ struct aws_event_loop *loop;
+ struct aws_mqtt_client_connection *connection;
+ struct aws_mqtt311_listener *listener;
+};
+
+static void s_aws_mqtt_protocol_adapter_311_destroy(void *impl) {
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = impl;
+
+ // all the real cleanup is done in the listener termination callback
+ aws_mqtt311_listener_release(adapter->listener);
+}
+
+/* Subscribe */
+
+static void s_protocol_adapter_311_subscribe_completion(
+ struct aws_mqtt_client_connection *connection,
+ uint16_t packet_id,
+ const struct aws_byte_cursor *topic,
+ enum aws_mqtt_qos qos,
+ int error_code,
+ void *userdata) {
+ (void)connection;
+ (void)topic;
+ (void)packet_id;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = userdata;
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = subscribe_data->adapter;
+
+ if (adapter == NULL) {
+ goto done;
+ }
+
+ if (error_code == AWS_ERROR_SUCCESS) {
+ if (qos >= 128) {
+ error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
+ }
+ }
+
+ struct aws_protocol_adapter_subscription_event subscribe_event = {
+ .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->operation_data.sub_unsub_data.topic_filter),
+ .event_type = AWS_PASET_SUBSCRIBE,
+ .error_code = error_code,
+ .retryable = true,
+ };
+
+ (*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data);
+
+done:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data);
+}
+
+int s_aws_mqtt_protocol_adapter_311_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) {
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = impl;
+ struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data =
+ s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter);
+
+ aws_linked_list_push_back(&adapter->incomplete_operations, &subscribe_data->node);
+
+ uint64_t timeout_nanos =
+ aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
+ if (aws_mqtt_client_connection_311_subscribe(
+ connection_impl,
+ &options->topic_filter,
+ AWS_MQTT_QOS_AT_LEAST_ONCE,
+ NULL,
+ NULL,
+ NULL,
+ s_protocol_adapter_311_subscribe_completion,
+ subscribe_data,
+ timeout_nanos) == 0) {
+ goto error;
+ }
+
+ return AWS_OP_SUCCESS;
+
+error:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data);
+
+ return AWS_OP_ERR;
+}
+
+/* Unsubscribe */
+
+static bool s_is_retryable_unsubscribe311(int error_code) {
+ return error_code == AWS_ERROR_MQTT_TIMEOUT;
+}
+
+static void s_protocol_adapter_311_unsubscribe_completion(
+ struct aws_mqtt_client_connection *connection,
+ uint16_t packet_id,
+ int error_code,
+ void *userdata) {
+ (void)connection;
+ (void)packet_id;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = userdata;
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = unsubscribe_data->adapter;
+
+ if (adapter == NULL) {
+ goto done;
+ }
+
+ struct aws_protocol_adapter_subscription_event unsubscribe_event = {
+ .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->operation_data.sub_unsub_data.topic_filter),
+ .event_type = AWS_PASET_UNSUBSCRIBE,
+ .error_code = error_code,
+ .retryable = s_is_retryable_unsubscribe311(error_code),
+ };
+
+ (*adapter->config.subscription_event_callback)(&unsubscribe_event, adapter->config.user_data);
+
+done:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data);
+}
+
+int s_aws_mqtt_protocol_adapter_311_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) {
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = impl;
+ struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data =
+ s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter);
+
+ aws_linked_list_push_back(&adapter->incomplete_operations, &unsubscribe_data->node);
+
+ uint64_t timeout_nanos =
+ aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
+ if (aws_mqtt_client_connection_311_unsubscribe(
+ connection_impl,
+ &options->topic_filter,
+ s_protocol_adapter_311_unsubscribe_completion,
+ unsubscribe_data,
+ timeout_nanos) == 0) {
+ goto error;
+ }
+
+ return AWS_OP_SUCCESS;
+
+error:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data);
+
+ return AWS_OP_ERR;
+}
+
+/* Publish */
+
+static void s_protocol_adapter_311_publish_completion(
+ struct aws_mqtt_client_connection *connection,
+ uint16_t packet_id,
+ int error_code,
+ void *userdata) {
+
+ (void)connection;
+ (void)packet_id;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = userdata;
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = publish_data->adapter;
+
+ if (adapter == NULL) {
+ goto done;
+ }
+
+ (*publish_data->operation_data.publish_data.completion_callback_fn)(
+ error_code, publish_data->operation_data.publish_data.user_data);
+
+done:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data);
+}
+
+int s_aws_mqtt_protocol_adapter_311_publish(void *impl, struct aws_protocol_adapter_publish_options *options) {
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = impl;
+ struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *publish_data =
+ s_aws_mqtt_protocol_adapter_publish_data_new(adapter->allocator, options, adapter);
+
+ aws_linked_list_push_back(&adapter->incomplete_operations, &publish_data->node);
+
+ uint64_t timeout_nanos =
+ aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
+ if (aws_mqtt_client_connection_311_publish(
+ connection_impl,
+ &options->topic,
+ AWS_MQTT_QOS_AT_LEAST_ONCE,
+ false,
+ &options->payload,
+ s_protocol_adapter_311_publish_completion,
+ publish_data,
+ timeout_nanos) == 0) {
+ goto error;
+ }
+
+ return AWS_OP_SUCCESS;
+
+error:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data);
+
+ return AWS_OP_ERR;
+}
+
+static void s_protocol_adapter_mqtt311_listener_publish_received(
+ struct aws_mqtt_client_connection *connection,
+ const struct aws_byte_cursor *topic,
+ const struct aws_byte_cursor *payload,
+ bool dup,
+ enum aws_mqtt_qos qos,
+ bool retain,
+ void *userdata) {
+
+ (void)connection;
+ (void)dup;
+ (void)qos;
+ (void)retain;
+
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = userdata;
+
+ struct aws_mqtt_rr_incoming_publish_event publish_event = {
+ .topic = *topic,
+ .payload = *payload,
+ };
+
+ /* This will potentially include messages that are completely unrelated to MQTT request-response.
+ * The topic is the first thing that should be checked for relevance. */
+ (*adapter->config.incoming_publish_callback)(&publish_event, adapter->config.user_data);
+}
+
+static void s_protocol_adapter_mqtt311_listener_connection_success(
+ struct aws_mqtt_client_connection *connection,
+ enum aws_mqtt_connect_return_code return_code,
+ bool session_present,
+ void *userdata) {
+ (void)connection;
+ (void)return_code;
+
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = userdata;
+
+ if (adapter->config.connection_event_callback != NULL) {
+ struct aws_protocol_adapter_connection_event connection_event = {
+ .event_type = AWS_PACET_CONNECTED,
+ .joined_session = session_present,
+ };
+
+ (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data);
+ }
+}
+
+static void s_protocol_adapter_mqtt311_emit_disconnect_event(struct aws_mqtt_protocol_adapter_311_impl *adapter) {
+ if (adapter->config.connection_event_callback != NULL) {
+ struct aws_protocol_adapter_connection_event connection_event = {
+ .event_type = AWS_PACET_DISCONNECTED,
+ };
+
+ (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data);
+ }
+}
+
+static void s_protocol_adapter_mqtt311_listener_connection_interrupted(
+ struct aws_mqtt_client_connection *connection,
+ int error_code,
+ void *userdata) {
+ (void)connection;
+ (void)error_code;
+
+ s_protocol_adapter_mqtt311_emit_disconnect_event(userdata);
+}
+
+static void s_aws_mqtt_protocol_adapter_311_disconnect_fn(
+ struct aws_mqtt_client_connection *connection,
+ void *userdata) {
+ (void)connection;
+
+ s_protocol_adapter_mqtt311_emit_disconnect_event(userdata);
+}
+
+static bool s_aws_mqtt_protocol_adapter_311_is_connected(void *impl) {
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = impl;
+ struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop));
+
+ mqtt_connection_lock_synced_data(connection_impl);
+ enum aws_mqtt_client_connection_state current_state = connection_impl->synced_data.state;
+ mqtt_connection_unlock_synced_data(connection_impl);
+
+ return current_state == AWS_MQTT_CLIENT_STATE_CONNECTED;
+}
+
+static void s_release_incomplete_operations(struct aws_linked_list *incomplete_operations) {
+ struct aws_linked_list dummy_list;
+ aws_linked_list_init(&dummy_list);
+ aws_linked_list_swap_contents(incomplete_operations, &dummy_list);
+
+ while (!aws_linked_list_empty(&dummy_list)) {
+ struct aws_linked_list_node *head = aws_linked_list_pop_front(&dummy_list);
+ struct aws_mqtt_protocol_adapter_operation_userdata *userdata =
+ AWS_CONTAINER_OF(head, struct aws_mqtt_protocol_adapter_operation_userdata, node);
+
+ userdata->adapter = NULL;
+
+ if (userdata->operation_type == AMPAOT_PUBLISH) {
+ struct aws_mqtt_protocol_adapter_publish_data *publish_data = &userdata->operation_data.publish_data;
+ if (publish_data->completion_callback_fn != NULL) {
+ (*userdata->operation_data.publish_data.completion_callback_fn)(
+ AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, publish_data->user_data);
+ }
+ }
+ }
+}
+
+static void s_protocol_adapter_mqtt311_listener_termination_callback(void *user_data) {
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = user_data;
+ struct aws_mqtt_client_connection_311_impl *impl = adapter->connection->impl;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(impl->loop));
+
+ s_release_incomplete_operations(&adapter->incomplete_operations);
+
+ aws_mqtt_client_connection_release(adapter->connection);
+
+ aws_protocol_adapter_terminate_callback_fn *terminate_callback = adapter->config.terminate_callback;
+ void *terminate_user_data = adapter->config.user_data;
+
+ aws_mem_release(adapter->allocator, adapter);
+
+ if (terminate_callback) {
+ (*terminate_callback)(terminate_user_data);
+ }
+}
+
+static struct aws_event_loop *s_aws_mqtt_protocol_adapter_311_get_event_loop(void *impl) {
+ struct aws_mqtt_protocol_adapter_311_impl *adapter = impl;
+
+ return adapter->loop;
+}
+
+static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt311_vtable = {
+ .aws_mqtt_protocol_adapter_destroy_fn = s_aws_mqtt_protocol_adapter_311_destroy,
+ .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_311_subscribe,
+ .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_311_unsubscribe,
+ .aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_311_publish,
+ .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_311_is_connected,
+ .aws_mqtt_protocol_adapter_get_event_loop_fn = s_aws_mqtt_protocol_adapter_311_get_event_loop,
+};
+
+struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(
+ struct aws_allocator *allocator,
+ struct aws_mqtt_protocol_adapter_options *options,
+ struct aws_mqtt_client_connection *connection) {
+
+ if (options == NULL || connection == NULL) {
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ return NULL;
+ }
+
+ if (aws_mqtt_client_connection_get_impl_type(connection) != AWS_MQTT311_IT_311_CONNECTION) {
+ struct aws_mqtt_client_connection_5_impl *adapter_impl = connection->impl;
+ return aws_mqtt_protocol_adapter_new_from_5(allocator, options, adapter_impl->client);
+ }
+
+ struct aws_mqtt_client_connection_311_impl *impl = connection->impl;
+
+ struct aws_mqtt_protocol_adapter_311_impl *adapter =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_311_impl));
+
+ adapter->allocator = allocator;
+ adapter->base.impl = adapter;
+ adapter->base.vtable = &s_protocol_adapter_mqtt311_vtable;
+ aws_linked_list_init(&adapter->incomplete_operations);
+ adapter->config = *options;
+ adapter->loop = impl->loop;
+ adapter->connection = aws_mqtt_client_connection_acquire(connection);
+
+ struct aws_mqtt311_listener_config listener_options = {
+ .connection = connection,
+ .listener_callbacks =
+ {
+ .publish_received_handler = s_protocol_adapter_mqtt311_listener_publish_received,
+ .connection_success_handler = s_protocol_adapter_mqtt311_listener_connection_success,
+ .connection_interrupted_handler = s_protocol_adapter_mqtt311_listener_connection_interrupted,
+ .disconnect_handler = s_aws_mqtt_protocol_adapter_311_disconnect_fn,
+ .user_data = adapter,
+ },
+ .termination_callback = s_protocol_adapter_mqtt311_listener_termination_callback,
+ .termination_callback_user_data = adapter,
+ };
+
+ adapter->listener = aws_mqtt311_listener_new(allocator, &listener_options);
+
+ return &adapter->base;
+}
+
+/******************************************************************************************************************/
+
+struct aws_mqtt_protocol_adapter_5_impl {
+ struct aws_allocator *allocator;
+ struct aws_mqtt_protocol_adapter base;
+ struct aws_linked_list incomplete_operations;
+ struct aws_mqtt_protocol_adapter_options config;
+
+ struct aws_event_loop *loop;
+ struct aws_mqtt5_client *client;
+ struct aws_mqtt5_listener *listener;
+};
+
+static void s_aws_mqtt_protocol_adapter_5_destroy(void *impl) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
+
+ // all the real cleanup is done in the listener termination callback
+ aws_mqtt5_listener_release(adapter->listener);
+}
+
+/* Subscribe */
+
+static bool s_is_retryable_subscribe(enum aws_mqtt5_suback_reason_code reason_code, int error_code) {
+ if (error_code == AWS_ERROR_MQTT5_PACKET_VALIDATION || error_code == AWS_ERROR_MQTT5_SUBSCRIBE_OPTIONS_VALIDATION) {
+ return false;
+ } else if (error_code != AWS_ERROR_SUCCESS) {
+ return true;
+ }
+
+ switch (reason_code) {
+ case AWS_MQTT5_SARC_GRANTED_QOS_0:
+ case AWS_MQTT5_SARC_GRANTED_QOS_1:
+ case AWS_MQTT5_SARC_GRANTED_QOS_2:
+ case AWS_MQTT5_SARC_UNSPECIFIED_ERROR:
+ case AWS_MQTT5_SARC_PACKET_IDENTIFIER_IN_USE:
+ case AWS_MQTT5_SARC_IMPLEMENTATION_SPECIFIC_ERROR:
+ case AWS_MQTT5_SARC_QUOTA_EXCEEDED:
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+static void s_protocol_adapter_5_subscribe_completion(
+ const struct aws_mqtt5_packet_suback_view *suback,
+ int error_code,
+ void *complete_ctx) {
+ struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = complete_ctx;
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = subscribe_data->adapter;
+
+ if (adapter == NULL) {
+ goto done;
+ }
+
+ enum aws_mqtt5_suback_reason_code reason_code = AWS_MQTT5_SARC_GRANTED_QOS_0;
+ if (suback != NULL && suback->reason_code_count > 0) {
+ reason_code = suback->reason_codes[0];
+ }
+ bool is_retryable = s_is_retryable_subscribe(reason_code, error_code);
+
+ if (error_code == AWS_ERROR_SUCCESS) {
+ if (suback == NULL || suback->reason_code_count != 1 || suback->reason_codes[0] >= 128) {
+ error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
+ }
+ }
+
+ struct aws_protocol_adapter_subscription_event subscribe_event = {
+ .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->operation_data.sub_unsub_data.topic_filter),
+ .event_type = AWS_PASET_SUBSCRIBE,
+ .error_code = error_code,
+ .retryable = is_retryable,
+ };
+
+ (*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data);
+
+done:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data);
+}
+
+int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data =
+ s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter);
+
+ aws_linked_list_push_back(&adapter->incomplete_operations, &subscribe_data->node);
+
+ struct aws_mqtt5_subscription_view subscription_view = {
+ .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE,
+ .topic_filter = options->topic_filter,
+ };
+
+ struct aws_mqtt5_packet_subscribe_view subscribe_view = {
+ .subscriptions = &subscription_view,
+ .subscription_count = 1,
+ };
+
+ struct aws_mqtt5_subscribe_completion_options completion_options = {
+ .ack_timeout_seconds_override = options->ack_timeout_seconds,
+ .completion_callback = s_protocol_adapter_5_subscribe_completion,
+ .completion_user_data = subscribe_data,
+ };
+
+ if (aws_mqtt5_client_subscribe(adapter->client, &subscribe_view, &completion_options)) {
+ goto error;
+ }
+
+ return AWS_OP_SUCCESS;
+
+error:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data);
+
+ return AWS_OP_ERR;
+}
+
+/* Unsubscribe */
+
+static bool s_is_retryable_unsubscribe5(enum aws_mqtt5_unsuback_reason_code reason_code, int error_code) {
+ if (error_code == AWS_ERROR_MQTT5_PACKET_VALIDATION ||
+ error_code == AWS_ERROR_MQTT5_UNSUBSCRIBE_OPTIONS_VALIDATION) {
+ return false;
+ } else if (error_code == AWS_ERROR_MQTT_TIMEOUT) {
+ return true;
+ }
+
+ switch (reason_code) {
+ case AWS_MQTT5_UARC_UNSPECIFIED_ERROR:
+ case AWS_MQTT5_UARC_IMPLEMENTATION_SPECIFIC_ERROR:
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+static void s_protocol_adapter_5_unsubscribe_completion(
+ const struct aws_mqtt5_packet_unsuback_view *unsuback,
+ int error_code,
+ void *complete_ctx) {
+ struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = complete_ctx;
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = unsubscribe_data->adapter;
+
+ if (adapter == NULL) {
+ goto done;
+ }
+
+ enum aws_mqtt5_unsuback_reason_code reason_code = AWS_MQTT5_UARC_SUCCESS;
+ if (unsuback != NULL && unsuback->reason_code_count > 0) {
+ reason_code = unsuback->reason_codes[0];
+ }
+
+ bool is_retryable = s_is_retryable_unsubscribe5(reason_code, error_code);
+
+ if (error_code == AWS_ERROR_SUCCESS) {
+ if (unsuback == NULL || unsuback->reason_code_count != 1 || unsuback->reason_codes[0] >= 128) {
+ error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
+ }
+ }
+
+ struct aws_protocol_adapter_subscription_event unsubscribe_event = {
+ .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->operation_data.sub_unsub_data.topic_filter),
+ .event_type = AWS_PASET_UNSUBSCRIBE,
+ .error_code = error_code,
+ .retryable = is_retryable,
+ };
+
+ (*adapter->config.subscription_event_callback)(&unsubscribe_event, adapter->config.user_data);
+
+done:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data);
+}
+
+int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
+
+ struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data =
+ s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter);
+
+ aws_linked_list_push_back(&adapter->incomplete_operations, &unsubscribe_data->node);
+
+ struct aws_mqtt5_packet_unsubscribe_view unsubscribe_view = {
+ .topic_filters = &options->topic_filter,
+ .topic_filter_count = 1,
+ };
+
+ struct aws_mqtt5_unsubscribe_completion_options completion_options = {
+ .ack_timeout_seconds_override = options->ack_timeout_seconds,
+ .completion_callback = s_protocol_adapter_5_unsubscribe_completion,
+ .completion_user_data = unsubscribe_data,
+ };
+
+ if (aws_mqtt5_client_unsubscribe(adapter->client, &unsubscribe_view, &completion_options)) {
+ goto error;
+ }
+
+ return AWS_OP_SUCCESS;
+
+error:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data);
+
+ return AWS_OP_ERR;
+}
+
+/* Publish */
+
+static void s_protocol_adapter_5_publish_completion(
+ enum aws_mqtt5_packet_type packet_type,
+ const void *packet,
+ int error_code,
+ void *complete_ctx) {
+ struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = complete_ctx;
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = publish_data->adapter;
+
+ if (adapter == NULL) {
+ goto done;
+ }
+
+ if (error_code == AWS_ERROR_SUCCESS && packet_type == AWS_MQTT5_PT_PUBACK) {
+ const struct aws_mqtt5_packet_puback_view *puback = packet;
+ if (puback->reason_code >= 128) {
+ error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
+ }
+ }
+
+ (*publish_data->operation_data.publish_data.completion_callback_fn)(
+ error_code, publish_data->operation_data.publish_data.user_data);
+
+done:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data);
+}
+
+int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapter_publish_options *options) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
+ struct aws_mqtt_protocol_adapter_operation_userdata *publish_data =
+ s_aws_mqtt_protocol_adapter_publish_data_new(adapter->allocator, options, adapter);
+
+ aws_linked_list_push_back(&adapter->incomplete_operations, &publish_data->node);
+
+ struct aws_mqtt5_packet_publish_view publish_view = {
+ .topic = options->topic, .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, .payload = options->payload};
+
+ struct aws_mqtt5_publish_completion_options completion_options = {
+ .ack_timeout_seconds_override = options->ack_timeout_seconds,
+ .completion_callback = s_protocol_adapter_5_publish_completion,
+ .completion_user_data = publish_data,
+ };
+
+ if (aws_mqtt5_client_publish(adapter->client, &publish_view, &completion_options)) {
+ goto error;
+ }
+
+ return AWS_OP_SUCCESS;
+
+error:
+
+ s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data);
+
+ return AWS_OP_ERR;
+}
+
+static bool s_aws_mqtt_protocol_adapter_5_is_connected(void *impl) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop));
+
+ enum aws_mqtt5_client_state current_state = adapter->client->current_state;
+
+ return current_state == AWS_MCS_CONNECTED;
+}
+
+static bool s_protocol_adapter_mqtt5_listener_publish_received(
+ const struct aws_mqtt5_packet_publish_view *publish,
+ void *user_data) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = user_data;
+
+ struct aws_mqtt_rr_incoming_publish_event publish_event = {
+ .topic = publish->topic,
+ .payload = publish->payload,
+ .content_type = publish->content_type,
+ .user_property_count = publish->user_property_count,
+ .user_properties = publish->user_properties,
+ .message_expiry_interval_seconds = publish->message_expiry_interval_seconds,
+ };
+
+ /* This will potentially include messages that are completely unrelated to MQTT request-response.
+ * The topic is the first thing that should be checked for relevance. */
+ (*adapter->config.incoming_publish_callback)(&publish_event, adapter->config.user_data);
+
+ return false;
+}
+
+static void s_protocol_adapter_mqtt5_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = event->user_data;
+
+ switch (event->event_type) {
+ case AWS_MQTT5_CLET_CONNECTION_SUCCESS: {
+ struct aws_protocol_adapter_connection_event connection_event = {
+ .event_type = AWS_PACET_CONNECTED,
+ .joined_session = event->settings->rejoined_session,
+ };
+
+ (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data);
+ break;
+ }
+ case AWS_MQTT5_CLET_DISCONNECTION: {
+ struct aws_protocol_adapter_connection_event connection_event = {
+ .event_type = AWS_PACET_DISCONNECTED,
+ };
+
+ (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data);
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_data) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = user_data;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop));
+
+ s_release_incomplete_operations(&adapter->incomplete_operations);
+
+ aws_mqtt5_client_release(adapter->client);
+
+ aws_protocol_adapter_terminate_callback_fn *terminate_callback = adapter->config.terminate_callback;
+ void *terminate_user_data = adapter->config.user_data;
+
+ aws_mem_release(adapter->allocator, adapter);
+
+ if (terminate_callback) {
+ (*terminate_callback)(terminate_user_data);
+ }
+}
+
+static struct aws_event_loop *s_aws_mqtt_protocol_adapter_5_get_event_loop(void *impl) {
+ struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
+
+ return adapter->loop;
+}
+
+static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable = {
+ .aws_mqtt_protocol_adapter_destroy_fn = s_aws_mqtt_protocol_adapter_5_destroy,
+ .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_5_subscribe,
+ .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_5_unsubscribe,
+ .aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_5_publish,
+ .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_5_is_connected,
+ .aws_mqtt_protocol_adapter_get_event_loop_fn = s_aws_mqtt_protocol_adapter_5_get_event_loop};
+
+struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
+ struct aws_allocator *allocator,
+ struct aws_mqtt_protocol_adapter_options *options,
+ struct aws_mqtt5_client *client) {
+
+ if (options == NULL || client == NULL) {
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ return NULL;
+ }
+
+ struct aws_mqtt_protocol_adapter_5_impl *adapter =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_impl));
+
+ adapter->allocator = allocator;
+ adapter->base.impl = adapter;
+ adapter->base.vtable = &s_protocol_adapter_mqtt5_vtable;
+ aws_linked_list_init(&adapter->incomplete_operations);
+ adapter->config = *options;
+ adapter->loop = client->loop;
+ adapter->client = aws_mqtt5_client_acquire(client);
+
+ struct aws_mqtt5_listener_config listener_options = {
+ .client = client,
+ .listener_callbacks =
+ {
+ .listener_publish_received_handler = s_protocol_adapter_mqtt5_listener_publish_received,
+ .listener_publish_received_handler_user_data = adapter,
+ .lifecycle_event_handler = s_protocol_adapter_mqtt5_lifecycle_event_callback,
+ .lifecycle_event_handler_user_data = adapter,
+ },
+ .termination_callback = s_protocol_adapter_mqtt5_listener_termination_callback,
+ .termination_callback_user_data = adapter,
+ };
+
+ adapter->listener = aws_mqtt5_listener_new(allocator, &listener_options);
+
+ return &adapter->base;
+}
+
+void aws_mqtt_protocol_adapter_destroy(struct aws_mqtt_protocol_adapter *adapter) {
+ (*adapter->vtable->aws_mqtt_protocol_adapter_destroy_fn)(adapter->impl);
+}
+
+int aws_mqtt_protocol_adapter_subscribe(
+ struct aws_mqtt_protocol_adapter *adapter,
+ struct aws_protocol_adapter_subscribe_options *options) {
+ return (*adapter->vtable->aws_mqtt_protocol_adapter_subscribe_fn)(adapter->impl, options);
+}
+
+int aws_mqtt_protocol_adapter_unsubscribe(
+ struct aws_mqtt_protocol_adapter *adapter,
+ struct aws_protocol_adapter_unsubscribe_options *options) {
+ return (*adapter->vtable->aws_mqtt_protocol_adapter_unsubscribe_fn)(adapter->impl, options);
+}
+
+int aws_mqtt_protocol_adapter_publish(
+ struct aws_mqtt_protocol_adapter *adapter,
+ struct aws_protocol_adapter_publish_options *options) {
+ return (*adapter->vtable->aws_mqtt_protocol_adapter_publish_fn)(adapter->impl, options);
+}
+
+bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter) {
+ return (*adapter->vtable->aws_mqtt_protocol_adapter_is_connected_fn)(adapter->impl);
+}
+
+struct aws_event_loop *aws_mqtt_protocol_adapter_get_event_loop(struct aws_mqtt_protocol_adapter *adapter) {
+ return (*adapter->vtable->aws_mqtt_protocol_adapter_get_event_loop_fn)(adapter->impl);
+}
+
+const char *aws_protocol_adapter_subscription_event_type_to_c_str(
+ enum aws_protocol_adapter_subscription_event_type type) {
+ switch (type) {
+ case AWS_PASET_SUBSCRIBE:
+ return "Subscribe";
+
+ case AWS_PASET_UNSUBSCRIBE:
+ return "Unsubscribe";
+ }
+
+ return "Unknown";
+}
+
+const char *aws_protocol_adapter_connection_event_type_to_c_str(enum aws_protocol_adapter_connection_event_type type) {
+ switch (type) {
+ case AWS_PACET_CONNECTED:
+ return "Connected";
+
+ case AWS_PACET_DISCONNECTED:
+ return "Disconnected";
+ }
+
+ return "Unknown";
+}
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_client.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_client.c
new file mode 100644
index 00000000000..7e5bedfe1df
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_client.c
@@ -0,0 +1,2121 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/request-response/request_response_client.h>
+
+#include <aws/common/clock.h>
+#include <aws/common/json.h>
+#include <aws/common/ref_count.h>
+#include <aws/common/task_scheduler.h>
+#include <aws/io/event_loop.h>
+#include <aws/mqtt/private/client_impl_shared.h>
+#include <aws/mqtt/private/request-response/protocol_adapter.h>
+#include <aws/mqtt/private/request-response/request_response_subscription_set.h>
+#include <aws/mqtt/private/request-response/subscription_manager.h>
+#include <aws/mqtt/private/v5/mqtt5_client_impl.h>
+
+#include <inttypes.h>
+
+#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50
+
+struct aws_mqtt_request_operation_storage {
+ struct aws_mqtt_request_operation_options options;
+
+ struct aws_array_list operation_response_paths;
+ struct aws_array_list subscription_topic_filters;
+
+ struct aws_byte_buf operation_data;
+};
+
+struct aws_mqtt_streaming_operation_storage {
+ struct aws_mqtt_streaming_operation_options options;
+
+ struct aws_byte_buf operation_data;
+
+ struct aws_atomic_var activated;
+};
+
+enum aws_mqtt_request_response_operation_type {
+ AWS_MRROT_REQUEST,
+ AWS_MRROT_STREAMING,
+};
+
+enum aws_mqtt_request_response_operation_state {
+ /* creation -> in event loop enqueue */
+ AWS_MRROS_NONE,
+
+ /* in event loop queue -> non blocked response from subscription manager */
+ AWS_MRROS_QUEUED,
+
+ /* subscribing response from sub manager -> subscription success/failure event */
+ AWS_MRROS_PENDING_SUBSCRIPTION,
+
+ /* (request only) subscription success -> (publish failure OR correlated response received) */
+ AWS_MRROS_PENDING_RESPONSE,
+
+ /* (request only) the operation's destroy task has been scheduled but not yet executed */
+ AWS_MRROS_PENDING_DESTROY,
+
+ /* (streaming only) subscription success -> (operation finished OR subscription ended event) */
+ AWS_MRROS_SUBSCRIBED,
+
+ /* (streaming only) (subscription failure OR subscription ended) -> operation close/terminate */
+ AWS_MRROS_TERMINAL,
+};
+
+const char *s_aws_mqtt_request_response_operation_state_to_c_str(enum aws_mqtt_request_response_operation_state state) {
+ switch (state) {
+ case AWS_MRROS_NONE:
+ return "NONE";
+
+ case AWS_MRROS_QUEUED:
+ return "QUEUED";
+
+ case AWS_MRROS_PENDING_SUBSCRIPTION:
+ return "PENDING_SUBSCRIPTION";
+
+ case AWS_MRROS_PENDING_RESPONSE:
+ return "PENDING_RESPONSE";
+
+ case AWS_MRROS_SUBSCRIBED:
+ return "SUBSCRIBED";
+
+ case AWS_MRROS_TERMINAL:
+ return "TERMINAL";
+
+ case AWS_MRROS_PENDING_DESTROY:
+ return "PENDING_DESTROY";
+
+ default:
+ return "Unknown";
+ }
+}
+
+const char *s_aws_acquire_subscription_result_type(enum aws_acquire_subscription_result_type result) {
+ switch (result) {
+ case AASRT_SUBSCRIBED:
+ return "SUBSCRIBED";
+
+ case AASRT_SUBSCRIBING:
+ return "SUBSCRIBING";
+
+ case AASRT_BLOCKED:
+ return "BLOCKED";
+
+ case AASRT_NO_CAPACITY:
+ return "NO_CAPACITY";
+
+ case AASRT_FAILURE:
+ return "FAILURE";
+
+ default:
+ return "Unknown";
+ }
+}
+
+/*
+
+Client Tables/Lookups
+
+ (operations: authoritative operation container)
+ 1. &operation.id -> &operation // added on in-thread enqueue, removed on operation completion/destruction
+
+ (request_response_paths: response path topic -> Correlation token extraction info)
+ 2. &topic -> &{topic, topic_buffer, correlation token json path buffer} // ref-counted, per-message-path add on
+ request dequeue into subscribing/subscribed state, decref/removed on operation completion/destruction
+
+ (operations_by_correlation_tokens: correlationToken -> request operation)
+ 3. &operation.correlation token -> (request) &operation // added on request dequeue into subscribing/subscribed
+state, removed on operation completion/destruction
+
+ (streaming_operation_subscription_lists: streaming subscription filter -> list of all operations using that filter)
+ 4. &topic_filter -> &{topic_filter, linked_list} // added on request dequeue into subscribing/subscribed state,
+ removed from list on operation completion/destruction also checked for empty and removed from table
+
+*/
+
+struct aws_mqtt_rr_client_operation {
+ struct aws_allocator *allocator;
+
+ /*
+ * Operation ref-counting is a bit tricky and un-intuitive because it differs based on the type of operation.
+ *
+ * Streaming operations are managed by the user, and so the ref count is their responsibility to drop to zero.
+ * Dropping a streaming operation's ref count to zero schedules a task on the client event loop to destroy the
+ * operation. It is expected that the binding client will track (with proper synchronization) all unclosed
+ * streaming operations and safely close them for the user when close is called on the binding client.
+ *
+ * Request operations are managed by the client, and so the ref count is dropped to zero when either the
+ * operation completes normally (success or failure) or when the client is shutdown due to its external ref
+ * count dropping to zero. In all cases, this event happens naturally on the client event loop.
+ *
+ * So the summary is:
+ *
+ * (1) Streaming operation clean up is initiated by the user calling dec ref on the streaming operation
+ * (2) Request operation clean up is initiated by normal completion or client shutdown invoking dec ref.
+ *
+ * The upshot is that client shutdown dec-refs request operations but not streaming operations.
+ */
+ struct aws_ref_count ref_count;
+
+ struct aws_mqtt_request_response_client *client_internal_ref;
+
+ uint64_t id;
+
+ enum aws_mqtt_request_response_operation_type type;
+
+ union {
+ struct aws_mqtt_streaming_operation_storage streaming_storage;
+ struct aws_mqtt_request_operation_storage request_storage;
+ } storage;
+
+ uint64_t timeout_timepoint_ns;
+ struct aws_priority_queue_node priority_queue_node;
+
+ /* Sometimes this is client->operation_queue, other times it is an entry in the client's topic_filter table */
+ struct aws_linked_list_node node;
+
+ enum aws_mqtt_request_response_operation_state state;
+
+ size_t pending_subscriptions;
+
+ bool in_client_tables;
+
+ struct aws_task submit_task;
+ struct aws_task destroy_task;
+};
+
+/*******************************************************************************************/
+
+/* Tracks the current state of the request-response client */
+enum aws_request_response_client_state {
+
+ /* cross-thread initialization has not completed and all protocol adapter callbacks are ignored */
+ AWS_RRCS_UNINITIALIZED,
+
+ /* Normal operating state for the client. */
+ AWS_RRCS_ACTIVE,
+
+ /* asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored */
+ AWS_RRCS_SHUTTING_DOWN,
+};
+
+/*
+ * Request-Response Client Notes
+ *
+ * Ref-counting/Shutdown
+ *
+ * The request-response client uses a double ref-count pattern.
+ *
+ * External references represent user references. When the external reference reaches zero, the client's asynchronous
+ * shutdown process is started.
+ *
+ * Internal references block final destruction. Asynchronous shutdown will not complete until all internal references
+ * are dropped. In addition to one long-lived internal reference (the protocol client adapter's back reference to
+ * the request-response client), all event loop tasks that target the request-response client hold an internal
+ * reference between task submission and task completion. This ensures that the task always has a valid reference
+ * to the client, even if we're trying to shut down at the same time.
+ *
+ *
+ * Initialization
+ *
+ * Initialization is complicated by the fact that the subscription manager needs to be initialized from the
+ * event loop thread that the client/protocol adapter/protocol client are all seated on. To do this safely,
+ * we add an uninitialized state that ignores all callbacks and we schedule a task on initial construction to do
+ * the event-loop-only initialization. Once that initialization completes on the event loop thread, we move
+ * the client into an active state where it will process operations and protocol adapter callbacks.
+ */
+struct aws_mqtt_request_response_client {
+ struct aws_allocator *allocator;
+
+ struct aws_ref_count external_ref_count;
+ struct aws_ref_count internal_ref_count;
+
+ struct aws_mqtt_request_response_client_options config;
+
+ struct aws_mqtt_protocol_adapter *client_adapter;
+
+ struct aws_rr_subscription_manager subscription_manager;
+
+ struct aws_event_loop *loop;
+
+ struct aws_task initialize_task;
+ struct aws_task external_shutdown_task;
+ struct aws_task internal_shutdown_task;
+
+ uint64_t scheduled_service_timepoint_ns;
+ struct aws_task service_task;
+
+ enum aws_request_response_client_state state;
+
+ struct aws_atomic_var next_id;
+
+ struct aws_linked_list operation_queue;
+
+ /* &operation->id -> &operation */
+ struct aws_hash_table operations;
+
+ /*
+ * heap of operation pointers where the timeout is the sort value. Elements are added to this on operation
+ * submission and removed on operation timeout/completion/termination. Request-response operations have actual
+ * timeouts, while streaming operations have UINT64_MAX timeouts.
+ */
+ struct aws_priority_queue operations_by_timeout;
+
+ /*
+ * Structure to handle stream and request subscriptions.
+ */
+ struct aws_request_response_subscriptions subscriptions;
+
+ /*
+ * Map from cursor (correlation token) -> request operation
+ */
+ struct aws_hash_table operations_by_correlation_tokens;
+};
+
+struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire_internal(
+ struct aws_mqtt_request_response_client *client) {
+ if (client != NULL) {
+ aws_ref_count_acquire(&client->internal_ref_count);
+ }
+
+ return client;
+}
+
+struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release_internal(
+ struct aws_mqtt_request_response_client *client) {
+ if (client != NULL) {
+ aws_ref_count_release(&client->internal_ref_count);
+ }
+
+ return NULL;
+}
+
+static void s_aws_rr_client_on_zero_internal_ref_count(void *context) {
+ struct aws_mqtt_request_response_client *client = context;
+
+ /* Both ref counts are zero, but it's still safest to schedule final destruction, not invoke it directly */
+ aws_event_loop_schedule_task_now(client->loop, &client->internal_shutdown_task);
+}
+
+static void s_aws_rr_client_on_zero_external_ref_count(void *context) {
+ struct aws_mqtt_request_response_client *client = context;
+
+ /* Start the asynchronous shutdown process */
+ aws_event_loop_schedule_task_now(client->loop, &client->external_shutdown_task);
+}
+
+static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request_response_client *client) {
+ aws_mqtt_request_response_client_terminated_callback_fn *terminate_callback = client->config.terminated_callback;
+ void *user_data = client->config.user_data;
+
+ AWS_FATAL_ASSERT(aws_hash_table_get_entry_count(&client->operations) == 0);
+ aws_hash_table_clean_up(&client->operations);
+
+ aws_priority_queue_clean_up(&client->operations_by_timeout);
+
+ aws_mqtt_request_response_client_subscriptions_clean_up(&client->subscriptions);
+ aws_hash_table_clean_up(&client->operations_by_correlation_tokens);
+
+ aws_mem_release(client->allocator, client);
+
+ if (terminate_callback != NULL) {
+ (*terminate_callback)(user_data);
+ }
+}
+
+static void s_mqtt_request_response_client_internal_shutdown_task_fn(
+ struct aws_task *task,
+ void *arg,
+ enum aws_task_status task_status) {
+ (void)task;
+ (void)task_status;
+
+ struct aws_mqtt_request_response_client *client = arg;
+
+ /* All internal and external refs are gone; it is safe to clean up synchronously. */
+ s_mqtt_request_response_client_final_destroy(client);
+}
+
+static void s_remove_operation_from_timeout_queue(struct aws_mqtt_rr_client_operation *operation) {
+ struct aws_mqtt_request_response_client *client = operation->client_internal_ref;
+
+ if (aws_priority_queue_node_is_in_queue(&operation->priority_queue_node)) {
+ struct aws_mqtt_rr_client_operation *queued_operation = NULL;
+ aws_priority_queue_remove(&client->operations_by_timeout, &queued_operation, &operation->priority_queue_node);
+ }
+}
+
+static void s_change_operation_state(
+ struct aws_mqtt_rr_client_operation *operation,
+ enum aws_mqtt_request_response_operation_state new_state) {
+ enum aws_mqtt_request_response_operation_state old_state = operation->state;
+ if (old_state == new_state) {
+ return;
+ }
+
+ operation->state = new_state;
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response operation %" PRIu64 " changing state from %s to %s",
+ (void *)operation->client_internal_ref,
+ operation->id,
+ s_aws_mqtt_request_response_operation_state_to_c_str(old_state),
+ s_aws_mqtt_request_response_operation_state_to_c_str(new_state));
+}
+
+static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) {
+ AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST);
+ AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS);
+
+ if (operation->state == AWS_MRROS_PENDING_DESTROY) {
+ return;
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response operation %" PRIu64 " failed with error code %d(%s)",
+ (void *)operation->client_internal_ref,
+ operation->id,
+ error_code,
+ aws_error_debug_str(error_code));
+
+ aws_mqtt_request_operation_completion_fn *completion_callback =
+ operation->storage.request_storage.options.completion_callback;
+ void *user_data = operation->storage.request_storage.options.user_data;
+
+ if (completion_callback != NULL) {
+ (*completion_callback)(NULL, error_code, user_data);
+ }
+
+ s_change_operation_state(operation, AWS_MRROS_PENDING_DESTROY);
+
+ aws_mqtt_rr_client_operation_release(operation);
+}
+
+static void s_streaming_operation_emit_streaming_subscription_event(
+ struct aws_mqtt_rr_client_operation *operation,
+ enum aws_rr_streaming_subscription_event_type event_type,
+ int error_code) {
+ aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback =
+ operation->storage.streaming_storage.options.subscription_status_callback;
+
+ if (subscription_status_callback != NULL) {
+ void *user_data = operation->storage.streaming_storage.options.user_data;
+ (*subscription_status_callback)(event_type, error_code, user_data);
+ }
+}
+
+static void s_halt_streaming_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) {
+ AWS_FATAL_ASSERT(operation->type == AWS_MRROT_STREAMING);
+ AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS);
+
+ if (operation->state == AWS_MRROS_PENDING_DESTROY || operation->state == AWS_MRROS_TERMINAL) {
+ return;
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: streaming operation %" PRIu64 " halted with error code %d(%s)",
+ (void *)operation->client_internal_ref,
+ operation->id,
+ error_code,
+ aws_error_debug_str(error_code));
+
+ s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_HALTED, error_code);
+
+ s_change_operation_state(operation, AWS_MRROS_TERMINAL);
+}
+
+static void s_request_response_fail_operation(struct aws_mqtt_rr_client_operation *operation, int error_code) {
+ if (operation->type == AWS_MRROT_STREAMING) {
+ s_halt_streaming_operation_with_failure(operation, error_code);
+ } else {
+ s_complete_request_operation_with_failure(operation, error_code);
+ }
+}
+
+static int s_rr_client_clean_up_operation(void *context, struct aws_hash_element *elem) {
+ (void)context;
+ struct aws_mqtt_rr_client_operation *operation = elem->value;
+
+ s_request_response_fail_operation(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN);
+
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
+}
+
+static void s_mqtt_request_response_client_external_shutdown_task_fn(
+ struct aws_task *task,
+ void *arg,
+ enum aws_task_status task_status) {
+ (void)task;
+
+ AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED);
+
+ struct aws_mqtt_request_response_client *client = arg;
+
+ /* stop handling adapter event callbacks */
+ client->state = AWS_RRCS_SHUTTING_DOWN;
+
+ if (client->scheduled_service_timepoint_ns != 0) {
+ aws_event_loop_cancel_task(client->loop, &client->service_task);
+ client->scheduled_service_timepoint_ns = 0;
+ }
+
+ aws_rr_subscription_manager_clean_up(&client->subscription_manager);
+
+ if (client->client_adapter != NULL) {
+ aws_mqtt_protocol_adapter_destroy(client->client_adapter);
+ }
+
+ /*
+ * It is a client invariant that when external shutdown starts, it must be the case that there are no in-flight
+ * operations with un-executed submit tasks. This means it safe to assume that all tracked request operations are
+ * either in the process of cleaning up already (state == AWS_MRROS_PENDING_DESTROY) or can be
+ * completed now (state != AWS_MRROS_PENDING_DESTROY). Non-terminal streaming operations are moved into
+ * a terminal state and emit an appropriate failure/ended event.
+ *
+ * Actual operation destruction and client ref-count release is done by a scheduled task
+ * on the operation that is triggered by dec-refing it (assuming streaming operations get closed by the binding
+ * client).
+ */
+ aws_hash_table_foreach(&client->operations, s_rr_client_clean_up_operation, NULL);
+
+ aws_ref_count_release(&client->internal_ref_count);
+}
+
+static void s_mqtt_request_response_client_wake_service(struct aws_mqtt_request_response_client *client) {
+ uint64_t now = 0;
+ aws_high_res_clock_get_ticks(&now);
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop));
+
+ if (client->state != AWS_RRCS_ACTIVE) {
+ return;
+ }
+
+ if (client->scheduled_service_timepoint_ns == 0 || now < client->scheduled_service_timepoint_ns) {
+ if (now < client->scheduled_service_timepoint_ns) {
+ aws_event_loop_cancel_task(client->loop, &client->service_task);
+ }
+
+ client->scheduled_service_timepoint_ns = now;
+ aws_event_loop_schedule_task_now(client->loop, &client->service_task);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE, "id=%p: request-response client service task woke", (void *)client);
+ }
+}
+
+struct aws_rrc_incomplete_publish {
+ struct aws_allocator *allocator;
+
+ struct aws_mqtt_request_response_client *rr_client;
+
+ uint64_t operation_id;
+};
+
+static void s_aws_rrc_incomplete_publish_destroy(struct aws_rrc_incomplete_publish *user_data) {
+ if (user_data == NULL) {
+ return;
+ }
+
+ aws_mqtt_request_response_client_release_internal(user_data->rr_client);
+
+ aws_mem_release(user_data->allocator, user_data);
+}
+
+static void s_on_request_publish_completion(int error_code, void *userdata) {
+ struct aws_rrc_incomplete_publish *publish_user_data = userdata;
+
+ if (error_code != AWS_ERROR_SUCCESS) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response operation %" PRIu64 " failed publish step due to error %d(%s)",
+ (void *)publish_user_data->rr_client,
+ publish_user_data->operation_id,
+ error_code,
+ aws_error_debug_str(error_code));
+
+ struct aws_hash_element *element = NULL;
+ if (aws_hash_table_find(
+ &publish_user_data->rr_client->operations, &publish_user_data->operation_id, &element) ==
+ AWS_OP_SUCCESS &&
+ element != NULL) {
+ struct aws_mqtt_rr_client_operation *operation = element->value;
+ s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE);
+ }
+ }
+
+ s_aws_rrc_incomplete_publish_destroy(publish_user_data);
+}
+
+static void s_make_mqtt_request(
+ struct aws_mqtt_request_response_client *client,
+ struct aws_mqtt_rr_client_operation *operation) {
+ (void)client;
+
+ AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST);
+
+ struct aws_mqtt_request_operation_options *request_options = &operation->storage.request_storage.options;
+
+ struct aws_rrc_incomplete_publish *publish_user_data =
+ aws_mem_calloc(client->allocator, 1, sizeof(struct aws_rrc_incomplete_publish));
+ publish_user_data->allocator = client->allocator;
+ publish_user_data->rr_client = aws_mqtt_request_response_client_acquire_internal(client);
+ publish_user_data->operation_id = operation->id;
+
+ struct aws_protocol_adapter_publish_options publish_options = {
+ .topic = request_options->publish_topic,
+ .payload = request_options->serialized_request,
+ .ack_timeout_seconds = client->config.operation_timeout_seconds,
+ .completion_callback_fn = s_on_request_publish_completion,
+ .user_data = publish_user_data,
+ };
+
+ if (aws_mqtt_protocol_adapter_publish(client->client_adapter, &publish_options)) {
+ int error_code = aws_last_error();
+
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response operation %" PRIu64 " synchronously failed publish step due to error %d(%s)",
+ (void *)publish_user_data->rr_client,
+ publish_user_data->operation_id,
+ error_code,
+ aws_error_debug_str(error_code));
+ s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE);
+ goto error;
+ }
+
+ return;
+
+error:
+
+ s_aws_rrc_incomplete_publish_destroy(publish_user_data);
+}
+
+struct aws_rr_subscription_status_event_task {
+ struct aws_allocator *allocator;
+
+ struct aws_task task;
+
+ struct aws_mqtt_request_response_client *rr_client;
+
+ enum aws_rr_subscription_event_type type;
+ struct aws_byte_buf topic_filter;
+ uint64_t operation_id;
+};
+
+static void s_aws_rr_subscription_status_event_task_delete(struct aws_rr_subscription_status_event_task *task) {
+ if (task == NULL) {
+ return;
+ }
+
+ aws_byte_buf_clean_up(&task->topic_filter);
+ aws_mqtt_request_response_client_release_internal(task->rr_client);
+
+ aws_mem_release(task->allocator, task);
+}
+
+static void s_on_request_operation_subscription_status_event(
+ struct aws_mqtt_rr_client_operation *operation,
+ struct aws_byte_cursor topic_filter,
+ enum aws_rr_subscription_event_type event_type) {
+ (void)topic_filter;
+
+ switch (event_type) {
+ case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
+ case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
+ s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE);
+ break;
+
+ case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
+ if (operation->state == AWS_MRROS_PENDING_SUBSCRIPTION) {
+ --operation->pending_subscriptions;
+ if (operation->pending_subscriptions == 0) {
+ s_change_operation_state(operation, AWS_MRROS_PENDING_RESPONSE);
+ s_make_mqtt_request(operation->client_internal_ref, operation);
+ }
+ }
+ break;
+
+ default:
+ AWS_FATAL_ASSERT(false);
+ }
+}
+
+static void s_on_streaming_operation_subscription_status_event(
+ struct aws_mqtt_rr_client_operation *operation,
+ struct aws_byte_cursor topic_filter,
+ enum aws_rr_subscription_event_type event_type) {
+ (void)topic_filter;
+
+ switch (event_type) {
+ case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
+ if (operation->state == AWS_MRROS_PENDING_SUBSCRIPTION) {
+ s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED);
+ }
+
+ s_streaming_operation_emit_streaming_subscription_event(
+ operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS);
+ break;
+
+ case ARRSET_STREAMING_SUBSCRIPTION_LOST:
+ s_streaming_operation_emit_streaming_subscription_event(
+ operation, ARRSSET_SUBSCRIPTION_LOST, AWS_ERROR_SUCCESS);
+ break;
+
+ case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
+ s_halt_streaming_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE);
+ break;
+
+ default:
+ AWS_FATAL_ASSERT(false);
+ }
+}
+
+static void s_handle_subscription_status_event_task(struct aws_task *task, void *arg, enum aws_task_status status) {
+ (void)task;
+
+ struct aws_rr_subscription_status_event_task *event_task = arg;
+
+ if (status == AWS_TASK_STATUS_CANCELED) {
+ goto done;
+ }
+
+ if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE || event_task->type == ARRSET_SUBSCRIPTION_EMPTY) {
+ s_mqtt_request_response_client_wake_service(event_task->rr_client);
+ goto done;
+ }
+
+ struct aws_hash_element *element = NULL;
+ if (aws_hash_table_find(&event_task->rr_client->operations, &event_task->operation_id, &element) ||
+ element == NULL) {
+ goto done;
+ }
+
+ struct aws_mqtt_rr_client_operation *operation = element->value;
+
+ switch (event_task->type) {
+ case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
+ case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
+ case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
+ s_on_request_operation_subscription_status_event(
+ operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type);
+ break;
+
+ case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
+ case ARRSET_STREAMING_SUBSCRIPTION_LOST:
+ case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
+ s_on_streaming_operation_subscription_status_event(
+ operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type);
+ break;
+
+ default:
+ break;
+ }
+
+done:
+
+ s_aws_rr_subscription_status_event_task_delete(event_task);
+}
+
+static struct aws_rr_subscription_status_event_task *s_aws_rr_subscription_status_event_task_new(
+ struct aws_allocator *allocator,
+ struct aws_mqtt_request_response_client *rr_client,
+ const struct aws_rr_subscription_status_event *event) {
+ struct aws_rr_subscription_status_event_task *task =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_status_event_task));
+
+ task->allocator = allocator;
+ task->type = event->type;
+ task->operation_id = event->operation_id;
+ task->rr_client = aws_mqtt_request_response_client_acquire_internal(rr_client);
+
+ aws_byte_buf_init_copy_from_cursor(&task->topic_filter, allocator, event->topic_filter);
+
+ aws_task_init(&task->task, s_handle_subscription_status_event_task, task, "SubscriptionStatusEventTask");
+
+ return task;
+}
+
+static void s_aws_rr_client_subscription_status_event_callback(
+ const struct aws_rr_subscription_status_event *event,
+ void *userdata) {
+ (void)event;
+ (void)userdata;
+
+ /*
+ * We must be on the event loop, but it's safer overall to process this event as a top-level event loop task. The
+ * subscription manager assumes that we won't call APIs on it while iterating subscription records and listeners.
+ *
+ * These tasks hold an internal reference while they exist.
+ */
+
+ struct aws_mqtt_request_response_client *rr_client = userdata;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
+ AWS_FATAL_ASSERT(rr_client->state != AWS_RRCS_SHUTTING_DOWN);
+
+ struct aws_rr_subscription_status_event_task *task =
+ s_aws_rr_subscription_status_event_task_new(rr_client->allocator, rr_client, event);
+
+ aws_event_loop_schedule_task_now(rr_client->loop, &task->task);
+}
+
+static void s_aws_rr_client_protocol_adapter_subscription_event_callback(
+ const struct aws_protocol_adapter_subscription_event *event,
+ void *user_data) {
+ struct aws_mqtt_request_response_client *rr_client = user_data;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
+
+ if (rr_client->state != AWS_RRCS_ACTIVE) {
+ return;
+ }
+
+ aws_rr_subscription_manager_on_protocol_adapter_subscription_event(&rr_client->subscription_manager, event);
+}
+
+static void s_apply_publish_to_streaming_operation_list(
+ const struct aws_linked_list *operations,
+ const struct aws_byte_cursor *topic_filter,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ void *user_data) {
+
+ AWS_FATAL_ASSERT(operations != NULL);
+
+ struct aws_mqtt_request_response_client *rr_client = user_data;
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on topic '" PRInSTR
+ "' matches streaming subscription on topic filter '" PRInSTR "'",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic),
+ AWS_BYTE_CURSOR_PRI(*topic_filter));
+
+ struct aws_linked_list_node *node = aws_linked_list_begin(operations);
+ while (node != aws_linked_list_end(operations)) {
+ struct aws_mqtt_rr_client_operation *operation =
+ AWS_CONTAINER_OF(node, struct aws_mqtt_rr_client_operation, node);
+ node = aws_linked_list_next(node);
+
+ if (operation->type != AWS_MRROT_STREAMING) {
+ continue;
+ }
+
+ if (operation->state == AWS_MRROS_PENDING_DESTROY || operation->state == AWS_MRROS_TERMINAL) {
+ continue;
+ }
+
+ aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback =
+ operation->storage.streaming_storage.options.incoming_publish_callback;
+ if (!incoming_publish_callback) {
+ continue;
+ }
+
+ void *operation_user_data = operation->storage.streaming_storage.options.user_data;
+ (*incoming_publish_callback)(publish_event, operation_user_data);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on topic '" PRInSTR
+ "' routed to streaming operation %" PRIu64,
+ (void *)operation->client_internal_ref,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic),
+ operation->id);
+ }
+}
+
+static void s_complete_operation_with_correlation_token(
+ struct aws_mqtt_request_response_client *rr_client,
+ struct aws_byte_cursor correlation_token,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event) {
+ struct aws_hash_element *hash_element = NULL;
+
+ if (aws_hash_table_find(&rr_client->operations_by_correlation_tokens, &correlation_token, &hash_element)) {
+ return;
+ }
+
+ if (hash_element == NULL) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on response path topic '" PRInSTR
+ "' and correlation token '" PRInSTR "' does not have an originating request entry",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic),
+ AWS_BYTE_CURSOR_PRI(correlation_token));
+ return;
+ }
+
+ struct aws_mqtt_rr_client_operation *operation = hash_element->value;
+ AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST);
+
+ if (operation->state == AWS_MRROS_PENDING_DESTROY) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response operation %" PRIu64 " cannot be completed, already in pending destruction state",
+ (void *)operation->client_internal_ref,
+ operation->id);
+ return;
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response operation %" PRIu64 " completed successfully",
+ (void *)operation->client_internal_ref,
+ operation->id);
+
+ aws_mqtt_request_operation_completion_fn *completion_callback =
+ operation->storage.request_storage.options.completion_callback;
+ void *user_data = operation->storage.request_storage.options.user_data;
+
+ if (completion_callback != NULL) {
+ (*completion_callback)(publish_event, AWS_ERROR_SUCCESS, user_data);
+ }
+
+ s_change_operation_state(operation, AWS_MRROS_PENDING_DESTROY);
+
+ aws_mqtt_rr_client_operation_release(operation);
+}
+
+static void s_apply_publish_to_response_path_entry(
+ struct aws_rr_response_path_entry *entry,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ void *user_data) {
+
+ struct aws_mqtt_request_response_client *rr_client = user_data;
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on topic '" PRInSTR "' matches response path",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic));
+
+ struct aws_json_value *json_payload = NULL;
+
+ struct aws_byte_cursor correlation_token;
+ AWS_ZERO_STRUCT(correlation_token);
+ struct aws_byte_cursor correlation_token_json_path = aws_byte_cursor_from_buf(&entry->correlation_token_json_path);
+ if (correlation_token_json_path.len > 0) {
+ json_payload = aws_json_value_new_from_string(rr_client->allocator, publish_event->payload);
+ if (json_payload == NULL) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on response path topic '" PRInSTR
+ "' could not be deserialized into JSON",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic));
+ return;
+ }
+
+ struct aws_byte_cursor segment;
+ AWS_ZERO_STRUCT(segment);
+
+ struct aws_json_value *correlation_token_entry = json_payload;
+ while (aws_byte_cursor_next_split(&correlation_token_json_path, '.', &segment)) {
+ if (!aws_json_value_is_object(correlation_token_entry)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on response path topic '" PRInSTR
+ "' unable to walk correlation token path '" PRInSTR "'",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic),
+ AWS_BYTE_CURSOR_PRI(correlation_token_json_path));
+ goto done;
+ }
+
+ correlation_token_entry = aws_json_value_get_from_object(correlation_token_entry, segment);
+ if (correlation_token_entry == NULL) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on response path topic '" PRInSTR
+ "' could not find path segment '" PRInSTR "'",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic),
+ AWS_BYTE_CURSOR_PRI(segment));
+ goto done;
+ }
+ }
+
+ if (!aws_json_value_is_string(correlation_token_entry)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on response path topic '" PRInSTR
+ "' token entry is not a string",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic));
+ goto done;
+ }
+
+ if (aws_json_value_get_string(correlation_token_entry, &correlation_token)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client incoming publish on response path topic '" PRInSTR
+ "' failed to extract string from token entry",
+ (void *)rr_client,
+ AWS_BYTE_CURSOR_PRI(publish_event->topic));
+ goto done;
+ }
+ }
+
+ s_complete_operation_with_correlation_token(rr_client, correlation_token, publish_event);
+
+done:
+
+ if (json_payload != NULL) {
+ aws_json_value_destroy(json_payload);
+ }
+}
+
+static void s_aws_rr_client_protocol_adapter_incoming_publish_callback(
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ void *user_data) {
+
+ struct aws_mqtt_request_response_client *rr_client = user_data;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
+
+ if (rr_client->state != AWS_RRCS_ACTIVE) {
+ return;
+ }
+
+ aws_mqtt_request_response_client_subscriptions_handle_incoming_publish(
+ &rr_client->subscriptions,
+ publish_event,
+ s_apply_publish_to_streaming_operation_list,
+ s_apply_publish_to_response_path_entry,
+ rr_client);
+}
+
+static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) {
+ struct aws_mqtt_request_response_client *rr_client = user_data;
+
+ /* release the internal ref count "held" by the protocol adapter's existence */
+ aws_ref_count_release(&rr_client->internal_ref_count);
+}
+
+static void s_aws_rr_client_protocol_adapter_connection_event_callback(
+ const struct aws_protocol_adapter_connection_event *event,
+ void *user_data) {
+ struct aws_mqtt_request_response_client *rr_client = user_data;
+
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
+
+ if (rr_client->state != AWS_RRCS_ACTIVE) {
+ return;
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client applying connection event to subscription manager",
+ (void *)rr_client);
+
+ aws_rr_subscription_manager_on_protocol_adapter_connection_event(&rr_client->subscription_manager, event);
+}
+
+static int s_compare_rr_operation_timeouts(const void *a, const void *b) {
+ const struct aws_mqtt_rr_client_operation **operation_a_ptr = (void *)a;
+ const struct aws_mqtt_rr_client_operation *operation_a = *operation_a_ptr;
+
+ const struct aws_mqtt_rr_client_operation **operation_b_ptr = (void *)b;
+ const struct aws_mqtt_rr_client_operation *operation_b = *operation_b_ptr;
+
+ if (operation_a->timeout_timepoint_ns < operation_b->timeout_timepoint_ns) {
+ return -1;
+ } else if (operation_a->timeout_timepoint_ns > operation_b->timeout_timepoint_ns) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(
+ struct aws_allocator *allocator,
+ const struct aws_mqtt_request_response_client_options *options,
+ struct aws_event_loop *loop) {
+ struct aws_rr_subscription_manager_options sm_options = {
+ .max_request_response_subscriptions = options->max_request_response_subscriptions,
+ .max_streaming_subscriptions = options->max_streaming_subscriptions,
+ .operation_timeout_seconds = options->operation_timeout_seconds,
+ };
+
+ /*
+ * We can't initialize the subscription manager until we're running on the event loop, so make sure that
+ * initialize can't fail by checking its options for validity now.
+ */
+ if (!aws_rr_subscription_manager_are_options_valid(&sm_options)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE, "(static) request response client creation failed - invalid client options");
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ return NULL;
+ }
+
+ struct aws_mqtt_request_response_client *rr_client =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client));
+
+ rr_client->allocator = allocator;
+ rr_client->config = *options;
+ rr_client->loop = loop;
+ rr_client->state = AWS_RRCS_UNINITIALIZED;
+
+ aws_hash_table_init(
+ &rr_client->operations,
+ allocator,
+ MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
+ aws_hash_uint64_t_by_identity,
+ aws_hash_compare_uint64_t_eq,
+ NULL,
+ NULL);
+
+ aws_priority_queue_init_dynamic(
+ &rr_client->operations_by_timeout,
+ allocator,
+ 100,
+ sizeof(struct aws_mqtt_rr_client_operation *),
+ s_compare_rr_operation_timeouts);
+
+ aws_mqtt_request_response_client_subscriptions_init(&rr_client->subscriptions, allocator);
+
+ aws_hash_table_init(
+ &rr_client->operations_by_correlation_tokens,
+ allocator,
+ MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
+ aws_hash_byte_cursor_ptr,
+ aws_mqtt_byte_cursor_hash_equality,
+ NULL,
+ NULL);
+
+ aws_linked_list_init(&rr_client->operation_queue);
+
+ aws_task_init(
+ &rr_client->external_shutdown_task,
+ s_mqtt_request_response_client_external_shutdown_task_fn,
+ rr_client,
+ "mqtt_rr_client_external_shutdown");
+
+ aws_task_init(
+ &rr_client->internal_shutdown_task,
+ s_mqtt_request_response_client_internal_shutdown_task_fn,
+ rr_client,
+ "mqtt_rr_client_internal_shutdown");
+
+ /* The initial external ref belongs to the caller */
+ aws_ref_count_init(&rr_client->external_ref_count, rr_client, s_aws_rr_client_on_zero_external_ref_count);
+
+ /* The initial internal ref belongs to ourselves (the external ref count shutdown task) */
+ aws_ref_count_init(&rr_client->internal_ref_count, rr_client, s_aws_rr_client_on_zero_internal_ref_count);
+
+ aws_atomic_store_int(&rr_client->next_id, 1);
+
+ return rr_client;
+}
+
+static void s_aws_rr_client_init_subscription_manager(
+ struct aws_mqtt_request_response_client *rr_client,
+ struct aws_allocator *allocator) {
+ AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
+
+ struct aws_rr_subscription_manager_options subscription_manager_options = {
+ .operation_timeout_seconds = rr_client->config.operation_timeout_seconds,
+ .max_request_response_subscriptions = rr_client->config.max_request_response_subscriptions,
+ .max_streaming_subscriptions = rr_client->config.max_streaming_subscriptions,
+ .subscription_status_callback = s_aws_rr_client_subscription_status_event_callback,
+ .userdata = rr_client,
+ };
+
+ aws_rr_subscription_manager_init(
+ &rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options);
+}
+
+static void s_check_for_operation_timeouts(struct aws_mqtt_request_response_client *client) {
+ uint64_t now = 0;
+ aws_high_res_clock_get_ticks(&now);
+
+ struct aws_priority_queue *timeout_queue = &client->operations_by_timeout;
+
+ bool done = aws_priority_queue_size(timeout_queue) == 0;
+ while (!done) {
+ struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL;
+ aws_priority_queue_top(timeout_queue, (void **)&next_operation_by_timeout_ptr);
+ AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
+ struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
+ AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);
+
+ // If the current top of the heap hasn't timed out than nothing has
+ if (next_operation_by_timeout->timeout_timepoint_ns > now) {
+ break;
+ }
+
+ /* Ack timeout for this operation has been reached */
+ aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout);
+
+ s_request_response_fail_operation(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT);
+
+ done = aws_priority_queue_size(timeout_queue) == 0;
+ }
+}
+
+static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_mqtt_request_response_client *client) {
+ if (aws_priority_queue_size(&client->operations_by_timeout) > 0) {
+ struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL;
+ aws_priority_queue_top(&client->operations_by_timeout, (void **)&next_operation_by_timeout_ptr);
+ AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
+ struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
+ AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);
+
+ return next_operation_by_timeout->timeout_timepoint_ns;
+ }
+
+ return UINT64_MAX;
+}
+
+static int s_add_streaming_operation_to_subscription_topic_filter_table(
+ struct aws_mqtt_request_response_client *client,
+ struct aws_mqtt_rr_client_operation *operation) {
+
+ struct aws_byte_cursor topic_filter_cursor = operation->storage.streaming_storage.options.topic_filter;
+
+ struct aws_rr_operation_list_topic_filter_entry *entry =
+ aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
+ &client->subscriptions, &topic_filter_cursor);
+ if (entry == NULL) {
+ return AWS_OP_ERR;
+ }
+
+ if (aws_linked_list_node_is_in_list(&operation->node)) {
+ aws_linked_list_remove(&operation->node);
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client adding streaming operation %" PRIu64
+ " to streaming subscription table with topic_filter '" PRInSTR "'",
+ (void *)client,
+ operation->id,
+ AWS_BYTE_CURSOR_PRI(topic_filter_cursor));
+
+ aws_linked_list_push_back(&entry->operations, &operation->node);
+
+ return AWS_OP_SUCCESS;
+}
+
+static int s_add_request_operation_to_response_path_table(
+ struct aws_mqtt_request_response_client *client,
+ struct aws_mqtt_rr_client_operation *operation) {
+
+ struct aws_array_list *paths = &operation->storage.request_storage.operation_response_paths;
+ size_t path_count = aws_array_list_length(paths);
+ for (size_t i = 0; i < path_count; ++i) {
+ struct aws_mqtt_request_operation_response_path path;
+ aws_array_list_get_at(paths, &path, i);
+ if (aws_mqtt_request_response_client_subscriptions_add_request_subscription(
+ &client->subscriptions, &path.topic, &path.correlation_token_json_path)) {
+ return AWS_OP_ERR;
+ }
+ }
+ return AWS_OP_SUCCESS;
+}
+
+static int s_add_request_operation_to_correlation_token_table(
+ struct aws_mqtt_request_response_client *client,
+ struct aws_mqtt_rr_client_operation *operation) {
+
+ return aws_hash_table_put(
+ &client->operations_by_correlation_tokens,
+ &operation->storage.request_storage.options.correlation_token,
+ operation,
+ NULL);
+}
+
+static int s_add_in_progress_operation_to_tracking_tables(
+ struct aws_mqtt_request_response_client *client,
+ struct aws_mqtt_rr_client_operation *operation) {
+ if (operation->type == AWS_MRROT_STREAMING) {
+ if (s_add_streaming_operation_to_subscription_topic_filter_table(client, operation)) {
+ return AWS_OP_ERR;
+ }
+ } else {
+ if (s_add_request_operation_to_response_path_table(client, operation)) {
+ return AWS_OP_ERR;
+ }
+
+ if (s_add_request_operation_to_correlation_token_table(client, operation)) {
+ return AWS_OP_ERR;
+ }
+ }
+
+ operation->in_client_tables = true;
+
+ return AWS_OP_SUCCESS;
+}
+
+static void s_handle_operation_subscribe_result(
+ struct aws_mqtt_request_response_client *client,
+ struct aws_mqtt_rr_client_operation *operation,
+ enum aws_acquire_subscription_result_type subscribe_result) {
+ if (subscribe_result == AASRT_FAILURE || subscribe_result == AASRT_NO_CAPACITY) {
+ int error_code = (subscribe_result == AASRT_NO_CAPACITY)
+ ? AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY
+ : AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE;
+ s_request_response_fail_operation(operation, error_code);
+ return;
+ }
+
+ if (s_add_in_progress_operation_to_tracking_tables(client, operation)) {
+ s_request_response_fail_operation(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR);
+ return;
+ }
+
+ if (subscribe_result == AASRT_SUBSCRIBING) {
+ s_change_operation_state(operation, AWS_MRROS_PENDING_SUBSCRIPTION);
+ return;
+ }
+
+ if (operation->type == AWS_MRROT_STREAMING) {
+ s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED);
+ s_streaming_operation_emit_streaming_subscription_event(
+ operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS);
+ } else {
+ s_make_mqtt_request(client, operation);
+ }
+}
+
+static enum aws_rr_subscription_type s_rr_operation_type_to_subscription_type(
+ enum aws_mqtt_request_response_operation_type type) {
+ if (type == AWS_MRROT_REQUEST) {
+ return ARRST_REQUEST_RESPONSE;
+ }
+
+ return ARRST_EVENT_STREAM;
+}
+
+static bool s_can_operation_dequeue(
+ struct aws_mqtt_request_response_client *client,
+ struct aws_mqtt_rr_client_operation *operation) {
+ if (operation->type != AWS_MRROT_REQUEST) {
+ return true;
+ }
+
+ struct aws_hash_element *token_element = NULL;
+ if (aws_hash_table_find(
+ &client->operations_by_correlation_tokens,
+ &operation->storage.request_storage.options.correlation_token,
+ &token_element)) {
+ return false;
+ }
+
+ return token_element == NULL;
+}
+
+static struct aws_byte_cursor *s_aws_mqtt_rr_operation_get_subscription_topic_filters(
+ struct aws_mqtt_rr_client_operation *operation) {
+ if (operation->type == AWS_MRROT_STREAMING) {
+ return &operation->storage.streaming_storage.options.topic_filter;
+ } else {
+ return operation->storage.request_storage.options.subscription_topic_filters;
+ }
+}
+
+static size_t s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(
+ struct aws_mqtt_rr_client_operation *operation) {
+ if (operation->type == AWS_MRROT_STREAMING) {
+ return 1;
+ } else {
+ return operation->storage.request_storage.options.subscription_topic_filter_count;
+ }
+}
+
+static void s_process_queued_operations(struct aws_mqtt_request_response_client *client) {
+ aws_rr_subscription_manager_purge_unused(&client->subscription_manager);
+
+ while (!aws_linked_list_empty(&client->operation_queue)) {
+ struct aws_linked_list_node *head = aws_linked_list_front(&client->operation_queue);
+ struct aws_mqtt_rr_client_operation *head_operation =
+ AWS_CONTAINER_OF(head, struct aws_mqtt_rr_client_operation, node);
+
+ if (!s_can_operation_dequeue(client, head_operation)) {
+ break;
+ }
+
+ struct aws_rr_acquire_subscription_options subscribe_options = {
+ .topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(head_operation),
+ .topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(head_operation),
+ .operation_id = head_operation->id,
+ .type = s_rr_operation_type_to_subscription_type(head_operation->type),
+ };
+
+ enum aws_acquire_subscription_result_type subscribe_result =
+ aws_rr_subscription_manager_acquire_subscription(&client->subscription_manager, &subscribe_options);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client intake, queued operation %" PRIu64
+ " yielded acquire subscription result: %s",
+ (void *)client,
+ head_operation->id,
+ s_aws_acquire_subscription_result_type(subscribe_result));
+
+ if (subscribe_result == AASRT_BLOCKED) {
+ break;
+ }
+
+ aws_linked_list_pop_front(&client->operation_queue);
+ s_handle_operation_subscribe_result(client, head_operation, subscribe_result);
+ }
+}
+
+static void s_mqtt_request_response_service_task_fn(
+ struct aws_task *task,
+ void *arg,
+ enum aws_task_status task_status) {
+ (void)task;
+
+ if (task_status == AWS_TASK_STATUS_CANCELED) {
+ return;
+ }
+
+ struct aws_mqtt_request_response_client *client = arg;
+ client->scheduled_service_timepoint_ns = 0;
+
+ if (client->state == AWS_RRCS_ACTIVE) {
+
+ // timeouts
+ s_check_for_operation_timeouts(client);
+
+ // operation queue
+ s_process_queued_operations(client);
+
+ // schedule next service
+ client->scheduled_service_timepoint_ns = s_mqtt_request_response_client_get_next_service_time(client);
+ aws_event_loop_schedule_task_future(
+ client->loop, &client->service_task, client->scheduled_service_timepoint_ns);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client service, next timepoint: %" PRIu64,
+ (void *)client,
+ client->scheduled_service_timepoint_ns);
+ }
+}
+
+static void s_mqtt_request_response_client_initialize_task_fn(
+ struct aws_task *task,
+ void *arg,
+ enum aws_task_status task_status) {
+ (void)task;
+
+ AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED);
+
+ struct aws_mqtt_request_response_client *client = arg;
+
+ if (client->state == AWS_RRCS_UNINITIALIZED) {
+ s_aws_rr_client_init_subscription_manager(client, client->allocator);
+
+ client->state = AWS_RRCS_ACTIVE;
+
+ aws_task_init(&client->service_task, s_mqtt_request_response_service_task_fn, client, "mqtt_rr_client_service");
+
+ aws_event_loop_schedule_task_future(client->loop, &client->service_task, UINT64_MAX);
+ client->scheduled_service_timepoint_ns = UINT64_MAX;
+ }
+
+ if (client->config.initialized_callback != NULL) {
+ (*client->config.initialized_callback)(client->config.user_data);
+ }
+
+ /* give up the internal ref we held while the task was pending */
+ aws_ref_count_release(&client->internal_ref_count);
+}
+
+static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client *rr_client) {
+ /* now that it exists, 1 internal ref belongs to protocol adapter termination */
+ aws_ref_count_acquire(&rr_client->internal_ref_count);
+
+ /* 1 internal ref belongs to the initialize task until it runs */
+ aws_ref_count_acquire(&rr_client->internal_ref_count);
+
+ aws_task_init(
+ &rr_client->initialize_task,
+ s_mqtt_request_response_client_initialize_task_fn,
+ rr_client,
+ "mqtt_rr_client_initialize");
+ aws_event_loop_schedule_task_now(rr_client->loop, &rr_client->initialize_task);
+}
+
+struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(
+ struct aws_allocator *allocator,
+ struct aws_mqtt_client_connection *client,
+ const struct aws_mqtt_request_response_client_options *options) {
+
+ struct aws_mqtt_request_response_client *rr_client =
+ s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client));
+
+ if (rr_client == NULL) {
+ return NULL;
+ }
+
+ struct aws_mqtt_protocol_adapter_options adapter_options = {
+ .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback,
+ .incoming_publish_callback = s_aws_rr_client_protocol_adapter_incoming_publish_callback,
+ .terminate_callback = s_aws_rr_client_protocol_adapter_terminate_callback,
+ .connection_event_callback = s_aws_rr_client_protocol_adapter_connection_event_callback,
+ .user_data = rr_client,
+ };
+
+ rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_311(rr_client->allocator, &adapter_options, client);
+ if (rr_client->client_adapter == NULL) {
+ goto error;
+ }
+
+ s_setup_cross_thread_initialization(rr_client);
+
+ return rr_client;
+
+error:
+
+ /* even on construction failures we still need to walk through the async shutdown process */
+ aws_mqtt_request_response_client_release(rr_client);
+
+ return NULL;
+}
+
+struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(
+ struct aws_allocator *allocator,
+ struct aws_mqtt5_client *client,
+ const struct aws_mqtt_request_response_client_options *options) {
+
+ struct aws_mqtt_request_response_client *rr_client =
+ s_aws_mqtt_request_response_client_new(allocator, options, client->loop);
+
+ if (rr_client == NULL) {
+ return NULL;
+ }
+
+ struct aws_mqtt_protocol_adapter_options adapter_options = {
+ .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback,
+ .incoming_publish_callback = s_aws_rr_client_protocol_adapter_incoming_publish_callback,
+ .terminate_callback = s_aws_rr_client_protocol_adapter_terminate_callback,
+ .connection_event_callback = s_aws_rr_client_protocol_adapter_connection_event_callback,
+ .user_data = rr_client,
+ };
+
+ rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_5(rr_client->allocator, &adapter_options, client);
+ if (rr_client->client_adapter == NULL) {
+ goto error;
+ }
+
+ s_setup_cross_thread_initialization(rr_client);
+
+ return rr_client;
+
+error:
+
+ /* even on construction failures we still need to walk through the async shutdown process */
+ aws_mqtt_request_response_client_release(rr_client);
+
+ return NULL;
+}
+
+struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(
+ struct aws_mqtt_request_response_client *client) {
+ if (client != NULL) {
+ aws_ref_count_acquire(&client->external_ref_count);
+ }
+
+ return client;
+}
+
+struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(
+ struct aws_mqtt_request_response_client *client) {
+ if (client != NULL) {
+ aws_ref_count_release(&client->external_ref_count);
+ }
+
+ return NULL;
+}
+
+/////////////////////////////////////////////////
+
+static bool s_are_request_operation_options_valid(
+ const struct aws_mqtt_request_response_client *client,
+ const struct aws_mqtt_request_operation_options *request_options) {
+ if (request_options == NULL) {
+ AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client - NULL request options", (void *)client);
+ return false;
+ }
+
+ if (request_options->response_path_count == 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "(%p) rr client request options - no response paths supplied",
+ (void *)client);
+ return false;
+ }
+
+ for (size_t i = 0; i < request_options->response_path_count; ++i) {
+ const struct aws_mqtt_request_operation_response_path *path = &request_options->response_paths[i];
+ if (!aws_mqtt_is_valid_topic(&path->topic)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "(%p) rr client request options - " PRInSTR " is not a valid topic",
+ (void *)client,
+ AWS_BYTE_CURSOR_PRI(path->topic));
+ return false;
+ }
+ }
+
+ if (!aws_mqtt_is_valid_topic(&request_options->publish_topic)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "(%p) rr client request options - " PRInSTR " is not a valid topic",
+ (void *)client,
+ AWS_BYTE_CURSOR_PRI(request_options->publish_topic));
+ return false;
+ }
+
+ if (request_options->subscription_topic_filter_count == 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "(%p) rr client request options - no subscription topic filters supplied",
+ (void *)client);
+ return false;
+ }
+
+ for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) {
+ const struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i];
+ if (!aws_mqtt_is_valid_topic_filter(&subscription_topic_filter)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "(%p) rr client request options - " PRInSTR " is not a valid subscription topic filter",
+ (void *)client,
+ AWS_BYTE_CURSOR_PRI(subscription_topic_filter));
+ return false;
+ }
+ }
+
+ if (request_options->serialized_request.len == 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty request payload", (void *)client);
+ return false;
+ }
+
+ return true;
+}
+
+static bool s_are_streaming_operation_options_valid(
+ struct aws_mqtt_request_response_client *client,
+ const struct aws_mqtt_streaming_operation_options *streaming_options) {
+ if (streaming_options == NULL) {
+ AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client - NULL streaming options", (void *)client);
+ return false;
+ }
+
+ if (!aws_mqtt_is_valid_topic_filter(&streaming_options->topic_filter)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "(%p) rr client streaming options - " PRInSTR " is not a valid topic filter",
+ (void *)client,
+ AWS_BYTE_CURSOR_PRI(streaming_options->topic_filter));
+ return false;
+ }
+
+ return true;
+}
+
+static uint64_t s_aws_mqtt_request_response_client_allocate_operation_id(
+ struct aws_mqtt_request_response_client *client) {
+ return aws_atomic_fetch_add(&client->next_id, 1);
+}
+
+static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg, enum aws_task_status status) {
+ (void)task;
+
+ struct aws_mqtt_rr_client_operation *operation = arg;
+ struct aws_mqtt_request_response_client *client = operation->client_internal_ref;
+
+ if (status == AWS_TASK_STATUS_CANCELED) {
+ goto done;
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client, queuing operation %" PRIu64,
+ (void *)client,
+ operation->id);
+
+ // add appropriate client table entries
+ aws_hash_table_put(&client->operations, &operation->id, operation, NULL);
+
+ // add to timeout priority queue
+ if (operation->type == AWS_MRROT_REQUEST) {
+ aws_priority_queue_push_ref(
+ &client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node);
+ }
+
+ // enqueue
+ aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node);
+
+ s_change_operation_state(operation, AWS_MRROS_QUEUED);
+
+ s_mqtt_request_response_client_wake_service(operation->client_internal_ref);
+
+done:
+
+ /*
+ * We hold a second reference to the operation during submission. This ensures that even if a streaming operation
+ * is immediately dec-refed by the creator (before submission completes), the operation will not get destroyed.
+ *
+ * It is now safe and correct to release that reference.
+ *
+ * After this, streaming operation lifetime is completely user-driven, while request operation lifetime is
+ * completely client-internal.
+ */
+ aws_mqtt_rr_client_operation_release(operation);
+}
+
+static void s_aws_mqtt_streaming_operation_storage_clean_up(struct aws_mqtt_streaming_operation_storage *storage) {
+ aws_byte_buf_clean_up(&storage->operation_data);
+}
+
+static void s_aws_mqtt_request_operation_storage_clean_up(struct aws_mqtt_request_operation_storage *storage) {
+ aws_array_list_clean_up(&storage->operation_response_paths);
+ aws_array_list_clean_up(&storage->subscription_topic_filters);
+ aws_byte_buf_clean_up(&storage->operation_data);
+}
+
+static void s_remove_operation_from_client_tables(struct aws_mqtt_rr_client_operation *operation) {
+ if (operation->type != AWS_MRROT_REQUEST) {
+ return;
+ }
+
+ if (!operation->in_client_tables) {
+ return;
+ }
+
+ struct aws_mqtt_request_response_client *client = operation->client_internal_ref;
+
+ aws_hash_table_remove(
+ &client->operations_by_correlation_tokens,
+ &operation->storage.request_storage.options.correlation_token,
+ NULL,
+ NULL);
+
+ struct aws_array_list *paths = &operation->storage.request_storage.operation_response_paths;
+
+ size_t path_count = aws_array_list_length(paths);
+ for (size_t i = 0; i < path_count; ++i) {
+ struct aws_mqtt_request_operation_response_path path;
+ aws_array_list_get_at(paths, &path, i);
+ if (aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
+ &client->subscriptions, &path.topic) == AWS_OP_ERR) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: internal state error removing reference to response path for topic " PRInSTR,
+ (void *)client,
+ AWS_BYTE_CURSOR_PRI(path.topic));
+ }
+ }
+}
+
+static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, enum aws_task_status status) {
+ (void)task;
+ (void)status;
+
+ struct aws_mqtt_rr_client_operation *operation = arg;
+ struct aws_mqtt_request_response_client *client = operation->client_internal_ref;
+
+ aws_hash_table_remove(&client->operations, &operation->id, NULL, NULL);
+ s_remove_operation_from_timeout_queue(operation);
+
+ if (aws_linked_list_node_is_in_list(&operation->node)) {
+ aws_linked_list_remove(&operation->node);
+ }
+
+ if (client->state != AWS_RRCS_SHUTTING_DOWN) {
+ struct aws_rr_release_subscription_options release_options = {
+ .topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(operation),
+ .topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(operation),
+ .operation_id = operation->id,
+ };
+ aws_rr_subscription_manager_release_subscription(&client->subscription_manager, &release_options);
+ }
+
+ s_remove_operation_from_client_tables(operation);
+
+ aws_mqtt_request_response_client_release_internal(operation->client_internal_ref);
+
+ if (operation->type == AWS_MRROT_STREAMING) {
+ s_aws_mqtt_streaming_operation_storage_clean_up(&operation->storage.streaming_storage);
+ } else {
+ s_aws_mqtt_request_operation_storage_clean_up(&operation->storage.request_storage);
+ }
+
+ aws_mqtt_streaming_operation_terminated_fn *terminated_callback = NULL;
+ void *terminated_user_data = NULL;
+ if (operation->type == AWS_MRROT_STREAMING) {
+ terminated_callback = operation->storage.streaming_storage.options.terminated_callback;
+ terminated_user_data = operation->storage.streaming_storage.options.user_data;
+ }
+
+ aws_mem_release(operation->allocator, operation);
+
+ if (terminated_callback != NULL) {
+ (*terminated_callback)(terminated_user_data);
+ }
+}
+
+static void s_on_mqtt_rr_client_operation_zero_ref_count(void *context) {
+ struct aws_mqtt_rr_client_operation *operation = context;
+
+ aws_event_loop_schedule_task_now(operation->client_internal_ref->loop, &operation->destroy_task);
+}
+
+static void s_aws_mqtt_rr_client_operation_init_shared(
+ struct aws_mqtt_rr_client_operation *operation,
+ struct aws_mqtt_request_response_client *client) {
+ operation->allocator = client->allocator;
+ aws_ref_count_init(&operation->ref_count, operation, s_on_mqtt_rr_client_operation_zero_ref_count);
+
+ operation->client_internal_ref = aws_mqtt_request_response_client_acquire_internal(client);
+ operation->id = s_aws_mqtt_request_response_client_allocate_operation_id(client);
+ s_change_operation_state(operation, AWS_MRROS_NONE);
+
+ aws_task_init(
+ &operation->submit_task,
+ s_mqtt_rr_client_submit_operation,
+ operation,
+ "MQTTRequestResponseClientOperationSubmit");
+ aws_task_init(
+ &operation->destroy_task,
+ s_mqtt_rr_client_destroy_operation,
+ operation,
+ "MQTTRequestResponseClientOperationDestroy");
+}
+
+void s_aws_mqtt_request_operation_storage_init_from_options(
+ struct aws_mqtt_request_operation_storage *storage,
+ struct aws_allocator *allocator,
+ const struct aws_mqtt_request_operation_options *request_options) {
+
+ size_t bytes_needed = 0;
+ bytes_needed += request_options->publish_topic.len;
+ bytes_needed += request_options->serialized_request.len;
+ bytes_needed += request_options->correlation_token.len;
+
+ for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) {
+ const struct aws_byte_cursor *subscription_topic_filter = &request_options->subscription_topic_filters[i];
+
+ bytes_needed += subscription_topic_filter->len;
+ }
+
+ for (size_t i = 0; i < request_options->response_path_count; ++i) {
+ const struct aws_mqtt_request_operation_response_path *response_path = &request_options->response_paths[i];
+
+ bytes_needed += response_path->topic.len;
+ bytes_needed += response_path->correlation_token_json_path.len;
+ }
+
+ storage->options = *request_options;
+
+ aws_byte_buf_init(&storage->operation_data, allocator, bytes_needed);
+ aws_array_list_init_dynamic(
+ &storage->operation_response_paths,
+ allocator,
+ request_options->response_path_count,
+ sizeof(struct aws_mqtt_request_operation_response_path));
+ aws_array_list_init_dynamic(
+ &storage->subscription_topic_filters,
+ allocator,
+ request_options->subscription_topic_filter_count,
+ sizeof(struct aws_byte_cursor));
+
+ AWS_FATAL_ASSERT(
+ aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.publish_topic) == AWS_OP_SUCCESS);
+ AWS_FATAL_ASSERT(
+ aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.serialized_request) ==
+ AWS_OP_SUCCESS);
+ AWS_FATAL_ASSERT(
+ aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.correlation_token) ==
+ AWS_OP_SUCCESS);
+
+ for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) {
+ struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i];
+
+ AWS_FATAL_ASSERT(
+ aws_byte_buf_append_and_update(&storage->operation_data, &subscription_topic_filter) == AWS_OP_SUCCESS);
+
+ aws_array_list_push_back(&storage->subscription_topic_filters, &subscription_topic_filter);
+ }
+
+ storage->options.subscription_topic_filters = storage->subscription_topic_filters.data;
+
+ for (size_t i = 0; i < request_options->response_path_count; ++i) {
+ struct aws_mqtt_request_operation_response_path response_path = request_options->response_paths[i];
+
+ AWS_FATAL_ASSERT(
+ aws_byte_buf_append_and_update(&storage->operation_data, &response_path.topic) == AWS_OP_SUCCESS);
+ AWS_FATAL_ASSERT(
+ aws_byte_buf_append_and_update(&storage->operation_data, &response_path.correlation_token_json_path) ==
+ AWS_OP_SUCCESS);
+
+ aws_array_list_push_back(&storage->operation_response_paths, &response_path);
+ }
+
+ storage->options.response_paths = storage->operation_response_paths.data;
+}
+
+static void s_log_request_response_operation(
+ struct aws_mqtt_rr_client_operation *operation,
+ struct aws_mqtt_request_response_client *client) {
+ struct aws_logger *log_handle = aws_logger_get_conditional(AWS_LS_MQTT_REQUEST_RESPONSE, AWS_LL_DEBUG);
+ if (log_handle == NULL) {
+ return;
+ }
+
+ struct aws_mqtt_request_operation_options *options = &operation->storage.request_storage.options;
+
+ for (size_t i = 0; i < options->subscription_topic_filter_count; ++i) {
+ struct aws_byte_cursor subscription_topic_filter = options->subscription_topic_filters[i];
+
+ AWS_LOGUF(
+ log_handle,
+ AWS_LL_DEBUG,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client operation %" PRIu64 " - subscription topic filter %zu topic '" PRInSTR "'",
+ (void *)client,
+ operation->id,
+ i,
+ AWS_BYTE_CURSOR_PRI(subscription_topic_filter));
+ }
+
+ AWS_LOGUF(
+ log_handle,
+ AWS_LL_DEBUG,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client operation %" PRIu64 " - correlation token: '" PRInSTR "'",
+ (void *)client,
+ operation->id,
+ AWS_BYTE_CURSOR_PRI(options->correlation_token));
+
+ AWS_LOGUF(
+ log_handle,
+ AWS_LL_DEBUG,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client operation %" PRIu64 " - publish topic: '" PRInSTR "'",
+ (void *)client,
+ operation->id,
+ AWS_BYTE_CURSOR_PRI(options->publish_topic));
+
+ AWS_LOGUF(
+ log_handle,
+ AWS_LL_DEBUG,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client operation %" PRIu64 " - %zu response paths:",
+ (void *)client,
+ operation->id,
+ options->response_path_count);
+
+ for (size_t i = 0; i < options->response_path_count; ++i) {
+ struct aws_mqtt_request_operation_response_path *response_path = &options->response_paths[i];
+
+ AWS_LOGUF(
+ log_handle,
+ AWS_LL_DEBUG,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client operation %" PRIu64 " - response path %zu topic '" PRInSTR "'",
+ (void *)client,
+ operation->id,
+ i,
+ AWS_BYTE_CURSOR_PRI(response_path->topic));
+
+ AWS_LOGUF(
+ log_handle,
+ AWS_LL_DEBUG,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client operation %" PRIu64 " - response path %zu correlation token path '" PRInSTR
+ "'",
+ (void *)client,
+ operation->id,
+ i,
+ AWS_BYTE_CURSOR_PRI(response_path->correlation_token_json_path));
+ }
+}
+
+int aws_mqtt_request_response_client_submit_request(
+ struct aws_mqtt_request_response_client *client,
+ const struct aws_mqtt_request_operation_options *request_options) {
+
+ if (client == NULL) {
+ return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ }
+
+ if (!s_are_request_operation_options_valid(client, request_options)) {
+ /* all failure cases have logged the problem already */
+ return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ }
+
+ uint64_t now = 0;
+ if (aws_high_res_clock_get_ticks(&now)) {
+ return aws_raise_error(AWS_ERROR_CLOCK_FAILURE);
+ }
+
+ struct aws_allocator *allocator = client->allocator;
+ struct aws_mqtt_rr_client_operation *operation =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation));
+ operation->allocator = allocator;
+ operation->type = AWS_MRROT_REQUEST;
+ operation->timeout_timepoint_ns =
+ now +
+ aws_timestamp_convert(client->config.operation_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
+ operation->pending_subscriptions = request_options->subscription_topic_filter_count;
+
+ s_aws_mqtt_request_operation_storage_init_from_options(
+ &operation->storage.request_storage, allocator, request_options);
+ s_aws_mqtt_rr_client_operation_init_shared(operation, client);
+
+ AWS_LOGF_INFO(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client - submitting request-response operation with id %" PRIu64,
+ (void *)client,
+ operation->id);
+
+ s_log_request_response_operation(operation, client);
+
+ /*
+ * We hold a second reference to the operation during submission. This ensures that even if a streaming operation
+ * is immediately dec-refed by the creator (before submission runs), the operation will not get destroyed.
+ */
+ aws_mqtt_rr_client_operation_acquire(operation);
+
+ aws_event_loop_schedule_task_now(client->loop, &operation->submit_task);
+
+ return AWS_OP_SUCCESS;
+}
+
+void s_aws_mqtt_streaming_operation_storage_init_from_options(
+ struct aws_mqtt_streaming_operation_storage *storage,
+ struct aws_allocator *allocator,
+ const struct aws_mqtt_streaming_operation_options *streaming_options) {
+ size_t bytes_needed = streaming_options->topic_filter.len;
+
+ storage->options = *streaming_options;
+ aws_byte_buf_init(&storage->operation_data, allocator, bytes_needed);
+
+ AWS_FATAL_ASSERT(
+ aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.topic_filter) == AWS_OP_SUCCESS);
+
+ aws_atomic_init_int(&storage->activated, 0);
+}
+
+static void s_log_streaming_operation(
+ struct aws_mqtt_rr_client_operation *operation,
+ struct aws_mqtt_request_response_client *client) {
+ struct aws_logger *log_handle = aws_logger_get_conditional(AWS_LS_MQTT_REQUEST_RESPONSE, AWS_LL_DEBUG);
+ if (log_handle == NULL) {
+ return;
+ }
+
+ struct aws_mqtt_streaming_operation_options *options = &operation->storage.streaming_storage.options;
+
+ AWS_LOGUF(
+ log_handle,
+ AWS_LL_DEBUG,
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client streaming operation %" PRIu64 ": topic filter: '" PRInSTR "'",
+ (void *)client,
+ operation->id,
+ AWS_BYTE_CURSOR_PRI(options->topic_filter));
+}
+
+struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_streaming_operation(
+ struct aws_mqtt_request_response_client *client,
+ const struct aws_mqtt_streaming_operation_options *streaming_options) {
+
+ if (client == NULL) {
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ return NULL;
+ }
+
+ if (!s_are_streaming_operation_options_valid(client, streaming_options)) {
+ /* all failure cases have logged the problem already */
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
+ return NULL;
+ }
+
+ struct aws_allocator *allocator = client->allocator;
+ struct aws_mqtt_rr_client_operation *operation =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation));
+ operation->allocator = allocator;
+ operation->type = AWS_MRROT_STREAMING;
+ operation->timeout_timepoint_ns = UINT64_MAX;
+ operation->pending_subscriptions = 1;
+
+ s_aws_mqtt_streaming_operation_storage_init_from_options(
+ &operation->storage.streaming_storage, allocator, streaming_options);
+ s_aws_mqtt_rr_client_operation_init_shared(operation, client);
+
+ AWS_LOGF_INFO(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client - submitting streaming operation with id %" PRIu64,
+ (void *)client,
+ operation->id);
+
+ s_log_streaming_operation(operation, client);
+
+ return operation;
+}
+
+int aws_mqtt_rr_client_operation_activate(struct aws_mqtt_rr_client_operation *operation) {
+ struct aws_atomic_var *activated = &operation->storage.streaming_storage.activated;
+ size_t unactivated = 0;
+ if (!aws_atomic_compare_exchange_int(activated, &unactivated, 1)) {
+ return aws_raise_error(AWS_ERROR_MQTT_REUQEST_RESPONSE_STREAM_ALREADY_ACTIVATED);
+ }
+
+ struct aws_mqtt_request_response_client *rr_client = operation->client_internal_ref;
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "id=%p: request-response client - activating streaming operation with id %" PRIu64,
+ (void *)rr_client,
+ operation->id);
+
+ /*
+ * We hold a second reference to the operation during submission. This ensures that even if a streaming operation
+ * is immediately dec-refed by the creator (before submission runs), the operation will not get destroyed.
+ */
+ aws_mqtt_rr_client_operation_acquire(operation);
+
+ aws_event_loop_schedule_task_now(rr_client->loop, &operation->submit_task);
+
+ return AWS_OP_SUCCESS;
+}
+
+struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(
+ struct aws_mqtt_rr_client_operation *operation) {
+ if (operation != NULL) {
+ aws_ref_count_acquire(&operation->ref_count);
+ }
+
+ return operation;
+}
+
+struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release(
+ struct aws_mqtt_rr_client_operation *operation) {
+ if (operation != NULL) {
+ aws_ref_count_release(&operation->ref_count);
+ }
+
+ return NULL;
+}
+
+struct aws_event_loop *aws_mqtt_request_response_client_get_event_loop(
+ struct aws_mqtt_request_response_client *client) {
+
+ return aws_mqtt_protocol_adapter_get_event_loop(client->client_adapter);
+}
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_subscription_set.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_subscription_set.c
new file mode 100644
index 00000000000..319410c3d4b
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/request_response_subscription_set.c
@@ -0,0 +1,336 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/private/request-response/request_response_subscription_set.h>
+
+#include <aws/mqtt/mqtt.h>
+#include <aws/mqtt/private/client_impl_shared.h>
+#include <aws/mqtt/request-response/request_response_client.h>
+
+#define MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE 50
+#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50
+
+static void s_aws_rr_operation_list_topic_filter_entry_destroy(struct aws_rr_operation_list_topic_filter_entry *entry) {
+ if (entry == NULL) {
+ return;
+ }
+
+ aws_byte_buf_clean_up(&entry->topic_filter);
+
+ aws_mem_release(entry->allocator, entry);
+}
+
+static void s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy(void *value) {
+ s_aws_rr_operation_list_topic_filter_entry_destroy(value);
+}
+
+static struct aws_rr_response_path_entry *s_aws_rr_response_path_entry_new(
+ struct aws_allocator *allocator,
+ struct aws_byte_cursor topic,
+ struct aws_byte_cursor correlation_token_json_path) {
+ struct aws_rr_response_path_entry *entry = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_response_path_entry));
+
+ entry->allocator = allocator;
+ entry->ref_count = 1;
+ aws_byte_buf_init_copy_from_cursor(&entry->topic, allocator, topic);
+ entry->topic_cursor = aws_byte_cursor_from_buf(&entry->topic);
+
+ aws_byte_buf_init_copy_from_cursor(&entry->correlation_token_json_path, allocator, correlation_token_json_path);
+
+ return entry;
+}
+
+static void s_aws_rr_response_path_entry_destroy(struct aws_rr_response_path_entry *entry) {
+ if (entry == NULL) {
+ return;
+ }
+
+ aws_byte_buf_clean_up(&entry->topic);
+ aws_byte_buf_clean_up(&entry->correlation_token_json_path);
+
+ aws_mem_release(entry->allocator, entry);
+}
+
+static void s_aws_rr_response_path_table_hash_element_destroy(void *value) {
+ s_aws_rr_response_path_entry_destroy(value);
+}
+
+int aws_mqtt_request_response_client_subscriptions_init(
+ struct aws_request_response_subscriptions *subscriptions,
+ struct aws_allocator *allocator) {
+ AWS_FATAL_ASSERT(subscriptions);
+
+ subscriptions->allocator = allocator;
+
+ if (aws_hash_table_init(
+ &subscriptions->streaming_operation_subscription_lists,
+ allocator,
+ MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
+ aws_hash_byte_cursor_ptr,
+ aws_mqtt_byte_cursor_hash_equality,
+ NULL,
+ s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) {
+ goto clean_up;
+ }
+
+ if (aws_hash_table_init(
+ &subscriptions->streaming_operation_wildcards_subscription_lists,
+ allocator,
+ MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
+ aws_hash_byte_cursor_ptr,
+ aws_mqtt_byte_cursor_hash_equality,
+ NULL,
+ s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) {
+ goto clean_up;
+ }
+
+ if (aws_hash_table_init(
+ &subscriptions->request_response_paths,
+ allocator,
+ MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE,
+ aws_hash_byte_cursor_ptr,
+ aws_mqtt_byte_cursor_hash_equality,
+ NULL,
+ s_aws_rr_response_path_table_hash_element_destroy)) {
+ goto clean_up;
+ }
+
+ return AWS_OP_SUCCESS;
+
+clean_up:
+ aws_mqtt_request_response_client_subscriptions_clean_up(subscriptions);
+ return AWS_OP_ERR;
+}
+
+void aws_mqtt_request_response_client_subscriptions_clean_up(struct aws_request_response_subscriptions *subscriptions) {
+ if (subscriptions == NULL) {
+ return;
+ }
+
+ if (aws_hash_table_is_valid(&subscriptions->streaming_operation_subscription_lists)) {
+ aws_hash_table_clean_up(&subscriptions->streaming_operation_subscription_lists);
+ }
+ if (aws_hash_table_is_valid(&subscriptions->streaming_operation_wildcards_subscription_lists)) {
+ aws_hash_table_clean_up(&subscriptions->streaming_operation_wildcards_subscription_lists);
+ }
+ if (aws_hash_table_is_valid(&subscriptions->request_response_paths)) {
+ aws_hash_table_clean_up(&subscriptions->request_response_paths);
+ }
+}
+
+static struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_topic_filter_entry_new(
+ struct aws_allocator *allocator,
+ struct aws_byte_cursor topic_filter) {
+ struct aws_rr_operation_list_topic_filter_entry *entry =
+ aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_operation_list_topic_filter_entry));
+
+ entry->allocator = allocator;
+ aws_byte_buf_init_copy_from_cursor(&entry->topic_filter, allocator, topic_filter);
+ entry->topic_filter_cursor = aws_byte_cursor_from_buf(&entry->topic_filter);
+
+ aws_linked_list_init(&entry->operations);
+
+ return entry;
+}
+
+static bool s_is_topic_filter_with_wildcard(const struct aws_byte_cursor *topic_filter) {
+ bool res = (memchr(topic_filter->ptr, '+', topic_filter->len) || memchr(topic_filter->ptr, '#', topic_filter->len));
+ return res;
+}
+
+struct aws_rr_operation_list_topic_filter_entry *aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
+ struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_byte_cursor *topic_filter) {
+ AWS_FATAL_ASSERT(subscriptions);
+
+ struct aws_hash_table *subscription_lists = s_is_topic_filter_with_wildcard(topic_filter)
+ ? &subscriptions->streaming_operation_wildcards_subscription_lists
+ : &subscriptions->streaming_operation_subscription_lists;
+
+ struct aws_hash_element *element = NULL;
+ if (aws_hash_table_find(subscription_lists, topic_filter, &element)) {
+ aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR);
+ return NULL;
+ }
+
+ struct aws_rr_operation_list_topic_filter_entry *entry = NULL;
+ if (element == NULL) {
+ entry = s_aws_rr_operation_list_topic_filter_entry_new(subscriptions->allocator, *topic_filter);
+ aws_hash_table_put(subscription_lists, &entry->topic_filter_cursor, entry, NULL);
+ } else {
+ entry = element->value;
+ }
+
+ AWS_FATAL_ASSERT(entry != NULL);
+
+ return entry;
+}
+
+int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
+ struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_byte_cursor *topic_filter,
+ const struct aws_byte_cursor *correlation_token_json_path) {
+ struct aws_hash_element *element = NULL;
+ if (aws_hash_table_find(&subscriptions->request_response_paths, topic_filter, &element)) {
+ return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR);
+ }
+
+ if (element != NULL) {
+ struct aws_rr_response_path_entry *entry = element->value;
+ ++entry->ref_count;
+ return AWS_OP_SUCCESS;
+ }
+
+ struct aws_rr_response_path_entry *entry =
+ s_aws_rr_response_path_entry_new(subscriptions->allocator, *topic_filter, *correlation_token_json_path);
+ if (aws_hash_table_put(&subscriptions->request_response_paths, &entry->topic_cursor, entry, NULL)) {
+ s_aws_rr_response_path_entry_destroy(entry);
+ return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR);
+ }
+
+ return AWS_OP_SUCCESS;
+}
+
+int aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
+ struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_byte_cursor *topic_filter) {
+
+ AWS_FATAL_ASSERT(subscriptions);
+ AWS_FATAL_ASSERT(topic_filter);
+
+ struct aws_hash_element *element = NULL;
+ if (aws_hash_table_find(&subscriptions->request_response_paths, topic_filter, &element) || element == NULL) {
+ return AWS_OP_ERR;
+ }
+
+ struct aws_rr_response_path_entry *entry = element->value;
+ --entry->ref_count;
+
+ if (entry->ref_count == 0) {
+ aws_hash_table_remove(&subscriptions->request_response_paths, topic_filter, NULL, NULL);
+ }
+
+ return AWS_OP_SUCCESS;
+}
+
+static void s_match_stream_subscriptions(
+ const struct aws_hash_table *subscriptions,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
+ void *user_data) {
+ struct aws_hash_element *subscription_filter_element = NULL;
+ if (aws_hash_table_find(subscriptions, &publish_event->topic, &subscription_filter_element) == AWS_OP_SUCCESS &&
+ subscription_filter_element != NULL) {
+
+ struct aws_rr_operation_list_topic_filter_entry *entry = subscription_filter_element->value;
+ on_stream_operation_subscription_match(
+ &entry->operations, &entry->topic_filter_cursor, publish_event, user_data);
+ }
+}
+
+static bool s_is_topic_matched_to_subscription(
+ struct aws_byte_cursor topic,
+ struct aws_byte_cursor topic_filter_cursor) {
+ bool is_matched = true;
+ bool multi_level_wildcard = false;
+
+ struct aws_byte_cursor subscription_topic_filter_segment;
+ AWS_ZERO_STRUCT(subscription_topic_filter_segment);
+
+ struct aws_byte_cursor topic_segment;
+ AWS_ZERO_STRUCT(topic_segment);
+
+ while (aws_byte_cursor_next_split(&topic_filter_cursor, '/', &subscription_topic_filter_segment)) {
+ if (!aws_byte_cursor_next_split(&topic, '/', &topic_segment)) {
+ is_matched = false;
+ break;
+ }
+
+ if (aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "#")) {
+ multi_level_wildcard = true;
+ is_matched = true;
+ break;
+ }
+
+ if (!aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "+") &&
+ !aws_byte_cursor_eq(&topic_segment, &subscription_topic_filter_segment)) {
+ is_matched = false;
+ break;
+ }
+ }
+ if (!multi_level_wildcard && aws_byte_cursor_next_split(&topic, '/', &topic_segment)) {
+ is_matched = false;
+ }
+
+ return is_matched;
+}
+
+static void s_match_wildcard_stream_subscriptions(
+ const struct aws_hash_table *subscriptions,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
+ void *user_data) {
+
+ /*
+ * Incoming event's topic is checked against each registered stream with wildcard. While this approach is far from
+ * optimal, it should be sufficient for request-response client where not many subscriptions with wildcards are
+ * used.
+ */
+ for (struct aws_hash_iter iter = aws_hash_iter_begin(subscriptions); !aws_hash_iter_done(&iter);
+ aws_hash_iter_next(&iter)) {
+ struct aws_rr_operation_list_topic_filter_entry *entry = iter.element.value;
+ struct aws_byte_cursor topic_filter_cursor = entry->topic_filter_cursor;
+
+ bool is_matched = s_is_topic_matched_to_subscription(publish_event->topic, topic_filter_cursor);
+ if (is_matched) {
+ on_stream_operation_subscription_match(
+ &entry->operations, &entry->topic_filter_cursor, publish_event, user_data);
+ }
+ }
+}
+
+void s_match_request_response_subscriptions(
+ const struct aws_hash_table *request_response_paths,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match,
+ void *user_data) {
+
+ struct aws_hash_element *response_path_element = NULL;
+ if (aws_hash_table_find(request_response_paths, &publish_event->topic, &response_path_element) == AWS_OP_SUCCESS &&
+ response_path_element != NULL) {
+
+ on_request_operation_subscription_match(response_path_element->value, publish_event, user_data);
+ }
+}
+
+void aws_mqtt_request_response_client_subscriptions_handle_incoming_publish(
+ const struct aws_request_response_subscriptions *subscriptions,
+ const struct aws_mqtt_rr_incoming_publish_event *publish_event,
+ aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
+ aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match,
+ void *user_data) {
+
+ AWS_FATAL_PRECONDITION(subscriptions);
+ AWS_FATAL_PRECONDITION(publish_event);
+ AWS_FATAL_PRECONDITION(on_stream_operation_subscription_match);
+ AWS_FATAL_PRECONDITION(on_request_operation_subscription_match);
+
+ /* Streaming operation handling */
+ s_match_stream_subscriptions(
+ &subscriptions->streaming_operation_subscription_lists,
+ publish_event,
+ on_stream_operation_subscription_match,
+ user_data);
+
+ s_match_wildcard_stream_subscriptions(
+ &subscriptions->streaming_operation_wildcards_subscription_lists,
+ publish_event,
+ on_stream_operation_subscription_match,
+ user_data);
+
+ /* Request-Response handling */
+ s_match_request_response_subscriptions(
+ &subscriptions->request_response_paths, publish_event, on_request_operation_subscription_match, user_data);
+}
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/request-response/subscription_manager.c b/contrib/restricted/aws/aws-c-mqtt/source/request-response/subscription_manager.c
new file mode 100644
index 00000000000..88cc542873e
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-mqtt/source/request-response/subscription_manager.c
@@ -0,0 +1,822 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/mqtt/private/request-response/subscription_manager.h>
+
+#include <aws/common/logging.h>
+#include <aws/mqtt/private/client_impl_shared.h>
+#include <aws/mqtt/private/request-response/protocol_adapter.h>
+
+#include <inttypes.h>
+
+enum aws_rr_subscription_status_type {
+ ARRSST_SUBSCRIBED,
+ ARRSST_NOT_SUBSCRIBED,
+};
+
+/*
+ * Invariant: subscriptions can only transition from nothing -> {subscribing, unsubscribing}
+ *
+ * In particular, the logic blocks subscribing while unsubscribing and unsubscribing while subscribing (unless
+ * shutting down).
+ */
+enum aws_rr_subscription_pending_action_type {
+ ARRSPAT_NOTHING,
+ ARRSPAT_SUBSCRIBING,
+ ARRSPAT_UNSUBSCRIBING,
+};
+
+struct aws_rr_subscription_listener {
+ struct aws_allocator *allocator;
+ uint64_t operation_id;
+};
+
+static uint64_t s_aws_hash_subscription_listener(const void *item) {
+ const struct aws_rr_subscription_listener *listener = item;
+
+ return listener->operation_id;
+}
+
+static bool s_aws_subscription_listener_hash_equality(const void *a, const void *b) {
+ const struct aws_rr_subscription_listener *a_listener = a;
+ const struct aws_rr_subscription_listener *b_listener = b;
+
+ return a_listener->operation_id == b_listener->operation_id;
+}
+
+static void s_aws_subscription_listener_destroy(void *element) {
+ struct aws_rr_subscription_listener *listener = element;
+
+ aws_mem_release(listener->allocator, listener);
+}
+
+struct aws_rr_subscription_record {
+ struct aws_allocator *allocator;
+
+ struct aws_byte_buf topic_filter;
+ struct aws_byte_cursor topic_filter_cursor;
+
+ struct aws_hash_table listeners;
+
+ enum aws_rr_subscription_status_type status;
+ enum aws_rr_subscription_pending_action_type pending_action;
+
+ enum aws_rr_subscription_type type;
+
+ /*
+ * A poisoned record represents a subscription that we will never try to subscribe to because a previous
+ * attempt resulted in a failure that we judge to be "terminal." Terminal failures include permission failures
+ * and validation failures. To remove a poisoned record, all listeners must be removed. For request-response
+ * operations this will happen naturally. For streaming operations, the operation must be closed by the user (in
+ * response to the user-facing event we emit on the streaming operation when the failure that poisons the
+ * record occurs).
+ */
+ bool poisoned;
+};
+
+static void s_aws_rr_subscription_record_destroy(void *element) {
+ struct aws_rr_subscription_record *record = element;
+
+ aws_byte_buf_clean_up(&record->topic_filter);
+ aws_hash_table_clean_up(&record->listeners);
+
+ aws_mem_release(record->allocator, record);
+}
+
+static struct aws_rr_subscription_record *s_aws_rr_subscription_new(
+ struct aws_allocator *allocator,
+ struct aws_byte_cursor topic_filter,
+ enum aws_rr_subscription_type type) {
+ struct aws_rr_subscription_record *record = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_record));
+ record->allocator = allocator;
+
+ aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, topic_filter);
+ record->topic_filter_cursor = aws_byte_cursor_from_buf(&record->topic_filter);
+
+ aws_hash_table_init(
+ &record->listeners,
+ allocator,
+ 4,
+ s_aws_hash_subscription_listener,
+ s_aws_subscription_listener_hash_equality,
+ NULL,
+ s_aws_subscription_listener_destroy);
+
+ record->status = ARRSST_NOT_SUBSCRIBED;
+ record->pending_action = ARRSPAT_NOTHING;
+
+ record->type = type;
+
+ return record;
+}
+
+static void s_subscription_record_unsubscribe(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_rr_subscription_record *record,
+ bool shutdown) {
+
+ bool currently_subscribed = record->status == ARRSST_SUBSCRIBED;
+ bool currently_subscribing = record->pending_action == ARRSPAT_SUBSCRIBING;
+ bool currently_unsubscribing = record->pending_action == ARRSPAT_UNSUBSCRIBING;
+
+ /*
+ * The difference between a shutdown unsubscribe and a normal unsubscribe is that on a shutdown we will "chase"
+ * a pending subscribe with an unsubscribe (breaking the invariant of never having multiple MQTT operations
+ * pending on a subscription).
+ */
+ bool should_unsubscribe = currently_subscribed && !currently_unsubscribing;
+ if (shutdown) {
+ should_unsubscribe = should_unsubscribe || currently_subscribing;
+ }
+
+ if (!should_unsubscribe) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - subscription ('" PRInSTR
+ "') has no listeners but is not in a state that allows unsubscribe yet",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor));
+ return;
+ }
+
+ struct aws_protocol_adapter_unsubscribe_options unsubscribe_options = {
+ .topic_filter = record->topic_filter_cursor,
+ .ack_timeout_seconds = manager->config.operation_timeout_seconds,
+ };
+
+ if (aws_mqtt_protocol_adapter_unsubscribe(manager->protocol_adapter, &unsubscribe_options)) {
+ int error_code = aws_last_error();
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - sync unsubscribe failure for ('" PRInSTR "'), ec %d(%s)",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor),
+ error_code,
+ aws_error_debug_str(error_code));
+ return;
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - unsubscribe submitted for ('" PRInSTR "')",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor));
+
+ record->pending_action = ARRSPAT_UNSUBSCRIBING;
+}
+
+/* Only called by the request-response client when shutting down */
+static int s_rr_subscription_clean_up_foreach_wrap(void *context, struct aws_hash_element *elem) {
+ struct aws_rr_subscription_manager *manager = context;
+ struct aws_rr_subscription_record *subscription = elem->value;
+
+ s_subscription_record_unsubscribe(manager, subscription, true);
+ s_aws_rr_subscription_record_destroy(subscription);
+
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE;
+}
+
+static struct aws_rr_subscription_record *s_get_subscription_record(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_byte_cursor topic_filter) {
+ struct aws_rr_subscription_record *subscription = NULL;
+ struct aws_hash_element *element = NULL;
+ if (aws_hash_table_find(&manager->subscriptions, &topic_filter, &element)) {
+ return NULL;
+ }
+
+ if (element != NULL) {
+ subscription = element->value;
+ }
+
+ return subscription;
+}
+
+struct aws_subscription_stats {
+ size_t request_response_subscriptions;
+ size_t event_stream_subscriptions;
+ size_t unsubscribing_event_stream_subscriptions;
+};
+
+static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_element *elem) {
+ const struct aws_rr_subscription_record *subscription = elem->value;
+ struct aws_subscription_stats *stats = context;
+
+ if (subscription->type == ARRST_EVENT_STREAM) {
+ ++stats->event_stream_subscriptions;
+ if (subscription->pending_action == ARRSPAT_UNSUBSCRIBING) {
+ ++stats->unsubscribing_event_stream_subscriptions;
+ }
+ } else {
+ ++stats->request_response_subscriptions;
+ }
+
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
+}
+
+static void s_get_subscription_stats(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_subscription_stats *stats) {
+ AWS_ZERO_STRUCT(*stats);
+
+ aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_count_foreach_wrap, stats);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager current stats: %d event stream sub records, %d request-response sub "
+ "records, %d unsubscribing event stream subscriptions",
+ (int)stats->event_stream_subscriptions,
+ (int)stats->request_response_subscriptions,
+ (int)stats->unsubscribing_event_stream_subscriptions);
+}
+
+static void s_remove_listener_from_subscription_record(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_byte_cursor topic_filter,
+ uint64_t operation_id) {
+ struct aws_rr_subscription_record *record = s_get_subscription_record(manager, topic_filter);
+ if (record == NULL) {
+ return;
+ }
+
+ struct aws_rr_subscription_listener listener = {
+ .operation_id = operation_id,
+ };
+
+ aws_hash_table_remove(&record->listeners, &listener, NULL, NULL);
+
+ size_t listener_count = aws_hash_table_get_entry_count(&record->listeners);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - removed listener %" PRIu64 " from subscription ('" PRInSTR
+ "'), %zu listeners left",
+ operation_id,
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor),
+ listener_count);
+
+ if (listener_count == 0) {
+ struct aws_rr_subscription_status_event event = {
+ .type = ARRSET_SUBSCRIPTION_EMPTY,
+ .topic_filter = record->topic_filter_cursor,
+ .operation_id = 0,
+ };
+
+ (*manager->config.subscription_status_callback)(&event, manager->config.userdata);
+ }
+}
+
+static void s_add_listener_to_subscription_record(struct aws_rr_subscription_record *record, uint64_t operation_id) {
+ struct aws_rr_subscription_listener *listener =
+ aws_mem_calloc(record->allocator, 1, sizeof(struct aws_rr_subscription_listener));
+ listener->allocator = record->allocator;
+ listener->operation_id = operation_id;
+
+ aws_hash_table_put(&record->listeners, listener, listener, NULL);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - added listener %" PRIu64 " to subscription ('" PRInSTR
+ "'), %zu listeners total",
+ operation_id,
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor),
+ aws_hash_table_get_entry_count(&record->listeners));
+}
+
+static int s_rr_subscription_purge_unused_subscriptions_wrapper(void *context, struct aws_hash_element *elem) {
+ struct aws_rr_subscription_record *record = elem->value;
+ struct aws_rr_subscription_manager *manager = context;
+
+ if (aws_hash_table_get_entry_count(&record->listeners) == 0) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - checking subscription ('" PRInSTR "') for removal",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor));
+
+ if (manager->is_protocol_client_connected) {
+ s_subscription_record_unsubscribe(manager, record, false);
+ }
+
+ if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action == ARRSPAT_NOTHING) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - deleting subscription ('" PRInSTR "')",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor));
+
+ s_aws_rr_subscription_record_destroy(record);
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE;
+ }
+ }
+
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
+}
+
+void aws_rr_subscription_manager_purge_unused(struct aws_rr_subscription_manager *manager) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE, "request-response subscription manager - purging unused subscriptions");
+ aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_purge_unused_subscriptions_wrapper, manager);
+}
+
+static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscription_event_type type) {
+ switch (type) {
+ case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
+ return "RequestSubscribeSuccess";
+
+ case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
+ return "RequestSubscribeFailure";
+
+ case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
+ return "RequestSubscriptionEnded";
+
+ case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
+ return "StreamingSubscriptionEstablished";
+
+ case ARRSET_STREAMING_SUBSCRIPTION_LOST:
+ return "StreamingSubscriptionLost";
+
+ case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
+ return "StreamingSubscriptionHalted";
+
+ case ARRSET_UNSUBSCRIBE_COMPLETE:
+ return "UnsubscribeComplete";
+
+ case ARRSET_SUBSCRIPTION_EMPTY:
+ return "SubscriptionEmpty";
+ }
+
+ return "Unknown";
+}
+
+static bool s_subscription_type_matches_event_type(
+ enum aws_rr_subscription_type subscription_type,
+ enum aws_rr_subscription_event_type event_type) {
+ switch (event_type) {
+ case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
+ case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
+ case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
+ return subscription_type == ARRST_REQUEST_RESPONSE;
+
+ case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
+ case ARRSET_STREAMING_SUBSCRIPTION_LOST:
+ case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
+ return subscription_type == ARRST_EVENT_STREAM;
+
+ default:
+ return true;
+ }
+}
+
+static void s_emit_subscription_event(
+ const struct aws_rr_subscription_manager *manager,
+ const struct aws_rr_subscription_record *record,
+ enum aws_rr_subscription_event_type type) {
+
+ AWS_FATAL_ASSERT(s_subscription_type_matches_event_type(record->type, type));
+
+ for (struct aws_hash_iter iter = aws_hash_iter_begin(&record->listeners); !aws_hash_iter_done(&iter);
+ aws_hash_iter_next(&iter)) {
+
+ struct aws_rr_subscription_listener *listener = iter.element.value;
+ struct aws_rr_subscription_status_event event = {
+ .type = type,
+ .topic_filter = record->topic_filter_cursor,
+ .operation_id = listener->operation_id,
+ };
+
+ (*manager->config.subscription_status_callback)(&event, manager->config.userdata);
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - subscription event for ('" PRInSTR
+ "'), type: %s, operation: %" PRIu64 "",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor),
+ s_rr_subscription_event_type_to_c_str(type),
+ listener->operation_id);
+ }
+}
+
+static int s_rr_activate_idle_subscription(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_rr_subscription_record *record) {
+ int result = AWS_OP_SUCCESS;
+
+ if (record->poisoned) {
+ /*
+ * Don't try and establish poisoned subscriptions. This is not an error or a loggable event, it just means
+ * we hit a "try and make subscriptions" event when a poisoned subscription still hadn't been fully released.
+ */
+ return AWS_OP_SUCCESS;
+ }
+
+ if (manager->is_protocol_client_connected && aws_hash_table_get_entry_count(&record->listeners) > 0) {
+ if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action == ARRSPAT_NOTHING) {
+ struct aws_protocol_adapter_subscribe_options subscribe_options = {
+ .topic_filter = record->topic_filter_cursor,
+ .ack_timeout_seconds = manager->config.operation_timeout_seconds,
+ };
+
+ result = aws_mqtt_protocol_adapter_subscribe(manager->protocol_adapter, &subscribe_options);
+ if (result == AWS_OP_SUCCESS) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - initiating subscribe operation for ('" PRInSTR "')",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor));
+ record->pending_action = ARRSPAT_SUBSCRIBING;
+ } else {
+ int error_code = aws_last_error();
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - synchronous failure subscribing to ('" PRInSTR
+ "'), ec %d(%s)",
+ AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor),
+ error_code,
+ aws_error_debug_str(error_code));
+
+ if (record->type == ARRST_REQUEST_RESPONSE) {
+ s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE);
+ } else {
+ record->poisoned = true;
+ s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED);
+ }
+ }
+ }
+ }
+
+ return result;
+}
+
+enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_rr_acquire_subscription_options *options) {
+
+ if (options->topic_filter_count == 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire_subscription for operation %" PRIu64
+ " with no topic filters",
+ options->operation_id);
+ return AASRT_FAILURE;
+ }
+
+ /*
+ * Check for poisoned or mismatched records. This has precedence over the following unsubscribing check,
+ * and so we put them in separate loops
+ */
+ for (size_t i = 0; i < options->topic_filter_count; ++i) {
+ struct aws_byte_cursor topic_filter = options->topic_filters[i];
+ struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter);
+ if (existing_record == NULL) {
+ continue;
+ }
+
+ if (existing_record->poisoned) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64
+ " failed - existing subscription is poisoned and has not been released",
+ AWS_BYTE_CURSOR_PRI(topic_filter),
+ options->operation_id);
+ return AASRT_FAILURE;
+ }
+
+ if (existing_record->type != options->type) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64
+ " failed - conflicts with subscription type of existing subscription",
+ AWS_BYTE_CURSOR_PRI(topic_filter),
+ options->operation_id);
+ return AASRT_FAILURE;
+ }
+ }
+
+ /* blocked if an existing record is unsubscribing; also compute how many subscriptions are needed */
+ size_t subscriptions_needed = 0;
+ for (size_t i = 0; i < options->topic_filter_count; ++i) {
+ struct aws_byte_cursor topic_filter = options->topic_filters[i];
+ struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter);
+ if (existing_record != NULL) {
+ if (existing_record->pending_action == ARRSPAT_UNSUBSCRIBING) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for ('" PRInSTR
+ "'), operation %" PRIu64 " blocked - existing subscription is unsubscribing",
+ AWS_BYTE_CURSOR_PRI(topic_filter),
+ options->operation_id);
+ return AASRT_BLOCKED;
+ }
+ } else {
+ ++subscriptions_needed;
+ }
+ }
+
+ /* Check for space and fail or block as appropriate */
+ if (subscriptions_needed > 0) {
+ /* how much of the budget are we using? */
+ struct aws_subscription_stats stats;
+ s_get_subscription_stats(manager, &stats);
+
+ if (options->type == ARRST_REQUEST_RESPONSE) {
+ if (subscriptions_needed >
+ manager->config.max_request_response_subscriptions - stats.request_response_subscriptions) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for request operation %" PRIu64
+ " blocked - no room currently",
+ options->operation_id);
+ return AASRT_BLOCKED;
+ }
+ } else {
+ /*
+ * Streaming subscriptions have more complicated space-checking logic. Under certain conditions, we may
+ * block rather than failing
+ */
+ if (subscriptions_needed + stats.event_stream_subscriptions > manager->config.max_streaming_subscriptions) {
+ if (subscriptions_needed + stats.event_stream_subscriptions <=
+ manager->config.max_streaming_subscriptions + stats.unsubscribing_event_stream_subscriptions) {
+ /* If enough subscriptions are in the process of going away then wait in the blocked state */
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for streaming operation %" PRIu64
+ " blocked - no room currently",
+ options->operation_id);
+ return AASRT_BLOCKED;
+ } else {
+ /* Otherwise, there's no hope, fail */
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for operation %" PRIu64
+ " failed - no room",
+ options->operation_id);
+ return AASRT_NO_CAPACITY;
+ }
+ }
+ }
+ }
+
+ bool is_fully_subscribed = true;
+ for (size_t i = 0; i < options->topic_filter_count; ++i) {
+ struct aws_byte_cursor topic_filter = options->topic_filters[i];
+ struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter);
+
+ if (existing_record == NULL) {
+ existing_record = s_aws_rr_subscription_new(manager->allocator, topic_filter, options->type);
+ aws_hash_table_put(&manager->subscriptions, &existing_record->topic_filter_cursor, existing_record, NULL);
+ }
+
+ s_add_listener_to_subscription_record(existing_record, options->operation_id);
+ if (existing_record->status != ARRSST_SUBSCRIBED) {
+ is_fully_subscribed = false;
+ }
+ }
+
+ if (is_fully_subscribed) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for operation %" PRIu64
+ " fully subscribed - all required subscriptions are active",
+ options->operation_id);
+ return AASRT_SUBSCRIBED;
+ }
+
+ for (size_t i = 0; i < options->topic_filter_count; ++i) {
+ struct aws_byte_cursor topic_filter = options->topic_filters[i];
+ struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter);
+
+ if (s_rr_activate_idle_subscription(manager, existing_record)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for operation %" PRIu64
+ " failed - synchronous subscribe failure",
+ options->operation_id);
+ return AASRT_FAILURE;
+ }
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - acquire subscription for operation %" PRIu64
+ " subscribing - waiting on one or more subscribes to complete",
+ options->operation_id);
+
+ return AASRT_SUBSCRIBING;
+}
+
+void aws_rr_subscription_manager_release_subscription(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_rr_release_subscription_options *options) {
+ for (size_t i = 0; i < options->topic_filter_count; ++i) {
+ struct aws_byte_cursor topic_filter = options->topic_filters[i];
+ s_remove_listener_from_subscription_record(manager, topic_filter, options->operation_id);
+ }
+}
+
+static void s_handle_protocol_adapter_request_subscription_event(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_rr_subscription_record *record,
+ const struct aws_protocol_adapter_subscription_event *event) {
+ if (event->event_type == AWS_PASET_SUBSCRIBE) {
+ AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_SUBSCRIBING);
+ record->pending_action = ARRSPAT_NOTHING;
+
+ if (event->error_code == AWS_ERROR_SUCCESS) {
+ record->status = ARRSST_SUBSCRIBED;
+ s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_SUCCESS);
+ } else {
+ s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE);
+ }
+ } else {
+ AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE);
+ AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_UNSUBSCRIBING);
+ record->pending_action = ARRSPAT_NOTHING;
+
+ if (event->error_code == AWS_ERROR_SUCCESS) {
+ record->status = ARRSST_NOT_SUBSCRIBED;
+
+ struct aws_rr_subscription_status_event unsubscribe_event = {
+ .type = ARRSET_UNSUBSCRIBE_COMPLETE,
+ .topic_filter = record->topic_filter_cursor,
+ .operation_id = 0,
+ };
+
+ (*manager->config.subscription_status_callback)(&unsubscribe_event, manager->config.userdata);
+ }
+ }
+}
+
+static void s_handle_protocol_adapter_streaming_subscription_event(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_rr_subscription_record *record,
+ const struct aws_protocol_adapter_subscription_event *event) {
+ if (event->event_type == AWS_PASET_SUBSCRIBE) {
+ AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_SUBSCRIBING);
+ record->pending_action = ARRSPAT_NOTHING;
+
+ if (event->error_code == AWS_ERROR_SUCCESS) {
+ record->status = ARRSST_SUBSCRIBED;
+ s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED);
+ } else {
+ if (event->retryable) {
+ s_rr_activate_idle_subscription(manager, record);
+ } else {
+ record->poisoned = true;
+ s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED);
+ }
+ }
+ } else {
+ AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE);
+ AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_UNSUBSCRIBING);
+ record->pending_action = ARRSPAT_NOTHING;
+
+ if (event->error_code == AWS_ERROR_SUCCESS) {
+ record->status = ARRSST_NOT_SUBSCRIBED;
+
+ struct aws_rr_subscription_status_event unsubscribe_event = {
+ .type = ARRSET_UNSUBSCRIBE_COMPLETE,
+ .topic_filter = record->topic_filter_cursor,
+ .operation_id = 0,
+ };
+
+ (*manager->config.subscription_status_callback)(&unsubscribe_event, manager->config.userdata);
+ }
+ }
+}
+
+void aws_rr_subscription_manager_on_protocol_adapter_subscription_event(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_protocol_adapter_subscription_event *event) {
+ struct aws_rr_subscription_record *record = s_get_subscription_record(manager, event->topic_filter);
+ if (record == NULL) {
+ return;
+ }
+
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - received a protocol adapter subscription event for ('" PRInSTR
+ "'), type %s, error_code %d(%s)",
+ AWS_BYTE_CURSOR_PRI(event->topic_filter),
+ aws_protocol_adapter_subscription_event_type_to_c_str(event->event_type),
+ event->error_code,
+ aws_error_debug_str(event->error_code));
+
+ if (record->type == ARRST_REQUEST_RESPONSE) {
+ s_handle_protocol_adapter_request_subscription_event(manager, record, event);
+ } else {
+ s_handle_protocol_adapter_streaming_subscription_event(manager, record, event);
+ }
+}
+
+static int s_rr_activate_idle_subscriptions_wrapper(void *context, struct aws_hash_element *elem) {
+ struct aws_rr_subscription_record *record = elem->value;
+ struct aws_rr_subscription_manager *manager = context;
+
+ s_rr_activate_idle_subscription(manager, record);
+
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
+}
+
+static void s_activate_idle_subscriptions(struct aws_rr_subscription_manager *manager) {
+ aws_hash_table_foreach(&manager->subscriptions, s_rr_activate_idle_subscriptions_wrapper, manager);
+}
+
+static int s_apply_session_lost_wrapper(void *context, struct aws_hash_element *elem) {
+ struct aws_rr_subscription_record *record = elem->value;
+ struct aws_rr_subscription_manager *manager = context;
+
+ if (record->status == ARRSST_SUBSCRIBED) {
+ record->status = ARRSST_NOT_SUBSCRIBED;
+
+ if (record->type == ARRST_REQUEST_RESPONSE) {
+ s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_ENDED);
+
+ if (record->pending_action != ARRSPAT_UNSUBSCRIBING) {
+ s_aws_rr_subscription_record_destroy(record);
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE;
+ }
+ } else {
+ s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_LOST);
+ }
+ }
+
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
+}
+
+static int s_apply_streaming_resubscribe_wrapper(void *context, struct aws_hash_element *elem) {
+ struct aws_rr_subscription_record *record = elem->value;
+ struct aws_rr_subscription_manager *manager = context;
+
+ if (record->type == ARRST_EVENT_STREAM) {
+ s_rr_activate_idle_subscription(manager, record);
+ }
+
+ return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
+}
+
+static void s_apply_session_lost(struct aws_rr_subscription_manager *manager) {
+ aws_hash_table_foreach(&manager->subscriptions, s_apply_session_lost_wrapper, manager);
+ aws_hash_table_foreach(&manager->subscriptions, s_apply_streaming_resubscribe_wrapper, manager);
+}
+
+void aws_rr_subscription_manager_on_protocol_adapter_connection_event(
+ struct aws_rr_subscription_manager *manager,
+ const struct aws_protocol_adapter_connection_event *event) {
+
+ if (event->event_type == AWS_PACET_CONNECTED) {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - received a protocol adapter connection event, joined_session: "
+ "%d",
+ (int)(event->joined_session ? 1 : 0));
+
+ manager->is_protocol_client_connected = true;
+ if (!event->joined_session) {
+ s_apply_session_lost(manager);
+ }
+
+ aws_rr_subscription_manager_purge_unused(manager);
+ s_activate_idle_subscriptions(manager);
+ } else {
+ AWS_LOGF_DEBUG(
+ AWS_LS_MQTT_REQUEST_RESPONSE,
+ "request-response subscription manager - received a protocol adapter disconnection event");
+
+ manager->is_protocol_client_connected = false;
+ }
+}
+
+bool aws_rr_subscription_manager_are_options_valid(const struct aws_rr_subscription_manager_options *options) {
+ if (options == NULL || options->max_request_response_subscriptions < 2 || options->operation_timeout_seconds == 0) {
+ return false;
+ }
+
+ return true;
+}
+
+void aws_rr_subscription_manager_init(
+ struct aws_rr_subscription_manager *manager,
+ struct aws_allocator *allocator,
+ struct aws_mqtt_protocol_adapter *protocol_adapter,
+ const struct aws_rr_subscription_manager_options *options) {
+ AWS_ZERO_STRUCT(*manager);
+
+ AWS_FATAL_ASSERT(aws_rr_subscription_manager_are_options_valid(options));
+
+ manager->allocator = allocator;
+ manager->config = *options;
+ manager->protocol_adapter = protocol_adapter;
+
+ aws_hash_table_init(
+ &manager->subscriptions,
+ allocator,
+ options->max_request_response_subscriptions + options->max_streaming_subscriptions,
+ aws_hash_byte_cursor_ptr,
+ aws_mqtt_byte_cursor_hash_equality,
+ NULL,
+ s_aws_rr_subscription_record_destroy);
+
+ manager->is_protocol_client_connected = aws_mqtt_protocol_adapter_is_connected(protocol_adapter);
+}
+
+void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager) {
+ aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_clean_up_foreach_wrap, manager);
+ aws_hash_table_clean_up(&manager->subscriptions);
+}
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/shared_constants.c b/contrib/restricted/aws/aws-c-mqtt/source/shared.c
index 8d260799598..e84c4afe5f3 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/shared_constants.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/shared.c
@@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0.
*/
-#include <aws/mqtt/private/shared_constants.h>
+#include <aws/mqtt/private/shared.h>
#include <aws/http/request_response.h>
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c
index e6da6d3d397..33bad2e6ecd 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_client.c
@@ -13,7 +13,7 @@
#include <aws/io/channel_bootstrap.h>
#include <aws/io/event_loop.h>
#include <aws/mqtt/private/client_impl_shared.h>
-#include <aws/mqtt/private/shared_constants.h>
+#include <aws/mqtt/private/shared.h>
#include <aws/mqtt/private/v5/mqtt5_client_impl.h>
#include <aws/mqtt/private/v5/mqtt5_options_storage.h>
#include <aws/mqtt/private/v5/mqtt5_utils.h>
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c
index 04170394e30..ebb45b81b22 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_listener.c
@@ -84,6 +84,7 @@ struct aws_mqtt5_listener *aws_mqtt5_listener_new(
struct aws_allocator *allocator,
struct aws_mqtt5_listener_config *config) {
if (config->client == NULL) {
+ aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c
index 6510c6ab21f..3cddac179e8 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_options_storage.c
@@ -3623,7 +3623,7 @@ void aws_mqtt5_client_options_storage_log(
log_handle,
level,
AWS_LS_MQTT5_GENERAL,
- "id=%p: mqtt5_client_options_storage disabling websockets",
+ "id=%p: aws_mqtt5_client_options_storage disabling websockets",
(void *)options_storage);
}
@@ -3669,7 +3669,7 @@ void aws_mqtt5_client_options_storage_log(
log_handle,
level,
AWS_LS_MQTT5_GENERAL,
- "id=%p: mqtt5_client_options_storage reconnect delay min set to %" PRIu64 " ms, max set to %" PRIu64 " ms",
+ "id=%p: aws_mqtt5_client_options_storage reconnect delay min set to %" PRIu64 " ms, max set to %" PRIu64 " ms",
(void *)options_storage,
options_storage->min_reconnect_delay_ms,
options_storage->max_reconnect_delay_ms);
diff --git a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c
index bd90a6581fa..691d2c9a491 100644
--- a/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c
+++ b/contrib/restricted/aws/aws-c-mqtt/source/v5/mqtt5_to_mqtt3_adapter.c
@@ -2850,6 +2850,18 @@ error:
return 0;
}
+static enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_5_get_impl(const void *impl) {
+ (void)impl;
+
+ return AWS_MQTT311_IT_5_ADAPTER;
+}
+
+static struct aws_event_loop *s_aws_mqtt_client_connection_5_get_event_loop(const void *impl) {
+ const struct aws_mqtt_client_connection_5_impl *adapter = impl;
+
+ return adapter->client->loop;
+}
+
static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_vtable = {
.acquire_fn = s_aws_mqtt_client_connection_5_acquire,
.release_fn = s_aws_mqtt_client_connection_5_release,
@@ -2873,6 +2885,8 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_v
.unsubscribe_fn = s_aws_mqtt_client_connection_5_unsubscribe,
.publish_fn = s_aws_mqtt_client_connection_5_publish,
.get_stats_fn = s_aws_mqtt_client_connection_5_get_stats,
+ .get_impl_type = s_aws_mqtt_client_connection_5_get_impl,
+ .get_event_loop = s_aws_mqtt_client_connection_5_get_event_loop,
};
static struct aws_mqtt_client_connection_vtable *s_aws_mqtt_client_connection_5_vtable_ptr =
diff --git a/contrib/restricted/aws/aws-c-mqtt/ya.make b/contrib/restricted/aws/aws-c-mqtt/ya.make
index 8f460436aa3..d13474e9718 100644
--- a/contrib/restricted/aws/aws-c-mqtt/ya.make
+++ b/contrib/restricted/aws/aws-c-mqtt/ya.make
@@ -6,9 +6,9 @@ LICENSE(Apache-2.0)
LICENSE_TEXTS(.yandex_meta/licenses.list.txt)
-VERSION(0.10.4)
+VERSION(0.13.0)
-ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-mqtt/archive/v0.10.4.tar.gz)
+ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-mqtt/archive/v0.13.0.tar.gz)
PEERDIR(
contrib/restricted/aws/aws-c-common
@@ -49,9 +49,14 @@ SRCS(
source/fixed_header.c
source/mqtt.c
source/mqtt311_decoder.c
+ source/mqtt311_listener.c
source/mqtt_subscription_set.c
source/packets.c
- source/shared_constants.c
+ source/request-response/protocol_adapter.c
+ source/request-response/request_response_client.c
+ source/request-response/request_response_subscription_set.c
+ source/request-response/subscription_manager.c
+ source/shared.c
source/topic_tree.c
source/v5/mqtt5_callbacks.c
source/v5/mqtt5_client.c