diff options
| author | AlexSm <[email protected]> | 2024-01-04 15:09:05 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-04 15:09:05 +0100 |
| commit | dab291146f6cd7d35684e3a1150e5bb1c412982c (patch) | |
| tree | 36ef35f6cacb6432845a4a33f940c95871036b32 /contrib/clickhouse/src/QueryPipeline/RemoteInserter.cpp | |
| parent | 63660ad5e7512029fd0218e7a636580695a24e1f (diff) | |
Library import 5, delete go dependencies (#832)
* Library import 5, delete go dependencies
* Fix yt client
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/RemoteInserter.cpp')
| -rw-r--r-- | contrib/clickhouse/src/QueryPipeline/RemoteInserter.cpp | 164 |
1 files changed, 0 insertions, 164 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/RemoteInserter.cpp b/contrib/clickhouse/src/QueryPipeline/RemoteInserter.cpp deleted file mode 100644 index 134c169e35f..00000000000 --- a/contrib/clickhouse/src/QueryPipeline/RemoteInserter.cpp +++ /dev/null @@ -1,164 +0,0 @@ -#include <QueryPipeline/RemoteInserter.h> - -#include <Client/Connection.h> -#include <Common/logger_useful.h> - -#include <Common/NetException.h> -#include <Common/CurrentThread.h> -#include <Interpreters/InternalTextLogsQueue.h> -#include <IO/ConnectionTimeouts.h> -#include <Core/Settings.h> - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int UNEXPECTED_PACKET_FROM_SERVER; -} - - -RemoteInserter::RemoteInserter( - Connection & connection_, - const ConnectionTimeouts & timeouts, - const String & query_, - const Settings & settings_, - const ClientInfo & client_info_) - : connection(connection_) - , query(query_) - , server_revision(connection.getServerRevision(timeouts)) -{ - ClientInfo modified_client_info = client_info_; - modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - - Settings settings = settings_; - /// With current protocol it is impossible to avoid deadlock in case of send_logs_level!=none. - /// - /// RemoteInserter send Data blocks/packets to the remote shard, - /// while remote side can send Log packets to the initiator (this RemoteInserter instance). - /// - /// But it is not enough to pull Log packets just before writing the next block - /// since there is no way to ensure that all Log packets had been consumed. - /// - /// And if enough Log packets will be queued by the remote side, - /// it will wait send_timeout until initiator will consume those packets, - /// while initiator already starts writing Data blocks, - /// and will not consume Log packets. - /// - /// So that is why send_logs_level had been disabled here. - settings.send_logs_level = "none"; - /** Send query and receive "header", that describes table structure. - * Header is needed to know, what structure is required for blocks to be passed to 'write' method. - */ - connection.sendQuery( - timeouts, query, /* query_parameters */ {}, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, {}); - - while (true) - { - Packet packet = connection.receivePacket(); - - if (Protocol::Server::Data == packet.type) - { - header = packet.block; - break; - } - else if (Protocol::Server::Exception == packet.type) - { - packet.exception->rethrow(); - break; - } - else if (Protocol::Server::Log == packet.type) - { - /// Pass logs from remote server to client - if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) - log_queue->pushBlock(std::move(packet.block)); - } - else if (Protocol::Server::TableColumns == packet.type) - { - /// Server could attach ColumnsDescription in front of stream for column defaults. There's no need to pass it through cause - /// client's already got this information for remote table. Ignore. - } - else - throw NetException( - ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER, - "Unexpected packet from server (expected Data or Exception, got {})", - Protocol::Server::toString(packet.type)); - } -} - - -void RemoteInserter::write(Block block) -{ - try - { - connection.sendData(block, /* name */"", /* scalar */false); - } - catch (const NetException &) - { - /// Try to get more detailed exception from server - auto packet_type = connection.checkPacket(/* timeout_microseconds */0); - if (packet_type && *packet_type == Protocol::Server::Exception) - { - Packet packet = connection.receivePacket(); - packet.exception->rethrow(); - } - - throw; - } -} - - -void RemoteInserter::writePrepared(ReadBuffer & buf, size_t size) -{ - /// We cannot use 'header'. Input must contain block with proper structure. - connection.sendPreparedData(buf, size); -} - - -void RemoteInserter::onFinish() -{ - /// Empty block means end of data. - connection.sendData(Block(), /* name */"", /* scalar */false); - - /// Wait for EndOfStream or Exception packet, skip Log packets. - while (true) - { - Packet packet = connection.receivePacket(); - - if (Protocol::Server::EndOfStream == packet.type) - break; - else if (Protocol::Server::Exception == packet.type) - packet.exception->rethrow(); - else if (Protocol::Server::Log == packet.type || Protocol::Server::TimezoneUpdate == packet.type) - { - // Do nothing - } - else - throw NetException( - ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER, - "Unexpected packet from server (expected EndOfStream or Exception, got {})", - Protocol::Server::toString(packet.type)); - } - - finished = true; -} - -RemoteInserter::~RemoteInserter() -{ - /// If interrupted in the middle of the loop of communication with the server, then interrupt the connection, - /// to not leave the connection in unsynchronized state. - if (!finished) - { - try - { - connection.disconnect(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } -} - -} |
