diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-12-13 16:01:38 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-12-13 16:01:38 +0300 |
commit | f2bea70bea01921ec43846224d100f2c70dd5719 (patch) | |
tree | eead917572063b63adc1c9a76284c8fbd10f25a3 /contrib/libs/liburing/test/socket.c | |
parent | 1ab9ee3dfe0ab4023a3a57bf55de31dff3eac908 (diff) | |
download | ydb-f2bea70bea01921ec43846224d100f2c70dd5719.tar.gz |
Add cross-link
Diffstat (limited to 'contrib/libs/liburing/test/socket.c')
-rw-r--r-- | contrib/libs/liburing/test/socket.c | 410 |
1 files changed, 410 insertions, 0 deletions
diff --git a/contrib/libs/liburing/test/socket.c b/contrib/libs/liburing/test/socket.c new file mode 100644 index 0000000000..c3250c8dc0 --- /dev/null +++ b/contrib/libs/liburing/test/socket.c @@ -0,0 +1,410 @@ +#include "../config-host.h" +/* SPDX-License-Identifier: MIT */ +/* + * Simple test case using the socket op + */ +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <arpa/inet.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <pthread.h> +#include <assert.h> + +#include "liburing.h" +#include "helpers.h" + +static char str[] = "This is a test of send and recv over io_uring!"; + +#define MAX_MSG 128 + +#define HOST "127.0.0.1" + +static int no_socket; +static __be32 g_port; + +static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock, + int registerfiles) +{ + struct sockaddr_in saddr; + struct io_uring_sqe *sqe; + int sockfd, ret, val, use_fd; + + memset(&saddr, 0, sizeof(saddr)); + saddr.sin_family = AF_INET; + saddr.sin_addr.s_addr = htonl(INADDR_ANY); + + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) { + perror("socket"); + return 1; + } + + val = 1; + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); + + if (t_bind_ephemeral_port(sockfd, &saddr)) { + perror("bind"); + goto err; + } + g_port = saddr.sin_port; + + if (registerfiles) { + ret = io_uring_register_files(ring, &sockfd, 1); + if (ret) { + fprintf(stderr, "file reg failed\n"); + goto err; + } + use_fd = 0; + } else { + use_fd = sockfd; + } + + sqe = io_uring_get_sqe(ring); + io_uring_prep_recv(sqe, use_fd, iov->iov_base, iov->iov_len, 0); + if (registerfiles) + sqe->flags |= IOSQE_FIXED_FILE; + sqe->user_data = 2; + + ret = io_uring_submit(ring); + if (ret <= 0) { + fprintf(stderr, "submit failed: %d\n", ret); + goto err; + } + + *sock = sockfd; + return 0; +err: + close(sockfd); + return 1; +} + +static int do_recv(struct io_uring *ring, struct iovec *iov) +{ + struct io_uring_cqe *cqe; + int ret; + + ret = io_uring_wait_cqe(ring, &cqe); + if (ret) { + fprintf(stdout, "wait_cqe: %d\n", ret); + goto err; + } + if (cqe->res == -EINVAL) { + fprintf(stdout, "recv not supported, skipping\n"); + return 0; + } + if (cqe->res < 0) { + fprintf(stderr, "failed cqe: %d\n", cqe->res); + goto err; + } + + if (cqe->res -1 != strlen(str)) { + fprintf(stderr, "got wrong length: %d/%d\n", cqe->res, + (int) strlen(str) + 1); + goto err; + } + + if (strcmp(str, iov->iov_base)) { + fprintf(stderr, "string mismatch\n"); + goto err; + } + + return 0; +err: + return 1; +} + +struct recv_data { + pthread_mutex_t mutex; + int use_sqthread; + int registerfiles; +}; + +static void *recv_fn(void *data) +{ + struct recv_data *rd = data; + char buf[MAX_MSG + 1]; + struct iovec iov = { + .iov_base = buf, + .iov_len = sizeof(buf) - 1, + }; + struct io_uring_params p = { }; + struct io_uring ring; + int ret, sock; + + if (rd->use_sqthread) + p.flags = IORING_SETUP_SQPOLL; + ret = t_create_ring_params(1, &ring, &p); + if (ret == T_SETUP_SKIP) { + pthread_mutex_unlock(&rd->mutex); + ret = 0; + goto err; + } else if (ret < 0) { + pthread_mutex_unlock(&rd->mutex); + goto err; + } + + if (rd->use_sqthread && !rd->registerfiles) { + if (!(p.features & IORING_FEAT_SQPOLL_NONFIXED)) { + fprintf(stdout, "Non-registered SQPOLL not available, skipping\n"); + pthread_mutex_unlock(&rd->mutex); + goto err; + } + } + + ret = recv_prep(&ring, &iov, &sock, rd->registerfiles); + if (ret) { + fprintf(stderr, "recv_prep failed: %d\n", ret); + goto err; + } + pthread_mutex_unlock(&rd->mutex); + ret = do_recv(&ring, &iov); + + close(sock); + io_uring_queue_exit(&ring); +err: + return (void *)(intptr_t)ret; +} + +static int fallback_send(struct io_uring *ring, struct sockaddr_in *saddr) +{ + struct iovec iov = { + .iov_base = str, + .iov_len = sizeof(str), + }; + struct io_uring_cqe *cqe; + struct io_uring_sqe *sqe; + int sockfd, ret; + + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) { + perror("socket"); + return 1; + } + + ret = connect(sockfd, (struct sockaddr *)saddr, sizeof(*saddr)); + if (ret < 0) { + perror("connect"); + return 1; + } + + sqe = io_uring_get_sqe(ring); + io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0); + sqe->user_data = 1; + + ret = io_uring_submit(ring); + if (ret <= 0) { + fprintf(stderr, "submit failed: %d\n", ret); + goto err; + } + + ret = io_uring_wait_cqe(ring, &cqe); + if (cqe->res == -EINVAL) { + fprintf(stdout, "send not supported, skipping\n"); + close(sockfd); + return 0; + } + if (cqe->res != iov.iov_len) { + fprintf(stderr, "failed cqe: %d\n", cqe->res); + goto err; + } + + close(sockfd); + return 0; +err: + close(sockfd); + return 1; +} + +static int do_send(int socket_direct, int alloc) +{ + struct sockaddr_in saddr; + struct iovec iov = { + .iov_base = str, + .iov_len = sizeof(str), + }; + struct io_uring ring; + struct io_uring_cqe *cqe; + struct io_uring_sqe *sqe; + int sockfd, ret, fd = -1; + + ret = io_uring_queue_init(1, &ring, 0); + if (ret) { + fprintf(stderr, "queue init failed: %d\n", ret); + return 1; + } + + if (socket_direct) { + ret = io_uring_register_files(&ring, &fd, 1); + if (ret) { + fprintf(stderr, "file register %d\n", ret); + return 1; + } + } + + assert(g_port != 0); + memset(&saddr, 0, sizeof(saddr)); + saddr.sin_family = AF_INET; + saddr.sin_port = g_port; + inet_pton(AF_INET, HOST, &saddr.sin_addr); + + sqe = io_uring_get_sqe(&ring); + if (socket_direct) { + unsigned file_index = 0; + if (alloc) + file_index = IORING_FILE_INDEX_ALLOC - 1; + io_uring_prep_socket_direct(sqe, AF_INET, SOCK_DGRAM, 0, + file_index, 0); + } else { + io_uring_prep_socket(sqe, AF_INET, SOCK_DGRAM, 0, 0); + } + ret = io_uring_submit(&ring); + if (ret != 1) { + fprintf(stderr, "socket submit: %d\n", ret); + return 1; + } + ret = io_uring_wait_cqe(&ring, &cqe); + if (ret) { + fprintf(stderr, "wait_cqe: %d\n", ret); + return 1; + } + if (cqe->res < 0) { + if (cqe->res == -EINVAL) { + fprintf(stdout, "No socket support, skipping\n"); + no_socket = 1; + io_uring_cqe_seen(&ring, cqe); + return fallback_send(&ring, &saddr); + } + + fprintf(stderr, "socket res: %d\n", ret); + return 1; + } + + sockfd = cqe->res; + if (socket_direct && !alloc) + sockfd = 0; + io_uring_cqe_seen(&ring, cqe); + + sqe = io_uring_get_sqe(&ring); + io_uring_prep_connect(sqe, sockfd, (struct sockaddr *) &saddr, + sizeof(saddr)); + if (socket_direct) + sqe->flags |= IOSQE_FIXED_FILE; + ret = io_uring_submit(&ring); + if (ret != 1) { + fprintf(stderr, "connect submit: %d\n", ret); + return 1; + } + ret = io_uring_wait_cqe(&ring, &cqe); + if (ret) { + fprintf(stderr, "wait_cqe: %d\n", ret); + return 1; + } + if (cqe->res < 0) { + fprintf(stderr, "connect res: %d\n", cqe->res); + return 1; + } + io_uring_cqe_seen(&ring, cqe); + + sqe = io_uring_get_sqe(&ring); + io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0); + sqe->user_data = 1; + if (socket_direct) + sqe->flags |= IOSQE_FIXED_FILE; + + ret = io_uring_submit(&ring); + if (ret <= 0) { + fprintf(stderr, "submit failed: %d\n", ret); + goto err; + } + + ret = io_uring_wait_cqe(&ring, &cqe); + if (cqe->res == -EINVAL) { + fprintf(stdout, "send not supported, skipping\n"); + close(sockfd); + return 0; + } + if (cqe->res != iov.iov_len) { + fprintf(stderr, "failed cqe: %d\n", cqe->res); + goto err; + } + + close(sockfd); + return 0; +err: + close(sockfd); + return 1; +} + +static int test(int use_sqthread, int regfiles, int socket_direct, int alloc) +{ + pthread_mutexattr_t attr; + pthread_t recv_thread; + struct recv_data rd; + int ret; + void *retval; + + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, 1); + pthread_mutex_init(&rd.mutex, &attr); + pthread_mutex_lock(&rd.mutex); + rd.use_sqthread = use_sqthread; + rd.registerfiles = regfiles; + + ret = pthread_create(&recv_thread, NULL, recv_fn, &rd); + if (ret) { + fprintf(stderr, "Thread create failed: %d\n", ret); + pthread_mutex_unlock(&rd.mutex); + return 1; + } + + pthread_mutex_lock(&rd.mutex); + do_send(socket_direct, alloc); + pthread_join(recv_thread, &retval); + return (intptr_t)retval; +} + +int main(int argc, char *argv[]) +{ + int ret; + + if (argc > 1) + return 0; + + ret = test(0, 0, 0, 0); + if (ret) { + fprintf(stderr, "test sqthread=0 failed\n"); + return ret; + } + if (no_socket) + return 0; + + ret = test(1, 1, 0, 0); + if (ret) { + fprintf(stderr, "test sqthread=1 reg=1 failed\n"); + return ret; + } + + ret = test(1, 0, 0, 0); + if (ret) { + fprintf(stderr, "test sqthread=1 reg=0 failed\n"); + return ret; + } + + ret = test(0, 0, 1, 0); + if (ret) { + fprintf(stderr, "test sqthread=0 direct=1 failed\n"); + return ret; + } + + ret = test(0, 0, 1, 1); + if (ret) { + fprintf(stderr, "test sqthread=0 direct=alloc failed\n"); + return ret; + } + + return 0; +} |