diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Interpreters/InterserverIOHandler.h | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Interpreters/InterserverIOHandler.h')
-rw-r--r-- | contrib/clickhouse/src/Interpreters/InterserverIOHandler.h | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Interpreters/InterserverIOHandler.h b/contrib/clickhouse/src/Interpreters/InterserverIOHandler.h new file mode 100644 index 0000000000..375c6ee9ca --- /dev/null +++ b/contrib/clickhouse/src/Interpreters/InterserverIOHandler.h @@ -0,0 +1,90 @@ +#pragma once + +#include <IO/ReadBuffer.h> +#include <IO/WriteBuffer.h> +#include <IO/ReadBufferFromString.h> +#include <IO/ReadHelpers.h> +#include <IO/WriteBufferFromString.h> +#include <IO/WriteHelpers.h> +#include <Common/ActionBlocker.h> +#include <Common/SharedMutex.h> +#include <base/types.h> + +#include <atomic> +#include <map> +#include <utility> + +namespace zkutil +{ + class ZooKeeper; + using ZooKeeperPtr = std::shared_ptr<ZooKeeper>; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int DUPLICATE_INTERSERVER_IO_ENDPOINT; + extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT; +} + +class HTMLForm; +class HTTPServerResponse; + +/** Query processor from other servers. + */ +class InterserverIOEndpoint +{ +public: + virtual std::string getId(const std::string & path) const = 0; + virtual void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) = 0; + virtual ~InterserverIOEndpoint() = default; + + /// You need to stop the data transfer if blocker is activated. + ActionBlocker blocker; + SharedMutex rwlock; +}; + +using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>; + + +/** Here you can register a service that processes requests from other servers. + * Used to transfer chunks in ReplicatedMergeTree. + */ +class InterserverIOHandler +{ +public: + void addEndpoint(const String & name, InterserverIOEndpointPtr endpoint) + { + std::lock_guard lock(mutex); + bool inserted = endpoint_map.try_emplace(name, std::move(endpoint)).second; + if (!inserted) + throw Exception(ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT, "Duplicate interserver IO endpoint: {}", name); + } + + bool removeEndpointIfExists(const String & name) + { + std::lock_guard lock(mutex); + return endpoint_map.erase(name); + } + + InterserverIOEndpointPtr getEndpoint(const String & name) const + try + { + std::lock_guard lock(mutex); + return endpoint_map.at(name); + } + catch (...) + { + throw Exception(ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT, "No interserver IO endpoint named {}", name); + } + +private: + using EndpointMap = std::map<String, InterserverIOEndpointPtr>; + + EndpointMap endpoint_map; + mutable std::mutex mutex; +}; + +} |