diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-07-08 15:54:05 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-07-08 15:54:05 +0000 |
commit | fc7be18c76af2e700641f3598c4856baeef1428e (patch) | |
tree | 11dbca45eb321c3a4dd08b12152acc6ef5dd3fa9 /yt/cpp/mapreduce/http/ut/simple_server.cpp | |
parent | ec0e7ed6da6fb317741fd8468602949a1362eca5 (diff) | |
parent | c92cb9d3a19331916f0c274d80e67f02a62caa9b (diff) | |
download | ydb-fc7be18c76af2e700641f3598c4856baeef1428e.tar.gz |
Merge branch 'rightlib' into mergelibs-240708-1553
Diffstat (limited to 'yt/cpp/mapreduce/http/ut/simple_server.cpp')
-rw-r--r-- | yt/cpp/mapreduce/http/ut/simple_server.cpp | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http/ut/simple_server.cpp b/yt/cpp/mapreduce/http/ut/simple_server.cpp new file mode 100644 index 0000000000..fbc369ec20 --- /dev/null +++ b/yt/cpp/mapreduce/http/ut/simple_server.cpp @@ -0,0 +1,90 @@ +#include "simple_server.h" + +#include <util/network/pair.h> +#include <util/network/poller.h> +#include <util/network/sock.h> +#include <util/string/builder.h> +#include <util/system/thread.h> +#include <util/thread/pool.h> + +TSimpleServer::TSimpleServer(int port, TRequestHandler requestHandler) + : Port_(port) +{ + auto listenSocket = MakeAtomicShared<TInetStreamSocket>(); + TSockAddrInet addr((TIpHost)INADDR_ANY, Port_); + SetSockOpt(*listenSocket, SOL_SOCKET, SO_REUSEADDR, 1); + int ret = listenSocket->Bind(&addr); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not bind"); + + SOCKET socketPair[2]; + ret = SocketPair(socketPair); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not create socket pair"); + + ret = listenSocket->Listen(10); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not listen socket"); + + SendFinishSocket_ = MakeHolder<TInetStreamSocket>(socketPair[1]); + + ThreadPool_ = MakeHolder<TAdaptiveThreadPool>(); + ThreadPool_->Start(1); + + auto receiveFinish = MakeAtomicShared<TInetStreamSocket>(socketPair[0]); + ListenerThread_ = ThreadPool_->Run([listenSocket, receiveFinish, requestHandler] { + TSocketPoller socketPoller; + socketPoller.WaitRead(*receiveFinish, nullptr); + socketPoller.WaitRead(*listenSocket, (void*)1); + + bool running = true; + while (running) { + void* cookies[2]; + size_t cookieCount = socketPoller.WaitI(cookies, 2); + for (size_t i = 0; i != cookieCount; ++i) { + if (!cookies[i]) { + running = false; + } else { + TSockAddrInet addr; + TAtomicSharedPtr<TStreamSocket> socket = MakeAtomicShared<TInetStreamSocket>(); + int ret = listenSocket->Accept(socket.Get(), &addr); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not accept connection"); + + SystemThreadFactory()->Run( + [socket, requestHandler] { + TStreamSocketInput input(socket.Get()); + TStreamSocketOutput output(socket.Get()); + requestHandler(&input, &output); + socket->Close(); + }); + } + } + } + }); +} + +TSimpleServer::~TSimpleServer() +{ + try { + if (ThreadPool_) { + Stop(); + } + } catch (...) { + } +} + +void TSimpleServer::Stop() +{ + // Just send something to indicate shutdown. + SendFinishSocket_->Send("X", 1); + ListenerThread_->Join(); + ThreadPool_->Stop(); + ThreadPool_.Destroy(); +} + +int TSimpleServer::GetPort() const +{ + return Port_; +} + +TString TSimpleServer::GetAddress() const +{ + return TStringBuilder() << "localhost:" << Port_; +} |