aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/ResourceGuard.h
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/IO/ResourceGuard.h
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/IO/ResourceGuard.h')
-rw-r--r--contrib/clickhouse/src/IO/ResourceGuard.h139
1 files changed, 139 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/ResourceGuard.h b/contrib/clickhouse/src/IO/ResourceGuard.h
new file mode 100644
index 0000000000..92f25b40f6
--- /dev/null
+++ b/contrib/clickhouse/src/IO/ResourceGuard.h
@@ -0,0 +1,139 @@
+#pragma once
+
+#include <base/types.h>
+
+#include <IO/ResourceRequest.h>
+#include <IO/ResourceLink.h>
+#include <IO/ISchedulerConstraint.h>
+
+#include <condition_variable>
+#include <mutex>
+
+
+namespace DB
+{
+
+/*
+ * Scoped resource guard.
+ * Waits for resource to be available in constructor and releases resource in destructor
+ * IMPORTANT: multiple resources should not be locked concurrently by a single thread
+ */
+class ResourceGuard
+{
+public:
+ enum ResourceGuardCtor
+ {
+ LockStraightAway, /// Locks inside constructor (default)
+
+ // WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction.
+ PostponeLocking /// Don't lock in constructor, but send request
+ };
+
+ enum RequestState
+ {
+ Finished, // Last request has already finished; no concurrent access is possible
+ Enqueued, // Enqueued into the scheduler; thread-safe access is required
+ Dequeued // Dequeued from the scheduler and is in consumption state; no concurrent access is possible
+ };
+
+ class Request : public ResourceRequest
+ {
+ public:
+ void enqueue(ResourceCost cost_, ResourceLink link_)
+ {
+ // lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread
+ chassert(state == Finished);
+ state = Enqueued;
+ ResourceRequest::reset(cost_);
+ link_.queue->enqueueRequestUsingBudget(this);
+ }
+
+ // This function is executed inside scheduler thread and wakes thread issued this `request`.
+ // That thread will continue execution and do real consumption of requested resource synchronously.
+ void execute() override
+ {
+ {
+ std::unique_lock lock(mutex);
+ chassert(state == Enqueued);
+ state = Dequeued;
+ }
+ dequeued_cv.notify_one();
+ }
+
+ void wait()
+ {
+ std::unique_lock lock(mutex);
+ dequeued_cv.wait(lock, [this] { return state == Dequeued; });
+ }
+
+ void finish()
+ {
+ // lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
+ chassert(state == Dequeued);
+ state = Finished;
+ if (constraint)
+ constraint->finishRequest(this);
+ }
+
+ static Request & local()
+ {
+ // Since single thread cannot use more than one resource request simultaneously,
+ // we can reuse thread-local request to avoid allocations
+ static thread_local Request instance;
+ return instance;
+ }
+
+ private:
+ std::mutex mutex;
+ std::condition_variable dequeued_cv;
+ RequestState state = Finished;
+ };
+
+ /// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`)
+ explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
+ : link(link_)
+ , request(Request::local())
+ {
+ if (cost == 0)
+ link.queue = nullptr; // Ignore zero-cost requests
+ else if (link.queue)
+ {
+ request.enqueue(cost, link);
+ if (ctor == LockStraightAway)
+ request.wait();
+ }
+ }
+
+ ~ResourceGuard()
+ {
+ unlock();
+ }
+
+ /// Blocks until resource is available
+ void lock()
+ {
+ if (link.queue)
+ request.wait();
+ }
+
+ /// Report resource consumption has finished
+ void unlock()
+ {
+ if (link.queue)
+ {
+ request.finish();
+ link.queue = nullptr;
+ }
+ }
+
+ /// Mark request as unsuccessful; by default request is considered to be successful
+ void setFailure()
+ {
+ request.successful = false;
+ }
+
+ ResourceLink link;
+ Request & request;
+};
+
+}