blob: d1ec7dcbca74abffdcfe9fa5d03f9051a2b2d60e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
#pragma once
#include <QueryPipeline/SizeLimits.h>
#include <Interpreters/Context_fwd.h>
#include <Processors/IAccumulatingTransform.h>
#include <QueryPipeline/Chain.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Interpreters/PreparedSets.h>
#include <Common/Stopwatch.h>
#include <Poco/Logger.h>
namespace DB
{
class QueryStatus;
struct Progress;
using ProgressCallback = std::function<void(const Progress & progress)>;
class PushingPipelineExecutor;
/// This processor creates set during execution.
/// Don't return any data. Sets are created when Finish status is returned.
/// In general, several work() methods need to be called to finish.
/// Independent processors is created for each subquery.
class CreatingSetsTransform : public IAccumulatingTransform
{
public:
CreatingSetsTransform(
Block in_header_,
Block out_header_,
SetAndKeyPtr set_and_key_,
StoragePtr external_table_,
SizeLimits network_transfer_limits_,
PreparedSetsCachePtr prepared_sets_cache_);
~CreatingSetsTransform() override;
String getName() const override { return "CreatingSetsTransform"; }
void work() override;
void consume(Chunk chunk) override;
Chunk generate() override;
private:
SetAndKeyPtr set_and_key;
StoragePtr external_table;
std::optional<std::promise<SetPtr>> promise_to_build;
QueryPipeline table_out;
std::unique_ptr<PushingPipelineExecutor> executor;
UInt64 read_rows = 0;
bool set_from_cache = false;
Stopwatch watch;
bool done_with_set = true;
bool done_with_table = true;
SizeLimits network_transfer_limits;
PreparedSetsCachePtr prepared_sets_cache;
size_t rows_to_transfer = 0;
size_t bytes_to_transfer = 0;
using Logger = Poco::Logger;
Poco::Logger * log = &Poco::Logger::get("CreatingSetsTransform");
bool is_initialized = false;
void init();
void startSubquery();
void finishSubquery();
};
}
|