aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 10:11:16 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 10:33:35 +0300
commitfa9347ea5cf4447897b525032be9a711cc3dc583 (patch)
tree4f3d4f493e4cfb43a3c8b5f7e279621c41e0e978 /contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp
parentf82bfd2a08a51c4815a4cde64974f819ed4f7128 (diff)
downloadydb-fa9347ea5cf4447897b525032be9a711cc3dc583.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp')
-rw-r--r--contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp86
1 files changed, 86 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp b/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp
new file mode 100644
index 0000000000..e72347146f
--- /dev/null
+++ b/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp
@@ -0,0 +1,86 @@
+#include <IO/PeekableWriteBuffer.h>
+
+namespace DB
+{
+
+PeekableWriteBuffer::PeekableWriteBuffer(DB::WriteBuffer & sub_buf_) : BufferWithOwnMemory(0), sub_buf(sub_buf_)
+{
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin() + sub_buf.offset(), sub_working.size() - sub_buf.offset(), 0);
+}
+
+void PeekableWriteBuffer::nextImpl()
+{
+ if (checkpoint)
+ {
+ if (write_to_own_memory)
+ {
+ size_t prev_size = position() - memory.data();
+ size_t new_size = memory.size() * 2;
+ memory.resize(new_size);
+ BufferBase::set(memory.data() + prev_size, memory.size() - prev_size, 0);
+ return;
+ }
+
+ if (memory.size() == 0)
+ memory.resize(DBMS_DEFAULT_BUFFER_SIZE);
+
+ sub_buf.position() = position();
+ BufferBase::set(memory.data(), memory.size(), 0);
+ write_to_own_memory = true;
+ return;
+ }
+
+ sub_buf.position() = position();
+ sub_buf.next();
+ BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
+}
+
+
+void PeekableWriteBuffer::dropCheckpoint()
+{
+ assert(checkpoint);
+ checkpoint = std::nullopt;
+ /// If we have saved data in own memory, write it to sub-buf.
+ if (write_to_own_memory)
+ {
+ try
+ {
+ sub_buf.next();
+ sub_buf.write(memory.data(), position() - memory.data());
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
+ write_to_own_memory = false;
+ }
+ catch (...)
+ {
+ /// If exception happened during writing to sub buffer, we should
+ /// update buffer to not leave it in invalid state.
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
+ write_to_own_memory = false;
+ throw;
+ }
+ }
+
+}
+
+void PeekableWriteBuffer::rollbackToCheckpoint(bool drop)
+{
+ assert(checkpoint);
+
+ /// Just ignore all data written after checkpoint.
+ if (write_to_own_memory)
+ {
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
+ write_to_own_memory = false;
+ }
+
+ position() = *checkpoint;
+
+ if (drop)
+ checkpoint = std::nullopt;
+}
+
+}