blob: 92f25b40f6b2f4e27cc99a446842b7913fe32083 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
#pragma once
#include <base/types.h>
#include <IO/ResourceRequest.h>
#include <IO/ResourceLink.h>
#include <IO/ISchedulerConstraint.h>
#include <condition_variable>
#include <mutex>
namespace DB
{
/*
* Scoped resource guard.
* Waits for resource to be available in constructor and releases resource in destructor
* IMPORTANT: multiple resources should not be locked concurrently by a single thread
*/
class ResourceGuard
{
public:
enum ResourceGuardCtor
{
LockStraightAway, /// Locks inside constructor (default)
// WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction.
PostponeLocking /// Don't lock in constructor, but send request
};
enum RequestState
{
Finished, // Last request has already finished; no concurrent access is possible
Enqueued, // Enqueued into the scheduler; thread-safe access is required
Dequeued // Dequeued from the scheduler and is in consumption state; no concurrent access is possible
};
class Request : public ResourceRequest
{
public:
void enqueue(ResourceCost cost_, ResourceLink link_)
{
// lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread
chassert(state == Finished);
state = Enqueued;
ResourceRequest::reset(cost_);
link_.queue->enqueueRequestUsingBudget(this);
}
// This function is executed inside scheduler thread and wakes thread issued this `request`.
// That thread will continue execution and do real consumption of requested resource synchronously.
void execute() override
{
{
std::unique_lock lock(mutex);
chassert(state == Enqueued);
state = Dequeued;
}
dequeued_cv.notify_one();
}
void wait()
{
std::unique_lock lock(mutex);
dequeued_cv.wait(lock, [this] { return state == Dequeued; });
}
void finish()
{
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (constraint)
constraint->finishRequest(this);
}
static Request & local()
{
// Since single thread cannot use more than one resource request simultaneously,
// we can reuse thread-local request to avoid allocations
static thread_local Request instance;
return instance;
}
private:
std::mutex mutex;
std::condition_variable dequeued_cv;
RequestState state = Finished;
};
/// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`)
explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
: link(link_)
, request(Request::local())
{
if (cost == 0)
link.queue = nullptr; // Ignore zero-cost requests
else if (link.queue)
{
request.enqueue(cost, link);
if (ctor == LockStraightAway)
request.wait();
}
}
~ResourceGuard()
{
unlock();
}
/// Blocks until resource is available
void lock()
{
if (link.queue)
request.wait();
}
/// Report resource consumption has finished
void unlock()
{
if (link.queue)
{
request.finish();
link.queue = nullptr;
}
}
/// Mark request as unsuccessful; by default request is considered to be successful
void setFailure()
{
request.successful = false;
}
ResourceLink link;
Request & request;
};
}
|