aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/WriteBufferFromPocoSocket.cpp
blob: 171e7f1ce69158e3a00afe993b3583e909614216 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#include <Poco/Net/NetException.h>

#include <base/scope_guard.h>

#include <IO/WriteBufferFromPocoSocket.h>

#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/AsyncTaskExecutor.h>
#include <Common/checkSSLReturnCode.h>

namespace ProfileEvents
{
    extern const Event NetworkSendElapsedMicroseconds;
    extern const Event NetworkSendBytes;
}

namespace CurrentMetrics
{
    extern const Metric NetworkSend;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int NETWORK_ERROR;
    extern const int SOCKET_TIMEOUT;
    extern const int CANNOT_WRITE_TO_SOCKET;
    extern const int LOGICAL_ERROR;
}

void WriteBufferFromPocoSocket::nextImpl()
{
    if (!offset())
        return;

    Stopwatch watch;
    size_t bytes_written = 0;

    SCOPE_EXIT({
        ProfileEvents::increment(ProfileEvents::NetworkSendElapsedMicroseconds, watch.elapsedMicroseconds());
        ProfileEvents::increment(ProfileEvents::NetworkSendBytes, bytes_written);
    });

    while (bytes_written < offset())
    {
        ssize_t res = 0;

        /// Add more details to exceptions.
        try
        {
            CurrentMetrics::Increment metric_increment(CurrentMetrics::NetworkSend);
            char * pos = working_buffer.begin() + bytes_written;
            size_t size = offset() - bytes_written;
            if (size > INT_MAX)
                throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");

            /// If async_callback is specified, set socket to non-blocking mode
            /// and try to write data to it, if socket is not ready for writing,
            /// run async_callback and try again later.
            /// It is expected that file descriptor may be polled externally.
            /// Note that send timeout is not checked here. External code should check it while polling.
            if (async_callback)
            {
                socket.setBlocking(false);
                /// Set socket to blocking mode at the end.
                SCOPE_EXIT(socket.setBlocking(true));
                bool secure = socket.secure();
                res = socket.impl()->sendBytes(pos, static_cast<int>(size));

                /// Check EAGAIN and ERR_SSL_WANT_WRITE/ERR_SSL_WANT_READ for secure socket (writing to secure socket can read too).
                while (res < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(res) || checkSSLWantWrite(res)))))
                {
                    /// In case of ERR_SSL_WANT_READ we should wait for socket to be ready for reading, otherwise - for writing.
                    if (secure && checkSSLWantRead(res))
                        async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
                    else
                        async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);

                    /// Try to write again.
                    res = socket.impl()->sendBytes(pos, static_cast<int>(size));
                }
            }
            else
            {
                res = socket.impl()->sendBytes(pos, static_cast<int>(size));
            }
        }
        catch (const Poco::Net::NetException & e)
        {
            throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
                               our_address.toString(), peer_address.toString());
        }
        catch (const Poco::TimeoutException &)
        {
            throw NetException(ErrorCodes::SOCKET_TIMEOUT, "Timeout exceeded while writing to socket ({}, {} ms)",
                peer_address.toString(),
                socket.impl()->getSendTimeout().totalMilliseconds());
        }
        catch (const Poco::IOException & e)
        {
            throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
                               our_address.toString(), peer_address.toString());
        }

        if (res < 0)
            throw NetException(ErrorCodes::CANNOT_WRITE_TO_SOCKET, "Cannot write to socket ({} -> {})",
                               our_address.toString(), peer_address.toString());

        bytes_written += res;
    }
}

WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
    : BufferWithOwnMemory<WriteBuffer>(buf_size)
    , socket(socket_)
    , peer_address(socket.peerAddress())
    , our_address(socket.address())
    , socket_description("socket (" + peer_address.toString() + ")")
{
}

WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
    try
    {
        finalize();
    }
    catch (...)
    {
        tryLogCurrentException(__PRETTY_FUNCTION__);
    }
}

}