aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.h
blob: 21daea22c05801e675508b2faccd570458b486a4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#pragma once

#include "clickhouse_config.h"

#if USE_AWS_S3

#include "WriteBufferFromS3.h"

#include <Common/logger_useful.h>

#include <list>

namespace DB
{

/// That class is used only in WriteBufferFromS3 for now.
/// Therefore it declared as a part of  WriteBufferFromS3.
/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool.
/// TaskTracker brings the methods waitIfAny, waitAll/safeWaitAll
/// to help with coordination of the running tasks.

/// Basic exception safety is provided. If exception occurred the object has to be destroyed.
/// No thread safety is provided. Use this object with no concurrency.

class WriteBufferFromS3::TaskTracker
{
public:
    using Callback = std::function<void()>;

    TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_);
    ~TaskTracker();

    static ThreadPoolCallbackRunner<void> syncRunner();

    bool isAsync() const;

    /// waitIfAny collects statuses from already finished tasks
    /// There could be no finished tasks yet, so waitIfAny do nothing useful in that case
    /// the first exception is thrown if any task has failed
    void waitIfAny();

    /// Well, waitAll waits all the tasks until they finish and collects their statuses
    void waitAll();

    /// safeWaitAll does the same as waitAll but mutes the exceptions
    void safeWaitAll();

    void add(Callback && func);

private:
    /// waitTilInflightShrink waits til the number of in-flight tasks beyond the limit `max_tasks_inflight`.
    void waitTilInflightShrink() TSA_NO_THREAD_SAFETY_ANALYSIS;

    void collectFinishedFutures(bool propagate_exceptions) TSA_REQUIRES(mutex);

    const bool is_async;
    ThreadPoolCallbackRunner<void> scheduler;
    const size_t max_tasks_inflight;

    using FutureList = std::list<std::future<void>>;
    FutureList futures;
    LogSeriesLimiterPtr limitedLog;

    std::mutex mutex;
    std::condition_variable has_finished TSA_GUARDED_BY(mutex);
    using FinishedList = std::list<FutureList::iterator>;
    FinishedList finished_futures TSA_GUARDED_BY(mutex);
};

}

#endif