aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/copyData.cpp
blob: 07222a930b521cbb6641191c2e20db8ac1b4992b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <Common/Exception.h>
#include <Common/Throttler.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/copyData.h>


namespace DB
{
namespace ErrorCodes
{
    extern const int ATTEMPT_TO_READ_AFTER_EOF;
    extern const int CANNOT_READ_ALL_DATA;
}

namespace
{

void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic<int> * is_cancelled, ThrottlerPtr throttler)
{
    /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
    while (bytes > 0 && !from.eof())
    {
        if (is_cancelled && *is_cancelled)
            return;

        /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read.
        size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
        to.write(from.position(), count);
        from.position() += count;
        bytes -= count;

        if (throttler)
            throttler->add(count);
    }

    if (check_bytes && bytes > 0)
        throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after EOF.");
}

void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function<void()> cancellation_hook, ThrottlerPtr throttler)
{
    /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
    while (bytes > 0 && !from.eof())
    {
        if (cancellation_hook)
            cancellation_hook();

        /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read.
        size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
        to.write(from.position(), count);
        from.position() += count;
        bytes -= count;

        if (throttler)
            throttler->add(count);
    }

    if (check_bytes && bytes > 0)
        throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after EOF.");
}

}

void copyData(ReadBuffer & from, WriteBuffer & to)
{
    copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr, nullptr);
}

void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled)
{
    copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, nullptr);
}

void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook)
{
    copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), cancellation_hook, nullptr);
}

void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
{
    copyDataImpl(from, to, true, bytes, nullptr, nullptr);
}

void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled)
{
    copyDataImpl(from, to, true, bytes, &is_cancelled, nullptr);
}

void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook)
{
    copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr);
}

void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes)
{
    copyDataImpl(from, to, false, max_bytes, nullptr, nullptr);
    if (!from.eof())
        throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, max readable size reached.");
}

void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
{
    copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, throttler);
}

void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
{
    copyDataImpl(from, to, true, bytes, &is_cancelled, throttler);
}

}