blob: fc2f3943d26c582f05af4928c9e6878525113a7c (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
#pragma once
#include <IO/ISchedulerNode.h>
#include <IO/ResourceBudget.h>
#include <IO/ResourceRequest.h>
#include <memory>
namespace DB
{
/*
* Queue for pending requests for specific resource, leaf of hierarchy.
* Note that every queue has budget associated with it.
*/
class ISchedulerQueue : public ISchedulerNode
{
public:
explicit ISchedulerQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}
// Wrapper for `enqueueRequest()` that should be used to account for available resource budget
void enqueueRequestUsingBudget(ResourceRequest * request)
{
request->cost = budget.ask(request->cost);
enqueueRequest(request);
}
// Should be called to account for difference between real and estimated costs
void adjustBudget(ResourceCost estimated_cost, ResourceCost real_cost)
{
budget.adjust(estimated_cost, real_cost);
}
// Adjust budget to account for extra consumption of `cost` resource units
void consumeBudget(ResourceCost cost)
{
adjustBudget(0, cost);
}
// Adjust budget to account for requested, but not consumed `cost` resource units
void accumulateBudget(ResourceCost cost)
{
adjustBudget(cost, 0);
}
/// Enqueue new request to be executed using underlying resource.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;
private:
// Allows multiple consumers to synchronize with common "debit/credit" balance.
// 1) (positive) to avoid wasting of allocated but not used resource (e.g in case of a failure);
// 2) (negative) to account for overconsumption (e.g. if cost is not know in advance and estimation from below is applied).
ResourceBudget budget;
};
}
|