aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/libevent/bufferevent_async.c
diff options
context:
space:
mode:
authorkikht <kikht@yandex-team.ru>2022-02-10 16:45:14 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:14 +0300
commit778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (patch)
treebe835aa92c6248212e705f25388ebafcf84bc7a1 /contrib/libs/libevent/bufferevent_async.c
parent194cae0e8855b11be2005e1eff12c660c3ee9774 (diff)
downloadydb-778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5.tar.gz
Restoring authorship annotation for <kikht@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/libevent/bufferevent_async.c')
-rw-r--r--contrib/libs/libevent/bufferevent_async.c218
1 files changed, 109 insertions, 109 deletions
diff --git a/contrib/libs/libevent/bufferevent_async.c b/contrib/libs/libevent/bufferevent_async.c
index 5548721234..40c7c5e8d0 100644
--- a/contrib/libs/libevent/bufferevent_async.c
+++ b/contrib/libs/libevent/bufferevent_async.c
@@ -27,9 +27,9 @@
*/
#include "event2/event-config.h"
-#include "evconfig-private.h"
+#include "evconfig-private.h"
-#ifdef EVENT__HAVE_SYS_TIME_H
+#ifdef EVENT__HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
@@ -37,16 +37,16 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#ifdef EVENT__HAVE_STDARG_H
+#ifdef EVENT__HAVE_STDARG_H
#include <stdarg.h>
#endif
-#ifdef EVENT__HAVE_UNISTD_H
+#ifdef EVENT__HAVE_UNISTD_H
#include <unistd.h>
#endif
-#ifdef _WIN32
+#ifdef _WIN32
#include <winsock2.h>
-#include <winerror.h>
+#include <winerror.h>
#include <ws2tcpip.h>
#endif
@@ -94,39 +94,39 @@ const struct bufferevent_ops bufferevent_ops_async = {
evutil_offsetof(struct bufferevent_async, bev.bev),
be_async_enable,
be_async_disable,
- NULL, /* Unlink */
+ NULL, /* Unlink */
be_async_destruct,
- bufferevent_generic_adj_timeouts_,
+ bufferevent_generic_adj_timeouts_,
be_async_flush,
be_async_ctrl,
};
-static inline void
-be_async_run_eventcb(struct bufferevent *bev, short what, int options)
-{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
-
-static inline void
-be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
-{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
-
-static inline int
-fatal_error(int err)
-{
- switch (err) {
- /* We may have already associated this fd with a port.
- * Let's hope it's this port, and that the error code
- * for doing this neer changes. */
- case ERROR_INVALID_PARAMETER:
- return 0;
- }
- return 1;
-}
-
+static inline void
+be_async_run_eventcb(struct bufferevent *bev, short what, int options)
+{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
+
+static inline void
+be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
+{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
+
+static inline int
+fatal_error(int err)
+{
+ switch (err) {
+ /* We may have already associated this fd with a port.
+ * Let's hope it's this port, and that the error code
+ * for doing this neer changes. */
+ case ERROR_INVALID_PARAMETER:
+ return 0;
+ }
+ return 1;
+}
+
static inline struct bufferevent_async *
upcast(struct bufferevent *bev)
{
struct bufferevent_async *bev_a;
- if (!BEV_IS_ASYNC(bev))
+ if (!BEV_IS_ASYNC(bev))
return NULL;
bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
return bev_a;
@@ -166,7 +166,7 @@ bev_async_del_write(struct bufferevent_async *beva)
if (beva->write_added) {
beva->write_added = 0;
- event_base_del_virtual_(bev->ev_base);
+ event_base_del_virtual_(bev->ev_base);
}
}
@@ -177,7 +177,7 @@ bev_async_del_read(struct bufferevent_async *beva)
if (beva->read_added) {
beva->read_added = 0;
- event_base_del_virtual_(bev->ev_base);
+ event_base_del_virtual_(bev->ev_base);
}
}
@@ -188,7 +188,7 @@ bev_async_add_write(struct bufferevent_async *beva)
if (!beva->write_added) {
beva->write_added = 1;
- event_base_add_virtual_(bev->ev_base);
+ event_base_add_virtual_(bev->ev_base);
}
}
@@ -199,7 +199,7 @@ bev_async_add_read(struct bufferevent_async *beva)
if (!beva->read_added) {
beva->read_added = 1;
- event_base_add_virtual_(bev->ev_base);
+ event_base_add_virtual_(bev->ev_base);
}
}
@@ -224,7 +224,7 @@ bev_async_consider_writing(struct bufferevent_async *beva)
/* This is safe so long as bufferevent_get_write_max never returns
* more than INT_MAX. That's true for now. XXXX */
- limit = (int)bufferevent_get_write_max_(&beva->bev);
+ limit = (int)bufferevent_get_write_max_(&beva->bev);
if (at_most >= (size_t)limit && limit >= 0)
at_most = limit;
@@ -234,15 +234,15 @@ bev_async_consider_writing(struct bufferevent_async *beva)
}
/* XXXX doesn't respect low-water mark very well. */
- bufferevent_incref_(bev);
- if (evbuffer_launch_write_(bev->output, at_most,
+ bufferevent_incref_(bev);
+ if (evbuffer_launch_write_(bev->output, at_most,
&beva->write_overlapped)) {
- bufferevent_decref_(bev);
+ bufferevent_decref_(bev);
beva->ok = 0;
- be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
+ be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
} else {
beva->write_in_progress = at_most;
- bufferevent_decrement_write_buckets_(&beva->bev, at_most);
+ bufferevent_decrement_write_buckets_(&beva->bev, at_most);
bev_async_add_write(beva);
}
}
@@ -279,8 +279,8 @@ bev_async_consider_reading(struct bufferevent_async *beva)
}
/* XXXX This over-commits. */
- /* XXXX see also not above on cast on bufferevent_get_write_max_() */
- limit = (int)bufferevent_get_read_max_(&beva->bev);
+ /* XXXX see also not above on cast on bufferevent_get_write_max_() */
+ limit = (int)bufferevent_get_read_max_(&beva->bev);
if (at_most >= (size_t)limit && limit >= 0)
at_most = limit;
@@ -289,14 +289,14 @@ bev_async_consider_reading(struct bufferevent_async *beva)
return;
}
- bufferevent_incref_(bev);
- if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
+ bufferevent_incref_(bev);
+ if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
- be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
- bufferevent_decref_(bev);
+ be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
+ bufferevent_decref_(bev);
} else {
beva->read_in_progress = at_most;
- bufferevent_decrement_read_buckets_(&beva->bev, at_most);
+ bufferevent_decrement_read_buckets_(&beva->bev, at_most);
bev_async_add_read(beva);
}
@@ -314,12 +314,12 @@ be_async_outbuf_callback(struct evbuffer *buf,
/* If we added data to the outbuf and were not writing before,
* we may want to write now. */
- bufferevent_incref_and_lock_(bev);
+ bufferevent_incref_and_lock_(bev);
if (cbinfo->n_added)
bev_async_consider_writing(bev_async);
- bufferevent_decref_and_unlock_(bev);
+ bufferevent_decref_and_unlock_(bev);
}
static void
@@ -333,12 +333,12 @@ be_async_inbuf_callback(struct evbuffer *buf,
/* If we drained data from the inbuf and were not reading before,
* we may want to read now */
- bufferevent_incref_and_lock_(bev);
+ bufferevent_incref_and_lock_(bev);
if (cbinfo->n_deleted)
bev_async_consider_reading(bev_async);
- bufferevent_decref_and_unlock_(bev);
+ bufferevent_decref_and_unlock_(bev);
}
static int
@@ -402,11 +402,11 @@ be_async_destruct(struct bufferevent *bev)
bev_async_del_read(bev_async);
bev_async_del_write(bev_async);
- fd = evbuffer_overlapped_get_fd_(bev->input);
- if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
- (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
+ fd = evbuffer_overlapped_get_fd_(bev->input);
+ if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
+ (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
evutil_closesocket(fd);
- evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
+ evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
}
}
@@ -418,7 +418,7 @@ bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
DWORD bytes, flags;
evutil_socket_t fd;
- fd = evbuffer_overlapped_get_fd_(bev->input);
+ fd = evbuffer_overlapped_get_fd_(bev->input);
WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
}
@@ -441,20 +441,20 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
- sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
+ sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
/* XXXX Handle error? */
setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
if (ok)
- bufferevent_async_set_connected_(bev);
+ bufferevent_async_set_connected_(bev);
else
bev_async_set_wsa_error(bev, eo);
- be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
+ be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
- event_base_del_virtual_(bev->ev_base);
+ event_base_del_virtual_(bev->ev_base);
- bufferevent_decref_and_unlock_(bev);
+ bufferevent_decref_and_unlock_(bev);
}
static void
@@ -469,10 +469,10 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
EVUTIL_ASSERT(bev_a->read_in_progress);
amount_unread = bev_a->read_in_progress - nbytes;
- evbuffer_commit_read_(bev->input, nbytes);
+ evbuffer_commit_read_(bev->input, nbytes);
bev_a->read_in_progress = 0;
if (amount_unread)
- bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
+ bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
if (!ok)
bev_async_set_wsa_error(bev, eo);
@@ -480,20 +480,20 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- be_async_trigger_nolock(bev, EV_READ, 0);
+ be_async_trigger_nolock(bev, EV_READ, 0);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
- be_async_run_eventcb(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
- be_async_run_eventcb(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
}
}
- bufferevent_decref_and_unlock_(bev);
+ bufferevent_decref_and_unlock_(bev);
}
static void
@@ -509,11 +509,11 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
EVUTIL_ASSERT(bev_a->write_in_progress);
amount_unwritten = bev_a->write_in_progress - nbytes;
- evbuffer_commit_write_(bev->output, nbytes);
+ evbuffer_commit_write_(bev->output, nbytes);
bev_a->write_in_progress = 0;
if (amount_unwritten)
- bufferevent_decrement_write_buckets_(&bev_a->bev,
+ bufferevent_decrement_write_buckets_(&bev_a->bev,
-amount_unwritten);
@@ -523,24 +523,24 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- be_async_trigger_nolock(bev, EV_WRITE, 0);
+ be_async_trigger_nolock(bev, EV_WRITE, 0);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
- be_async_run_eventcb(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
- be_async_run_eventcb(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
}
}
- bufferevent_decref_and_unlock_(bev);
+ bufferevent_decref_and_unlock_(bev);
}
struct bufferevent *
-bufferevent_async_new_(struct event_base *base,
+bufferevent_async_new_(struct event_base *base,
evutil_socket_t fd, int options)
{
struct bufferevent_async *bev_a;
@@ -549,11 +549,11 @@ bufferevent_async_new_(struct event_base *base,
options |= BEV_OPT_THREADSAFE;
- if (!(iocp = event_base_get_iocp_(base)))
+ if (!(iocp = event_base_get_iocp_(base)))
return NULL;
- if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
- if (fatal_error(GetLastError()))
+ if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
+ if (fatal_error(GetLastError()))
return NULL;
}
@@ -561,29 +561,29 @@ bufferevent_async_new_(struct event_base *base,
return NULL;
bev = &bev_a->bev.bev;
- if (!(bev->input = evbuffer_overlapped_new_(fd))) {
+ if (!(bev->input = evbuffer_overlapped_new_(fd))) {
mm_free(bev_a);
return NULL;
}
- if (!(bev->output = evbuffer_overlapped_new_(fd))) {
+ if (!(bev->output = evbuffer_overlapped_new_(fd))) {
evbuffer_free(bev->input);
mm_free(bev_a);
return NULL;
}
- if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
+ if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
options)<0)
goto err;
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
- event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
- event_overlapped_init_(&bev_a->read_overlapped, read_complete);
- event_overlapped_init_(&bev_a->write_overlapped, write_complete);
+ event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
+ event_overlapped_init_(&bev_a->read_overlapped, read_complete);
+ event_overlapped_init_(&bev_a->write_overlapped, write_complete);
+
+ bufferevent_init_generic_timeout_cbs_(bev);
- bufferevent_init_generic_timeout_cbs_(bev);
-
bev_a->ok = fd >= 0;
return bev;
@@ -593,7 +593,7 @@ err:
}
void
-bufferevent_async_set_connected_(struct bufferevent *bev)
+bufferevent_async_set_connected_(struct bufferevent *bev)
{
struct bufferevent_async *bev_async = upcast(bev);
bev_async->ok = 1;
@@ -602,13 +602,13 @@ bufferevent_async_set_connected_(struct bufferevent *bev)
}
int
-bufferevent_async_can_connect_(struct bufferevent *bev)
+bufferevent_async_can_connect_(struct bufferevent *bev)
{
const struct win32_extension_fns *ext =
- event_get_win32_extension_fns_();
+ event_get_win32_extension_fns_();
if (BEV_IS_ASYNC(bev) &&
- event_base_get_iocp_(bev->ev_base) &&
+ event_base_get_iocp_(bev->ev_base) &&
ext && ext->ConnectEx)
return 1;
@@ -616,14 +616,14 @@ bufferevent_async_can_connect_(struct bufferevent *bev)
}
int
-bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
+bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
const struct sockaddr *sa, int socklen)
{
BOOL rc;
struct bufferevent_async *bev_async = upcast(bev);
struct sockaddr_storage ss;
const struct win32_extension_fns *ext =
- event_get_win32_extension_fns_();
+ event_get_win32_extension_fns_();
EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
@@ -648,15 +648,15 @@ bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
WSAGetLastError() != WSAEINVAL)
return -1;
- event_base_add_virtual_(bev->ev_base);
- bufferevent_incref_(bev);
+ event_base_add_virtual_(bev->ev_base);
+ bufferevent_incref_(bev);
rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
&bev_async->connect_overlapped.overlapped);
if (rc || WSAGetLastError() == ERROR_IO_PENDING)
return 0;
- event_base_del_virtual_(bev->ev_base);
- bufferevent_decref_(bev);
+ event_base_del_virtual_(bev->ev_base);
+ bufferevent_decref_(bev);
return -1;
}
@@ -667,32 +667,32 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
{
switch (op) {
case BEV_CTRL_GET_FD:
- data->fd = evbuffer_overlapped_get_fd_(bev->input);
+ data->fd = evbuffer_overlapped_get_fd_(bev->input);
return 0;
case BEV_CTRL_SET_FD: {
- struct bufferevent_async *bev_a = upcast(bev);
+ struct bufferevent_async *bev_a = upcast(bev);
struct event_iocp_port *iocp;
- if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
+ if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
return 0;
- if (!(iocp = event_base_get_iocp_(bev->ev_base)))
+ if (!(iocp = event_base_get_iocp_(bev->ev_base)))
return -1;
- if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
- if (fatal_error(GetLastError()))
- return -1;
- }
- evbuffer_overlapped_set_fd_(bev->input, data->fd);
- evbuffer_overlapped_set_fd_(bev->output, data->fd);
- bev_a->ok = data->fd >= 0;
+ if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
+ if (fatal_error(GetLastError()))
+ return -1;
+ }
+ evbuffer_overlapped_set_fd_(bev->input, data->fd);
+ evbuffer_overlapped_set_fd_(bev->output, data->fd);
+ bev_a->ok = data->fd >= 0;
return 0;
}
case BEV_CTRL_CANCEL_ALL: {
struct bufferevent_async *bev_a = upcast(bev);
- evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
- if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
+ evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
+ if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
(bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
closesocket(fd);
- evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
+ evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
}
bev_a->ok = 0;
return 0;