diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.h | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.h')
-rw-r--r-- | contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.h | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.h b/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.h new file mode 100644 index 0000000000..21daea22c0 --- /dev/null +++ b/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.h @@ -0,0 +1,72 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include "WriteBufferFromS3.h" + +#include <Common/logger_useful.h> + +#include <list> + +namespace DB +{ + +/// That class is used only in WriteBufferFromS3 for now. +/// Therefore it declared as a part of WriteBufferFromS3. +/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool. +/// TaskTracker brings the methods waitIfAny, waitAll/safeWaitAll +/// to help with coordination of the running tasks. + +/// Basic exception safety is provided. If exception occurred the object has to be destroyed. +/// No thread safety is provided. Use this object with no concurrency. + +class WriteBufferFromS3::TaskTracker +{ +public: + using Callback = std::function<void()>; + + TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_); + ~TaskTracker(); + + static ThreadPoolCallbackRunner<void> syncRunner(); + + bool isAsync() const; + + /// waitIfAny collects statuses from already finished tasks + /// There could be no finished tasks yet, so waitIfAny do nothing useful in that case + /// the first exception is thrown if any task has failed + void waitIfAny(); + + /// Well, waitAll waits all the tasks until they finish and collects their statuses + void waitAll(); + + /// safeWaitAll does the same as waitAll but mutes the exceptions + void safeWaitAll(); + + void add(Callback && func); + +private: + /// waitTilInflightShrink waits til the number of in-flight tasks beyond the limit `max_tasks_inflight`. + void waitTilInflightShrink() TSA_NO_THREAD_SAFETY_ANALYSIS; + + void collectFinishedFutures(bool propagate_exceptions) TSA_REQUIRES(mutex); + + const bool is_async; + ThreadPoolCallbackRunner<void> scheduler; + const size_t max_tasks_inflight; + + using FutureList = std::list<std::future<void>>; + FutureList futures; + LogSeriesLimiterPtr limitedLog; + + std::mutex mutex; + std::condition_variable has_finished TSA_GUARDED_BY(mutex); + using FinishedList = std::list<FutureList::iterator>; + FinishedList finished_futures TSA_GUARDED_BY(mutex); +}; + +} + +#endif |