aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Compression/CompressedReadBufferFromFile.cpp
blob: 9dc40b8217ce0ac3e92ab7230945ebc6ae11be38 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#include <cassert>

#include "CompressedReadBufferFromFile.h"

#include <Compression/LZ4_decompress_faster.h>
#include <IO/WriteHelpers.h>
#include <Disks/IO/createReadBufferFromFileBase.h>


namespace DB
{
namespace ErrorCodes
{
    extern const int SEEK_POSITION_OUT_OF_BOUND;
}


bool CompressedReadBufferFromFile::nextImpl()
{
    size_t size_decompressed = 0;
    size_t size_compressed_without_checksum;
    size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum, false);
    if (!size_compressed)
        return false;

    auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();

    /// This is for clang static analyzer.
    assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);

    memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
    working_buffer = Buffer(memory.data(), &memory[size_decompressed]);

    decompress(working_buffer, size_decompressed, size_compressed_without_checksum);

    /// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
    /// check that we are not seeking beyond working buffer.
    if (nextimpl_working_buffer_offset > working_buffer.size())
        throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Required to move position beyond the decompressed block (pos: "
        "{}, block size: {})", nextimpl_working_buffer_offset, toString(working_buffer.size()));

    return true;
}


CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_)
    : BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
    compressed_in = &file_in;
    allow_different_codecs = allow_different_codecs_;
}


void CompressedReadBufferFromFile::prefetch(Priority priority)
{
    file_in.prefetch(priority);
}


void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
    /// Nothing to do if we already at required position
    if (!size_compressed && static_cast<size_t>(file_in.getPosition()) == offset_in_compressed_file && /// correct position in compressed file
        ((!buffer().empty() && offset() == offset_in_decompressed_block)     /// correct position in buffer or
         || nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one
        return;

    /// Our seek is within working_buffer, so just move the position
    if (size_compressed &&
        offset_in_compressed_file == file_in.getPosition() - size_compressed &&
        offset_in_decompressed_block <= working_buffer.size())
    {
        pos = working_buffer.begin() + offset_in_decompressed_block;
    }
    else /// Our seek outside working buffer, so perform "lazy seek"
    {
        /// Actually seek compressed file
        file_in.seek(offset_in_compressed_file, SEEK_SET);
        /// We will discard our working_buffer, but have to account rest bytes
        bytes += offset();
        /// No data, everything discarded
        resetWorkingBuffer();
        size_compressed = 0;
        /// Remember required offset in decompressed block which will be set in
        /// the next ReadBuffer::next() call
        nextimpl_working_buffer_offset = offset_in_decompressed_block;
    }
}

size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
{
    size_t bytes_read = 0;
    /// The codec mode is only relevant for codecs which support hardware offloading.
    ICompressionCodec::CodecMode decompress_mode = ICompressionCodec::CodecMode::Synchronous;
    bool read_tail = false;

    /// If there are unread bytes in the buffer, then we copy needed to `to`.
    if (pos < working_buffer.end())
        bytes_read += read(to, std::min(static_cast<size_t>(working_buffer.end() - pos), n));

    /// If you need to read more - we will, if possible, decompress at once to `to`.
    while (bytes_read < n)
    {
        size_t size_decompressed = 0;
        size_t size_compressed_without_checksum = 0;

        ///Try to read block which is entirely located in a single 'compressed_in->' buffer.
        size_t new_size_compressed = readCompressedDataBlockForAsynchronous(size_decompressed, size_compressed_without_checksum);

        if (new_size_compressed)
        {
            /// Current block is entirely located in a single 'compressed_in->' buffer.
            /// We can set asynchronous decompression mode if supported to boost performance.
            decompress_mode = ICompressionCodec::CodecMode::Asynchronous;
        }
        else
        {
            /// Current block cannot be decompressed asynchronously, means it probably span across two compressed_in buffers.
            /// Meanwhile, asynchronous requests for previous blocks should be flushed if any.
            flushAsynchronousDecompressRequests();
            /// Fallback to generic API
            new_size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum, false);
            decompress_mode = ICompressionCodec::CodecMode::Synchronous;
        }
        size_compressed = 0; /// file_in no longer points to the end of the block in working_buffer.

        if (!new_size_compressed)
            break;

        auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();

        /// If the decompressed block fits entirely where it needs to be copied and we don't
        /// need to skip some bytes in decompressed data (seek happened before readBig call).
        if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
        {
            setDecompressMode(decompress_mode);
            decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
            bytes_read += size_decompressed;
            bytes += size_decompressed;
        }
        else if (nextimpl_working_buffer_offset > 0)
        {
            /// Need to skip some bytes in decompressed data (seek happened before readBig call).
            size_compressed = new_size_compressed;
            bytes += offset();

            /// This is for clang static analyzer.
            assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);
            memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
            working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
            /// Synchronous mode must be set since we need read partial data immediately from working buffer to target buffer.
            setDecompressMode(ICompressionCodec::CodecMode::Synchronous);
            decompress(working_buffer, size_decompressed, size_compressed_without_checksum);

            /// Read partial data from first block. Won't run here at second block.
            /// Avoid to call nextImpl and unnecessary memcpy in read when the second block fits entirely to output buffer.
            size_t size_partial = std::min((size_decompressed - nextimpl_working_buffer_offset),(n - bytes_read));
            pos = working_buffer.begin() + nextimpl_working_buffer_offset;
            nextimpl_working_buffer_offset = 0;
            bytes_read += read(to + bytes_read, size_partial);
        }
        else
        {
            size_compressed = new_size_compressed;
            bytes += offset();

            /// This is for clang static analyzer.
            assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);
            memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
            working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
            // Asynchronous mode can be set here because working_buffer wouldn't be overwritten any more since this is the last block.
            setDecompressMode(ICompressionCodec::CodecMode::Asynchronous);
            decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
            read_tail = true;
            break;
        }
    }

    /// Here we must make sure all asynchronous requests above are completely done.
    flushAsynchronousDecompressRequests();

    if (read_tail)
    {
        /// Manually take nextimpl_working_buffer_offset into account, because we don't use
        /// nextImpl in this method.
        pos = working_buffer.begin();
        bytes_read += read(to + bytes_read, n - bytes_read);
    }

    return bytes_read;
}

}