diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/IO/copyData.cpp | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/IO/copyData.cpp')
-rw-r--r-- | contrib/clickhouse/src/IO/copyData.cpp | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/copyData.cpp b/contrib/clickhouse/src/IO/copyData.cpp new file mode 100644 index 0000000000..07222a930b --- /dev/null +++ b/contrib/clickhouse/src/IO/copyData.cpp @@ -0,0 +1,112 @@ +#include <Common/Exception.h> +#include <Common/Throttler.h> +#include <IO/ReadBuffer.h> +#include <IO/WriteBuffer.h> +#include <IO/copyData.h> + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int ATTEMPT_TO_READ_AFTER_EOF; + extern const int CANNOT_READ_ALL_DATA; +} + +namespace +{ + +void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic<int> * is_cancelled, ThrottlerPtr throttler) +{ + /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false. + while (bytes > 0 && !from.eof()) + { + if (is_cancelled && *is_cancelled) + return; + + /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read. + size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position())); + to.write(from.position(), count); + from.position() += count; + bytes -= count; + + if (throttler) + throttler->add(count); + } + + if (check_bytes && bytes > 0) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after EOF."); +} + +void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function<void()> cancellation_hook, ThrottlerPtr throttler) +{ + /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false. + while (bytes > 0 && !from.eof()) + { + if (cancellation_hook) + cancellation_hook(); + + /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read. + size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position())); + to.write(from.position(), count); + from.position() += count; + bytes -= count; + + if (throttler) + throttler->add(count); + } + + if (check_bytes && bytes > 0) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after EOF."); +} + +} + +void copyData(ReadBuffer & from, WriteBuffer & to) +{ + copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr, nullptr); +} + +void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled) +{ + copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, nullptr); +} + +void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook) +{ + copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), cancellation_hook, nullptr); +} + +void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes) +{ + copyDataImpl(from, to, true, bytes, nullptr, nullptr); +} + +void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled) +{ + copyDataImpl(from, to, true, bytes, &is_cancelled, nullptr); +} + +void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook) +{ + copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr); +} + +void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes) +{ + copyDataImpl(from, to, false, max_bytes, nullptr, nullptr); + if (!from.eof()) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, max readable size reached."); +} + +void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler) +{ + copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, throttler); +} + +void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler) +{ + copyDataImpl(from, to, true, bytes, &is_cancelled, throttler); +} + +} |