1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
#include <Common/AsyncTaskExecutor.h>
#include <base/scope_guard.h>
namespace DB
{
AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_) : task(std::move(task_))
{
}
void AsyncTaskExecutor::resume()
{
if (routine_is_finished)
return;
/// Create fiber lazily on first resume() call.
if (!fiber)
createFiber();
if (!checkBeforeTaskResume())
return;
{
std::lock_guard guard(fiber_lock);
if (is_cancelled)
return;
resumeUnlocked();
/// Destroy fiber when it's finished.
if (routine_is_finished)
destroyFiber();
if (exception)
processException(exception);
}
afterTaskResume();
}
void AsyncTaskExecutor::resumeUnlocked()
{
fiber.resume();
}
void AsyncTaskExecutor::cancel()
{
std::lock_guard guard(fiber_lock);
is_cancelled = true;
{
SCOPE_EXIT({ destroyFiber(); });
cancelBefore();
}
cancelAfter();
}
void AsyncTaskExecutor::restart()
{
std::lock_guard guard(fiber_lock);
if (!routine_is_finished)
destroyFiber();
routine_is_finished = false;
}
struct AsyncTaskExecutor::Routine
{
AsyncTaskExecutor & executor;
struct AsyncCallback
{
AsyncTaskExecutor & executor;
SuspendCallback suspend_callback;
void operator()(int fd, Poco::Timespan timeout, AsyncEventTimeoutType type, const std::string & desc, uint32_t events)
{
executor.processAsyncEvent(fd, timeout, type, desc, events);
suspend_callback();
executor.clearAsyncEvent();
}
};
void operator()(SuspendCallback suspend_callback)
{
auto async_callback = AsyncCallback{executor, suspend_callback};
try
{
executor.task->run(async_callback, suspend_callback);
}
catch (const boost::context::detail::forced_unwind &)
{
/// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited
/// It should not be caught or it will segfault.
/// Other exceptions must be caught
throw;
}
catch (...)
{
executor.exception = std::current_exception();
}
executor.routine_is_finished = true;
}
};
void AsyncTaskExecutor::createFiber()
{
fiber = Fiber(fiber_stack, Routine{*this});
}
void AsyncTaskExecutor::destroyFiber()
{
Fiber to_destroy = std::move(fiber);
}
String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description)
{
switch (type)
{
case AsyncEventTimeoutType::CONNECT:
return fmt::format("Timeout exceeded while connecting to socket ({}, connection timeout {} ms)", socket_description, timeout.totalMilliseconds());
case AsyncEventTimeoutType::RECEIVE:
return fmt::format("Timeout exceeded while reading from socket ({}, receive timeout {} ms)", socket_description, timeout.totalMilliseconds());
case AsyncEventTimeoutType::SEND:
return fmt::format("Timeout exceeded while writing to socket ({}, send timeout {} ms)", socket_description, timeout.totalMilliseconds());
default:
return fmt::format("Timeout exceeded while working with socket ({}, {} ms)", socket_description, timeout.totalMilliseconds());
}
}
}
|