aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp')
-rw-r--r--contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp176
1 files changed, 176 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp b/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp
new file mode 100644
index 0000000000..ed63d0c530
--- /dev/null
+++ b/contrib/clickhouse/src/IO/WriteBufferFromS3TaskTracker.cpp
@@ -0,0 +1,176 @@
+#include "clickhouse_config.h"
+
+#if USE_AWS_S3
+
+#include <IO/WriteBufferFromS3TaskTracker.h>
+
+namespace ProfileEvents
+{
+ extern const Event WriteBufferFromS3WaitInflightLimitMicroseconds;
+}
+
+namespace DB
+{
+
+WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_)
+ : is_async(bool(scheduler_))
+ , scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
+ , max_tasks_inflight(max_tasks_inflight_)
+ , limitedLog(limitedLog_)
+{}
+
+WriteBufferFromS3::TaskTracker::~TaskTracker()
+{
+ safeWaitAll();
+}
+
+ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
+{
+ return [](Callback && callback, int64_t) mutable -> std::future<void>
+ {
+ auto package = std::packaged_task<void()>(std::move(callback));
+ /// No exceptions are propagated, exceptions are packed to future
+ package();
+ return package.get_future();
+ };
+}
+
+void WriteBufferFromS3::TaskTracker::waitAll()
+{
+ /// Exceptions are propagated
+ for (auto & future : futures)
+ {
+ future.get();
+ }
+ futures.clear();
+
+ std::lock_guard lock(mutex);
+ finished_futures.clear();
+}
+
+void WriteBufferFromS3::TaskTracker::safeWaitAll()
+{
+ for (auto & future : futures)
+ {
+ if (future.valid())
+ {
+ try
+ {
+ /// Exceptions are not propagated
+ future.get();
+ } catch (...)
+ {
+ /// But at least they are printed
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ }
+ }
+ }
+ futures.clear();
+
+ std::lock_guard lock(mutex);
+ finished_futures.clear();
+}
+
+void WriteBufferFromS3::TaskTracker::waitIfAny()
+{
+ if (futures.empty())
+ return;
+
+ Stopwatch watch;
+
+ {
+ std::lock_guard lock(mutex);
+ for (auto & it : finished_futures)
+ {
+ /// actually that call might lock this thread until the future is set finally
+ /// however that won't lock us for long, the task is about to finish when the pointer appears in the `finished_futures`
+ it->get();
+
+ /// in case of exception in `it->get()`
+ /// it it not necessary to remove `it` from list `futures`
+ /// `TaskTracker` has to be destroyed after any exception occurs, for this `safeWaitAll` is called.
+ /// `safeWaitAll` handles invalid futures in the list `futures`
+ futures.erase(it);
+ }
+ finished_futures.clear();
+ }
+
+ watch.stop();
+ ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
+}
+
+void WriteBufferFromS3::TaskTracker::add(Callback && func)
+{
+ /// All this fuzz is about 2 things. This is the most critical place of TaskTracker.
+ /// The first is not to fail insertion in the list `futures`.
+ /// In order to face it, the element is allocated at the end of the list `futures` in advance.
+ /// The second is not to fail the notification of the task.
+ /// In order to face it, the list element, which would be inserted to the list `finished_futures`,
+ /// is allocated in advance as an other list `pre_allocated_finished` with one element inside.
+
+ /// preallocation for the first issue
+ futures.emplace_back();
+ auto future_placeholder = std::prev(futures.end());
+
+ /// preallocation for the second issue
+ FinishedList pre_allocated_finished {future_placeholder};
+
+ Callback func_with_notification = [&, my_func = std::move(func), my_pre_allocated_finished = std::move(pre_allocated_finished)]() mutable
+ {
+ SCOPE_EXIT({
+ DENY_ALLOCATIONS_IN_SCOPE;
+
+ std::lock_guard lock(mutex);
+ finished_futures.splice(finished_futures.end(), my_pre_allocated_finished);
+ has_finished.notify_one();
+ });
+
+ my_func();
+ };
+
+ /// this move is nothrow
+ *future_placeholder = scheduler(std::move(func_with_notification), Priority{});
+
+ waitTilInflightShrink();
+}
+
+void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
+{
+ if (!max_tasks_inflight)
+ return;
+
+ if (futures.size() >= max_tasks_inflight)
+ LOG_TEST(limitedLog, "have to wait some tasks finish, in queue {}, limit {}", futures.size(), max_tasks_inflight);
+
+ Stopwatch watch;
+
+ /// Alternative approach is to wait until at least futures.size() - max_tasks_inflight element are finished
+ /// However the faster finished task is collected the faster CH checks if there is an exception
+ /// The faster an exception is propagated the lesser time is spent for cancellation
+ while (futures.size() >= max_tasks_inflight)
+ {
+ std::unique_lock lock(mutex);
+
+ has_finished.wait(lock, [this] () TSA_REQUIRES(mutex) { return !finished_futures.empty(); });
+
+ for (auto & it : finished_futures)
+ {
+ it->get();
+ futures.erase(it);
+ }
+
+ finished_futures.clear();
+ }
+
+ watch.stop();
+ ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
+}
+
+bool WriteBufferFromS3::TaskTracker::isAsync() const
+{
+ return is_async;
+}
+
+}
+
+#endif