blob: b0dd3cef39c344cfacf41bb41b6b31084981643e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
#pragma once
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
#include <Common/ThreadPool.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <mutex>
#include <Common/CurrentMetrics.h>
namespace DB
{
class IObjectStorageIteratorAsync : public IObjectStorageIterator
{
public:
IObjectStorageIteratorAsync(
CurrentMetrics::Metric threads_metric,
CurrentMetrics::Metric threads_active_metric,
const std::string & thread_name)
: list_objects_pool(threads_metric, threads_active_metric, 1)
, list_objects_scheduler(threadPoolCallbackRunner<BatchAndHasNext>(list_objects_pool, thread_name))
{
}
void next() override;
void nextBatch() override;
bool isValid() override;
RelativePathWithMetadata current() override;
RelativePathsWithMetadata currentBatch() override;
size_t getAccumulatedSize() const override;
std::optional<RelativePathsWithMetadata> getCurrrentBatchAndScheduleNext() override;
~IObjectStorageIteratorAsync() override
{
list_objects_pool.wait();
}
protected:
virtual bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) = 0;
struct BatchAndHasNext
{
RelativePathsWithMetadata batch;
bool has_next;
};
std::future<BatchAndHasNext> scheduleBatch();
bool is_initialized{false};
bool is_finished{false};
mutable std::recursive_mutex mutex;
ThreadPool list_objects_pool;
ThreadPoolCallbackRunner<BatchAndHasNext> list_objects_scheduler;
std::future<BatchAndHasNext> outcome_future;
RelativePathsWithMetadata current_batch;
RelativePathsWithMetadata::iterator current_batch_iterator;
std::atomic<size_t> accumulated_size = 0;
};
}
|