blob: 9934a69bce1bb4fa6a6b5723f40053b052c5a95b (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
#include <Client/PacketReceiver.h>
#if defined(OS_LINUX)
namespace DB
{
PacketReceiver::PacketReceiver(Connection * connection_) : AsyncTaskExecutor(std::make_unique<Task>(*this)), connection(connection_)
{
epoll.add(timeout_descriptor.getDescriptor());
socket_fd = connection->getSocket()->impl()->sockfd();
epoll.add(socket_fd);
}
bool PacketReceiver::checkBeforeTaskResume()
{
/// If there is no pending data, check timeout.
return connection->hasReadPendingData() || checkTimeout();
}
void PacketReceiver::processAsyncEvent(int fd [[maybe_unused]], Poco::Timespan socket_timeout, AsyncEventTimeoutType, const std::string &, uint32_t)
{
assert(fd == socket_fd);
timeout_descriptor.setRelative(socket_timeout);
timeout = socket_timeout;
is_read_in_process = true;
}
void PacketReceiver::clearAsyncEvent()
{
is_read_in_process = false;
timeout_descriptor.reset();
}
bool PacketReceiver::checkTimeout()
{
bool is_socket_ready = false;
epoll_event events[2];
events[0].data.fd = events[1].data.fd = -1;
size_t ready_count = epoll.getManyReady(2, events, true);
for (size_t i = 0; i != ready_count; ++i)
{
if (events[i].data.fd == socket_fd)
is_socket_ready = true;
if (events[i].data.fd == timeout_descriptor.getDescriptor())
is_timeout_expired = true;
}
if (is_timeout_expired && !is_socket_ready)
{
timeout_descriptor.reset();
return false;
}
return true;
}
void PacketReceiver::Task::run(AsyncCallback async_callback, SuspendCallback suspend_callback)
{
while (true)
{
{
AsyncCallbackSetter async_setter(receiver.connection, async_callback);
receiver.packet = receiver.connection->receivePacket();
}
suspend_callback();
}
}
}
#endif
|