diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Disks/IO/ThreadPoolReader.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Disks/IO/ThreadPoolReader.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Disks/IO/ThreadPoolReader.cpp | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Disks/IO/ThreadPoolReader.cpp b/contrib/clickhouse/src/Disks/IO/ThreadPoolReader.cpp new file mode 100644 index 00000000000..cd3f2d8dea0 --- /dev/null +++ b/contrib/clickhouse/src/Disks/IO/ThreadPoolReader.cpp @@ -0,0 +1,253 @@ +#include "ThreadPoolReader.h" +#include <Common/VersionNumber.h> +#include <Common/assert_cast.h> +#include <Common/Exception.h> +#include <Common/ProfileEvents.h> +#include <Common/CurrentMetrics.h> +#include <Common/Stopwatch.h> +#include <Common/setThreadName.h> +#include <Common/MemorySanitizer.h> +#include <Common/CurrentThread.h> +#include <Common/ThreadPool.h> +#include <Poco/Environment.h> +#include <base/errnoToString.h> +#include <Poco/Event.h> +#include <future> +#include <unistd.h> +#include <fcntl.h> + +#if defined(OS_LINUX) + +#include <sys/syscall.h> +#include <sys/uio.h> + +/// We don't want to depend on specific glibc version. + +#if !defined(RWF_NOWAIT) + #define RWF_NOWAIT 8 +#endif + +#if !defined(SYS_preadv2) + #if defined(__x86_64__) + #define SYS_preadv2 327 + #elif defined(__aarch64__) + #define SYS_preadv2 286 + #elif defined(__powerpc64__) + #define SYS_preadv2 380 + #elif defined(__riscv) + #define SYS_preadv2 286 + #else + #error "Unsupported architecture" + #endif +#endif + +#endif + + +namespace ProfileEvents +{ + extern const Event ThreadPoolReaderPageCacheHit; + extern const Event ThreadPoolReaderPageCacheHitBytes; + extern const Event ThreadPoolReaderPageCacheHitElapsedMicroseconds; + extern const Event ThreadPoolReaderPageCacheMiss; + extern const Event ThreadPoolReaderPageCacheMissBytes; + extern const Event ThreadPoolReaderPageCacheMissElapsedMicroseconds; + + extern const Event ReadBufferFromFileDescriptorRead; + extern const Event ReadBufferFromFileDescriptorReadFailed; + extern const Event ReadBufferFromFileDescriptorReadBytes; + extern const Event DiskReadElapsedMicroseconds; +} + +namespace CurrentMetrics +{ + extern const Metric Read; + extern const Metric ThreadPoolFSReaderThreads; + extern const Metric ThreadPoolFSReaderThreadsActive; +} + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + +} + +#if defined(OS_LINUX) +/// According to man, Linux 5.9 and 5.10 have a bug in preadv2() with the RWF_NOWAIT. +/// https://manpages.debian.org/testing/manpages-dev/preadv2.2.en.html#BUGS +/// We also disable it for older Linux kernels, because according to user's reports, RedHat-patched kernels might be also affected. +static bool hasBugInPreadV2() +{ + VersionNumber linux_version(Poco::Environment::osVersion()); + return linux_version < VersionNumber{5, 11, 0}; +} +#endif + +ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_) + : pool(std::make_unique<ThreadPool>(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, pool_size, pool_size, queue_size_)) +{ +} + +std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request request) +{ + /// If size is zero, then read() cannot be distinguished from EOF + assert(request.size); + + int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd; + +#if defined(OS_LINUX) + /// Check if data is already in page cache with preadv2 syscall. + + /// We don't want to depend on new Linux kernel. + /// But kernels 5.9 and 5.10 have a bug where preadv2() with the + /// RWF_NOWAIT flag may return 0 even when not at end of file. + /// It can't be distinguished from the real eof, so we have to + /// disable pread with nowait. + static std::atomic<bool> has_pread_nowait_support = !hasBugInPreadV2(); + + if (has_pread_nowait_support.load(std::memory_order_relaxed)) + { + /// It reports real time spent including the time spent while thread was preempted doing nothing. + /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). + /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it + /// (NetlinkMetricsProvider has about 500K RPS). + Stopwatch watch(CLOCK_MONOTONIC); + + SCOPE_EXIT({ + watch.stop(); + + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + }); + + std::promise<Result> promise; + std::future<Result> future = promise.get_future(); + + size_t bytes_read = 0; + while (!bytes_read) + { + ssize_t res = 0; + + { + CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + + struct iovec io_vec{ .iov_base = request.buf, .iov_len = request.size }; + res = syscall( + SYS_preadv2, fd, + &io_vec, 1, + /// This is kind of weird calling convention for syscall. + static_cast<int64_t>(request.offset), static_cast<int64_t>(request.offset >> 32), + /// This flag forces read from page cache or returning EAGAIN. + RWF_NOWAIT); + } + + if (!res) + { + /// The file has ended. + promise.set_value({0, 0, nullptr}); + return future; + } + + if (-1 == res) + { + if (errno == ENOSYS || errno == EOPNOTSUPP) + { + /// No support for the syscall or the flag in the Linux kernel. + has_pread_nowait_support.store(false, std::memory_order_relaxed); + break; + } + else if (errno == EAGAIN) + { + /// Data is not available in page cache. Will hand off to thread pool. + break; + } + else if (errno == EINTR) + { + /// Interrupted by a signal. + continue; + } + else + { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + promise.set_exception(std::make_exception_ptr(ErrnoException( + fmt::format("Cannot read from file {}, {}", fd, errnoToString()), + ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno))); + return future; + } + } + else + { + bytes_read += res; + __msan_unpoison(request.buf, res); + } + } + + if (bytes_read) + { + /// Read successfully from page cache. + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHit); + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitBytes, bytes_read); + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + + promise.set_value({bytes_read, request.ignore, nullptr}); + return future; + } + } +#endif + + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss); + + auto schedule = threadPoolCallbackRunner<Result>(*pool, "ThreadPoolRead"); + + return schedule([request, fd]() -> Result + { + Stopwatch watch(CLOCK_MONOTONIC); + SCOPE_EXIT({ + watch.stop(); + + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + }); + + size_t bytes_read = 0; + while (!bytes_read) + { + ssize_t res = 0; + + { + CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + res = ::pread(fd, request.buf, request.size, request.offset); + } + + /// File has ended. + if (!res) + break; + + if (-1 == res && errno != EINTR) + { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + + bytes_read += res; + } + + watch.stop(); + + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissBytes, bytes_read); + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + + return Result{ .size = bytes_read, .offset = request.ignore }; + }, request.priority); +} + +void ThreadPoolReader::wait() +{ + pool->wait(); +} + +} |
