aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/StdStreamBufFromReadBuffer.cpp
blob: a814dff040bd5db35b197b0e2c465c1cf32323c5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <IO/StdStreamBufFromReadBuffer.h>
#include <IO/SeekableReadBuffer.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
    extern const int SEEK_POSITION_OUT_OF_BOUND;
}


StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(std::unique_ptr<ReadBuffer> read_buffer_, size_t size_)
    : read_buffer(std::move(read_buffer_)), seekable_read_buffer(dynamic_cast<SeekableReadBuffer *>(read_buffer.get())), size(size_)
{
}

StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(ReadBuffer & read_buffer_, size_t size_) : size(size_)
{
    if (dynamic_cast<SeekableReadBuffer *>(&read_buffer_))
    {
        read_buffer = wrapSeekableReadBufferReference(static_cast<SeekableReadBuffer &>(read_buffer_));
        seekable_read_buffer = static_cast<SeekableReadBuffer *>(read_buffer.get());
    }
    else
    {
        read_buffer = wrapReadBufferReference(read_buffer_);
    }
}

StdStreamBufFromReadBuffer::~StdStreamBufFromReadBuffer() = default;

int StdStreamBufFromReadBuffer::underflow()
{
    char c;
    if (!read_buffer->peek(c))
        return std::char_traits<char>::eof();
    return c;
}

std::streamsize StdStreamBufFromReadBuffer::showmanyc()
{
    return read_buffer->available();
}

std::streamsize StdStreamBufFromReadBuffer::xsgetn(char_type* s, std::streamsize count)
{
    return read_buffer->read(s, count);
}

std::streampos StdStreamBufFromReadBuffer::seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which)
{
    if (dir == std::ios_base::beg)
        return seekpos(off, which);
    else if (dir == std::ios_base::cur)
        return seekpos(getCurrentPosition() + off, which);
    else if (dir == std::ios_base::end)
        return seekpos(size + off, which);
    else
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong seek's base {}", static_cast<int>(dir));
}

std::streampos StdStreamBufFromReadBuffer::seekpos(std::streampos pos, std::ios_base::openmode which)
{
    if (!(which & std::ios_base::in))
        throw Exception(
            ErrorCodes::LOGICAL_ERROR, "Wrong seek mode {}", static_cast<int>(which));

    std::streamoff offset = pos - getCurrentPosition();
    if (!offset)
        return pos;

    if ((read_buffer->buffer().begin() <= read_buffer->position() + offset) && (read_buffer->position() + offset <= read_buffer->buffer().end()))
    {
        read_buffer->position() += offset;
        return pos;
    }

    if (seekable_read_buffer)
        return seekable_read_buffer->seek(pos, SEEK_SET);

    if (offset > 0)
    {
        read_buffer->ignore(offset);
        return pos;
    }

    throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek's offset {} is out of bound", pos);
}

std::streampos StdStreamBufFromReadBuffer::getCurrentPosition() const
{
    if (seekable_read_buffer)
        return seekable_read_buffer->getPosition();
    else
        return read_buffer->count();
}

std::streamsize StdStreamBufFromReadBuffer::xsputn(const char*, std::streamsize)
{
    throw Exception(ErrorCodes::LOGICAL_ERROR, "StdStreamBufFromReadBuffer cannot be used for output");
}

int StdStreamBufFromReadBuffer::overflow(int)
{
    throw Exception(ErrorCodes::LOGICAL_ERROR, "StdStreamBufFromReadBuffer cannot be used for output");
}

}