diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Server/TCPHandler.h | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Server/TCPHandler.h')
-rw-r--r-- | contrib/clickhouse/src/Server/TCPHandler.h | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Server/TCPHandler.h b/contrib/clickhouse/src/Server/TCPHandler.h new file mode 100644 index 0000000000..235f634afe --- /dev/null +++ b/contrib/clickhouse/src/Server/TCPHandler.h @@ -0,0 +1,294 @@ +#pragma once + +#include <optional> +#include <Poco/Net/TCPServerConnection.h> + +#include <base/getFQDNOrHostName.h> +#include <Common/ProfileEvents.h> +#include <Common/CurrentMetrics.h> +#include <Common/Stopwatch.h> +#include <Common/ThreadStatus.h> +#include <Core/Protocol.h> +#include <Core/QueryProcessingStage.h> +#include <IO/Progress.h> +#include <IO/TimeoutSetter.h> +#include <QueryPipeline/BlockIO.h> +#include <Interpreters/InternalTextLogsQueue.h> +#include <Interpreters/Context_fwd.h> +#include <Interpreters/ClientInfo.h> +#include <Interpreters/ProfileEventsExt.h> +#include <Formats/NativeReader.h> +#include <Formats/NativeWriter.h> + +#include "IServer.h" +#include "Server/TCPProtocolStackData.h" +#include "Storages/MergeTree/RequestResponse.h" +#include "base/types.h" + + +namespace CurrentMetrics +{ + extern const Metric TCPConnection; +} + +namespace Poco { class Logger; } + +namespace DB +{ + +class Session; +struct Settings; +class ColumnsDescription; +struct ProfileInfo; +class TCPServer; +class NativeWriter; +class NativeReader; + +/// State of query processing. +struct QueryState +{ + /// Identifier of the query. + String query_id; + + QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; + Protocol::Compression compression = Protocol::Compression::Disable; + + /// A queue with internal logs that will be passed to client. It must be + /// destroyed after input/output blocks, because they may contain other + /// threads that use this queue. + InternalTextLogsQueuePtr logs_queue; + std::unique_ptr<NativeWriter> logs_block_out; + + InternalProfileEventsQueuePtr profile_queue; + std::unique_ptr<NativeWriter> profile_events_block_out; + + /// From where to read data for INSERT. + std::shared_ptr<ReadBuffer> maybe_compressed_in; + std::unique_ptr<NativeReader> block_in; + + /// Where to write result data. + std::shared_ptr<WriteBuffer> maybe_compressed_out; + std::unique_ptr<NativeWriter> block_out; + Block block_for_insert; + + /// Query text. + String query; + /// Streams of blocks, that are processing the query. + BlockIO io; + + enum class CancellationStatus: UInt8 + { + FULLY_CANCELLED, + READ_CANCELLED, + NOT_CANCELLED + }; + + /// Is request cancelled + CancellationStatus cancellation_status = CancellationStatus::NOT_CANCELLED; + bool is_connection_closed = false; + /// empty or not + bool is_empty = true; + /// Data was sent. + bool sent_all_data = false; + /// Request requires data from the client (INSERT, but not INSERT SELECT). + bool need_receive_data_for_insert = false; + /// Data was read. + bool read_all_data = false; + + /// A state got uuids to exclude from a query + std::optional<std::vector<UUID>> part_uuids_to_ignore; + + /// Request requires data from client for function input() + bool need_receive_data_for_input = false; + /// temporary place for incoming data block for input() + Block block_for_input; + /// sample block from StorageInput + Block input_header; + + /// If true, the data packets will be skipped instead of reading. Used to recover after errors. + bool skipping_data = false; + + /// To output progress, the difference after the previous sending of progress. + Progress progress; + Stopwatch watch; + UInt64 prev_elapsed_ns = 0; + + /// Timeouts setter for current query + std::unique_ptr<TimeoutSetter> timeout_setter; + + void reset() + { + *this = QueryState(); + } + + bool empty() const + { + return is_empty; + } +}; + + +struct LastBlockInputParameters +{ + Protocol::Compression compression = Protocol::Compression::Disable; +}; + +class TCPHandler : public Poco::Net::TCPServerConnection +{ +public: + /** parse_proxy_protocol_ - if true, expect and parse the header of PROXY protocol in every connection + * and set the information about forwarded address accordingly. + * See https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt + * + * Note: immediate IP address is always used for access control (accept-list of IP networks), + * because it allows to check the IP ranges of the trusted proxy. + * Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP. + */ + TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_); + TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_); + ~TCPHandler() override; + + void run() override; + + /// This method is called right before the query execution. + virtual void customizeContext(ContextMutablePtr /*context*/) {} + +private: + IServer & server; + TCPServer & tcp_server; + bool parse_proxy_protocol = false; + Poco::Logger * log; + + String forwarded_for; + String certificate; + + String client_name; + UInt64 client_version_major = 0; + UInt64 client_version_minor = 0; + UInt64 client_version_patch = 0; + UInt32 client_tcp_protocol_version = 0; + String quota_key; + + /// Connection settings, which are extracted from a context. + bool send_exception_with_stack_trace = true; + Poco::Timespan send_timeout = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0); + Poco::Timespan receive_timeout = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0); + UInt64 poll_interval = DBMS_DEFAULT_POLL_INTERVAL; + UInt64 idle_connection_timeout = 3600; + UInt64 interactive_delay = 100000; + Poco::Timespan sleep_in_send_tables_status; + UInt64 unknown_packet_in_send_data = 0; + Poco::Timespan sleep_after_receiving_query; + + std::unique_ptr<Session> session; + ContextMutablePtr query_context; + ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::NO_QUERY; + + /// Streams for reading/writing from/to client connection socket. + std::shared_ptr<ReadBuffer> in; + std::shared_ptr<WriteBuffer> out; + + /// Time after the last check to stop the request and send the progress. + Stopwatch after_check_cancelled; + Stopwatch after_send_progress; + + String default_database; + + /// For inter-server secret (remote_server.*.secret) + bool is_interserver_mode = false; + /// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET + String salt; + /// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 + std::optional<UInt64> nonce; + String cluster; + + std::mutex task_callback_mutex; + std::mutex fatal_error_mutex; + + /// At the moment, only one ongoing query in the connection is supported at a time. + QueryState state; + + /// Last block input parameters are saved to be able to receive unexpected data packet sent after exception. + LastBlockInputParameters last_block_in; + + CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection}; + + ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots; + + /// It is the name of the server that will be sent to the client. + String server_display_name; + + void runImpl(); + + void extractConnectionSettingsFromContext(const ContextPtr & context); + + std::unique_ptr<Session> makeSession(); + + bool receiveProxyHeader(); + void receiveHello(); + void receiveAddendum(); + bool receivePacket(); + void receiveQuery(); + void receiveIgnoredPartUUIDs(); + String receiveReadTaskResponseAssumeLocked(); + std::optional<ParallelReadResponse> receivePartitionMergeTreeReadTaskResponseAssumeLocked(); + bool receiveData(bool scalar); + bool readDataNext(); + void readData(); + void skipData(); + void receiveClusterNameAndSalt(); + + bool receiveUnexpectedData(bool throw_exception = true); + [[noreturn]] void receiveUnexpectedQuery(); + [[noreturn]] void receiveUnexpectedIgnoredPartUUIDs(); + [[noreturn]] void receiveUnexpectedHello(); + [[noreturn]] void receiveUnexpectedTablesStatusRequest(); + + /// Process INSERT query + void processInsertQuery(); + + /// Process a request that does not require the receiving of data blocks from the client + void processOrdinaryQuery(); + + void processOrdinaryQueryWithProcessors(); + + void processTablesStatusRequest(); + + void sendHello(); + void sendData(const Block & block); /// Write a block to the network. + void sendLogData(const Block & block); + void sendTableColumns(const ColumnsDescription & columns); + void sendException(const Exception & e, bool with_stack_trace); + void sendProgress(); + void sendLogs(); + void sendEndOfStream(); + void sendPartUUIDs(); + void sendReadTaskRequestAssumeLocked(); + void sendMergeTreeAllRangesAnnounecementAssumeLocked(InitialAllRangesAnnouncement announcement); + void sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request); + void sendProfileInfo(const ProfileInfo & info); + void sendTotals(const Block & totals); + void sendExtremes(const Block & extremes); + void sendProfileEvents(); + void sendSelectProfileEvents(); + void sendInsertProfileEvents(); + void sendTimezone(); + + /// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled. + void initBlockInput(); + void initBlockOutput(const Block & block); + void initLogsBlockOutput(const Block & block); + void initProfileEventsBlockOutput(const Block & block); + + using CancellationStatus = QueryState::CancellationStatus; + + void decreaseCancellationStatus(const std::string & log_message); + CancellationStatus getQueryCancellationStatus(); + + /// This function is called from different threads. + void updateProgress(const Progress & value); + + Poco::Net::SocketAddress getClientAddress(const ClientInfo & client_info); +}; + +} |