aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/asio/tcp_socket_impl.h
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/asio/tcp_socket_impl.h
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/asio/tcp_socket_impl.h')
-rw-r--r--library/cpp/neh/asio/tcp_socket_impl.h332
1 files changed, 332 insertions, 0 deletions
diff --git a/library/cpp/neh/asio/tcp_socket_impl.h b/library/cpp/neh/asio/tcp_socket_impl.h
new file mode 100644
index 0000000000..44f8f42d87
--- /dev/null
+++ b/library/cpp/neh/asio/tcp_socket_impl.h
@@ -0,0 +1,332 @@
+#pragma once
+
+#include "asio.h"
+#include "io_service_impl.h"
+
+#include <sys/uio.h>
+
+#if defined(_bionic_)
+# define IOV_MAX 1024
+#endif
+
+namespace NAsio {
+ // ownership/keep-alive references:
+ // Handlers <- TOperation...(TFdOperation) <- TPollFdEventHandler <- TIOService
+
+ class TSocketOperation: public TFdOperation {
+ public:
+ TSocketOperation(TTcpSocket::TImpl& s, TPollType pt, TInstant deadline);
+
+ protected:
+ TTcpSocket::TImpl& S_;
+ };
+
+ class TOperationConnect: public TSocketOperation {
+ public:
+ TOperationConnect(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ {
+ }
+
+ bool Execute(int errorCode) override {
+ H_(errorCode, *this);
+
+ return true;
+ }
+
+ TTcpSocket::TConnectHandler H_;
+ };
+
+ class TOperationConnectFailed: public TSocketOperation {
+ public:
+ TOperationConnectFailed(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, int errorCode, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ , ErrorCode_(errorCode)
+ {
+ Speculative_ = true;
+ }
+
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ H_(ErrorCode_, *this);
+
+ return true;
+ }
+
+ TTcpSocket::TConnectHandler H_;
+ int ErrorCode_;
+ };
+
+ class TOperationWrite: public TSocketOperation {
+ public:
+ TOperationWrite(TTcpSocket::TImpl& s, NAsio::TTcpSocket::TSendedData& buffs, TTcpSocket::TWriteHandler h, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ , Buffs_(buffs)
+ , Written_(0)
+ {
+ Speculative_ = true;
+ }
+
+ //return true, if not need write more data
+ bool Execute(int errorCode) override;
+
+ private:
+ TTcpSocket::TWriteHandler H_;
+ NAsio::TTcpSocket::TSendedData Buffs_;
+ size_t Written_;
+ };
+
+ class TOperationWriteVector: public TSocketOperation {
+ public:
+ TOperationWriteVector(TTcpSocket::TImpl& s, TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ , V_(*v)
+ , Written_(0)
+ {
+ Speculative_ = true;
+ }
+
+ //return true, if not need write more data
+ bool Execute(int errorCode) override;
+
+ private:
+ TTcpSocket::TWriteHandler H_;
+ TContIOVector& V_;
+ size_t Written_;
+ };
+
+ class TOperationReadSome: public TSocketOperation {
+ public:
+ TOperationReadSome(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline)
+ : TSocketOperation(s, PollRead, deadline)
+ , H_(h)
+ , Buff_(static_cast<char*>(buff))
+ , Size_(size)
+ {
+ }
+
+ //return true, if not need read more data
+ bool Execute(int errorCode) override;
+
+ protected:
+ TTcpSocket::TReadHandler H_;
+ char* Buff_;
+ size_t Size_;
+ };
+
+ class TOperationRead: public TOperationReadSome {
+ public:
+ TOperationRead(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline)
+ : TOperationReadSome(s, buff, size, h, deadline)
+ , Read_(0)
+ {
+ }
+
+ bool Execute(int errorCode) override;
+
+ private:
+ size_t Read_;
+ };
+
+ class TOperationPoll: public TSocketOperation {
+ public:
+ TOperationPoll(TTcpSocket::TImpl& s, TPollType pt, TTcpSocket::TPollHandler h, TInstant deadline)
+ : TSocketOperation(s, pt, deadline)
+ , H_(h)
+ {
+ }
+
+ bool Execute(int errorCode) override {
+ H_(errorCode, *this);
+
+ return true;
+ }
+
+ private:
+ TTcpSocket::TPollHandler H_;
+ };
+
+ template <class T>
+ class TOperationCancel: public TNoneOperation {
+ public:
+ TOperationCancel(T* s)
+ : TNoneOperation()
+ , S_(s)
+ {
+ Speculative_ = true;
+ }
+
+ ~TOperationCancel() override {
+ }
+
+ private:
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ if (!errorCode && S_->Fd() != INVALID_SOCKET) {
+ S_->GetIOServiceImpl().CancelFdOp(S_->Fd());
+ }
+ return true;
+ }
+
+ TIntrusivePtr<T> S_;
+ };
+
+ class TTcpSocket::TImpl: public TNonCopyable, public TThrRefBase {
+ public:
+ typedef TTcpSocket::TSendedData TSendedData;
+
+ TImpl(TIOService::TImpl& srv) noexcept
+ : Srv_(srv)
+ {
+ }
+
+ ~TImpl() override {
+ DBGOUT("TSocket::~TImpl()");
+ }
+
+ void Assign(SOCKET fd, TEndpoint ep) {
+ TSocketHolder(fd).Swap(S_);
+ RemoteEndpoint_ = ep;
+ }
+
+ void AsyncConnect(const TEndpoint& ep, TTcpSocket::TConnectHandler h, TInstant deadline) {
+ TSocketHolder s(socket(ep.SockAddr()->sa_family, SOCK_STREAM, 0));
+
+ if (Y_UNLIKELY(s == INVALID_SOCKET || Srv_.HasAbort())) {
+ throw TSystemError() << TStringBuf("can't create socket");
+ }
+
+ SetNonBlock(s);
+
+ int err;
+ do {
+ err = connect(s, ep.SockAddr(), (int)ep.SockAddrLen());
+ if (Y_LIKELY(err)) {
+ err = LastSystemError();
+ }
+#if defined(_freebsd_)
+ if (Y_UNLIKELY(err == EINTR)) {
+ err = EINPROGRESS;
+ }
+ } while (0);
+#elif defined(_linux_)
+ } while (Y_UNLIKELY(err == EINTR));
+#else
+ } while (0);
+#endif
+
+ RemoteEndpoint_ = ep;
+ S_.Swap(s);
+
+ DBGOUT("AsyncConnect(): " << err);
+ if (Y_LIKELY(err == EINPROGRESS || err == EWOULDBLOCK || err == 0)) {
+ Srv_.ScheduleOp(new TOperationConnect(*this, h, deadline)); //set callback
+ } else {
+ Srv_.ScheduleOp(new TOperationConnectFailed(*this, h, err, deadline)); //set callback
+ }
+ }
+
+ inline void AsyncWrite(TSendedData& d, TTcpSocket::TWriteHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationWrite(*this, d, h, deadline));
+ }
+
+ inline void AsyncWrite(TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationWriteVector(*this, v, h, deadline));
+ }
+
+ inline void AsyncRead(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationRead(*this, buff, size, h, deadline));
+ }
+
+ inline void AsyncReadSome(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationReadSome(*this, buff, size, h, deadline));
+ }
+
+ inline void AsyncPollWrite(TTcpSocket::TPollHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollWrite, h, deadline));
+ }
+
+ inline void AsyncPollRead(TTcpSocket::TPollHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollRead, h, deadline));
+ }
+
+ inline void AsyncCancel() {
+ if (Y_UNLIKELY(Srv_.HasAbort())) {
+ return;
+ }
+ Srv_.ScheduleOp(new TOperationCancel<TTcpSocket::TImpl>(this));
+ }
+
+ inline bool SysCallHasResult(ssize_t& n, TErrorCode& ec) noexcept {
+ if (n >= 0) {
+ return true;
+ }
+
+ int errn = LastSystemError();
+ if (errn == EINTR) {
+ return false;
+ }
+
+ ec.Assign(errn);
+ n = 0;
+ return true;
+ }
+
+ size_t WriteSome(TContIOVector& iov, TErrorCode& ec) noexcept {
+ for (;;) {
+ ssize_t n = writev(S_, (const iovec*)iov.Parts(), Min(IOV_MAX, (int)iov.Count()));
+ DBGOUT("WriteSome(): n=" << n);
+ if (SysCallHasResult(n, ec)) {
+ return n;
+ }
+ }
+ }
+
+ size_t WriteSome(const void* buff, size_t size, TErrorCode& ec) noexcept {
+ for (;;) {
+ ssize_t n = send(S_, (char*)buff, size, 0);
+ DBGOUT("WriteSome(): n=" << n);
+ if (SysCallHasResult(n, ec)) {
+ return n;
+ }
+ }
+ }
+
+ size_t ReadSome(void* buff, size_t size, TErrorCode& ec) noexcept {
+ for (;;) {
+ ssize_t n = recv(S_, (char*)buff, size, 0);
+ DBGOUT("ReadSome(): n=" << n);
+ if (SysCallHasResult(n, ec)) {
+ return n;
+ }
+ }
+ }
+
+ inline void Shutdown(TTcpSocket::TShutdownMode mode, TErrorCode& ec) {
+ if (shutdown(S_, mode)) {
+ ec.Assign(LastSystemError());
+ }
+ }
+
+ TIOService::TImpl& GetIOServiceImpl() const noexcept {
+ return Srv_;
+ }
+
+ inline SOCKET Fd() const noexcept {
+ return S_;
+ }
+
+ TEndpoint RemoteEndpoint() const {
+ return RemoteEndpoint_;
+ }
+
+ private:
+ TIOService::TImpl& Srv_;
+ TSocketHolder S_;
+ TEndpoint RemoteEndpoint_;
+ };
+}