aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/clickhouse/client/base/compressed.cpp
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-11-23 11:26:33 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-11-23 12:01:57 +0300
commit44354d0fc55926c1d4510d1d2c9c9f6a1a5e9300 (patch)
treecb4d75cd1c6dbc3da0ed927337fd8d1b6ed9da84 /library/cpp/clickhouse/client/base/compressed.cpp
parent0e69bf615395fdd48ecee032faaec81bc468b0b8 (diff)
downloadydb-44354d0fc55926c1d4510d1d2c9c9f6a1a5e9300.tar.gz
YQ Connector:test INNER JOIN
Diffstat (limited to 'library/cpp/clickhouse/client/base/compressed.cpp')
-rw-r--r--library/cpp/clickhouse/client/base/compressed.cpp88
1 files changed, 88 insertions, 0 deletions
diff --git a/library/cpp/clickhouse/client/base/compressed.cpp b/library/cpp/clickhouse/client/base/compressed.cpp
new file mode 100644
index 0000000000..b883d534ee
--- /dev/null
+++ b/library/cpp/clickhouse/client/base/compressed.cpp
@@ -0,0 +1,88 @@
+#include "compressed.h"
+#include "wire_format.h"
+
+#include <util/generic/yexception.h>
+
+#include <contrib/libs/lz4/lz4.h>
+#include <contrib/restricted/cityhash-1.0.2/city.h>
+
+#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
+
+namespace NClickHouse {
+ TCompressedInput::TCompressedInput(TCodedInputStream* input)
+ : Input_(input)
+ {
+ }
+
+ TCompressedInput::~TCompressedInput() {
+ if (!Mem_.Exhausted()) {
+ Y_ABORT("some data was not read");
+ }
+ }
+
+ size_t TCompressedInput::DoNext(const void** ptr, size_t len) {
+ if (Mem_.Exhausted()) {
+ if (!Decompress()) {
+ return 0;
+ }
+ }
+
+ return Mem_.Next(ptr, len);
+ }
+
+ bool TCompressedInput::Decompress() {
+ CityHash_v1_0_2::uint128 hash;
+ ui32 compressed = 0;
+ ui32 original = 0;
+ ui8 method = 0;
+
+ if (!TWireFormat::ReadFixed(Input_, &hash)) {
+ return false;
+ }
+ if (!TWireFormat::ReadFixed(Input_, &method)) {
+ return false;
+ }
+
+ if (method != 0x82) {
+ ythrow yexception() << "unsupported compression method "
+ << int(method);
+ } else {
+ if (!TWireFormat::ReadFixed(Input_, &compressed)) {
+ return false;
+ }
+ if (!TWireFormat::ReadFixed(Input_, &original)) {
+ return false;
+ }
+
+ if (compressed > DBMS_MAX_COMPRESSED_SIZE) {
+ ythrow yexception() << "compressed data too big";
+ }
+
+ TTempBuf tmp(compressed);
+
+ // Заполнить заголовок сжатых данных.
+ tmp.Append(&method, sizeof(method));
+ tmp.Append(&compressed, sizeof(compressed));
+ tmp.Append(&original, sizeof(original));
+
+ if (!TWireFormat::ReadBytes(Input_, tmp.Data() + 9, compressed - 9)) {
+ return false;
+ } else {
+ if (hash != CityHash_v1_0_2::CityHash128(tmp.Data(), compressed)) {
+ ythrow yexception() << "data was corrupted";
+ }
+ }
+
+ Data_ = TTempBuf(original);
+
+ if (LZ4_decompress_fast(tmp.Data() + 9, Data_.Data(), original) < 0) {
+ ythrow yexception() << "can't decompress data";
+ } else {
+ Mem_.Reset(Data_.Data(), original);
+ }
+ }
+
+ return true;
+ }
+
+}