summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp')
-rw-r--r--contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp155
1 files changed, 155 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
new file mode 100644
index 00000000000..5e211bf036d
--- /dev/null
+++ b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
@@ -0,0 +1,155 @@
+#if defined(OS_LINUX)
+
+#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
+#include <QueryPipeline/RemoteQueryExecutor.h>
+#include <base/defines.h>
+#include <Common/Exception.h>
+#include <Common/NetException.h>
+#include <Client/IConnections.h>
+#include <Common/AsyncTaskExecutor.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int CANNOT_READ_FROM_SOCKET;
+ extern const int CANNOT_OPEN_FILE;
+ extern const int SOCKET_TIMEOUT;
+}
+
+RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(RemoteQueryExecutor & executor_, bool suspend_when_query_sent_)
+ : AsyncTaskExecutor(std::make_unique<Task>(*this)), executor(executor_), suspend_when_query_sent(suspend_when_query_sent_)
+{
+ if (-1 == pipe2(pipe_fd, O_NONBLOCK))
+ throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE);
+
+ epoll.add(pipe_fd[0]);
+ epoll.add(timer.getDescriptor());
+}
+
+bool RemoteQueryExecutorReadContext::checkBeforeTaskResume()
+{
+ return !is_in_progress.load(std::memory_order_relaxed) || checkTimeout();
+}
+
+
+void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, SuspendCallback suspend_callback)
+{
+ read_context.executor.sendQueryUnlocked(ClientInfo::QueryKind::SECONDARY_QUERY, async_callback);
+ read_context.is_query_sent = true;
+
+ if (read_context.suspend_when_query_sent)
+ suspend_callback();
+
+ if (read_context.executor.needToSkipUnavailableShard())
+ return;
+
+ while (true)
+ {
+ read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
+ suspend_callback();
+ }
+}
+
+void RemoteQueryExecutorReadContext::processAsyncEvent(int fd, Poco::Timespan socket_timeout, AsyncEventTimeoutType type, const std::string & description, uint32_t events)
+{
+ connection_fd = fd;
+ epoll.add(connection_fd, events);
+ timeout = socket_timeout;
+ timer.setRelative(socket_timeout);
+ timeout_type = type;
+ connection_fd_description = description;
+ is_in_progress.store(true);
+}
+
+void RemoteQueryExecutorReadContext::clearAsyncEvent()
+{
+ epoll.remove(connection_fd);
+ timer.reset();
+ is_in_progress.store(false);
+}
+
+bool RemoteQueryExecutorReadContext::checkTimeout(bool blocking)
+{
+ /// Wait for epoll will not block if it was polled externally.
+ epoll_event events[3];
+ events[0].data.fd = events[1].data.fd = events[2].data.fd = -1;
+
+ size_t num_events = epoll.getManyReady(3, events, blocking ? -1 : 0);
+
+ bool is_socket_ready = false;
+
+ for (size_t i = 0; i < num_events; ++i)
+ {
+ if (events[i].data.fd == connection_fd)
+ is_socket_ready = true;
+ if (events[i].data.fd == timer.getDescriptor())
+ is_timer_alarmed = true;
+ if (events[i].data.fd == pipe_fd[0])
+ is_pipe_alarmed = true;
+ }
+
+ if (is_pipe_alarmed)
+ return false;
+
+ if (is_timer_alarmed && !is_socket_ready)
+ {
+ /// Socket timeout. Drain it in case of error, or it may be hide by timeout exception.
+ timer.drain();
+ const String exception_message = getSocketTimeoutExceededMessageByTimeoutType(timeout_type, timeout, connection_fd_description);
+ throw NetException(ErrorCodes::SOCKET_TIMEOUT, exception_message);
+ }
+
+ return true;
+}
+
+void RemoteQueryExecutorReadContext::cancelBefore()
+{
+ /// One should not try to wait for the current packet here in case of
+ /// timeout because this will exceed the timeout.
+ /// Anyway if the timeout is exceeded, then the connection will be shutdown
+ /// (disconnected), so it will not left in an unsynchronised state.
+ if (!is_timer_alarmed)
+ {
+ /// If query wasn't sent, just complete sending it.
+ if (!is_query_sent)
+ suspend_when_query_sent = true;
+
+ /// Wait for current pending packet, to avoid leaving connection in unsynchronised state.
+ while (is_in_progress.load(std::memory_order_relaxed))
+ {
+ checkTimeout(/* blocking= */ true);
+ resumeUnlocked();
+ }
+ }
+
+ /// Send something to pipe to cancel executor waiting.
+ uint64_t buf = 0;
+ while (-1 == write(pipe_fd[1], &buf, sizeof(buf)))
+ {
+ if (errno == EAGAIN)
+ break;
+
+ if (errno != EINTR)
+ throwFromErrno("Cannot write to pipe", ErrorCodes::CANNOT_READ_FROM_SOCKET);
+ }
+}
+
+RemoteQueryExecutorReadContext::~RemoteQueryExecutorReadContext()
+{
+ /// connection_fd is closed by Poco::Net::Socket or Epoll
+ if (pipe_fd[0] != -1)
+ {
+ int err = close(pipe_fd[0]);
+ chassert(!err || errno == EINTR);
+ }
+ if (pipe_fd[1] != -1)
+ {
+ int err = close(pipe_fd[1]);
+ chassert(!err || errno == EINTR);
+ }
+}
+
+}
+#endif