diff options
author | kikht <kikht@yandex-team.ru> | 2022-02-10 16:45:14 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:14 +0300 |
commit | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (patch) | |
tree | be835aa92c6248212e705f25388ebafcf84bc7a1 /contrib/libs/libevent/bufferevent_async.c | |
parent | 194cae0e8855b11be2005e1eff12c660c3ee9774 (diff) | |
download | ydb-778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5.tar.gz |
Restoring authorship annotation for <kikht@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/libevent/bufferevent_async.c')
-rw-r--r-- | contrib/libs/libevent/bufferevent_async.c | 218 |
1 files changed, 109 insertions, 109 deletions
diff --git a/contrib/libs/libevent/bufferevent_async.c b/contrib/libs/libevent/bufferevent_async.c index 5548721234..40c7c5e8d0 100644 --- a/contrib/libs/libevent/bufferevent_async.c +++ b/contrib/libs/libevent/bufferevent_async.c @@ -27,9 +27,9 @@ */ #include "event2/event-config.h" -#include "evconfig-private.h" +#include "evconfig-private.h" -#ifdef EVENT__HAVE_SYS_TIME_H +#ifdef EVENT__HAVE_SYS_TIME_H #include <sys/time.h> #endif @@ -37,16 +37,16 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#ifdef EVENT__HAVE_STDARG_H +#ifdef EVENT__HAVE_STDARG_H #include <stdarg.h> #endif -#ifdef EVENT__HAVE_UNISTD_H +#ifdef EVENT__HAVE_UNISTD_H #include <unistd.h> #endif -#ifdef _WIN32 +#ifdef _WIN32 #include <winsock2.h> -#include <winerror.h> +#include <winerror.h> #include <ws2tcpip.h> #endif @@ -94,39 +94,39 @@ const struct bufferevent_ops bufferevent_ops_async = { evutil_offsetof(struct bufferevent_async, bev.bev), be_async_enable, be_async_disable, - NULL, /* Unlink */ + NULL, /* Unlink */ be_async_destruct, - bufferevent_generic_adj_timeouts_, + bufferevent_generic_adj_timeouts_, be_async_flush, be_async_ctrl, }; -static inline void -be_async_run_eventcb(struct bufferevent *bev, short what, int options) -{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } - -static inline void -be_async_trigger_nolock(struct bufferevent *bev, short what, int options) -{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } - -static inline int -fatal_error(int err) -{ - switch (err) { - /* We may have already associated this fd with a port. - * Let's hope it's this port, and that the error code - * for doing this neer changes. */ - case ERROR_INVALID_PARAMETER: - return 0; - } - return 1; -} - +static inline void +be_async_run_eventcb(struct bufferevent *bev, short what, int options) +{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } + +static inline void +be_async_trigger_nolock(struct bufferevent *bev, short what, int options) +{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } + +static inline int +fatal_error(int err) +{ + switch (err) { + /* We may have already associated this fd with a port. + * Let's hope it's this port, and that the error code + * for doing this neer changes. */ + case ERROR_INVALID_PARAMETER: + return 0; + } + return 1; +} + static inline struct bufferevent_async * upcast(struct bufferevent *bev) { struct bufferevent_async *bev_a; - if (!BEV_IS_ASYNC(bev)) + if (!BEV_IS_ASYNC(bev)) return NULL; bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); return bev_a; @@ -166,7 +166,7 @@ bev_async_del_write(struct bufferevent_async *beva) if (beva->write_added) { beva->write_added = 0; - event_base_del_virtual_(bev->ev_base); + event_base_del_virtual_(bev->ev_base); } } @@ -177,7 +177,7 @@ bev_async_del_read(struct bufferevent_async *beva) if (beva->read_added) { beva->read_added = 0; - event_base_del_virtual_(bev->ev_base); + event_base_del_virtual_(bev->ev_base); } } @@ -188,7 +188,7 @@ bev_async_add_write(struct bufferevent_async *beva) if (!beva->write_added) { beva->write_added = 1; - event_base_add_virtual_(bev->ev_base); + event_base_add_virtual_(bev->ev_base); } } @@ -199,7 +199,7 @@ bev_async_add_read(struct bufferevent_async *beva) if (!beva->read_added) { beva->read_added = 1; - event_base_add_virtual_(bev->ev_base); + event_base_add_virtual_(bev->ev_base); } } @@ -224,7 +224,7 @@ bev_async_consider_writing(struct bufferevent_async *beva) /* This is safe so long as bufferevent_get_write_max never returns * more than INT_MAX. That's true for now. XXXX */ - limit = (int)bufferevent_get_write_max_(&beva->bev); + limit = (int)bufferevent_get_write_max_(&beva->bev); if (at_most >= (size_t)limit && limit >= 0) at_most = limit; @@ -234,15 +234,15 @@ bev_async_consider_writing(struct bufferevent_async *beva) } /* XXXX doesn't respect low-water mark very well. */ - bufferevent_incref_(bev); - if (evbuffer_launch_write_(bev->output, at_most, + bufferevent_incref_(bev); + if (evbuffer_launch_write_(bev->output, at_most, &beva->write_overlapped)) { - bufferevent_decref_(bev); + bufferevent_decref_(bev); beva->ok = 0; - be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); + be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); } else { beva->write_in_progress = at_most; - bufferevent_decrement_write_buckets_(&beva->bev, at_most); + bufferevent_decrement_write_buckets_(&beva->bev, at_most); bev_async_add_write(beva); } } @@ -279,8 +279,8 @@ bev_async_consider_reading(struct bufferevent_async *beva) } /* XXXX This over-commits. */ - /* XXXX see also not above on cast on bufferevent_get_write_max_() */ - limit = (int)bufferevent_get_read_max_(&beva->bev); + /* XXXX see also not above on cast on bufferevent_get_write_max_() */ + limit = (int)bufferevent_get_read_max_(&beva->bev); if (at_most >= (size_t)limit && limit >= 0) at_most = limit; @@ -289,14 +289,14 @@ bev_async_consider_reading(struct bufferevent_async *beva) return; } - bufferevent_incref_(bev); - if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { + bufferevent_incref_(bev); + if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { beva->ok = 0; - be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); - bufferevent_decref_(bev); + be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); + bufferevent_decref_(bev); } else { beva->read_in_progress = at_most; - bufferevent_decrement_read_buckets_(&beva->bev, at_most); + bufferevent_decrement_read_buckets_(&beva->bev, at_most); bev_async_add_read(beva); } @@ -314,12 +314,12 @@ be_async_outbuf_callback(struct evbuffer *buf, /* If we added data to the outbuf and were not writing before, * we may want to write now. */ - bufferevent_incref_and_lock_(bev); + bufferevent_incref_and_lock_(bev); if (cbinfo->n_added) bev_async_consider_writing(bev_async); - bufferevent_decref_and_unlock_(bev); + bufferevent_decref_and_unlock_(bev); } static void @@ -333,12 +333,12 @@ be_async_inbuf_callback(struct evbuffer *buf, /* If we drained data from the inbuf and were not reading before, * we may want to read now */ - bufferevent_incref_and_lock_(bev); + bufferevent_incref_and_lock_(bev); if (cbinfo->n_deleted) bev_async_consider_reading(bev_async); - bufferevent_decref_and_unlock_(bev); + bufferevent_decref_and_unlock_(bev); } static int @@ -402,11 +402,11 @@ be_async_destruct(struct bufferevent *bev) bev_async_del_read(bev_async); bev_async_del_write(bev_async); - fd = evbuffer_overlapped_get_fd_(bev->input); - if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && - (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { + fd = evbuffer_overlapped_get_fd_(bev->input); + if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && + (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { evutil_closesocket(fd); - evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); + evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); } } @@ -418,7 +418,7 @@ bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) DWORD bytes, flags; evutil_socket_t fd; - fd = evbuffer_overlapped_get_fd_(bev->input); + fd = evbuffer_overlapped_get_fd_(bev->input); WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); } @@ -441,20 +441,20 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key, EVUTIL_ASSERT(bev_a->bev.connecting); bev_a->bev.connecting = 0; - sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); + sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); /* XXXX Handle error? */ setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); if (ok) - bufferevent_async_set_connected_(bev); + bufferevent_async_set_connected_(bev); else bev_async_set_wsa_error(bev, eo); - be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); + be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); - event_base_del_virtual_(bev->ev_base); + event_base_del_virtual_(bev->ev_base); - bufferevent_decref_and_unlock_(bev); + bufferevent_decref_and_unlock_(bev); } static void @@ -469,10 +469,10 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, EVUTIL_ASSERT(bev_a->read_in_progress); amount_unread = bev_a->read_in_progress - nbytes; - evbuffer_commit_read_(bev->input, nbytes); + evbuffer_commit_read_(bev->input, nbytes); bev_a->read_in_progress = 0; if (amount_unread) - bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); + bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); if (!ok) bev_async_set_wsa_error(bev, eo); @@ -480,20 +480,20 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - be_async_trigger_nolock(bev, EV_READ, 0); + be_async_trigger_nolock(bev, EV_READ, 0); bev_async_consider_reading(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - be_async_run_eventcb(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - be_async_run_eventcb(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } } - bufferevent_decref_and_unlock_(bev); + bufferevent_decref_and_unlock_(bev); } static void @@ -509,11 +509,11 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, EVUTIL_ASSERT(bev_a->write_in_progress); amount_unwritten = bev_a->write_in_progress - nbytes; - evbuffer_commit_write_(bev->output, nbytes); + evbuffer_commit_write_(bev->output, nbytes); bev_a->write_in_progress = 0; if (amount_unwritten) - bufferevent_decrement_write_buckets_(&bev_a->bev, + bufferevent_decrement_write_buckets_(&bev_a->bev, -amount_unwritten); @@ -523,24 +523,24 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - be_async_trigger_nolock(bev, EV_WRITE, 0); + be_async_trigger_nolock(bev, EV_WRITE, 0); bev_async_consider_writing(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - be_async_run_eventcb(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - be_async_run_eventcb(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } } - bufferevent_decref_and_unlock_(bev); + bufferevent_decref_and_unlock_(bev); } struct bufferevent * -bufferevent_async_new_(struct event_base *base, +bufferevent_async_new_(struct event_base *base, evutil_socket_t fd, int options) { struct bufferevent_async *bev_a; @@ -549,11 +549,11 @@ bufferevent_async_new_(struct event_base *base, options |= BEV_OPT_THREADSAFE; - if (!(iocp = event_base_get_iocp_(base))) + if (!(iocp = event_base_get_iocp_(base))) return NULL; - if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { - if (fatal_error(GetLastError())) + if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { + if (fatal_error(GetLastError())) return NULL; } @@ -561,29 +561,29 @@ bufferevent_async_new_(struct event_base *base, return NULL; bev = &bev_a->bev.bev; - if (!(bev->input = evbuffer_overlapped_new_(fd))) { + if (!(bev->input = evbuffer_overlapped_new_(fd))) { mm_free(bev_a); return NULL; } - if (!(bev->output = evbuffer_overlapped_new_(fd))) { + if (!(bev->output = evbuffer_overlapped_new_(fd))) { evbuffer_free(bev->input); mm_free(bev_a); return NULL; } - if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, + if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, options)<0) goto err; evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); - event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); - event_overlapped_init_(&bev_a->read_overlapped, read_complete); - event_overlapped_init_(&bev_a->write_overlapped, write_complete); + event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); + event_overlapped_init_(&bev_a->read_overlapped, read_complete); + event_overlapped_init_(&bev_a->write_overlapped, write_complete); + + bufferevent_init_generic_timeout_cbs_(bev); - bufferevent_init_generic_timeout_cbs_(bev); - bev_a->ok = fd >= 0; return bev; @@ -593,7 +593,7 @@ err: } void -bufferevent_async_set_connected_(struct bufferevent *bev) +bufferevent_async_set_connected_(struct bufferevent *bev) { struct bufferevent_async *bev_async = upcast(bev); bev_async->ok = 1; @@ -602,13 +602,13 @@ bufferevent_async_set_connected_(struct bufferevent *bev) } int -bufferevent_async_can_connect_(struct bufferevent *bev) +bufferevent_async_can_connect_(struct bufferevent *bev) { const struct win32_extension_fns *ext = - event_get_win32_extension_fns_(); + event_get_win32_extension_fns_(); if (BEV_IS_ASYNC(bev) && - event_base_get_iocp_(bev->ev_base) && + event_base_get_iocp_(bev->ev_base) && ext && ext->ConnectEx) return 1; @@ -616,14 +616,14 @@ bufferevent_async_can_connect_(struct bufferevent *bev) } int -bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, +bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, const struct sockaddr *sa, int socklen) { BOOL rc; struct bufferevent_async *bev_async = upcast(bev); struct sockaddr_storage ss; const struct win32_extension_fns *ext = - event_get_win32_extension_fns_(); + event_get_win32_extension_fns_(); EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); @@ -648,15 +648,15 @@ bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, WSAGetLastError() != WSAEINVAL) return -1; - event_base_add_virtual_(bev->ev_base); - bufferevent_incref_(bev); + event_base_add_virtual_(bev->ev_base); + bufferevent_incref_(bev); rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, &bev_async->connect_overlapped.overlapped); if (rc || WSAGetLastError() == ERROR_IO_PENDING) return 0; - event_base_del_virtual_(bev->ev_base); - bufferevent_decref_(bev); + event_base_del_virtual_(bev->ev_base); + bufferevent_decref_(bev); return -1; } @@ -667,32 +667,32 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, { switch (op) { case BEV_CTRL_GET_FD: - data->fd = evbuffer_overlapped_get_fd_(bev->input); + data->fd = evbuffer_overlapped_get_fd_(bev->input); return 0; case BEV_CTRL_SET_FD: { - struct bufferevent_async *bev_a = upcast(bev); + struct bufferevent_async *bev_a = upcast(bev); struct event_iocp_port *iocp; - if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) + if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) return 0; - if (!(iocp = event_base_get_iocp_(bev->ev_base))) + if (!(iocp = event_base_get_iocp_(bev->ev_base))) return -1; - if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) { - if (fatal_error(GetLastError())) - return -1; - } - evbuffer_overlapped_set_fd_(bev->input, data->fd); - evbuffer_overlapped_set_fd_(bev->output, data->fd); - bev_a->ok = data->fd >= 0; + if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) { + if (fatal_error(GetLastError())) + return -1; + } + evbuffer_overlapped_set_fd_(bev->input, data->fd); + evbuffer_overlapped_set_fd_(bev->output, data->fd); + bev_a->ok = data->fd >= 0; return 0; } case BEV_CTRL_CANCEL_ALL: { struct bufferevent_async *bev_a = upcast(bev); - evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); - if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && + evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); + if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { closesocket(fd); - evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); + evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); } bev_a->ok = 0; return 0; |