diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp')
-rw-r--r-- | contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp b/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp new file mode 100644 index 0000000000..ed63d0c530 --- /dev/null +++ b/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp @@ -0,0 +1,176 @@ +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <IO/WriteBufferFromS3TaskTracker.h> + +namespace ProfileEvents +{ + extern const Event WriteBufferFromS3WaitInflightLimitMicroseconds; +} + +namespace DB +{ + +WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_) + : is_async(bool(scheduler_)) + , scheduler(scheduler_ ? std::move(scheduler_) : syncRunner()) + , max_tasks_inflight(max_tasks_inflight_) + , limitedLog(limitedLog_) +{} + +WriteBufferFromS3::TaskTracker::~TaskTracker() +{ + safeWaitAll(); +} + +ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner() +{ + return [](Callback && callback, int64_t) mutable -> std::future<void> + { + auto package = std::packaged_task<void()>(std::move(callback)); + /// No exceptions are propagated, exceptions are packed to future + package(); + return package.get_future(); + }; +} + +void WriteBufferFromS3::TaskTracker::waitAll() +{ + /// Exceptions are propagated + for (auto & future : futures) + { + future.get(); + } + futures.clear(); + + std::lock_guard lock(mutex); + finished_futures.clear(); +} + +void WriteBufferFromS3::TaskTracker::safeWaitAll() +{ + for (auto & future : futures) + { + if (future.valid()) + { + try + { + /// Exceptions are not propagated + future.get(); + } catch (...) + { + /// But at least they are printed + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + } + futures.clear(); + + std::lock_guard lock(mutex); + finished_futures.clear(); +} + +void WriteBufferFromS3::TaskTracker::waitIfAny() +{ + if (futures.empty()) + return; + + Stopwatch watch; + + { + std::lock_guard lock(mutex); + for (auto & it : finished_futures) + { + /// actually that call might lock this thread until the future is set finally + /// however that won't lock us for long, the task is about to finish when the pointer appears in the `finished_futures` + it->get(); + + /// in case of exception in `it->get()` + /// it it not necessary to remove `it` from list `futures` + /// `TaskTracker` has to be destroyed after any exception occurs, for this `safeWaitAll` is called. + /// `safeWaitAll` handles invalid futures in the list `futures` + futures.erase(it); + } + finished_futures.clear(); + } + + watch.stop(); + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds()); +} + +void WriteBufferFromS3::TaskTracker::add(Callback && func) +{ + /// All this fuzz is about 2 things. This is the most critical place of TaskTracker. + /// The first is not to fail insertion in the list `futures`. + /// In order to face it, the element is allocated at the end of the list `futures` in advance. + /// The second is not to fail the notification of the task. + /// In order to face it, the list element, which would be inserted to the list `finished_futures`, + /// is allocated in advance as an other list `pre_allocated_finished` with one element inside. + + /// preallocation for the first issue + futures.emplace_back(); + auto future_placeholder = std::prev(futures.end()); + + /// preallocation for the second issue + FinishedList pre_allocated_finished {future_placeholder}; + + Callback func_with_notification = [&, my_func = std::move(func), my_pre_allocated_finished = std::move(pre_allocated_finished)]() mutable + { + SCOPE_EXIT({ + DENY_ALLOCATIONS_IN_SCOPE; + + std::lock_guard lock(mutex); + finished_futures.splice(finished_futures.end(), my_pre_allocated_finished); + has_finished.notify_one(); + }); + + my_func(); + }; + + /// this move is nothrow + *future_placeholder = scheduler(std::move(func_with_notification), Priority{}); + + waitTilInflightShrink(); +} + +void WriteBufferFromS3::TaskTracker::waitTilInflightShrink() +{ + if (!max_tasks_inflight) + return; + + if (futures.size() >= max_tasks_inflight) + LOG_TEST(limitedLog, "have to wait some tasks finish, in queue {}, limit {}", futures.size(), max_tasks_inflight); + + Stopwatch watch; + + /// Alternative approach is to wait until at least futures.size() - max_tasks_inflight element are finished + /// However the faster finished task is collected the faster CH checks if there is an exception + /// The faster an exception is propagated the lesser time is spent for cancellation + while (futures.size() >= max_tasks_inflight) + { + std::unique_lock lock(mutex); + + has_finished.wait(lock, [this] () TSA_REQUIRES(mutex) { return !finished_futures.empty(); }); + + for (auto & it : finished_futures) + { + it->get(); + futures.erase(it); + } + + finished_futures.clear(); + } + + watch.stop(); + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds()); +} + +bool WriteBufferFromS3::TaskTracker::isAsync() const +{ + return is_async; +} + +} + +#endif |