diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-09 10:11:16 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-09 10:33:35 +0300 |
commit | fa9347ea5cf4447897b525032be9a711cc3dc583 (patch) | |
tree | 4f3d4f493e4cfb43a3c8b5f7e279621c41e0e978 /contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp | |
parent | f82bfd2a08a51c4815a4cde64974f819ed4f7128 (diff) | |
download | ydb-fa9347ea5cf4447897b525032be9a711cc3dc583.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp')
-rw-r--r-- | contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp b/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp new file mode 100644 index 0000000000..e72347146f --- /dev/null +++ b/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp @@ -0,0 +1,86 @@ +#include <IO/PeekableWriteBuffer.h> + +namespace DB +{ + +PeekableWriteBuffer::PeekableWriteBuffer(DB::WriteBuffer & sub_buf_) : BufferWithOwnMemory(0), sub_buf(sub_buf_) +{ + Buffer & sub_working = sub_buf.buffer(); + BufferBase::set(sub_working.begin() + sub_buf.offset(), sub_working.size() - sub_buf.offset(), 0); +} + +void PeekableWriteBuffer::nextImpl() +{ + if (checkpoint) + { + if (write_to_own_memory) + { + size_t prev_size = position() - memory.data(); + size_t new_size = memory.size() * 2; + memory.resize(new_size); + BufferBase::set(memory.data() + prev_size, memory.size() - prev_size, 0); + return; + } + + if (memory.size() == 0) + memory.resize(DBMS_DEFAULT_BUFFER_SIZE); + + sub_buf.position() = position(); + BufferBase::set(memory.data(), memory.size(), 0); + write_to_own_memory = true; + return; + } + + sub_buf.position() = position(); + sub_buf.next(); + BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset()); +} + + +void PeekableWriteBuffer::dropCheckpoint() +{ + assert(checkpoint); + checkpoint = std::nullopt; + /// If we have saved data in own memory, write it to sub-buf. + if (write_to_own_memory) + { + try + { + sub_buf.next(); + sub_buf.write(memory.data(), position() - memory.data()); + Buffer & sub_working = sub_buf.buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); + write_to_own_memory = false; + } + catch (...) + { + /// If exception happened during writing to sub buffer, we should + /// update buffer to not leave it in invalid state. + Buffer & sub_working = sub_buf.buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); + write_to_own_memory = false; + throw; + } + } + +} + +void PeekableWriteBuffer::rollbackToCheckpoint(bool drop) +{ + assert(checkpoint); + + /// Just ignore all data written after checkpoint. + if (write_to_own_memory) + { + Buffer & sub_working = sub_buf.buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); + write_to_own_memory = false; + } + + position() = *checkpoint; + + if (drop) + checkpoint = std::nullopt; +} + +} |