aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/copyData.cpp
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/IO/copyData.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/IO/copyData.cpp')
-rw-r--r--contrib/clickhouse/src/IO/copyData.cpp112
1 files changed, 112 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/copyData.cpp b/contrib/clickhouse/src/IO/copyData.cpp
new file mode 100644
index 0000000000..07222a930b
--- /dev/null
+++ b/contrib/clickhouse/src/IO/copyData.cpp
@@ -0,0 +1,112 @@
+#include <Common/Exception.h>
+#include <Common/Throttler.h>
+#include <IO/ReadBuffer.h>
+#include <IO/WriteBuffer.h>
+#include <IO/copyData.h>
+
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int ATTEMPT_TO_READ_AFTER_EOF;
+ extern const int CANNOT_READ_ALL_DATA;
+}
+
+namespace
+{
+
+void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic<int> * is_cancelled, ThrottlerPtr throttler)
+{
+ /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
+ while (bytes > 0 && !from.eof())
+ {
+ if (is_cancelled && *is_cancelled)
+ return;
+
+ /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read.
+ size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
+ to.write(from.position(), count);
+ from.position() += count;
+ bytes -= count;
+
+ if (throttler)
+ throttler->add(count);
+ }
+
+ if (check_bytes && bytes > 0)
+ throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after EOF.");
+}
+
+void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function<void()> cancellation_hook, ThrottlerPtr throttler)
+{
+ /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
+ while (bytes > 0 && !from.eof())
+ {
+ if (cancellation_hook)
+ cancellation_hook();
+
+ /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read.
+ size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
+ to.write(from.position(), count);
+ from.position() += count;
+ bytes -= count;
+
+ if (throttler)
+ throttler->add(count);
+ }
+
+ if (check_bytes && bytes > 0)
+ throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after EOF.");
+}
+
+}
+
+void copyData(ReadBuffer & from, WriteBuffer & to)
+{
+ copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr, nullptr);
+}
+
+void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled)
+{
+ copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, nullptr);
+}
+
+void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook)
+{
+ copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), cancellation_hook, nullptr);
+}
+
+void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
+{
+ copyDataImpl(from, to, true, bytes, nullptr, nullptr);
+}
+
+void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled)
+{
+ copyDataImpl(from, to, true, bytes, &is_cancelled, nullptr);
+}
+
+void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook)
+{
+ copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr);
+}
+
+void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes)
+{
+ copyDataImpl(from, to, false, max_bytes, nullptr, nullptr);
+ if (!from.eof())
+ throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, max readable size reached.");
+}
+
+void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
+{
+ copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, throttler);
+}
+
+void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
+{
+ copyDataImpl(from, to, true, bytes, &is_cancelled, throttler);
+}
+
+}