aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/ZstdDeflatingWriteBuffer.cpp
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/IO/ZstdDeflatingWriteBuffer.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/IO/ZstdDeflatingWriteBuffer.cpp')
-rw-r--r--contrib/clickhouse/src/IO/ZstdDeflatingWriteBuffer.cpp104
1 files changed, 104 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/IO/ZstdDeflatingWriteBuffer.cpp b/contrib/clickhouse/src/IO/ZstdDeflatingWriteBuffer.cpp
new file mode 100644
index 0000000000..83d8487e3e
--- /dev/null
+++ b/contrib/clickhouse/src/IO/ZstdDeflatingWriteBuffer.cpp
@@ -0,0 +1,104 @@
+#include <IO/ZstdDeflatingWriteBuffer.h>
+#include <Common/Exception.h>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int ZSTD_ENCODER_FAILED;
+}
+
+ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
+ std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
+ : WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
+{
+ cctx = ZSTD_createCCtx();
+ if (cctx == nullptr)
+ throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
+ size_t ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);
+ if (ZSTD_isError(ret))
+ throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
+ "zstd stream encoder option setting failed: error code: {}; zstd version: {}",
+ ret, ZSTD_VERSION_STRING);
+ ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
+ if (ZSTD_isError(ret))
+ throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
+ "zstd stream encoder option setting failed: error code: {}; zstd version: {}",
+ ret, ZSTD_VERSION_STRING);
+
+ input = {nullptr, 0, 0};
+ output = {nullptr, 0, 0};
+}
+
+ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer() = default;
+
+void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode)
+{
+ input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
+ input.size = offset();
+ input.pos = 0;
+
+ try
+ {
+ bool ended = false;
+ do
+ {
+ out->nextIfAtEnd();
+
+ output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
+ output.size = out->buffer().size();
+ output.pos = out->offset();
+
+ size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
+ if (ZSTD_isError(compression_result))
+ throw Exception(
+ ErrorCodes::ZSTD_ENCODER_FAILED,
+ "ZSTD stream encoding failed: error: '{}'; zstd version: {}",
+ ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
+
+ out->position() = out->buffer().begin() + output.pos;
+
+ bool everything_was_compressed = (input.pos == input.size);
+ bool everything_was_flushed = compression_result == 0;
+
+ ended = everything_was_compressed && everything_was_flushed;
+ } while (!ended);
+ }
+ catch (...)
+ {
+ /// Do not try to write next time after exception.
+ out->position() = out->buffer().begin();
+ throw;
+ }
+}
+
+void ZstdDeflatingWriteBuffer::nextImpl()
+{
+ if (offset())
+ flush(ZSTD_e_flush);
+}
+
+void ZstdDeflatingWriteBuffer::finalizeBefore()
+{
+ flush(ZSTD_e_end);
+}
+
+void ZstdDeflatingWriteBuffer::finalizeAfter()
+{
+ try
+ {
+ size_t err = ZSTD_freeCCtx(cctx);
+ /// This is just in case, since it is impossible to get an error by using this wrapper.
+ if (unlikely(err))
+ throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "ZSTD_freeCCtx failed: error: '{}'; zstd version: {}",
+ ZSTD_getErrorName(err), ZSTD_VERSION_STRING);
+ }
+ catch (...)
+ {
+ /// It is OK not to terminate under an error from ZSTD_freeCCtx()
+ /// since all data already written to the stream.
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ }
+}
+
+}