aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Client/LocalConnection.h
blob: fb8f9003364b284b7ff248396d42e9e02ae1afaf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#pragma once

#include "Connection.h"
#include <Interpreters/Context.h>
#include <QueryPipeline/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/ColumnsDescription.h>
#include <Common/CurrentThread.h>


namespace DB
{
class PullingAsyncPipelineExecutor;
class PushingAsyncPipelineExecutor;
class PushingPipelineExecutor;

/// State of query processing.
struct LocalQueryState
{
    /// Identifier of the query.
    String query_id;
    QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;

    /// Query text.
    String query;
    /// Streams of blocks, that are processing the query.
    BlockIO io;
    /// Current stream to pull blocks from.
    std::unique_ptr<PullingAsyncPipelineExecutor> executor;
    std::unique_ptr<PushingPipelineExecutor> pushing_executor;
    std::unique_ptr<PushingAsyncPipelineExecutor> pushing_async_executor;
    InternalProfileEventsQueuePtr profile_queue;

    std::unique_ptr<Exception> exception;

    /// Current block to be sent next.
    std::optional<Block> block;
    std::optional<ColumnsDescription> columns_description;
    std::optional<ProfileInfo> profile_info;

    /// Is request cancelled
    bool is_cancelled = false;
    bool is_finished = false;

    bool sent_totals = false;
    bool sent_extremes = false;
    bool sent_progress = false;
    bool sent_profile_info = false;
    bool sent_profile_events = false;

    /// To output progress, the difference after the previous sending of progress.
    Progress progress;
    /// Time after the last check to stop the request and send the progress.
    Stopwatch after_send_progress;
    Stopwatch after_send_profile_events;

    std::unique_ptr<CurrentThread::QueryScope> query_scope_holder;
};


class LocalConnection : public IServerConnection, WithContext
{
public:
    explicit LocalConnection(
        ContextPtr context_, bool send_progress_ = false, bool send_profile_events_ = false, const String & server_display_name_ = "");

    ~LocalConnection() override;

    IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::LOCAL; }

    static ServerConnectionPtr createConnection(
        const ConnectionParameters & connection_parameters,
        ContextPtr current_context,
        bool send_progress = false,
        bool send_profile_events = false,
        const String & server_display_name = "");

    void setDefaultDatabase(const String & database) override;

    void getServerVersion(const ConnectionTimeouts & timeouts,
                          String & name,
                          UInt64 & version_major,
                          UInt64 & version_minor,
                          UInt64 & version_patch,
                          UInt64 & revision) override;

    UInt64 getServerRevision(const ConnectionTimeouts & timeouts) override;
    const String & getServerTimezone(const ConnectionTimeouts & timeouts) override;
    const String & getServerDisplayName(const ConnectionTimeouts & timeouts) override;

    const String & getDescription() const override { return description; }

    std::vector<std::pair<String, String>> getPasswordComplexityRules() const override { return {}; }

    void sendQuery(
        const ConnectionTimeouts & timeouts,
        const String & query,
        const NameToNameMap & query_parameters,
        const String & query_id/* = "" */,
        UInt64 stage/* = QueryProcessingStage::Complete */,
        const Settings * settings/* = nullptr */,
        const ClientInfo * client_info/* = nullptr */,
        bool with_pending_data/* = false */,
        std::function<void(const Progress &)> process_progress_callback) override;

    void sendCancel() override;

    void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;

    void sendExternalTablesData(ExternalTablesData &) override;

    void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) override;

    bool poll(size_t timeout_microseconds/* = 0 */) override;

    bool hasReadPendingData() const override;

    std::optional<UInt64> checkPacket(size_t timeout_microseconds/* = 0*/) override;

    Packet receivePacket() override;

    void forceConnected(const ConnectionTimeouts &) override {}

    bool isConnected() const override { return true; }

    bool checkConnected(const ConnectionTimeouts & /*timeouts*/) override { return true; }

    void disconnect() override {}

    void setThrottler(const ThrottlerPtr &) override {}

private:
    void initBlockInput();

    void processOrdinaryQuery();

    void processOrdinaryQueryWithProcessors();

    void updateState();

    bool pullBlock(Block & block);

    void finishQuery();

    void updateProgress(const Progress & value);

    void sendProfileEvents();

    bool pollImpl();

    ContextMutablePtr query_context;
    Session session;

    bool send_progress;
    bool send_profile_events;
    String server_display_name;
    String description = "clickhouse-local";

    std::optional<LocalQueryState> state;

    /// Last "server" packet.
    std::optional<UInt64> next_packet_type;

    String current_database;

    ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
};
}