diff options
author | dldmitry <dldmitry@yandex-team.ru> | 2022-02-10 16:47:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:17 +0300 |
commit | 10129030876638368b8965c627671fe44be079bc (patch) | |
tree | 681c849a324535feff6f07677873c9ffc4c51520 /contrib/libs/libevent/evrpc.c | |
parent | 5c1eb9f48fd0ac41fd72519a8284ded6f0358f6f (diff) | |
download | ydb-10129030876638368b8965c627671fe44be079bc.tar.gz |
Restoring authorship annotation for <dldmitry@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/libevent/evrpc.c')
-rw-r--r-- | contrib/libs/libevent/evrpc.c | 2278 |
1 files changed, 1139 insertions, 1139 deletions
diff --git a/contrib/libs/libevent/evrpc.c b/contrib/libs/libevent/evrpc.c index 4a60ca5f07..e47ced3b59 100644 --- a/contrib/libs/libevent/evrpc.c +++ b/contrib/libs/libevent/evrpc.c @@ -1,1171 +1,1171 @@ -/* - * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> - * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. The name of the author may not be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR - * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES - * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. - * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT - * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF - * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -#include "event2/event-config.h" +/* + * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> + * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#include "event2/event-config.h" #include "evconfig-private.h" - + #ifdef _WIN32 -#define WIN32_LEAN_AND_MEAN -#include <winsock2.h> -#include <windows.h> -#undef WIN32_LEAN_AND_MEAN -#endif - -#include <sys/types.h> +#define WIN32_LEAN_AND_MEAN +#include <winsock2.h> +#include <windows.h> +#undef WIN32_LEAN_AND_MEAN +#endif + +#include <sys/types.h> #ifndef _WIN32 -#include <sys/socket.h> -#endif +#include <sys/socket.h> +#endif #ifdef EVENT__HAVE_SYS_TIME_H -#include <sys/time.h> -#endif -#include <sys/queue.h> -#include <stdio.h> -#include <stdlib.h> +#include <sys/time.h> +#endif +#include <sys/queue.h> +#include <stdio.h> +#include <stdlib.h> #ifndef _WIN32 -#include <unistd.h> -#endif -#include <errno.h> -#include <signal.h> -#include <string.h> - -#include <sys/queue.h> - -#include "event2/event.h" -#include "event2/event_struct.h" -#include "event2/rpc.h" -#include "event2/rpc_struct.h" -#include "evrpc-internal.h" -#include "event2/http.h" -#include "event2/buffer.h" -#include "event2/tag.h" -#include "event2/http_struct.h" -#include "event2/http_compat.h" -#include "event2/util.h" -#include "util-internal.h" -#include "log-internal.h" -#include "mm-internal.h" - -struct evrpc_base * -evrpc_init(struct evhttp *http_server) -{ - struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); - if (base == NULL) - return (NULL); - - /* we rely on the tagging sub system */ - evtag_init(); - - TAILQ_INIT(&base->registered_rpcs); - TAILQ_INIT(&base->input_hooks); - TAILQ_INIT(&base->output_hooks); - - TAILQ_INIT(&base->paused_requests); - - base->http_server = http_server; - - return (base); -} - -void -evrpc_free(struct evrpc_base *base) -{ - struct evrpc *rpc; - struct evrpc_hook *hook; - struct evrpc_hook_ctx *pause; - int r; - - while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { - r = evrpc_unregister_rpc(base, rpc->uri); - EVUTIL_ASSERT(r == 0); - } - while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { - TAILQ_REMOVE(&base->paused_requests, pause, next); - mm_free(pause); - } - while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { - r = evrpc_remove_hook(base, EVRPC_INPUT, hook); - EVUTIL_ASSERT(r); - } - while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { - r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); - EVUTIL_ASSERT(r); - } - mm_free(base); -} - -void * -evrpc_add_hook(void *vbase, - enum EVRPC_HOOK_TYPE hook_type, - int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), - void *cb_arg) -{ +#include <unistd.h> +#endif +#include <errno.h> +#include <signal.h> +#include <string.h> + +#include <sys/queue.h> + +#include "event2/event.h" +#include "event2/event_struct.h" +#include "event2/rpc.h" +#include "event2/rpc_struct.h" +#include "evrpc-internal.h" +#include "event2/http.h" +#include "event2/buffer.h" +#include "event2/tag.h" +#include "event2/http_struct.h" +#include "event2/http_compat.h" +#include "event2/util.h" +#include "util-internal.h" +#include "log-internal.h" +#include "mm-internal.h" + +struct evrpc_base * +evrpc_init(struct evhttp *http_server) +{ + struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); + if (base == NULL) + return (NULL); + + /* we rely on the tagging sub system */ + evtag_init(); + + TAILQ_INIT(&base->registered_rpcs); + TAILQ_INIT(&base->input_hooks); + TAILQ_INIT(&base->output_hooks); + + TAILQ_INIT(&base->paused_requests); + + base->http_server = http_server; + + return (base); +} + +void +evrpc_free(struct evrpc_base *base) +{ + struct evrpc *rpc; + struct evrpc_hook *hook; + struct evrpc_hook_ctx *pause; + int r; + + while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { + r = evrpc_unregister_rpc(base, rpc->uri); + EVUTIL_ASSERT(r == 0); + } + while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { + TAILQ_REMOVE(&base->paused_requests, pause, next); + mm_free(pause); + } + while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { + r = evrpc_remove_hook(base, EVRPC_INPUT, hook); + EVUTIL_ASSERT(r); + } + while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { + r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); + EVUTIL_ASSERT(r); + } + mm_free(base); +} + +void * +evrpc_add_hook(void *vbase, + enum EVRPC_HOOK_TYPE hook_type, + int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), + void *cb_arg) +{ struct evrpc_hooks_ *base = vbase; - struct evrpc_hook_list *head = NULL; - struct evrpc_hook *hook = NULL; - switch (hook_type) { - case EVRPC_INPUT: - head = &base->in_hooks; - break; - case EVRPC_OUTPUT: - head = &base->out_hooks; - break; - default: - EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); - } - - hook = mm_calloc(1, sizeof(struct evrpc_hook)); - EVUTIL_ASSERT(hook != NULL); - - hook->process = cb; - hook->process_arg = cb_arg; - TAILQ_INSERT_TAIL(head, hook, next); - - return (hook); -} - -static int -evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) -{ - struct evrpc_hook *hook = NULL; - TAILQ_FOREACH(hook, head, next) { - if (hook == handle) { - TAILQ_REMOVE(head, hook, next); - mm_free(hook); - return (1); - } - } - - return (0); -} - -/* - * remove the hook specified by the handle - */ - -int -evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) -{ + struct evrpc_hook_list *head = NULL; + struct evrpc_hook *hook = NULL; + switch (hook_type) { + case EVRPC_INPUT: + head = &base->in_hooks; + break; + case EVRPC_OUTPUT: + head = &base->out_hooks; + break; + default: + EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); + } + + hook = mm_calloc(1, sizeof(struct evrpc_hook)); + EVUTIL_ASSERT(hook != NULL); + + hook->process = cb; + hook->process_arg = cb_arg; + TAILQ_INSERT_TAIL(head, hook, next); + + return (hook); +} + +static int +evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) +{ + struct evrpc_hook *hook = NULL; + TAILQ_FOREACH(hook, head, next) { + if (hook == handle) { + TAILQ_REMOVE(head, hook, next); + mm_free(hook); + return (1); + } + } + + return (0); +} + +/* + * remove the hook specified by the handle + */ + +int +evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) +{ struct evrpc_hooks_ *base = vbase; - struct evrpc_hook_list *head = NULL; - switch (hook_type) { - case EVRPC_INPUT: - head = &base->in_hooks; - break; - case EVRPC_OUTPUT: - head = &base->out_hooks; - break; - default: - EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); - } - - return (evrpc_remove_hook_internal(head, handle)); -} - -static int -evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, - struct evhttp_request *req, struct evbuffer *evbuf) -{ - struct evrpc_hook *hook; - TAILQ_FOREACH(hook, head, next) { - int res = hook->process(ctx, req, evbuf, hook->process_arg); - if (res != EVRPC_CONTINUE) - return (res); - } - - return (EVRPC_CONTINUE); -} - -static void evrpc_pool_schedule(struct evrpc_pool *pool); -static void evrpc_request_cb(struct evhttp_request *, void *); - -/* - * Registers a new RPC with the HTTP server. The evrpc object is expected - * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn - * calls this function. - */ - -static char * -evrpc_construct_uri(const char *uri) -{ - char *constructed_uri; - size_t constructed_uri_len; - - constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; - if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) - event_err(1, "%s: failed to register rpc at %s", - __func__, uri); - memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); - memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); - constructed_uri[constructed_uri_len - 1] = '\0'; - - return (constructed_uri); -} - -int -evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, - void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) -{ - char *constructed_uri = evrpc_construct_uri(rpc->uri); - - rpc->base = base; - rpc->cb = cb; - rpc->cb_arg = cb_arg; - - TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); - - evhttp_set_cb(base->http_server, - constructed_uri, - evrpc_request_cb, - rpc); - - mm_free(constructed_uri); - - return (0); -} - -int -evrpc_unregister_rpc(struct evrpc_base *base, const char *name) -{ - char *registered_uri = NULL; - struct evrpc *rpc; - int r; - - /* find the right rpc; linear search might be slow */ - TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { - if (strcmp(rpc->uri, name) == 0) - break; - } - if (rpc == NULL) { - /* We did not find an RPC with this name */ - return (-1); - } - TAILQ_REMOVE(&base->registered_rpcs, rpc, next); - - registered_uri = evrpc_construct_uri(name); - - /* remove the http server callback */ - r = evhttp_del_cb(base->http_server, registered_uri); - EVUTIL_ASSERT(r == 0); - - mm_free(registered_uri); - - mm_free((char *)rpc->uri); - mm_free(rpc); - return (0); -} - -static int evrpc_pause_request(void *vbase, void *ctx, - void (*cb)(void *, enum EVRPC_HOOK_RESULT)); -static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); - -static void -evrpc_request_cb(struct evhttp_request *req, void *arg) -{ - struct evrpc *rpc = arg; - struct evrpc_req_generic *rpc_state = NULL; - - /* let's verify the outside parameters */ - if (req->type != EVHTTP_REQ_POST || - evbuffer_get_length(req->input_buffer) <= 0) - goto error; - - rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); - if (rpc_state == NULL) - goto error; - rpc_state->rpc = rpc; - rpc_state->http_req = req; - rpc_state->rpc_data = NULL; - - if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { - int hook_res; - + struct evrpc_hook_list *head = NULL; + switch (hook_type) { + case EVRPC_INPUT: + head = &base->in_hooks; + break; + case EVRPC_OUTPUT: + head = &base->out_hooks; + break; + default: + EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); + } + + return (evrpc_remove_hook_internal(head, handle)); +} + +static int +evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, + struct evhttp_request *req, struct evbuffer *evbuf) +{ + struct evrpc_hook *hook; + TAILQ_FOREACH(hook, head, next) { + int res = hook->process(ctx, req, evbuf, hook->process_arg); + if (res != EVRPC_CONTINUE) + return (res); + } + + return (EVRPC_CONTINUE); +} + +static void evrpc_pool_schedule(struct evrpc_pool *pool); +static void evrpc_request_cb(struct evhttp_request *, void *); + +/* + * Registers a new RPC with the HTTP server. The evrpc object is expected + * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn + * calls this function. + */ + +static char * +evrpc_construct_uri(const char *uri) +{ + char *constructed_uri; + size_t constructed_uri_len; + + constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; + if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) + event_err(1, "%s: failed to register rpc at %s", + __func__, uri); + memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); + memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); + constructed_uri[constructed_uri_len - 1] = '\0'; + + return (constructed_uri); +} + +int +evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, + void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) +{ + char *constructed_uri = evrpc_construct_uri(rpc->uri); + + rpc->base = base; + rpc->cb = cb; + rpc->cb_arg = cb_arg; + + TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); + + evhttp_set_cb(base->http_server, + constructed_uri, + evrpc_request_cb, + rpc); + + mm_free(constructed_uri); + + return (0); +} + +int +evrpc_unregister_rpc(struct evrpc_base *base, const char *name) +{ + char *registered_uri = NULL; + struct evrpc *rpc; + int r; + + /* find the right rpc; linear search might be slow */ + TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { + if (strcmp(rpc->uri, name) == 0) + break; + } + if (rpc == NULL) { + /* We did not find an RPC with this name */ + return (-1); + } + TAILQ_REMOVE(&base->registered_rpcs, rpc, next); + + registered_uri = evrpc_construct_uri(name); + + /* remove the http server callback */ + r = evhttp_del_cb(base->http_server, registered_uri); + EVUTIL_ASSERT(r == 0); + + mm_free(registered_uri); + + mm_free((char *)rpc->uri); + mm_free(rpc); + return (0); +} + +static int evrpc_pause_request(void *vbase, void *ctx, + void (*cb)(void *, enum EVRPC_HOOK_RESULT)); +static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); + +static void +evrpc_request_cb(struct evhttp_request *req, void *arg) +{ + struct evrpc *rpc = arg; + struct evrpc_req_generic *rpc_state = NULL; + + /* let's verify the outside parameters */ + if (req->type != EVHTTP_REQ_POST || + evbuffer_get_length(req->input_buffer) <= 0) + goto error; + + rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); + if (rpc_state == NULL) + goto error; + rpc_state->rpc = rpc; + rpc_state->http_req = req; + rpc_state->rpc_data = NULL; + + if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { + int hook_res; + evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); - - /* - * allow hooks to modify the outgoing request - */ - hook_res = evrpc_process_hooks(&rpc->base->input_hooks, - rpc_state, req, req->input_buffer); - switch (hook_res) { - case EVRPC_TERMINATE: - goto error; - case EVRPC_PAUSE: - evrpc_pause_request(rpc->base, rpc_state, - evrpc_request_cb_closure); - return; - case EVRPC_CONTINUE: - break; - default: - EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || - hook_res == EVRPC_PAUSE); - } - } - - evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); - return; - -error: + + /* + * allow hooks to modify the outgoing request + */ + hook_res = evrpc_process_hooks(&rpc->base->input_hooks, + rpc_state, req, req->input_buffer); + switch (hook_res) { + case EVRPC_TERMINATE: + goto error; + case EVRPC_PAUSE: + evrpc_pause_request(rpc->base, rpc_state, + evrpc_request_cb_closure); + return; + case EVRPC_CONTINUE: + break; + default: + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } + } + + evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); + return; + +error: if (rpc_state) evrpc_reqstate_free_(rpc_state); - evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); - return; -} - -static void -evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) -{ - struct evrpc_req_generic *rpc_state = arg; - struct evrpc *rpc; - struct evhttp_request *req; - - EVUTIL_ASSERT(rpc_state); - rpc = rpc_state->rpc; - req = rpc_state->http_req; - - if (hook_res == EVRPC_TERMINATE) - goto error; - - /* let's check that we can parse the request */ - rpc_state->request = rpc->request_new(rpc->request_new_arg); - if (rpc_state->request == NULL) - goto error; - - if (rpc->request_unmarshal( - rpc_state->request, req->input_buffer) == -1) { - /* we failed to parse the request; that's a bummer */ - goto error; - } - - /* at this point, we have a well formed request, prepare the reply */ - - rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); - if (rpc_state->reply == NULL) - goto error; - - /* give the rpc to the user; they can deal with it */ - rpc->cb(rpc_state, rpc->cb_arg); - - return; - -error: + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); + return; +} + +static void +evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_req_generic *rpc_state = arg; + struct evrpc *rpc; + struct evhttp_request *req; + + EVUTIL_ASSERT(rpc_state); + rpc = rpc_state->rpc; + req = rpc_state->http_req; + + if (hook_res == EVRPC_TERMINATE) + goto error; + + /* let's check that we can parse the request */ + rpc_state->request = rpc->request_new(rpc->request_new_arg); + if (rpc_state->request == NULL) + goto error; + + if (rpc->request_unmarshal( + rpc_state->request, req->input_buffer) == -1) { + /* we failed to parse the request; that's a bummer */ + goto error; + } + + /* at this point, we have a well formed request, prepare the reply */ + + rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); + if (rpc_state->reply == NULL) + goto error; + + /* give the rpc to the user; they can deal with it */ + rpc->cb(rpc_state, rpc->cb_arg); + + return; + +error: evrpc_reqstate_free_(rpc_state); - evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); - return; -} - - -void + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); + return; +} + + +void evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) -{ - struct evrpc *rpc; - EVUTIL_ASSERT(rpc_state != NULL); - rpc = rpc_state->rpc; - - /* clean up all memory */ - if (rpc_state->hook_meta != NULL) +{ + struct evrpc *rpc; + EVUTIL_ASSERT(rpc_state != NULL); + rpc = rpc_state->rpc; + + /* clean up all memory */ + if (rpc_state->hook_meta != NULL) evrpc_hook_context_free_(rpc_state->hook_meta); - if (rpc_state->request != NULL) - rpc->request_free(rpc_state->request); - if (rpc_state->reply != NULL) - rpc->reply_free(rpc_state->reply); - if (rpc_state->rpc_data != NULL) - evbuffer_free(rpc_state->rpc_data); - mm_free(rpc_state); -} - -static void -evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); - -void -evrpc_request_done(struct evrpc_req_generic *rpc_state) -{ - struct evhttp_request *req; - struct evrpc *rpc; - - EVUTIL_ASSERT(rpc_state); - - req = rpc_state->http_req; - rpc = rpc_state->rpc; - - if (rpc->reply_complete(rpc_state->reply) == -1) { - /* the reply was not completely filled in. error out */ - goto error; - } - - if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { - /* out of memory */ - goto error; - } - - /* serialize the reply */ - rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); - - if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { - int hook_res; - + if (rpc_state->request != NULL) + rpc->request_free(rpc_state->request); + if (rpc_state->reply != NULL) + rpc->reply_free(rpc_state->reply); + if (rpc_state->rpc_data != NULL) + evbuffer_free(rpc_state->rpc_data); + mm_free(rpc_state); +} + +static void +evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); + +void +evrpc_request_done(struct evrpc_req_generic *rpc_state) +{ + struct evhttp_request *req; + struct evrpc *rpc; + + EVUTIL_ASSERT(rpc_state); + + req = rpc_state->http_req; + rpc = rpc_state->rpc; + + if (rpc->reply_complete(rpc_state->reply) == -1) { + /* the reply was not completely filled in. error out */ + goto error; + } + + if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { + /* out of memory */ + goto error; + } + + /* serialize the reply */ + rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); + + if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { + int hook_res; + evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); - - /* do hook based tweaks to the request */ - hook_res = evrpc_process_hooks(&rpc->base->output_hooks, - rpc_state, req, rpc_state->rpc_data); - switch (hook_res) { - case EVRPC_TERMINATE: - goto error; - case EVRPC_PAUSE: - if (evrpc_pause_request(rpc->base, rpc_state, - evrpc_request_done_closure) == -1) - goto error; - return; - case EVRPC_CONTINUE: - break; - default: - EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || - hook_res == EVRPC_PAUSE); - } - } - - evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); - return; - -error: + + /* do hook based tweaks to the request */ + hook_res = evrpc_process_hooks(&rpc->base->output_hooks, + rpc_state, req, rpc_state->rpc_data); + switch (hook_res) { + case EVRPC_TERMINATE: + goto error; + case EVRPC_PAUSE: + if (evrpc_pause_request(rpc->base, rpc_state, + evrpc_request_done_closure) == -1) + goto error; + return; + case EVRPC_CONTINUE: + break; + default: + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } + } + + evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); + return; + +error: evrpc_reqstate_free_(rpc_state); - evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); - return; -} - -void * -evrpc_get_request(struct evrpc_req_generic *req) -{ - return req->request; -} - -void * -evrpc_get_reply(struct evrpc_req_generic *req) -{ - return req->reply; -} - -static void -evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) -{ - struct evrpc_req_generic *rpc_state = arg; - struct evhttp_request *req; - EVUTIL_ASSERT(rpc_state); - req = rpc_state->http_req; - - if (hook_res == EVRPC_TERMINATE) - goto error; - - /* on success, we are going to transmit marshaled binary data */ - if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { - evhttp_add_header(req->output_headers, - "Content-Type", "application/octet-stream"); - } - evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); - + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); + return; +} + +void * +evrpc_get_request(struct evrpc_req_generic *req) +{ + return req->request; +} + +void * +evrpc_get_reply(struct evrpc_req_generic *req) +{ + return req->reply; +} + +static void +evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_req_generic *rpc_state = arg; + struct evhttp_request *req; + EVUTIL_ASSERT(rpc_state); + req = rpc_state->http_req; + + if (hook_res == EVRPC_TERMINATE) + goto error; + + /* on success, we are going to transmit marshaled binary data */ + if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { + evhttp_add_header(req->output_headers, + "Content-Type", "application/octet-stream"); + } + evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); + evrpc_reqstate_free_(rpc_state); - - return; - -error: + + return; + +error: evrpc_reqstate_free_(rpc_state); - evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); - return; -} - - -/* Client implementation of RPC site */ - -static int evrpc_schedule_request(struct evhttp_connection *connection, - struct evrpc_request_wrapper *ctx); - -struct evrpc_pool * -evrpc_pool_new(struct event_base *base) -{ - struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); - if (pool == NULL) - return (NULL); - - TAILQ_INIT(&pool->connections); - TAILQ_INIT(&pool->requests); - - TAILQ_INIT(&pool->paused_requests); - - TAILQ_INIT(&pool->input_hooks); - TAILQ_INIT(&pool->output_hooks); - - pool->base = base; - pool->timeout = -1; - - return (pool); -} - -static void -evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) -{ - if (request->hook_meta != NULL) + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); + return; +} + + +/* Client implementation of RPC site */ + +static int evrpc_schedule_request(struct evhttp_connection *connection, + struct evrpc_request_wrapper *ctx); + +struct evrpc_pool * +evrpc_pool_new(struct event_base *base) +{ + struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); + if (pool == NULL) + return (NULL); + + TAILQ_INIT(&pool->connections); + TAILQ_INIT(&pool->requests); + + TAILQ_INIT(&pool->paused_requests); + + TAILQ_INIT(&pool->input_hooks); + TAILQ_INIT(&pool->output_hooks); + + pool->base = base; + pool->timeout = -1; + + return (pool); +} + +static void +evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) +{ + if (request->hook_meta != NULL) evrpc_hook_context_free_(request->hook_meta); - mm_free(request->name); - mm_free(request); -} - -void -evrpc_pool_free(struct evrpc_pool *pool) -{ - struct evhttp_connection *connection; - struct evrpc_request_wrapper *request; - struct evrpc_hook_ctx *pause; - struct evrpc_hook *hook; - int r; - - while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { - TAILQ_REMOVE(&pool->requests, request, next); - evrpc_request_wrapper_free(request); - } - - while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { - TAILQ_REMOVE(&pool->paused_requests, pause, next); - mm_free(pause); - } - - while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { - TAILQ_REMOVE(&pool->connections, connection, next); - evhttp_connection_free(connection); - } - - while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { - r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); - EVUTIL_ASSERT(r); - } - - while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { - r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); - EVUTIL_ASSERT(r); - } - - mm_free(pool); -} - -/* - * Add a connection to the RPC pool. A request scheduled on the pool - * may use any available connection. - */ - -void -evrpc_pool_add_connection(struct evrpc_pool *pool, - struct evhttp_connection *connection) -{ - EVUTIL_ASSERT(connection->http_server == NULL); - TAILQ_INSERT_TAIL(&pool->connections, connection, next); - - /* - * associate an event base with this connection - */ - if (pool->base != NULL) - evhttp_connection_set_base(connection, pool->base); - - /* - * unless a timeout was specifically set for a connection, - * the connection inherits the timeout from the pool. - */ + mm_free(request->name); + mm_free(request); +} + +void +evrpc_pool_free(struct evrpc_pool *pool) +{ + struct evhttp_connection *connection; + struct evrpc_request_wrapper *request; + struct evrpc_hook_ctx *pause; + struct evrpc_hook *hook; + int r; + + while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { + TAILQ_REMOVE(&pool->requests, request, next); + evrpc_request_wrapper_free(request); + } + + while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { + TAILQ_REMOVE(&pool->paused_requests, pause, next); + mm_free(pause); + } + + while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { + TAILQ_REMOVE(&pool->connections, connection, next); + evhttp_connection_free(connection); + } + + while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { + r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); + EVUTIL_ASSERT(r); + } + + while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { + r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); + EVUTIL_ASSERT(r); + } + + mm_free(pool); +} + +/* + * Add a connection to the RPC pool. A request scheduled on the pool + * may use any available connection. + */ + +void +evrpc_pool_add_connection(struct evrpc_pool *pool, + struct evhttp_connection *connection) +{ + EVUTIL_ASSERT(connection->http_server == NULL); + TAILQ_INSERT_TAIL(&pool->connections, connection, next); + + /* + * associate an event base with this connection + */ + if (pool->base != NULL) + evhttp_connection_set_base(connection, pool->base); + + /* + * unless a timeout was specifically set for a connection, + * the connection inherits the timeout from the pool. + */ if (!evutil_timerisset(&connection->timeout)) evhttp_connection_set_timeout(connection, pool->timeout); - - /* - * if we have any requests pending, schedule them with the new - * connections. - */ - - if (TAILQ_FIRST(&pool->requests) != NULL) { - struct evrpc_request_wrapper *request = - TAILQ_FIRST(&pool->requests); - TAILQ_REMOVE(&pool->requests, request, next); - evrpc_schedule_request(connection, request); - } -} - -void -evrpc_pool_remove_connection(struct evrpc_pool *pool, - struct evhttp_connection *connection) -{ - TAILQ_REMOVE(&pool->connections, connection, next); -} - -void -evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) -{ - struct evhttp_connection *evcon; - TAILQ_FOREACH(evcon, &pool->connections, next) { + + /* + * if we have any requests pending, schedule them with the new + * connections. + */ + + if (TAILQ_FIRST(&pool->requests) != NULL) { + struct evrpc_request_wrapper *request = + TAILQ_FIRST(&pool->requests); + TAILQ_REMOVE(&pool->requests, request, next); + evrpc_schedule_request(connection, request); + } +} + +void +evrpc_pool_remove_connection(struct evrpc_pool *pool, + struct evhttp_connection *connection) +{ + TAILQ_REMOVE(&pool->connections, connection, next); +} + +void +evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) +{ + struct evhttp_connection *evcon; + TAILQ_FOREACH(evcon, &pool->connections, next) { evhttp_connection_set_timeout(evcon, timeout_in_secs); - } - pool->timeout = timeout_in_secs; -} - - -static void evrpc_reply_done(struct evhttp_request *, void *); -static void evrpc_request_timeout(evutil_socket_t, short, void *); - -/* - * Finds a connection object associated with the pool that is currently - * idle and can be used to make a request. - */ -static struct evhttp_connection * -evrpc_pool_find_connection(struct evrpc_pool *pool) -{ - struct evhttp_connection *connection; - TAILQ_FOREACH(connection, &pool->connections, next) { - if (TAILQ_FIRST(&connection->requests) == NULL) - return (connection); - } - - return (NULL); -} - -/* - * Prototypes responsible for evrpc scheduling and hooking - */ - -static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); - -/* - * We assume that the ctx is no longer queued on the pool. - */ -static int -evrpc_schedule_request(struct evhttp_connection *connection, - struct evrpc_request_wrapper *ctx) -{ - struct evhttp_request *req = NULL; - struct evrpc_pool *pool = ctx->pool; - struct evrpc_status status; - - if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) - goto error; - - /* serialize the request data into the output buffer */ - ctx->request_marshal(req->output_buffer, ctx->request); - - /* we need to know the connection that we might have to abort */ - ctx->evcon = connection; - - /* if we get paused we also need to know the request */ - ctx->req = req; - - if (TAILQ_FIRST(&pool->output_hooks) != NULL) { - int hook_res; - + } + pool->timeout = timeout_in_secs; +} + + +static void evrpc_reply_done(struct evhttp_request *, void *); +static void evrpc_request_timeout(evutil_socket_t, short, void *); + +/* + * Finds a connection object associated with the pool that is currently + * idle and can be used to make a request. + */ +static struct evhttp_connection * +evrpc_pool_find_connection(struct evrpc_pool *pool) +{ + struct evhttp_connection *connection; + TAILQ_FOREACH(connection, &pool->connections, next) { + if (TAILQ_FIRST(&connection->requests) == NULL) + return (connection); + } + + return (NULL); +} + +/* + * Prototypes responsible for evrpc scheduling and hooking + */ + +static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); + +/* + * We assume that the ctx is no longer queued on the pool. + */ +static int +evrpc_schedule_request(struct evhttp_connection *connection, + struct evrpc_request_wrapper *ctx) +{ + struct evhttp_request *req = NULL; + struct evrpc_pool *pool = ctx->pool; + struct evrpc_status status; + + if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) + goto error; + + /* serialize the request data into the output buffer */ + ctx->request_marshal(req->output_buffer, ctx->request); + + /* we need to know the connection that we might have to abort */ + ctx->evcon = connection; + + /* if we get paused we also need to know the request */ + ctx->req = req; + + if (TAILQ_FIRST(&pool->output_hooks) != NULL) { + int hook_res; + evrpc_hook_associate_meta_(&ctx->hook_meta, connection); - - /* apply hooks to the outgoing request */ - hook_res = evrpc_process_hooks(&pool->output_hooks, - ctx, req, req->output_buffer); - - switch (hook_res) { - case EVRPC_TERMINATE: - goto error; - case EVRPC_PAUSE: - /* we need to be explicitly resumed */ - if (evrpc_pause_request(pool, ctx, - evrpc_schedule_request_closure) == -1) - goto error; - return (0); - case EVRPC_CONTINUE: - /* we can just continue */ - break; - default: - EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || - hook_res == EVRPC_PAUSE); - } - } - - evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); - return (0); - -error: - memset(&status, 0, sizeof(status)); - status.error = EVRPC_STATUS_ERR_UNSTARTED; - (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); - evrpc_request_wrapper_free(ctx); - return (-1); -} - -static void -evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) -{ - struct evrpc_request_wrapper *ctx = arg; - struct evhttp_connection *connection = ctx->evcon; - struct evhttp_request *req = ctx->req; - struct evrpc_pool *pool = ctx->pool; - struct evrpc_status status; - char *uri = NULL; - int res = 0; - - if (hook_res == EVRPC_TERMINATE) - goto error; - - uri = evrpc_construct_uri(ctx->name); - if (uri == NULL) - goto error; - - if (pool->timeout > 0) { - /* - * a timeout after which the whole rpc is going to be aborted. - */ - struct timeval tv; - evutil_timerclear(&tv); - tv.tv_sec = pool->timeout; - evtimer_add(&ctx->ev_timeout, &tv); - } - - /* start the request over the connection */ - res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); - mm_free(uri); - - if (res == -1) - goto error; - - return; - -error: - memset(&status, 0, sizeof(status)); - status.error = EVRPC_STATUS_ERR_UNSTARTED; - (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); - evrpc_request_wrapper_free(ctx); -} - -/* we just queue the paused request on the pool under the req object */ -static int -evrpc_pause_request(void *vbase, void *ctx, - void (*cb)(void *, enum EVRPC_HOOK_RESULT)) -{ + + /* apply hooks to the outgoing request */ + hook_res = evrpc_process_hooks(&pool->output_hooks, + ctx, req, req->output_buffer); + + switch (hook_res) { + case EVRPC_TERMINATE: + goto error; + case EVRPC_PAUSE: + /* we need to be explicitly resumed */ + if (evrpc_pause_request(pool, ctx, + evrpc_schedule_request_closure) == -1) + goto error; + return (0); + case EVRPC_CONTINUE: + /* we can just continue */ + break; + default: + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } + } + + evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); + return (0); + +error: + memset(&status, 0, sizeof(status)); + status.error = EVRPC_STATUS_ERR_UNSTARTED; + (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); + evrpc_request_wrapper_free(ctx); + return (-1); +} + +static void +evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_request_wrapper *ctx = arg; + struct evhttp_connection *connection = ctx->evcon; + struct evhttp_request *req = ctx->req; + struct evrpc_pool *pool = ctx->pool; + struct evrpc_status status; + char *uri = NULL; + int res = 0; + + if (hook_res == EVRPC_TERMINATE) + goto error; + + uri = evrpc_construct_uri(ctx->name); + if (uri == NULL) + goto error; + + if (pool->timeout > 0) { + /* + * a timeout after which the whole rpc is going to be aborted. + */ + struct timeval tv; + evutil_timerclear(&tv); + tv.tv_sec = pool->timeout; + evtimer_add(&ctx->ev_timeout, &tv); + } + + /* start the request over the connection */ + res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); + mm_free(uri); + + if (res == -1) + goto error; + + return; + +error: + memset(&status, 0, sizeof(status)); + status.error = EVRPC_STATUS_ERR_UNSTARTED; + (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); + evrpc_request_wrapper_free(ctx); +} + +/* we just queue the paused request on the pool under the req object */ +static int +evrpc_pause_request(void *vbase, void *ctx, + void (*cb)(void *, enum EVRPC_HOOK_RESULT)) +{ struct evrpc_hooks_ *base = vbase; - struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); - if (pause == NULL) - return (-1); - - pause->ctx = ctx; - pause->cb = cb; - - TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); - return (0); -} - -int -evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) -{ + struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); + if (pause == NULL) + return (-1); + + pause->ctx = ctx; + pause->cb = cb; + + TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); + return (0); +} + +int +evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) +{ struct evrpc_hooks_ *base = vbase; - struct evrpc_pause_list *head = &base->pause_requests; - struct evrpc_hook_ctx *pause; - - TAILQ_FOREACH(pause, head, next) { - if (pause->ctx == ctx) - break; - } - - if (pause == NULL) - return (-1); - - (*pause->cb)(pause->ctx, res); - TAILQ_REMOVE(head, pause, next); - mm_free(pause); - return (0); -} - -int -evrpc_make_request(struct evrpc_request_wrapper *ctx) -{ - struct evrpc_pool *pool = ctx->pool; - - /* initialize the event structure for this rpc */ - evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); - - /* we better have some available connections on the pool */ - EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); - - /* - * if no connection is available, we queue the request on the pool, - * the next time a connection is empty, the rpc will be send on that. - */ - TAILQ_INSERT_TAIL(&pool->requests, ctx, next); - - evrpc_pool_schedule(pool); - - return (0); -} - - -struct evrpc_request_wrapper * -evrpc_make_request_ctx( - struct evrpc_pool *pool, void *request, void *reply, - const char *rpcname, - void (*req_marshal)(struct evbuffer*, void *), - void (*rpl_clear)(void *), - int (*rpl_unmarshal)(void *, struct evbuffer *), - void (*cb)(struct evrpc_status *, void *, void *, void *), - void *cbarg) -{ - struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) - mm_malloc(sizeof(struct evrpc_request_wrapper)); - if (ctx == NULL) - return (NULL); - - ctx->pool = pool; - ctx->hook_meta = NULL; - ctx->evcon = NULL; - ctx->name = mm_strdup(rpcname); - if (ctx->name == NULL) { - mm_free(ctx); - return (NULL); - } - ctx->cb = cb; - ctx->cb_arg = cbarg; - ctx->request = request; - ctx->reply = reply; - ctx->request_marshal = req_marshal; - ctx->reply_clear = rpl_clear; - ctx->reply_unmarshal = rpl_unmarshal; - - return (ctx); -} - -static void -evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); - -static void -evrpc_reply_done(struct evhttp_request *req, void *arg) -{ - struct evrpc_request_wrapper *ctx = arg; - struct evrpc_pool *pool = ctx->pool; - int hook_res = EVRPC_CONTINUE; - - /* cancel any timeout we might have scheduled */ - event_del(&ctx->ev_timeout); - - ctx->req = req; - - /* we need to get the reply now */ - if (req == NULL) { - evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); - return; - } - - if (TAILQ_FIRST(&pool->input_hooks) != NULL) { + struct evrpc_pause_list *head = &base->pause_requests; + struct evrpc_hook_ctx *pause; + + TAILQ_FOREACH(pause, head, next) { + if (pause->ctx == ctx) + break; + } + + if (pause == NULL) + return (-1); + + (*pause->cb)(pause->ctx, res); + TAILQ_REMOVE(head, pause, next); + mm_free(pause); + return (0); +} + +int +evrpc_make_request(struct evrpc_request_wrapper *ctx) +{ + struct evrpc_pool *pool = ctx->pool; + + /* initialize the event structure for this rpc */ + evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); + + /* we better have some available connections on the pool */ + EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); + + /* + * if no connection is available, we queue the request on the pool, + * the next time a connection is empty, the rpc will be send on that. + */ + TAILQ_INSERT_TAIL(&pool->requests, ctx, next); + + evrpc_pool_schedule(pool); + + return (0); +} + + +struct evrpc_request_wrapper * +evrpc_make_request_ctx( + struct evrpc_pool *pool, void *request, void *reply, + const char *rpcname, + void (*req_marshal)(struct evbuffer*, void *), + void (*rpl_clear)(void *), + int (*rpl_unmarshal)(void *, struct evbuffer *), + void (*cb)(struct evrpc_status *, void *, void *, void *), + void *cbarg) +{ + struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) + mm_malloc(sizeof(struct evrpc_request_wrapper)); + if (ctx == NULL) + return (NULL); + + ctx->pool = pool; + ctx->hook_meta = NULL; + ctx->evcon = NULL; + ctx->name = mm_strdup(rpcname); + if (ctx->name == NULL) { + mm_free(ctx); + return (NULL); + } + ctx->cb = cb; + ctx->cb_arg = cbarg; + ctx->request = request; + ctx->reply = reply; + ctx->request_marshal = req_marshal; + ctx->reply_clear = rpl_clear; + ctx->reply_unmarshal = rpl_unmarshal; + + return (ctx); +} + +static void +evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); + +static void +evrpc_reply_done(struct evhttp_request *req, void *arg) +{ + struct evrpc_request_wrapper *ctx = arg; + struct evrpc_pool *pool = ctx->pool; + int hook_res = EVRPC_CONTINUE; + + /* cancel any timeout we might have scheduled */ + event_del(&ctx->ev_timeout); + + ctx->req = req; + + /* we need to get the reply now */ + if (req == NULL) { + evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); + return; + } + + if (TAILQ_FIRST(&pool->input_hooks) != NULL) { evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); - - /* apply hooks to the incoming request */ - hook_res = evrpc_process_hooks(&pool->input_hooks, - ctx, req, req->input_buffer); - - switch (hook_res) { - case EVRPC_TERMINATE: - case EVRPC_CONTINUE: - break; - case EVRPC_PAUSE: - /* - * if we get paused we also need to know the - * request. unfortunately, the underlying - * layer is going to free it. we need to - * request ownership explicitly - */ + + /* apply hooks to the incoming request */ + hook_res = evrpc_process_hooks(&pool->input_hooks, + ctx, req, req->input_buffer); + + switch (hook_res) { + case EVRPC_TERMINATE: + case EVRPC_CONTINUE: + break; + case EVRPC_PAUSE: + /* + * if we get paused we also need to know the + * request. unfortunately, the underlying + * layer is going to free it. we need to + * request ownership explicitly + */ evhttp_request_own(req); - - evrpc_pause_request(pool, ctx, - evrpc_reply_done_closure); - return; - default: - EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || - hook_res == EVRPC_PAUSE); - } - } - - evrpc_reply_done_closure(ctx, hook_res); - - /* http request is being freed by underlying layer */ -} - -static void -evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) -{ - struct evrpc_request_wrapper *ctx = arg; - struct evhttp_request *req = ctx->req; - struct evrpc_pool *pool = ctx->pool; - struct evrpc_status status; - int res = -1; - - memset(&status, 0, sizeof(status)); - status.http_req = req; - - /* we need to get the reply now */ - if (req == NULL) { - status.error = EVRPC_STATUS_ERR_TIMEOUT; - } else if (hook_res == EVRPC_TERMINATE) { - status.error = EVRPC_STATUS_ERR_HOOKABORTED; - } else { - res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); - if (res == -1) - status.error = EVRPC_STATUS_ERR_BADPAYLOAD; - } - - if (res == -1) { - /* clear everything that we might have written previously */ - ctx->reply_clear(ctx->reply); - } - - (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); - - evrpc_request_wrapper_free(ctx); - - /* the http layer owned the original request structure, but if we - * got paused, we asked for ownership and need to free it here. */ - if (req != NULL && evhttp_request_is_owned(req)) - evhttp_request_free(req); - - /* see if we can schedule another request */ - evrpc_pool_schedule(pool); -} - -static void -evrpc_pool_schedule(struct evrpc_pool *pool) -{ - struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); - struct evhttp_connection *evcon; - - /* if no requests are pending, we have no work */ - if (ctx == NULL) - return; - - if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { - TAILQ_REMOVE(&pool->requests, ctx, next); - evrpc_schedule_request(evcon, ctx); - } -} - -static void -evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) -{ - struct evrpc_request_wrapper *ctx = arg; - struct evhttp_connection *evcon = ctx->evcon; - EVUTIL_ASSERT(evcon != NULL); - + + evrpc_pause_request(pool, ctx, + evrpc_reply_done_closure); + return; + default: + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } + } + + evrpc_reply_done_closure(ctx, hook_res); + + /* http request is being freed by underlying layer */ +} + +static void +evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_request_wrapper *ctx = arg; + struct evhttp_request *req = ctx->req; + struct evrpc_pool *pool = ctx->pool; + struct evrpc_status status; + int res = -1; + + memset(&status, 0, sizeof(status)); + status.http_req = req; + + /* we need to get the reply now */ + if (req == NULL) { + status.error = EVRPC_STATUS_ERR_TIMEOUT; + } else if (hook_res == EVRPC_TERMINATE) { + status.error = EVRPC_STATUS_ERR_HOOKABORTED; + } else { + res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); + if (res == -1) + status.error = EVRPC_STATUS_ERR_BADPAYLOAD; + } + + if (res == -1) { + /* clear everything that we might have written previously */ + ctx->reply_clear(ctx->reply); + } + + (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); + + evrpc_request_wrapper_free(ctx); + + /* the http layer owned the original request structure, but if we + * got paused, we asked for ownership and need to free it here. */ + if (req != NULL && evhttp_request_is_owned(req)) + evhttp_request_free(req); + + /* see if we can schedule another request */ + evrpc_pool_schedule(pool); +} + +static void +evrpc_pool_schedule(struct evrpc_pool *pool) +{ + struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); + struct evhttp_connection *evcon; + + /* if no requests are pending, we have no work */ + if (ctx == NULL) + return; + + if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { + TAILQ_REMOVE(&pool->requests, ctx, next); + evrpc_schedule_request(evcon, ctx); + } +} + +static void +evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) +{ + struct evrpc_request_wrapper *ctx = arg; + struct evhttp_connection *evcon = ctx->evcon; + EVUTIL_ASSERT(evcon != NULL); + evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); -} - -/* - * frees potential meta data associated with a request. - */ - -static void -evrpc_meta_data_free(struct evrpc_meta_list *meta_data) -{ - struct evrpc_meta *entry; - EVUTIL_ASSERT(meta_data != NULL); - - while ((entry = TAILQ_FIRST(meta_data)) != NULL) { - TAILQ_REMOVE(meta_data, entry, next); - mm_free(entry->key); - mm_free(entry->data); - mm_free(entry); - } -} - -static struct evrpc_hook_meta * +} + +/* + * frees potential meta data associated with a request. + */ + +static void +evrpc_meta_data_free(struct evrpc_meta_list *meta_data) +{ + struct evrpc_meta *entry; + EVUTIL_ASSERT(meta_data != NULL); + + while ((entry = TAILQ_FIRST(meta_data)) != NULL) { + TAILQ_REMOVE(meta_data, entry, next); + mm_free(entry->key); + mm_free(entry->data); + mm_free(entry); + } +} + +static struct evrpc_hook_meta * evrpc_hook_meta_new_(void) -{ - struct evrpc_hook_meta *ctx; - ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); - EVUTIL_ASSERT(ctx != NULL); - - TAILQ_INIT(&ctx->meta_data); - ctx->evcon = NULL; - - return (ctx); -} - -static void +{ + struct evrpc_hook_meta *ctx; + ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); + EVUTIL_ASSERT(ctx != NULL); + + TAILQ_INIT(&ctx->meta_data); + ctx->evcon = NULL; + + return (ctx); +} + +static void evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, - struct evhttp_connection *evcon) -{ - struct evrpc_hook_meta *ctx = *pctx; - if (ctx == NULL) + struct evhttp_connection *evcon) +{ + struct evrpc_hook_meta *ctx = *pctx; + if (ctx == NULL) *pctx = ctx = evrpc_hook_meta_new_(); - ctx->evcon = evcon; -} - -static void + ctx->evcon = evcon; +} + +static void evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) -{ - evrpc_meta_data_free(&ctx->meta_data); - mm_free(ctx); -} - -/* Adds meta data */ -void -evrpc_hook_add_meta(void *ctx, const char *key, - const void *data, size_t data_size) -{ - struct evrpc_request_wrapper *req = ctx; - struct evrpc_hook_meta *store = NULL; - struct evrpc_meta *meta = NULL; - - if ((store = req->hook_meta) == NULL) +{ + evrpc_meta_data_free(&ctx->meta_data); + mm_free(ctx); +} + +/* Adds meta data */ +void +evrpc_hook_add_meta(void *ctx, const char *key, + const void *data, size_t data_size) +{ + struct evrpc_request_wrapper *req = ctx; + struct evrpc_hook_meta *store = NULL; + struct evrpc_meta *meta = NULL; + + if ((store = req->hook_meta) == NULL) store = req->hook_meta = evrpc_hook_meta_new_(); - - meta = mm_malloc(sizeof(struct evrpc_meta)); - EVUTIL_ASSERT(meta != NULL); - meta->key = mm_strdup(key); - EVUTIL_ASSERT(meta->key != NULL); - meta->data_size = data_size; - meta->data = mm_malloc(data_size); - EVUTIL_ASSERT(meta->data != NULL); - memcpy(meta->data, data, data_size); - - TAILQ_INSERT_TAIL(&store->meta_data, meta, next); -} - -int -evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) -{ - struct evrpc_request_wrapper *req = ctx; - struct evrpc_meta *meta = NULL; - - if (req->hook_meta == NULL) - return (-1); - - TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { - if (strcmp(meta->key, key) == 0) { - *data = meta->data; - *data_size = meta->data_size; - return (0); - } - } - - return (-1); -} - -struct evhttp_connection * -evrpc_hook_get_connection(void *ctx) -{ - struct evrpc_request_wrapper *req = ctx; - return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); -} - -int -evrpc_send_request_generic(struct evrpc_pool *pool, - void *request, void *reply, - void (*cb)(struct evrpc_status *, void *, void *, void *), - void *cb_arg, - const char *rpcname, - void (*req_marshal)(struct evbuffer *, void *), - void (*rpl_clear)(void *), - int (*rpl_unmarshal)(void *, struct evbuffer *)) -{ - struct evrpc_status status; - struct evrpc_request_wrapper *ctx; - ctx = evrpc_make_request_ctx(pool, request, reply, - rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); - if (ctx == NULL) - goto error; - return (evrpc_make_request(ctx)); -error: - memset(&status, 0, sizeof(status)); - status.error = EVRPC_STATUS_ERR_UNSTARTED; - (*(cb))(&status, request, reply, cb_arg); - return (-1); -} - -/** Takes a request object and fills it in with the right magic */ -static struct evrpc * -evrpc_register_object(const char *name, - void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), - int (*req_unmarshal)(void *, struct evbuffer *), - void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), - int (*rpl_complete)(void *), - void (*rpl_marshal)(struct evbuffer *, void *)) -{ - struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); - if (rpc == NULL) - return (NULL); - rpc->uri = mm_strdup(name); - if (rpc->uri == NULL) { - mm_free(rpc); - return (NULL); - } - rpc->request_new = req_new; - rpc->request_new_arg = req_new_arg; - rpc->request_free = req_free; - rpc->request_unmarshal = req_unmarshal; - rpc->reply_new = rpl_new; - rpc->reply_new_arg = rpl_new_arg; - rpc->reply_free = rpl_free; - rpc->reply_complete = rpl_complete; - rpc->reply_marshal = rpl_marshal; - return (rpc); -} - -int -evrpc_register_generic(struct evrpc_base *base, const char *name, - void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, - void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), - int (*req_unmarshal)(void *, struct evbuffer *), - void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), - int (*rpl_complete)(void *), - void (*rpl_marshal)(struct evbuffer *, void *)) -{ - struct evrpc* rpc = - evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, - rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); - if (rpc == NULL) - return (-1); - evrpc_register_rpc(base, rpc, - (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); - return (0); -} - -/** accessors for obscure and undocumented functionality */ -struct evrpc_pool * -evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) -{ - return (ctx->pool); -} - -void -evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, - struct evrpc_pool *pool) -{ - ctx->pool = pool; -} - -void -evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, - void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), - void *cb_arg) -{ - ctx->cb = cb; - ctx->cb_arg = cb_arg; -} + + meta = mm_malloc(sizeof(struct evrpc_meta)); + EVUTIL_ASSERT(meta != NULL); + meta->key = mm_strdup(key); + EVUTIL_ASSERT(meta->key != NULL); + meta->data_size = data_size; + meta->data = mm_malloc(data_size); + EVUTIL_ASSERT(meta->data != NULL); + memcpy(meta->data, data, data_size); + + TAILQ_INSERT_TAIL(&store->meta_data, meta, next); +} + +int +evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) +{ + struct evrpc_request_wrapper *req = ctx; + struct evrpc_meta *meta = NULL; + + if (req->hook_meta == NULL) + return (-1); + + TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { + if (strcmp(meta->key, key) == 0) { + *data = meta->data; + *data_size = meta->data_size; + return (0); + } + } + + return (-1); +} + +struct evhttp_connection * +evrpc_hook_get_connection(void *ctx) +{ + struct evrpc_request_wrapper *req = ctx; + return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); +} + +int +evrpc_send_request_generic(struct evrpc_pool *pool, + void *request, void *reply, + void (*cb)(struct evrpc_status *, void *, void *, void *), + void *cb_arg, + const char *rpcname, + void (*req_marshal)(struct evbuffer *, void *), + void (*rpl_clear)(void *), + int (*rpl_unmarshal)(void *, struct evbuffer *)) +{ + struct evrpc_status status; + struct evrpc_request_wrapper *ctx; + ctx = evrpc_make_request_ctx(pool, request, reply, + rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); + if (ctx == NULL) + goto error; + return (evrpc_make_request(ctx)); +error: + memset(&status, 0, sizeof(status)); + status.error = EVRPC_STATUS_ERR_UNSTARTED; + (*(cb))(&status, request, reply, cb_arg); + return (-1); +} + +/** Takes a request object and fills it in with the right magic */ +static struct evrpc * +evrpc_register_object(const char *name, + void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), + int (*req_unmarshal)(void *, struct evbuffer *), + void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), + int (*rpl_complete)(void *), + void (*rpl_marshal)(struct evbuffer *, void *)) +{ + struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); + if (rpc == NULL) + return (NULL); + rpc->uri = mm_strdup(name); + if (rpc->uri == NULL) { + mm_free(rpc); + return (NULL); + } + rpc->request_new = req_new; + rpc->request_new_arg = req_new_arg; + rpc->request_free = req_free; + rpc->request_unmarshal = req_unmarshal; + rpc->reply_new = rpl_new; + rpc->reply_new_arg = rpl_new_arg; + rpc->reply_free = rpl_free; + rpc->reply_complete = rpl_complete; + rpc->reply_marshal = rpl_marshal; + return (rpc); +} + +int +evrpc_register_generic(struct evrpc_base *base, const char *name, + void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, + void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), + int (*req_unmarshal)(void *, struct evbuffer *), + void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), + int (*rpl_complete)(void *), + void (*rpl_marshal)(struct evbuffer *, void *)) +{ + struct evrpc* rpc = + evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, + rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); + if (rpc == NULL) + return (-1); + evrpc_register_rpc(base, rpc, + (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); + return (0); +} + +/** accessors for obscure and undocumented functionality */ +struct evrpc_pool * +evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) +{ + return (ctx->pool); +} + +void +evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, + struct evrpc_pool *pool) +{ + ctx->pool = pool; +} + +void +evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, + void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), + void *cb_arg) +{ + ctx->cb = cb; + ctx->cb_arg = cb_arg; +} |