blob: cf314696041780ac7a56feb0ad98a34c0b0e7e60 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#include <Storages/IMessageProducer.h>
#include <Common/logger_useful.h>
namespace DB
{
IMessageProducer::IMessageProducer(Poco::Logger * log_) : log(log_)
{
}
void AsynchronousMessageProducer::start(const ContextPtr & context)
{
LOG_TEST(log, "Executing startup");
initialize();
producing_task = context->getSchedulePool().createTask(getProducingTaskName(), [this]
{
LOG_TEST(log, "Starting producing task loop");
scheduled.store(true);
scheduled.notify_one();
startProducingTaskLoop();
});
producing_task->activateAndSchedule();
}
void AsynchronousMessageProducer::finish()
{
/// We should execute finish logic only once.
if (finished.exchange(true))
return;
LOG_TEST(log, "Executing shutdown");
/// It is possible that the task with a producer loop haven't been started yet
/// while we have non empty payloads queue.
/// If we deactivate it here, the messages will never be sent,
/// as the producer loop will never start.
scheduled.wait(false);
/// Tell the task that it should shutdown, but not immediately,
/// it will finish executing current tasks nevertheless.
stopProducingTask();
/// Wait for the producer task to finish.
producing_task->deactivate();
finishImpl();
}
}
|