aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
blob: 5e211bf036dd7fe9a78c28793d11899c109c84de (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#if defined(OS_LINUX)

#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <base/defines.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Client/IConnections.h>
#include <Common/AsyncTaskExecutor.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int CANNOT_READ_FROM_SOCKET;
    extern const int CANNOT_OPEN_FILE;
    extern const int SOCKET_TIMEOUT;
}

RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(RemoteQueryExecutor & executor_, bool suspend_when_query_sent_)
    : AsyncTaskExecutor(std::make_unique<Task>(*this)), executor(executor_), suspend_when_query_sent(suspend_when_query_sent_)
{
    if (-1 == pipe2(pipe_fd, O_NONBLOCK))
        throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE);

    epoll.add(pipe_fd[0]);
    epoll.add(timer.getDescriptor());
}

bool RemoteQueryExecutorReadContext::checkBeforeTaskResume()
{
    return !is_in_progress.load(std::memory_order_relaxed) || checkTimeout();
}


void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, SuspendCallback suspend_callback)
{
    read_context.executor.sendQueryUnlocked(ClientInfo::QueryKind::SECONDARY_QUERY, async_callback);
    read_context.is_query_sent = true;

    if (read_context.suspend_when_query_sent)
        suspend_callback();

    if (read_context.executor.needToSkipUnavailableShard())
        return;

    while (true)
    {
        read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
        suspend_callback();
    }
}

void RemoteQueryExecutorReadContext::processAsyncEvent(int fd, Poco::Timespan socket_timeout, AsyncEventTimeoutType type, const std::string & description, uint32_t events)
{
    connection_fd = fd;
    epoll.add(connection_fd, events);
    timeout = socket_timeout;
    timer.setRelative(socket_timeout);
    timeout_type = type;
    connection_fd_description = description;
    is_in_progress.store(true);
}

void RemoteQueryExecutorReadContext::clearAsyncEvent()
{
    epoll.remove(connection_fd);
    timer.reset();
    is_in_progress.store(false);
}

bool RemoteQueryExecutorReadContext::checkTimeout(bool blocking)
{
    /// Wait for epoll will not block if it was polled externally.
    epoll_event events[3];
    events[0].data.fd = events[1].data.fd = events[2].data.fd = -1;

    size_t num_events = epoll.getManyReady(3, events, blocking ? -1 : 0);

    bool is_socket_ready = false;

    for (size_t i = 0; i < num_events; ++i)
    {
        if (events[i].data.fd == connection_fd)
            is_socket_ready = true;
        if (events[i].data.fd == timer.getDescriptor())
            is_timer_alarmed = true;
        if (events[i].data.fd == pipe_fd[0])
            is_pipe_alarmed = true;
    }

    if (is_pipe_alarmed)
        return false;

    if (is_timer_alarmed && !is_socket_ready)
    {
        /// Socket timeout. Drain it in case of error, or it may be hide by timeout exception.
        timer.drain();
        const String exception_message = getSocketTimeoutExceededMessageByTimeoutType(timeout_type, timeout, connection_fd_description);
        throw NetException(ErrorCodes::SOCKET_TIMEOUT, exception_message);
    }

    return true;
}

void RemoteQueryExecutorReadContext::cancelBefore()
{
    /// One should not try to wait for the current packet here in case of
    /// timeout because this will exceed the timeout.
    /// Anyway if the timeout is exceeded, then the connection will be shutdown
    /// (disconnected), so it will not left in an unsynchronised state.
    if (!is_timer_alarmed)
    {
        /// If query wasn't sent, just complete sending it.
        if (!is_query_sent)
            suspend_when_query_sent = true;

        /// Wait for current pending packet, to avoid leaving connection in unsynchronised state.
        while (is_in_progress.load(std::memory_order_relaxed))
        {
            checkTimeout(/* blocking= */ true);
            resumeUnlocked();
        }
    }

    /// Send something to pipe to cancel executor waiting.
    uint64_t buf = 0;
    while (-1 == write(pipe_fd[1], &buf, sizeof(buf)))
    {
        if (errno == EAGAIN)
            break;

        if (errno != EINTR)
            throwFromErrno("Cannot write to pipe", ErrorCodes::CANNOT_READ_FROM_SOCKET);
    }
}

RemoteQueryExecutorReadContext::~RemoteQueryExecutorReadContext()
{
    /// connection_fd is closed by Poco::Net::Socket or Epoll
    if (pipe_fd[0] != -1)
    {
        int err = close(pipe_fd[0]);
        chassert(!err || errno == EINTR);
    }
    if (pipe_fd[1] != -1)
    {
        int err = close(pipe_fd[1]);
        chassert(!err || errno == EINTR);
    }
}

}
#endif