aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Client/PacketReceiver.cpp
blob: 9934a69bce1bb4fa6a6b5723f40053b052c5a95b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <Client/PacketReceiver.h>

#if defined(OS_LINUX)

namespace DB
{

PacketReceiver::PacketReceiver(Connection * connection_) : AsyncTaskExecutor(std::make_unique<Task>(*this)), connection(connection_)
{
    epoll.add(timeout_descriptor.getDescriptor());
    socket_fd = connection->getSocket()->impl()->sockfd();
    epoll.add(socket_fd);
}

bool PacketReceiver::checkBeforeTaskResume()
{
    /// If there is no pending data, check timeout.
    return connection->hasReadPendingData() || checkTimeout();
}

void PacketReceiver::processAsyncEvent(int fd [[maybe_unused]], Poco::Timespan socket_timeout, AsyncEventTimeoutType, const std::string &, uint32_t)
{
    assert(fd == socket_fd);
    timeout_descriptor.setRelative(socket_timeout);
    timeout = socket_timeout;
    is_read_in_process = true;
}

void PacketReceiver::clearAsyncEvent()
{
    is_read_in_process = false;
    timeout_descriptor.reset();
}

bool PacketReceiver::checkTimeout()
{
    bool is_socket_ready = false;

    epoll_event events[2];
    events[0].data.fd = events[1].data.fd = -1;
    size_t ready_count = epoll.getManyReady(2, events, true);

    for (size_t i = 0; i != ready_count; ++i)
    {
        if (events[i].data.fd == socket_fd)
            is_socket_ready = true;
        if (events[i].data.fd == timeout_descriptor.getDescriptor())
            is_timeout_expired = true;
    }

    if (is_timeout_expired && !is_socket_ready)
    {
        timeout_descriptor.reset();
        return false;
    }

    return true;
}

void PacketReceiver::Task::run(AsyncCallback async_callback, SuspendCallback suspend_callback)
{
    while (true)
    {
        {
            AsyncCallbackSetter async_setter(receiver.connection, async_callback);
            receiver.packet = receiver.connection->receivePacket();
        }
        suspend_callback();
    }
}

}

#endif