diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-11-23 11:26:33 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-11-23 12:01:57 +0300 |
commit | 44354d0fc55926c1d4510d1d2c9c9f6a1a5e9300 (patch) | |
tree | cb4d75cd1c6dbc3da0ed927337fd8d1b6ed9da84 /library/cpp | |
parent | 0e69bf615395fdd48ecee032faaec81bc468b0b8 (diff) | |
download | ydb-44354d0fc55926c1d4510d1d2c9c9f6a1a5e9300.tar.gz |
YQ Connector:test INNER JOIN
Diffstat (limited to 'library/cpp')
68 files changed, 4408 insertions, 0 deletions
diff --git a/library/cpp/CMakeLists.darwin-arm64.txt b/library/cpp/CMakeLists.darwin-arm64.txt index cf47314f07..b7794e3005 100644 --- a/library/cpp/CMakeLists.darwin-arm64.txt +++ b/library/cpp/CMakeLists.darwin-arm64.txt @@ -19,6 +19,7 @@ add_subdirectory(cache) add_subdirectory(case_insensitive_string) add_subdirectory(cgiparam) add_subdirectory(charset) +add_subdirectory(clickhouse) add_subdirectory(codecs) add_subdirectory(colorizer) add_subdirectory(compproto) diff --git a/library/cpp/CMakeLists.darwin-x86_64.txt b/library/cpp/CMakeLists.darwin-x86_64.txt index 0f393b2039..ca80e3eed3 100644 --- a/library/cpp/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/CMakeLists.darwin-x86_64.txt @@ -19,6 +19,7 @@ add_subdirectory(cache) add_subdirectory(case_insensitive_string) add_subdirectory(cgiparam) add_subdirectory(charset) +add_subdirectory(clickhouse) add_subdirectory(codecs) add_subdirectory(colorizer) add_subdirectory(compproto) diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt index cf47314f07..b7794e3005 100644 --- a/library/cpp/CMakeLists.linux-aarch64.txt +++ b/library/cpp/CMakeLists.linux-aarch64.txt @@ -19,6 +19,7 @@ add_subdirectory(cache) add_subdirectory(case_insensitive_string) add_subdirectory(cgiparam) add_subdirectory(charset) +add_subdirectory(clickhouse) add_subdirectory(codecs) add_subdirectory(colorizer) add_subdirectory(compproto) diff --git a/library/cpp/CMakeLists.linux-x86_64.txt b/library/cpp/CMakeLists.linux-x86_64.txt index 0f393b2039..ca80e3eed3 100644 --- a/library/cpp/CMakeLists.linux-x86_64.txt +++ b/library/cpp/CMakeLists.linux-x86_64.txt @@ -19,6 +19,7 @@ add_subdirectory(cache) add_subdirectory(case_insensitive_string) add_subdirectory(cgiparam) add_subdirectory(charset) +add_subdirectory(clickhouse) add_subdirectory(codecs) add_subdirectory(colorizer) add_subdirectory(compproto) diff --git a/library/cpp/clickhouse/CMakeLists.darwin-arm64.txt b/library/cpp/clickhouse/CMakeLists.darwin-arm64.txt new file mode 100644 index 0000000000..7f79107ebc --- /dev/null +++ b/library/cpp/clickhouse/CMakeLists.darwin-arm64.txt @@ -0,0 +1,9 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(client) diff --git a/library/cpp/clickhouse/CMakeLists.darwin-x86_64.txt b/library/cpp/clickhouse/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..7f79107ebc --- /dev/null +++ b/library/cpp/clickhouse/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,9 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(client) diff --git a/library/cpp/clickhouse/CMakeLists.linux-aarch64.txt b/library/cpp/clickhouse/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..7f79107ebc --- /dev/null +++ b/library/cpp/clickhouse/CMakeLists.linux-aarch64.txt @@ -0,0 +1,9 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(client) diff --git a/library/cpp/clickhouse/CMakeLists.linux-x86_64.txt b/library/cpp/clickhouse/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..7f79107ebc --- /dev/null +++ b/library/cpp/clickhouse/CMakeLists.linux-x86_64.txt @@ -0,0 +1,9 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(client) diff --git a/library/cpp/clickhouse/CMakeLists.txt b/library/cpp/clickhouse/CMakeLists.txt new file mode 100644 index 0000000000..1beba2829f --- /dev/null +++ b/library/cpp/clickhouse/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") + include(CMakeLists.darwin-arm64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/clickhouse/client/CMakeLists.darwin-arm64.txt b/library/cpp/clickhouse/client/CMakeLists.darwin-arm64.txt new file mode 100644 index 0000000000..d31f838946 --- /dev/null +++ b/library/cpp/clickhouse/client/CMakeLists.darwin-arm64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(base) +add_subdirectory(columns) +add_subdirectory(types) + +add_library(cpp-clickhouse-client) +target_link_libraries(cpp-clickhouse-client PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-lz4 + contrib-restricted-cityhash-1.0.2 + clickhouse-client-base + clickhouse-client-columns + clickhouse-client-types + cpp-openssl-io +) +target_sources(cpp-clickhouse-client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/block.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/client.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/query.cpp +) diff --git a/library/cpp/clickhouse/client/CMakeLists.darwin-x86_64.txt b/library/cpp/clickhouse/client/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..d31f838946 --- /dev/null +++ b/library/cpp/clickhouse/client/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(base) +add_subdirectory(columns) +add_subdirectory(types) + +add_library(cpp-clickhouse-client) +target_link_libraries(cpp-clickhouse-client PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-lz4 + contrib-restricted-cityhash-1.0.2 + clickhouse-client-base + clickhouse-client-columns + clickhouse-client-types + cpp-openssl-io +) +target_sources(cpp-clickhouse-client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/block.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/client.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/query.cpp +) diff --git a/library/cpp/clickhouse/client/CMakeLists.linux-aarch64.txt b/library/cpp/clickhouse/client/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..f86df04097 --- /dev/null +++ b/library/cpp/clickhouse/client/CMakeLists.linux-aarch64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(base) +add_subdirectory(columns) +add_subdirectory(types) + +add_library(cpp-clickhouse-client) +target_link_libraries(cpp-clickhouse-client PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-lz4 + contrib-restricted-cityhash-1.0.2 + clickhouse-client-base + clickhouse-client-columns + clickhouse-client-types + cpp-openssl-io +) +target_sources(cpp-clickhouse-client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/block.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/client.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/query.cpp +) diff --git a/library/cpp/clickhouse/client/CMakeLists.linux-x86_64.txt b/library/cpp/clickhouse/client/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..f86df04097 --- /dev/null +++ b/library/cpp/clickhouse/client/CMakeLists.linux-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(base) +add_subdirectory(columns) +add_subdirectory(types) + +add_library(cpp-clickhouse-client) +target_link_libraries(cpp-clickhouse-client PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-lz4 + contrib-restricted-cityhash-1.0.2 + clickhouse-client-base + clickhouse-client-columns + clickhouse-client-types + cpp-openssl-io +) +target_sources(cpp-clickhouse-client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/block.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/client.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/query.cpp +) diff --git a/library/cpp/clickhouse/client/CMakeLists.txt b/library/cpp/clickhouse/client/CMakeLists.txt new file mode 100644 index 0000000000..1beba2829f --- /dev/null +++ b/library/cpp/clickhouse/client/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") + include(CMakeLists.darwin-arm64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/clickhouse/client/base/CMakeLists.darwin-arm64.txt b/library/cpp/clickhouse/client/base/CMakeLists.darwin-arm64.txt new file mode 100644 index 0000000000..a76303537c --- /dev/null +++ b/library/cpp/clickhouse/client/base/CMakeLists.darwin-arm64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-base) +target_link_libraries(clickhouse-client-base PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-base PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/coded.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/compressed.cpp +) diff --git a/library/cpp/clickhouse/client/base/CMakeLists.darwin-x86_64.txt b/library/cpp/clickhouse/client/base/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..a76303537c --- /dev/null +++ b/library/cpp/clickhouse/client/base/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-base) +target_link_libraries(clickhouse-client-base PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-base PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/coded.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/compressed.cpp +) diff --git a/library/cpp/clickhouse/client/base/CMakeLists.linux-aarch64.txt b/library/cpp/clickhouse/client/base/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..98015a9e92 --- /dev/null +++ b/library/cpp/clickhouse/client/base/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-base) +target_link_libraries(clickhouse-client-base PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-base PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/coded.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/compressed.cpp +) diff --git a/library/cpp/clickhouse/client/base/CMakeLists.linux-x86_64.txt b/library/cpp/clickhouse/client/base/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..98015a9e92 --- /dev/null +++ b/library/cpp/clickhouse/client/base/CMakeLists.linux-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-base) +target_link_libraries(clickhouse-client-base PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-base PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/coded.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/base/compressed.cpp +) diff --git a/library/cpp/clickhouse/client/base/CMakeLists.txt b/library/cpp/clickhouse/client/base/CMakeLists.txt new file mode 100644 index 0000000000..1beba2829f --- /dev/null +++ b/library/cpp/clickhouse/client/base/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") + include(CMakeLists.darwin-arm64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/clickhouse/client/base/coded.cpp b/library/cpp/clickhouse/client/base/coded.cpp new file mode 100644 index 0000000000..5a5d56d158 --- /dev/null +++ b/library/cpp/clickhouse/client/base/coded.cpp @@ -0,0 +1,101 @@ +#include "coded.h" + +#include <memory.h> + +namespace NClickHouse { + static const int MAX_VARINT_BYTES = 10; + + TCodedInputStream::TCodedInputStream(IZeroCopyInput* input) + : Input_(input) + { + } + + bool TCodedInputStream::ReadRaw(void* buffer, size_t size) { + ui8* p = static_cast<ui8*>(buffer); + + while (size > 0) { + const void* ptr; + + if (size_t len = Input_->Next(&ptr, size)) { + memcpy(p, ptr, len); + + p += len; + size -= len; + } else { + break; + } + } + + return size == 0; + } + + bool TCodedInputStream::Skip(size_t count) { + while (count > 0) { + const void* ptr; + size_t len = Input_->Next(&ptr, count); + + if (len == 0) { + return false; + } + + count -= len; + } + + return true; + } + + bool TCodedInputStream::ReadVarint64(ui64* value) { + *value = 0; + + for (size_t i = 0; i < 9; ++i) { + ui8 byte; + + if (!Input_->Read(&byte, sizeof(byte))) { + return false; + } else { + *value |= (byte & 0x7F) << (7 * i); + + if (!(byte & 0x80)) { + return true; + } + } + } + + // TODO skip invalid + return false; + } + + TCodedOutputStream::TCodedOutputStream(IOutputStream* output) + : Output_(output) + { + } + + void TCodedOutputStream::Flush() { + Output_->Flush(); + } + + void TCodedOutputStream::WriteRaw(const void* buffer, int size) { + Output_->Write(buffer, size); + } + + void TCodedOutputStream::WriteVarint64(ui64 value) { + ui8 bytes[MAX_VARINT_BYTES]; + int size = 0; + + for (size_t i = 0; i < 9; ++i) { + ui8 byte = value & 0x7F; + if (value > 0x7F) + byte |= 0x80; + + bytes[size++] = byte; + + value >>= 7; + if (!value) { + break; + } + } + + WriteRaw(bytes, size); + } + +} diff --git a/library/cpp/clickhouse/client/base/coded.h b/library/cpp/clickhouse/client/base/coded.h new file mode 100644 index 0000000000..486cfc8165 --- /dev/null +++ b/library/cpp/clickhouse/client/base/coded.h @@ -0,0 +1,64 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/stream/output.h> +#include <util/stream/zerocopy.h> + +namespace NClickHouse { + /** + * Class which reads and decodes binary data which is composed of varint- + * encoded integers and fixed-width pieces. + */ + class TCodedInputStream { + public: + TCodedInputStream() = default; + /// Create a CodedInputStream that reads from the given ZeroCopyInput. + explicit TCodedInputStream(IZeroCopyInput* input); + + // Read an unsigned integer with Varint encoding, truncating to 32 bits. + // Reading a 32-bit value is equivalent to reading a 64-bit one and casting + // it to uint32, but may be more efficient. + bool ReadVarint32(ui32* value); + + // Read an unsigned integer with Varint encoding. + bool ReadVarint64(ui64* value); + + // Read raw bytes, copying them into the given buffer. + bool ReadRaw(void* buffer, size_t size); + + // Like ReadRaw, but reads into a string. + // + // Implementation Note: ReadString() grows the string gradually as it + // reads in the data, rather than allocating the entire requested size + // upfront. This prevents denial-of-service attacks in which a client + // could claim that a string is going to be MAX_INT bytes long in order to + // crash the server because it can't allocate this much space at once. + bool ReadString(TString* buffer, int size); + + // Skips a number of bytes. Returns false if an underlying read error + // occurs. + bool Skip(size_t count); + + private: + IZeroCopyInput* Input_; + }; + + class TCodedOutputStream { + public: + TCodedOutputStream() = default; + /// Create a CodedInputStream that writes to the given ZeroCopyOutput. + explicit TCodedOutputStream(IOutputStream* output); + + void Flush(); + + // Write raw bytes, copying them from the given buffer. + void WriteRaw(const void* buffer, int size); + + /// Write an unsigned integer with Varint encoding. + void WriteVarint64(const ui64 value); + + private: + IOutputStream* Output_; + }; + +} diff --git a/library/cpp/clickhouse/client/base/compressed.cpp b/library/cpp/clickhouse/client/base/compressed.cpp new file mode 100644 index 0000000000..b883d534ee --- /dev/null +++ b/library/cpp/clickhouse/client/base/compressed.cpp @@ -0,0 +1,88 @@ +#include "compressed.h" +#include "wire_format.h" + +#include <util/generic/yexception.h> + +#include <contrib/libs/lz4/lz4.h> +#include <contrib/restricted/cityhash-1.0.2/city.h> + +#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB + +namespace NClickHouse { + TCompressedInput::TCompressedInput(TCodedInputStream* input) + : Input_(input) + { + } + + TCompressedInput::~TCompressedInput() { + if (!Mem_.Exhausted()) { + Y_ABORT("some data was not read"); + } + } + + size_t TCompressedInput::DoNext(const void** ptr, size_t len) { + if (Mem_.Exhausted()) { + if (!Decompress()) { + return 0; + } + } + + return Mem_.Next(ptr, len); + } + + bool TCompressedInput::Decompress() { + CityHash_v1_0_2::uint128 hash; + ui32 compressed = 0; + ui32 original = 0; + ui8 method = 0; + + if (!TWireFormat::ReadFixed(Input_, &hash)) { + return false; + } + if (!TWireFormat::ReadFixed(Input_, &method)) { + return false; + } + + if (method != 0x82) { + ythrow yexception() << "unsupported compression method " + << int(method); + } else { + if (!TWireFormat::ReadFixed(Input_, &compressed)) { + return false; + } + if (!TWireFormat::ReadFixed(Input_, &original)) { + return false; + } + + if (compressed > DBMS_MAX_COMPRESSED_SIZE) { + ythrow yexception() << "compressed data too big"; + } + + TTempBuf tmp(compressed); + + // Заполнить заголовок сжатых данных. + tmp.Append(&method, sizeof(method)); + tmp.Append(&compressed, sizeof(compressed)); + tmp.Append(&original, sizeof(original)); + + if (!TWireFormat::ReadBytes(Input_, tmp.Data() + 9, compressed - 9)) { + return false; + } else { + if (hash != CityHash_v1_0_2::CityHash128(tmp.Data(), compressed)) { + ythrow yexception() << "data was corrupted"; + } + } + + Data_ = TTempBuf(original); + + if (LZ4_decompress_fast(tmp.Data() + 9, Data_.Data(), original) < 0) { + ythrow yexception() << "can't decompress data"; + } else { + Mem_.Reset(Data_.Data(), original); + } + } + + return true; + } + +} diff --git a/library/cpp/clickhouse/client/base/compressed.h b/library/cpp/clickhouse/client/base/compressed.h new file mode 100644 index 0000000000..d7c628ebb7 --- /dev/null +++ b/library/cpp/clickhouse/client/base/compressed.h @@ -0,0 +1,27 @@ +#pragma once + +#include "coded.h" + +#include <util/memory/tempbuf.h> +#include <util/stream/zerocopy.h> +#include <util/stream/mem.h> + +namespace NClickHouse { + class TCompressedInput: public IZeroCopyInput { + public: + TCompressedInput(TCodedInputStream* input); + ~TCompressedInput(); + + protected: + size_t DoNext(const void** ptr, size_t len) override; + + bool Decompress(); + + private: + TCodedInputStream* const Input_; + + TTempBuf Data_; + TMemoryInput Mem_; + }; + +} diff --git a/library/cpp/clickhouse/client/base/wire_format.h b/library/cpp/clickhouse/client/base/wire_format.h new file mode 100644 index 0000000000..805a2d3212 --- /dev/null +++ b/library/cpp/clickhouse/client/base/wire_format.h @@ -0,0 +1,103 @@ +#pragma once + +#include "coded.h" + +#include <util/generic/string.h> +#include <util/memory/tempbuf.h> + +namespace NClickHouse { + class TWireFormat { + public: + template <typename T> + static bool ReadFixed(TCodedInputStream* input, T* value); + + static bool ReadString(TCodedInputStream* input, TString* value); + + static bool ReadBytes(TCodedInputStream* input, void* buf, size_t len); + + static bool ReadUInt64(TCodedInputStream* input, ui64* value); + + template <typename T> + static void WriteFixed(TCodedOutputStream* output, const T& value); + + static void WriteBytes(TCodedOutputStream* output, const void* buf, size_t len); + + static void WriteString(TCodedOutputStream* output, const TString& value); + + static void WriteStringBuf(TCodedOutputStream* output, const TStringBuf value); + + static void WriteUInt64(TCodedOutputStream* output, const ui64 value); + }; + + template <typename T> + inline bool TWireFormat::ReadFixed( + TCodedInputStream* input, + T* value) { + return input->ReadRaw(value, sizeof(T)); + } + + inline bool TWireFormat::ReadString( + TCodedInputStream* input, + TString* value) { + ui64 len; + + if (input->ReadVarint64(&len)) { + if (len > 0x00FFFFFFULL) { + return false; + } + TTempBuf buf(len); + if (input->ReadRaw(buf.Data(), (size_t)len)) { + value->assign(buf.Data(), len); + return true; + } + } + + return false; + } + + inline bool TWireFormat::ReadBytes( + TCodedInputStream* input, void* buf, size_t len) { + return input->ReadRaw(buf, len); + } + + inline bool TWireFormat::ReadUInt64( + TCodedInputStream* input, + ui64* value) { + return input->ReadVarint64(value); + } + + template <typename T> + inline void TWireFormat::WriteFixed( + TCodedOutputStream* output, + const T& value) { + output->WriteRaw(&value, sizeof(T)); + } + + inline void TWireFormat::WriteBytes( + TCodedOutputStream* output, + const void* buf, + size_t len) { + output->WriteRaw(buf, len); + } + + inline void TWireFormat::WriteString( + TCodedOutputStream* output, + const TString& value) { + output->WriteVarint64(value.size()); + output->WriteRaw(value.data(), value.size()); + } + + inline void TWireFormat::WriteStringBuf( + TCodedOutputStream* output, + const TStringBuf value) { + output->WriteVarint64(value.size()); + output->WriteRaw(value.data(), value.size()); + } + + inline void TWireFormat::WriteUInt64( + TCodedOutputStream* output, + const ui64 value) { + output->WriteVarint64(value); + } + +} diff --git a/library/cpp/clickhouse/client/base/ya.make b/library/cpp/clickhouse/client/base/ya.make new file mode 100644 index 0000000000..8c3a1f6552 --- /dev/null +++ b/library/cpp/clickhouse/client/base/ya.make @@ -0,0 +1,9 @@ +LIBRARY() + +SRCS( + coded.cpp + compressed.cpp + wire_format.h +) + +END() diff --git a/library/cpp/clickhouse/client/block.cpp b/library/cpp/clickhouse/client/block.cpp new file mode 100644 index 0000000000..d39f1967c7 --- /dev/null +++ b/library/cpp/clickhouse/client/block.cpp @@ -0,0 +1,107 @@ +#include "block.h" + +#include <util/generic/yexception.h> + +namespace NClickHouse { + TBlock::TIterator::TIterator(const TBlock& block) + : Block_(block) + , Idx_(0) + { + } + + const TString& TBlock::TIterator::Name() const { + return Block_.Columns_[Idx_].Name; + } + + TTypeRef TBlock::TIterator::Type() const { + return Block_.Columns_[Idx_].Column->Type(); + } + + TColumnRef TBlock::TIterator::Column() const { + return Block_.Columns_[Idx_].Column; + } + + void TBlock::TIterator::Next() { + ++Idx_; + } + + bool TBlock::TIterator::IsValid() const { + return Idx_ < Block_.Columns_.size(); + } + + TBlock::TBlock() + : Rows_(0) + { + } + + TBlock::TBlock(size_t cols, size_t rows) + : Rows_(rows) + { + Columns_.reserve(cols); + } + + TBlock::~TBlock() = default; + + void TBlock::AppendColumn(const TString& name, const TColumnRef& col) { + if (Columns_.empty()) { + Rows_ = col->Size(); + } else if (col->Size() != Rows_) { + ythrow yexception() + << "all clumns in block must have same count of rows"; + } + + Columns_.push_back(TColumnItem{name, col}); + } + + /// Count of columns in the block. + size_t TBlock::GetColumnCount() const { + return Columns_.size(); + } + + const TBlockInfo& TBlock::Info() const { + return Info_; + } + + /// Count of rows in the block. + size_t TBlock::GetRowCount() const { + return Rows_; + } + + void TBlock::AppendBlock(const TBlock& block) { + if (block.GetRowCount() == 0) { + return; + } + size_t columnCount = GetColumnCount(); + if (columnCount == 0) { + Rows_ = block.GetRowCount(); + Columns_ = block.Columns_; + return; + } + + if (columnCount != block.GetColumnCount()) { + ythrow yexception() << "Can't concatenate two blocks. Different number of columns (current_block: " + << columnCount << ", added: " << block.GetColumnCount() << ")"; + } + + for (size_t i = 0; i < columnCount; ++i) { + if (Columns_[i].Name != block.Columns_[i].Name) { + ythrow yexception() << "Can't concatenate two blocks. Different names of columns (current_block: " + << Columns_[i].Name << ", added: " << block.Columns_[i].Name << ")"; + } + } + + for (size_t i = 0; i < columnCount; ++i) { + Columns_[i].Column->Append(block.Columns_[i].Column); + } + Rows_ += block.GetRowCount(); + } + + TColumnRef TBlock::operator[](size_t idx) const { + if (idx < Columns_.size()) { + return Columns_[idx].Column; + } + + ythrow yexception() << "column index is out of range"; + } + +} diff --git a/library/cpp/clickhouse/client/block.h b/library/cpp/clickhouse/client/block.h new file mode 100644 index 0000000000..d85c6ffbf6 --- /dev/null +++ b/library/cpp/clickhouse/client/block.h @@ -0,0 +1,74 @@ +#pragma once + +#include "columns/column.h" + +namespace NClickHouse { + struct TBlockInfo { + ui8 IsOverflows = 0; + i32 BucketNum = -1; + }; + + class TBlock { + public: + /// Allow to iterate over block's columns. + class TIterator { + public: + TIterator(const TBlock& block); + + /// Name of column. + const TString& Name() const; + + /// Type of column. + TTypeRef Type() const; + + /// Reference to column object. + TColumnRef Column() const; + + /// Move to next column. + void Next(); + + /// Is the iterator still valid. + bool IsValid() const; + + private: + TIterator() = delete; + + const TBlock& Block_; + size_t Idx_; + }; + + public: + TBlock(); + TBlock(size_t cols, size_t rows); + ~TBlock(); + + /// Append named column to the block. + void AppendColumn(const TString& name, const TColumnRef& col); + + /// Count of columns in the block. + size_t GetColumnCount() const; + + const TBlockInfo& Info() const; + + /// Count of rows in the block. + size_t GetRowCount() const; + + /// Append block to the current (vertical scale) + void AppendBlock(const TBlock& block); + + /// Reference to column by index in the block. + TColumnRef operator[](size_t idx) const; + + private: + struct TColumnItem { + TString Name; + TColumnRef Column; + }; + + TBlockInfo Info_; + TVector<TColumnItem> Columns_; + /// Count of rows in the block. + size_t Rows_; + }; + +} diff --git a/library/cpp/clickhouse/client/client.cpp b/library/cpp/clickhouse/client/client.cpp new file mode 100644 index 0000000000..b0b2613bb5 --- /dev/null +++ b/library/cpp/clickhouse/client/client.cpp @@ -0,0 +1,767 @@ +#include "client.h" +#include "protocol.h" + +#include <library/cpp/clickhouse/client/base/coded.h> +#include <library/cpp/clickhouse/client/base/compressed.h> +#include <library/cpp/clickhouse/client/base/wire_format.h> +#include <library/cpp/clickhouse/client/columns/factory.h> +#include <library/cpp/openssl/io/stream.h> + +#include <util/generic/buffer.h> +#include <util/generic/vector.h> +#include <util/network/socket.h> +#include <util/random/random.h> +#include <util/stream/buffered.h> +#include <util/stream/buffer.h> +#include <util/stream/mem.h> +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/system/unaligned_mem.h> + +#include <contrib/libs/lz4/lz4.h> +#include <contrib/restricted/cityhash-1.0.2/city.h> + +#define DBMS_NAME "ClickHouse" +#define DBMS_VERSION_MAJOR 1 +#define DBMS_VERSION_MINOR 1 +#define REVISION 54126 + +#define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264 +#define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554 +#define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903 +#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032 +#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 +#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060 + +namespace NClickHouse { + struct TClientInfo { + ui8 IfaceType = 1; // TCP + ui8 QueryKind; + TString InitialUser; + TString InitialQueryId; + TString QuotaKey; + TString OsUser; + TString ClientHostname; + TString ClientName; + TString InitialAddress = "[::ffff:127.0.0.1]:0"; + ui64 ClientVersionMajor = 0; + ui64 ClientVersionMinor = 0; + ui32 ClientRevision = 0; + }; + + struct TServerInfo { + TString Name; + TString Timezone; + ui64 VersionMajor; + ui64 VersionMinor; + ui64 Revision; + }; + + class TClient::TImpl { + public: + TImpl(const TClientOptions& opts); + ~TImpl(); + + void ExecuteQuery(TQuery query); + + void Insert(const TString& table_name, const TBlock& block); + + void Ping(); + + void ResetConnection(); + + private: + bool Handshake(); + + bool ReceivePacket(ui64* server_packet = nullptr); + + void SendQuery(const TString& query); + + void SendData(const TBlock& block); + + bool SendHello(); + + bool ReadBlock(TBlock* block, TCodedInputStream* input); + + bool ReceiveHello(); + + /// Reads data packet form input stream. + bool ReceiveData(); + + /// Reads exception packet form input stream. + bool ReceiveException(bool rethrow = false); + + void WriteBlock(const TBlock& block, TCodedOutputStream* output); + + private: + void Disconnect() { + Socket_ = TSocket(); + } + + /// In case of network errors tries to reconnect to server and + /// call fuc several times. + void RetryGuard(std::function<void()> fuc); + + private: + class EnsureNull { + public: + inline EnsureNull(TQueryEvents* ev, TQueryEvents** ptr) + : ptr_(ptr) + { + if (ptr_) { + *ptr_ = ev; + } + } + + inline ~EnsureNull() { + if (ptr_) { + *ptr_ = nullptr; + } + } + + private: + TQueryEvents** ptr_; + }; + + const TClientOptions Options_; + TQueryEvents* Events_; + int Compression_ = CompressionState::Disable; + + TSocket Socket_; + + TSocketInput SocketInput_; + TSocketOutput SocketOutput_; + THolder<TBufferedInput> BufferedInput_; + THolder<TBufferedOutput> BufferedOutput_; + THolder<TOpenSslClientIO> SslClient_; + + TCodedInputStream Input_; + TCodedOutputStream Output_; + + TServerInfo ServerInfo_; + }; + + TClient::TImpl::TImpl(const TClientOptions& opts) + : Options_(opts) + , Events_(nullptr) + , Socket_(TNetworkAddress(opts.Host, opts.Port), Options_.ConnectTimeout) + , SocketInput_(Socket_) + , SocketOutput_(Socket_) + { + if (opts.UseSsl) { + SslClient_ = MakeHolder<TOpenSslClientIO>(&SocketInput_, &SocketOutput_, opts.SslOptions); + BufferedInput_ = MakeHolder<TBufferedInput>(SslClient_.Get()); + BufferedOutput_ = MakeHolder<TBufferedOutput>(SslClient_.Get()); + } else { + BufferedInput_ = MakeHolder<TBufferedInput>(&SocketInput_); + BufferedOutput_ = MakeHolder<TBufferedOutput>(&SocketOutput_); + } + Input_ = TCodedInputStream(BufferedInput_.Get()); + Output_ = TCodedOutputStream(BufferedOutput_.Get()); + + if (Options_.RequestTimeout.Seconds()) { + Socket_.SetSocketTimeout(Options_.RequestTimeout.Seconds()); + } + + if (!Handshake()) { + ythrow yexception() << "fail to connect to " << Options_.Host; + } + + if (Options_.CompressionMethod != ECompressionMethod::None) { + Compression_ = CompressionState::Enable; + } + } + + TClient::TImpl::~TImpl() { + Disconnect(); + } + + void TClient::TImpl::ExecuteQuery(TQuery query) { + EnsureNull en(static_cast<TQueryEvents*>(&query), &Events_); + + if (Options_.PingBeforeQuery) { + RetryGuard([this]() { Ping(); }); + } + + SendQuery(query.GetText()); + + ui64 server_packet = 0; + while (ReceivePacket(&server_packet)) { + ; + } + if (server_packet != ServerCodes::EndOfStream && server_packet != ServerCodes::Exception) { + ythrow yexception() << "unexpected packet from server while receiving end of query (got: " + << (server_packet ? ToString(server_packet) : "nothing") << ")"; + } + } + + void TClient::TImpl::Insert(const TString& table_name, const TBlock& block) { + if (Options_.PingBeforeQuery) { + RetryGuard([this]() { Ping(); }); + } + TVector<TString> fields; + fields.reserve(block.GetColumnCount()); + + // Enumerate all fields + for (TBlock::TIterator bi(block); bi.IsValid(); bi.Next()) { + fields.push_back(bi.Name()); + } + + TStringBuilder fields_section; + for (auto elem = fields.begin(); elem != fields.end(); ++elem) { + if (std::distance(elem, fields.end()) == 1) { + fields_section << *elem; + } else { + fields_section << *elem << ","; + } + } + + SendQuery("INSERT INTO " + table_name + " ( " + fields_section + " ) VALUES"); + + ui64 server_packet(0); + // Receive data packet. + while (true) { + bool ret = ReceivePacket(&server_packet); + + if (!ret) { + ythrow yexception() << "unable to receive data packet"; + } + if (server_packet == ServerCodes::Data) { + break; + } + if (server_packet == ServerCodes::Progress) { + continue; + } + } + + // Send data. + SendData(block); + // Send empty block as marker of + // end of data. + SendData(TBlock()); + + // Wait for EOS. + ui64 eos_packet{0}; + while (ReceivePacket(&eos_packet)) { + ; + } + + if (eos_packet != ServerCodes::EndOfStream && eos_packet != ServerCodes::Exception + && eos_packet != ServerCodes::Log && Options_.RethrowExceptions) { + ythrow yexception() << "unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: " + << (eos_packet ? ToString(eos_packet) : "nothing") << ")"; + } + } + + void TClient::TImpl::Ping() { + TWireFormat::WriteUInt64(&Output_, ClientCodes::Ping); + Output_.Flush(); + + ui64 server_packet; + const bool ret = ReceivePacket(&server_packet); + + if (!ret || server_packet != ServerCodes::Pong) { + ythrow yexception() << "fail to ping server"; + } + } + + void TClient::TImpl::ResetConnection() { + Socket_ = TSocket(TNetworkAddress(Options_.Host, Options_.Port), Options_.ConnectTimeout); + + if (Options_.UseSsl) { + SslClient_.Reset(new TOpenSslClientIO(&SocketInput_, &SocketOutput_, Options_.SslOptions)); + BufferedInput_.Reset(new TBufferedInput(SslClient_.Get())); + BufferedOutput_.Reset(new TBufferedOutput(SslClient_.Get())); + } else { + BufferedInput_.Reset(new TBufferedInput(&SocketInput_)); + BufferedOutput_.Reset(new TBufferedOutput(&SocketOutput_)); + } + + SocketInput_ = TSocketInput(Socket_); + SocketOutput_ = TSocketOutput(Socket_); + + Input_ = TCodedInputStream(BufferedInput_.Get()); + Output_ = TCodedOutputStream(BufferedOutput_.Get()); + + if (Options_.RequestTimeout.Seconds()) { + Socket_.SetSocketTimeout(Options_.RequestTimeout.Seconds()); + } + + if (!Handshake()) { + ythrow yexception() << "fail to connect to " << Options_.Host; + } + } + + bool TClient::TImpl::Handshake() { + if (!SendHello()) { + return false; + } + if (!ReceiveHello()) { + return false; + } + return true; + } + + bool TClient::TImpl::ReceivePacket(ui64* server_packet) { + ui64 packet_type = 0; + + if (!Input_.ReadVarint64(&packet_type)) { + return false; + } + if (server_packet) { + *server_packet = packet_type; + } + + switch (packet_type) { + case ServerCodes::Totals: + case ServerCodes::Data: { + if (!ReceiveData()) { + ythrow yexception() << "can't read data packet from input stream"; + } + return true; + } + + case ServerCodes::Exception: { + ReceiveException(); + return false; + } + + case ServerCodes::ProfileInfo: { + TProfile profile; + + if (!TWireFormat::ReadUInt64(&Input_, &profile.rows)) { + return false; + } + if (!TWireFormat::ReadUInt64(&Input_, &profile.blocks)) { + return false; + } + if (!TWireFormat::ReadUInt64(&Input_, &profile.bytes)) { + return false; + } + if (!TWireFormat::ReadFixed(&Input_, &profile.applied_limit)) { + return false; + } + if (!TWireFormat::ReadUInt64(&Input_, &profile.rows_before_limit)) { + return false; + } + if (!TWireFormat::ReadFixed(&Input_, &profile.calculated_rows_before_limit)) { + return false; + } + + if (Events_) { + Events_->OnProfile(profile); + } + + return true; + } + + case ServerCodes::Progress: { + TProgress info; + + if (!TWireFormat::ReadUInt64(&Input_, &info.rows)) { + return false; + } + if (!TWireFormat::ReadUInt64(&Input_, &info.bytes)) { + return false; + } + if (REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) { + if (!TWireFormat::ReadUInt64(&Input_, &info.total_rows)) { + return false; + } + } + + if (Events_) { + Events_->OnProgress(info); + } + + return true; + } + + case ServerCodes::Pong: { + return true; + } + + case ServerCodes::EndOfStream: { + if (Events_) { + Events_->OnFinish(); + } + return false; + } + + default: + ythrow yexception() << "unimplemented " << (int)packet_type; + break; + } + + return false; + } + + bool TClient::TImpl::ReadBlock(TBlock* block, TCodedInputStream* input) { + // Additional information about block. + if (REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { + ui64 num; + TBlockInfo info; + + // BlockInfo + if (!TWireFormat::ReadUInt64(input, &num)) { + return false; + } + if (!TWireFormat::ReadFixed(input, &info.IsOverflows)) { + return false; + } + if (!TWireFormat::ReadUInt64(input, &num)) { + return false; + } + if (!TWireFormat::ReadFixed(input, &info.BucketNum)) { + return false; + } + if (!TWireFormat::ReadUInt64(input, &num)) { + return false; + } + + // TODO use data + } + + ui64 num_columns = 0; + ui64 num_rows = 0; + + if (!TWireFormat::ReadUInt64(input, &num_columns)) { + return false; + } + if (!TWireFormat::ReadUInt64(input, &num_rows)) { + return false; + } + + for (size_t i = 0; i < num_columns; ++i) { + TString name; + TString type; + + if (!TWireFormat::ReadString(input, &name)) { + return false; + } + if (!TWireFormat::ReadString(input, &type)) { + return false; + } + + if (TColumnRef col = CreateColumnByType(type)) { + if (num_rows && !col->Load(input, num_rows)) { + ythrow yexception() << "can't load"; + } + + block->AppendColumn(name, col); + } else { + ythrow yexception() << "unsupported column type: " << type; + } + } + + return true; + } + + bool TClient::TImpl::ReceiveData() { + TBlock block; + + if (REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { + TString table_name; + + if (!TWireFormat::ReadString(&Input_, &table_name)) { + return false; + } + } + + if (Compression_ == CompressionState::Enable) { + TCompressedInput compressed(&Input_); + TCodedInputStream coded(&compressed); + + if (!ReadBlock(&block, &coded)) { + return false; + } + } else { + if (!ReadBlock(&block, &Input_)) { + return false; + } + } + + if (Events_) { + Events_->OnData(block); + } + + return true; + } + + bool TClient::TImpl::ReceiveException(bool rethrow) { + std::unique_ptr<TException> e(new TException); + TException* current = e.get(); + + bool exception_received = true; + do { + bool has_nested = false; + + if (!TWireFormat::ReadFixed(&Input_, ¤t->Code)) { + exception_received = false; + break; + } + if (!TWireFormat::ReadString(&Input_, ¤t->Name)) { + exception_received = false; + break; + } + if (!TWireFormat::ReadString(&Input_, ¤t->DisplayText)) { + exception_received = false; + break; + } + if (!TWireFormat::ReadString(&Input_, ¤t->StackTrace)) { + exception_received = false; + break; + } + if (!TWireFormat::ReadFixed(&Input_, &has_nested)) { + exception_received = false; + break; + } + + if (has_nested) { + current->Nested.reset(new TException); + current = current->Nested.get(); + } else { + break; + } + } while (true); + + if (Events_) { + Events_->OnServerException(*e); + } + + if (rethrow || Options_.RethrowExceptions) { + throw TServerException(std::move(e)); + } + + return exception_received; + } + + void TClient::TImpl::SendQuery(const TString& query) { + TWireFormat::WriteUInt64(&Output_, ClientCodes::Query); + TWireFormat::WriteString(&Output_, TString()); + + /// Client info. + if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) { + TClientInfo info; + + info.QueryKind = 1; + info.ClientName = "ClickHouse client"; + info.ClientVersionMajor = DBMS_VERSION_MAJOR; + info.ClientVersionMinor = DBMS_VERSION_MINOR; + info.ClientRevision = REVISION; + + TWireFormat::WriteFixed(&Output_, info.QueryKind); + TWireFormat::WriteString(&Output_, info.InitialUser); + TWireFormat::WriteString(&Output_, info.InitialQueryId); + TWireFormat::WriteString(&Output_, info.InitialAddress); + TWireFormat::WriteFixed(&Output_, info.IfaceType); + + TWireFormat::WriteString(&Output_, info.OsUser); + TWireFormat::WriteString(&Output_, info.ClientHostname); + TWireFormat::WriteString(&Output_, info.ClientName); + TWireFormat::WriteUInt64(&Output_, info.ClientVersionMajor); + TWireFormat::WriteUInt64(&Output_, info.ClientVersionMinor); + TWireFormat::WriteUInt64(&Output_, info.ClientRevision); + + if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) + TWireFormat::WriteString(&Output_, info.QuotaKey); + } + + /// Per query settings. + //if (settings) + // settings->serialize(*out); + //else + TWireFormat::WriteString(&Output_, TString()); + + TWireFormat::WriteUInt64(&Output_, Stages::Complete); + TWireFormat::WriteUInt64(&Output_, Compression_); + TWireFormat::WriteString(&Output_, query); + // Send empty block as marker of + // end of data + SendData(TBlock()); + + Output_.Flush(); + } + + void TClient::TImpl::WriteBlock(const TBlock& block, TCodedOutputStream* output) { + /// Дополнительная информация о блоке. + if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { + TWireFormat::WriteUInt64(output, 1); + TWireFormat::WriteFixed(output, block.Info().IsOverflows); + TWireFormat::WriteUInt64(output, 2); + TWireFormat::WriteFixed(output, block.Info().BucketNum); + TWireFormat::WriteUInt64(output, 0); + } + + TWireFormat::WriteUInt64(output, block.GetColumnCount()); + TWireFormat::WriteUInt64(output, block.GetRowCount()); + + for (TBlock::TIterator bi(block); bi.IsValid(); bi.Next()) { + TWireFormat::WriteString(output, bi.Name()); + TWireFormat::WriteString(output, bi.Type()->GetName()); + + bi.Column()->Save(output); + } + } + + void TClient::TImpl::SendData(const TBlock& block) { + TWireFormat::WriteUInt64(&Output_, ClientCodes::Data); + + if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { + TWireFormat::WriteString(&Output_, TString()); + } + + if (Compression_ == CompressionState::Enable) { + switch (Options_.CompressionMethod) { + case ECompressionMethod::None: { + Y_ABORT_UNLESS(false, "invalid state"); + break; + } + + case ECompressionMethod::LZ4: { + TBufferOutput tmp; + + // Serialize block's data + { + TCodedOutputStream out(&tmp); + WriteBlock(block, &out); + } + // Reserver space for data + TBuffer buf; + buf.Resize(9 + LZ4_compressBound(tmp.Buffer().Size())); + + // Compress data + int size = LZ4_compress(tmp.Buffer().Data(), buf.Data() + 9, tmp.Buffer().Size()); + buf.Resize(9 + size); + + // Fill header + ui8* p = (ui8*)buf.Data(); + // Compression method + WriteUnaligned<ui8>(p, (ui8)0x82); + p += 1; + // Compressed data size with header + WriteUnaligned<ui32>(p, (ui32)buf.Size()); + p += 4; + // Original data size + WriteUnaligned<ui32>(p, (ui32)tmp.Buffer().Size()); + + TWireFormat::WriteFixed(&Output_, CityHash_v1_0_2::CityHash128( + buf.Data(), buf.Size())); + TWireFormat::WriteBytes(&Output_, buf.Data(), buf.Size()); + break; + } + } + } else { + WriteBlock(block, &Output_); + } + + Output_.Flush(); + } + + bool TClient::TImpl::SendHello() { + TWireFormat::WriteUInt64(&Output_, ClientCodes::Hello); + TWireFormat::WriteString(&Output_, TString(DBMS_NAME) + " client"); + TWireFormat::WriteUInt64(&Output_, DBMS_VERSION_MAJOR); + TWireFormat::WriteUInt64(&Output_, DBMS_VERSION_MINOR); + TWireFormat::WriteUInt64(&Output_, REVISION); + TWireFormat::WriteString(&Output_, Options_.DefaultDatabase); + TWireFormat::WriteString(&Output_, Options_.User); + TWireFormat::WriteString(&Output_, Options_.Password); + + Output_.Flush(); + + return true; + } + + bool TClient::TImpl::ReceiveHello() { + ui64 packet_type = 0; + + if (!Input_.ReadVarint64(&packet_type)) { + return false; + } + + if (packet_type == ServerCodes::Hello) { + if (!TWireFormat::ReadString(&Input_, &ServerInfo_.Name)) { + return false; + } + if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.VersionMajor)) { + return false; + } + if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.VersionMinor)) { + return false; + } + if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.Revision)) { + return false; + } + + if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) { + if (!TWireFormat::ReadString(&Input_, &ServerInfo_.Timezone)) { + return false; + } + } + + return true; + } else if (packet_type == ServerCodes::Exception) { + ReceiveException(true); + return false; + } + + return false; + } + + void TClient::TImpl::RetryGuard(std::function<void()> func) { + for (int i = 0; i <= Options_.SendRetries; ++i) { + try { + func(); + return; + } catch (const yexception&) { + bool ok = true; + + try { + Sleep(Options_.RetryTimeout); + ResetConnection(); + } catch (...) { + ok = false; + } + + if (!ok) { + throw; + } + } + } + } + + TClient::TClient(const TClientOptions& opts) + : Options_(opts) + , Impl_(new TImpl(opts)) + { + } + + TClient::~TClient() { + } + + void TClient::Execute(const TQuery& query) { + Impl_->ExecuteQuery(query); + } + + void TClient::Select(const TString& query, TSelectCallback cb) { + Execute(TQuery(query).OnData(cb)); + } + + void TClient::Select(const TQuery& query) { + Execute(query); + } + + void TClient::Insert(const TString& table_name, const TBlock& block) { + Impl_->Insert(table_name, block); + } + + void TClient::Ping() { + Impl_->Ping(); + } + + void TClient::ResetConnection() { + Impl_->ResetConnection(); + } + +} diff --git a/library/cpp/clickhouse/client/client.h b/library/cpp/clickhouse/client/client.h new file mode 100644 index 0000000000..865a9df551 --- /dev/null +++ b/library/cpp/clickhouse/client/client.h @@ -0,0 +1,105 @@ +#pragma once + +#include "query.h" +#include "exceptions.h" + +#include "columns/array.h" +#include "columns/date.h" +#include "columns/nullable.h" +#include "columns/numeric.h" +#include "columns/string.h" +#include "columns/tuple.h" + +#include <library/cpp/openssl/io/stream.h> + +#include <util/generic/string.h> + +namespace NClickHouse { + /// Метод сжатия + enum class ECompressionMethod { + None = -1, + LZ4 = 1, + }; + + struct TClientOptions { +#define DECLARE_FIELD(name, type, default) \ + type name{default}; \ + inline TClientOptions& Set##name(const type& value) { \ + name = value; \ + return *this; \ + } + + /// Hostname of the server. + DECLARE_FIELD(Host, TString, TString()); + /// Service port. + DECLARE_FIELD(Port, int, 9000); + + /// Default database. + DECLARE_FIELD(DefaultDatabase, TString, "default"); + /// User name. + DECLARE_FIELD(User, TString, "default"); + /// Access password. + DECLARE_FIELD(Password, TString, TString()); + + /// By default all exceptions received during query execution will be + /// passed to OnException handler. Set rethrow_exceptions to true to + /// enable throwing exceptions with standard c++ exception mechanism. + DECLARE_FIELD(RethrowExceptions, bool, true); + + /// Ping server every time before execute any query. + DECLARE_FIELD(PingBeforeQuery, bool, false); + /// Count of retry to send request to server. + DECLARE_FIELD(SendRetries, int, 1); + /// Amount of time to wait before next retry. + DECLARE_FIELD(RetryTimeout, TDuration, TDuration::Seconds(5)); + /// Define timeout for establishing a connection to server. + DECLARE_FIELD(ConnectTimeout, TDuration, TDuration::Seconds(5)); + /// Define timeout for any operations. + DECLARE_FIELD(RequestTimeout, TDuration, TDuration::Zero()); + + /// Compression method. + DECLARE_FIELD(CompressionMethod, ECompressionMethod, ECompressionMethod::None); + + /// Use SSL encryption + DECLARE_FIELD(UseSsl, bool, false); + /// SSL Options + DECLARE_FIELD(SslOptions, TOpenSslClientIO::TOptions, TOpenSslClientIO::TOptions()); + +#undef DECLARE_FIELD + }; + + /** + * + */ + class TClient { + public: + TClient(const TClientOptions& opts); + ~TClient(); + + /// Intends for execute arbitrary queries. + void Execute(const TQuery& query); + + /// Intends for execute select queries. Data will be returned with + /// one or more call of \p cb. + void Select(const TString& query, TSelectCallback cb); + + /// Alias for Execute. + void Select(const TQuery& query); + + /// Intends for insert block of data into a table \p table_name. + void Insert(const TString& table_name, const TBlock& block); + + /// Ping server for aliveness. + void Ping(); + + /// Reset connection with initial params. + void ResetConnection(); + + private: + TClientOptions Options_; + + class TImpl; + THolder<TImpl> Impl_; + }; + +} diff --git a/library/cpp/clickhouse/client/columns/CMakeLists.darwin-arm64.txt b/library/cpp/clickhouse/client/columns/CMakeLists.darwin-arm64.txt new file mode 100644 index 0000000000..306332e5fa --- /dev/null +++ b/library/cpp/clickhouse/client/columns/CMakeLists.darwin-arm64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-columns) +target_link_libraries(clickhouse-client-columns PUBLIC + contrib-libs-cxxsupp + yutil + clickhouse-client-base + clickhouse-client-types +) +target_sources(clickhouse-client-columns PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/array.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/date.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/enum.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/factory.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/nullable.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/numeric.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/string.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/tuple.cpp +) diff --git a/library/cpp/clickhouse/client/columns/CMakeLists.darwin-x86_64.txt b/library/cpp/clickhouse/client/columns/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..306332e5fa --- /dev/null +++ b/library/cpp/clickhouse/client/columns/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-columns) +target_link_libraries(clickhouse-client-columns PUBLIC + contrib-libs-cxxsupp + yutil + clickhouse-client-base + clickhouse-client-types +) +target_sources(clickhouse-client-columns PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/array.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/date.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/enum.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/factory.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/nullable.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/numeric.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/string.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/tuple.cpp +) diff --git a/library/cpp/clickhouse/client/columns/CMakeLists.linux-aarch64.txt b/library/cpp/clickhouse/client/columns/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..90add55098 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/CMakeLists.linux-aarch64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-columns) +target_link_libraries(clickhouse-client-columns PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + clickhouse-client-base + clickhouse-client-types +) +target_sources(clickhouse-client-columns PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/array.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/date.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/enum.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/factory.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/nullable.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/numeric.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/string.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/tuple.cpp +) diff --git a/library/cpp/clickhouse/client/columns/CMakeLists.linux-x86_64.txt b/library/cpp/clickhouse/client/columns/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..90add55098 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/CMakeLists.linux-x86_64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-columns) +target_link_libraries(clickhouse-client-columns PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + clickhouse-client-base + clickhouse-client-types +) +target_sources(clickhouse-client-columns PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/array.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/date.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/enum.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/factory.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/nullable.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/numeric.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/string.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/columns/tuple.cpp +) diff --git a/library/cpp/clickhouse/client/columns/CMakeLists.txt b/library/cpp/clickhouse/client/columns/CMakeLists.txt new file mode 100644 index 0000000000..1beba2829f --- /dev/null +++ b/library/cpp/clickhouse/client/columns/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") + include(CMakeLists.darwin-arm64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/clickhouse/client/columns/array.cpp b/library/cpp/clickhouse/client/columns/array.cpp new file mode 100644 index 0000000000..8a83c36f72 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/array.cpp @@ -0,0 +1,87 @@ +#include "array.h" + +#include <util/generic/yexception.h> + +namespace NClickHouse { + TColumnArray::TColumnArray(TColumnRef data) + : TColumn(TType::CreateArray(data->Type())) + , Data_(data) + , Offsets_(TColumnUInt64::Create()) + { + } + + TColumnArray::TColumnArray(TColumnRef data, TVector<ui64>&& offsets) + : TColumn(TType::CreateArray(data->Type())) + , Data_(data) + , Offsets_(TColumnUInt64::Create(std::move(offsets))) + { + } + + TIntrusivePtr<TColumnArray> TColumnArray::Create(TColumnRef data) { + return new TColumnArray(data); + } + + TIntrusivePtr<TColumnArray> TColumnArray::Create(TColumnRef data, TVector<ui64>&& offsets) { + return new TColumnArray(data, std::move(offsets)); + } + + void TColumnArray::AppendAsColumn(TColumnRef array) { + if (!Data_->Type()->IsEqual(array->Type())) { + ythrow yexception() + << "can't append column of type " << array->Type()->GetName() << " " + << "to column type " << Data_->Type()->GetName(); + } + + if (Offsets_->Size() == 0) { + Offsets_->Append(array->Size()); + } else { + Offsets_->Append((*Offsets_)[Offsets_->Size() - 1] + array->Size()); + } + + Data_->Append(array); + } + + void TColumnArray::Append(TColumnRef column) { + if (auto col = column->As<TColumnArray>()) { + if (!col->Data_->Type()->IsEqual(Data_->Type())) { + return; + } + + for (size_t i = 0; i < col->Size(); ++i) { + AppendAsColumn(col->GetAsColumn(i)); + } + } + } + + TColumnRef TColumnArray::GetAsColumn(size_t n) const { + return Data_->Slice(GetOffset(n), GetSize(n)); + } + + bool TColumnArray::Load(TCodedInputStream* input, size_t rows) { + if (!Offsets_->Load(input, rows)) { + return false; + } + if (!Data_->Load(input, (*Offsets_)[rows - 1])) { + return false; + } + return true; + } + + void TColumnArray::Save(TCodedOutputStream* output) { + Offsets_->Save(output); + Data_->Save(output); + } + + size_t TColumnArray::Size() const { + return Offsets_->Size(); + } + + size_t TColumnArray::GetOffset(size_t n) const { + return (n == 0) ? 0 : (*Offsets_)[n - 1]; + } + + size_t TColumnArray::GetSize(size_t n) const { + return (n == 0) ? (*Offsets_)[n] : ((*Offsets_)[n] - (*Offsets_)[n - 1]); + } + +} diff --git a/library/cpp/clickhouse/client/columns/array.h b/library/cpp/clickhouse/client/columns/array.h new file mode 100644 index 0000000000..1a5e7f429a --- /dev/null +++ b/library/cpp/clickhouse/client/columns/array.h @@ -0,0 +1,55 @@ +#pragma once + +#include "numeric.h" + +namespace NClickHouse { + /** + * Represents column of Array(T). + */ + class TColumnArray: public TColumn { + public: + static TIntrusivePtr<TColumnArray> Create(TColumnRef data); + + static TIntrusivePtr<TColumnArray> Create(TColumnRef data, TVector<ui64>&& offsets); + + /// Converts input column to array and appends + /// as one row to the current column. + void AppendAsColumn(TColumnRef array); + + /// Convets array at pos n to column. + /// Type of element of result column same as type of array element. + TColumnRef GetAsColumn(size_t n) const; + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t, size_t) override { + return TColumnRef(); + } + + private: + TColumnArray(TColumnRef data); + + TColumnArray(TColumnRef data, TVector<ui64>&& offsets); + + size_t GetOffset(size_t n) const; + + size_t GetSize(size_t n) const; + + private: + TColumnRef Data_; + TIntrusivePtr<TColumnUInt64> Offsets_; + }; + +} diff --git a/library/cpp/clickhouse/client/columns/column.h b/library/cpp/clickhouse/client/columns/column.h new file mode 100644 index 0000000000..d858338443 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/column.h @@ -0,0 +1,60 @@ +#pragma once + +#include <library/cpp/clickhouse/client/base/coded.h> +#include <library/cpp/clickhouse/client/types/types.h> + +#include <util/generic/ptr.h> + +namespace NClickHouse { + using TColumnRef = TIntrusivePtr<class TColumn>; + + /** + * An abstract base of all columns classes. + */ + class TColumn: public TAtomicRefCount<TColumn> { + public: + virtual ~TColumn() { + } + + /// Downcast pointer to the specific culumn's subtype. + template <typename T> + inline TIntrusivePtr<T> As() { + return TIntrusivePtr<T>(dynamic_cast<T*>(this)); + } + + /// Downcast pointer to the specific culumn's subtype. + template <typename T> + inline TIntrusivePtr<const T> As() const { + return TIntrusivePtr<const T>(dynamic_cast<const T*>(this)); + } + + /// Get type object of the column. + inline TTypeRef Type() const { + return Type_; + } + + /// Appends content of given column to the end of current one. + virtual void Append(TColumnRef column) = 0; + + /// Loads column data from input stream. + virtual bool Load(TCodedInputStream* input, size_t rows) = 0; + + /// Saves column data to output stream. + virtual void Save(TCodedOutputStream* output) = 0; + + /// Returns count of rows in the column. + virtual size_t Size() const = 0; + + /// Makes slice of the current column. + virtual TColumnRef Slice(size_t begin, size_t len) = 0; + + protected: + explicit inline TColumn(TTypeRef type) + : Type_(type) + { + } + + TTypeRef Type_; + }; + +} diff --git a/library/cpp/clickhouse/client/columns/date.cpp b/library/cpp/clickhouse/client/columns/date.cpp new file mode 100644 index 0000000000..242511a7eb --- /dev/null +++ b/library/cpp/clickhouse/client/columns/date.cpp @@ -0,0 +1,126 @@ +#include "date.h" + +namespace NClickHouse { + TIntrusivePtr<TColumnDate> TColumnDate::Create() { + return new TColumnDate(); + } + + TIntrusivePtr<TColumnDate> TColumnDate::Create(const TVector<TInstant>& data) { + return new TColumnDate(data); + } + + TColumnDate::TColumnDate() + : TColumn(TType::CreateDate()) + , Data_(TColumnUInt16::Create()) + { + } + + TColumnDate::TColumnDate(const TVector<TInstant>& data) + : TColumnDate() + { + for (const auto& value : data) { + Append(value); + } + } + + void TColumnDate::Append(const TInstant& value) { + Data_->Append(static_cast<ui16>(value.Days())); + } + + std::time_t TColumnDate::At(size_t n) const { + return Data_->At(n) * 86400; + } + + void TColumnDate::SetAt(size_t n, const TInstant& value) { + Data_->SetAt(n, static_cast<ui16>(value.Days())); + } + + void TColumnDate::Append(TColumnRef column) { + if (auto col = column->As<TColumnDate>()) { + Data_->Append(col->Data_); + } + } + + bool TColumnDate::Load(TCodedInputStream* input, size_t rows) { + return Data_->Load(input, rows); + } + + void TColumnDate::Save(TCodedOutputStream* output) { + Data_->Save(output); + } + + size_t TColumnDate::Size() const { + return Data_->Size(); + } + + TColumnRef TColumnDate::Slice(size_t begin, size_t len) { + auto col = Data_->Slice(begin, len)->As<TColumnUInt16>(); + auto result = TColumnDate::Create(); + + result->Data_->Append(col); + + return result; + } + + TColumnDateTime::TColumnDateTime() + : TColumn(TType::CreateDateTime()) + , Data_(TColumnUInt32::Create()) + { + } + + TColumnDateTime::TColumnDateTime(const TVector<TInstant>& data) + : TColumnDateTime() + { + for (const auto& value : data) { + Append(value); + } + } + + TIntrusivePtr<TColumnDateTime> TColumnDateTime::Create() { + return new TColumnDateTime(); + } + + TIntrusivePtr<TColumnDateTime> TColumnDateTime::Create(const TVector<TInstant>& data) { + return new TColumnDateTime(data); + } + + void TColumnDateTime::Append(const TInstant& value) { + Data_->Append(static_cast<ui32>(value.Seconds())); + } + + std::time_t TColumnDateTime::At(size_t n) const { + return Data_->At(n); + } + + void TColumnDateTime::SetAt(size_t n, const TInstant& value) { + Data_->SetAt(n, static_cast<ui32>(value.Seconds())); + } + + void TColumnDateTime::Append(TColumnRef column) { + if (auto col = column->As<TColumnDateTime>()) { + Data_->Append(col->Data_); + } + } + + bool TColumnDateTime::Load(TCodedInputStream* input, size_t rows) { + return Data_->Load(input, rows); + } + + void TColumnDateTime::Save(TCodedOutputStream* output) { + Data_->Save(output); + } + + size_t TColumnDateTime::Size() const { + return Data_->Size(); + } + + TColumnRef TColumnDateTime::Slice(size_t begin, size_t len) { + auto col = Data_->Slice(begin, len)->As<TColumnUInt32>(); + auto result = TColumnDateTime::Create(); + + result->Data_->Append(col); + + return result; + } + +} diff --git a/library/cpp/clickhouse/client/columns/date.h b/library/cpp/clickhouse/client/columns/date.h new file mode 100644 index 0000000000..003d3a0707 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/date.h @@ -0,0 +1,84 @@ +#pragma once + +#include "numeric.h" + +#include <util/datetime/base.h> + +namespace NClickHouse { + /** */ + class TColumnDate: public TColumn { + public: + static TIntrusivePtr<TColumnDate> Create(); + static TIntrusivePtr<TColumnDate> Create(const TVector<TInstant>& data); + + /// Appends one element to the end of column. + void Append(const TInstant& value); + + /// Returns element at given row number. + std::time_t At(size_t n) const; + + /// Set element at given row number. + void SetAt(size_t n, const TInstant& value); + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnDate(); + TColumnDate(const TVector<TInstant>& data); + + TIntrusivePtr<TColumnUInt16> Data_; + }; + + /** */ + class TColumnDateTime: public TColumn { + public: + static TIntrusivePtr<TColumnDateTime> Create(); + static TIntrusivePtr<TColumnDateTime> Create(const TVector<TInstant>& data); + + /// Appends one element to the end of column. + void Append(const TInstant& value); + + /// Returns element at given row number. + std::time_t At(size_t n) const; + + /// Set element at given row number. + void SetAt(size_t n, const TInstant& value); + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnDateTime(); + TColumnDateTime(const TVector<TInstant>& data); + + TIntrusivePtr<TColumnUInt32> Data_; + }; + +} diff --git a/library/cpp/clickhouse/client/columns/enum.cpp b/library/cpp/clickhouse/client/columns/enum.cpp new file mode 100644 index 0000000000..cd96903a8e --- /dev/null +++ b/library/cpp/clickhouse/client/columns/enum.cpp @@ -0,0 +1,157 @@ +#include "enum.h" +#include "utils.h" +#include <util/string/printf.h> + +namespace NClickHouse { + template <typename T> + TColumnEnum<T>::TColumnEnum(TTypeRef type) + : TColumn(type) + { + } + + template <typename T> + TColumnEnum<T>::TColumnEnum(TTypeRef type, const TVector<T>& data) + : TColumn(type) + , Data_(data) + { + } + + template <> + TIntrusivePtr<TColumnEnum<i8>> TColumnEnum<i8>::Create(const TVector<TEnumItem>& enumItems) { + TTypeRef type = TType::CreateEnum8(enumItems); + return new TColumnEnum<i8>(type); + } + + template <> + TIntrusivePtr<TColumnEnum<i8>> TColumnEnum<i8>::Create( + const TVector<TEnumItem>& enumItems, + const TVector<i8>& values, + bool checkValues) { + TTypeRef type = TType::CreateEnum8(enumItems); + if (checkValues) { + for (i8 value : values) { + Y_ENSURE(type->HasEnumValue(value), Sprintf("Enum type doesn't have value %d", value)); + } + } + return new TColumnEnum<i8>(type, values); + } + + template <> + TIntrusivePtr<TColumnEnum<i8>> TColumnEnum<i8>::Create( + const TVector<TEnumItem>& enumItems, + const TVector<TString>& names) { + TTypeRef type = TType::CreateEnum8(enumItems); + TVector<i8> values; + values.reserve(names.size()); + for (const TString& name : names) { + values.push_back(type->GetEnumValue(name)); + } + return new TColumnEnum<i8>(type, values); + } + + template <> + TIntrusivePtr<TColumnEnum<i16>> TColumnEnum<i16>::Create(const TVector<TEnumItem>& enumItems) { + TTypeRef type = TType::CreateEnum16(enumItems); + return new TColumnEnum<i16>(type); + } + + template <> + TIntrusivePtr<TColumnEnum<i16>> TColumnEnum<i16>::Create( + const TVector<TEnumItem>& enumItems, + const TVector<i16>& values, + bool checkValues) { + TTypeRef type = TType::CreateEnum16(enumItems); + if (checkValues) { + for (i16 value : values) { + Y_ENSURE(type->HasEnumValue(value), Sprintf("Enum type doesn't have value %d", value)); + } + } + return new TColumnEnum<i16>(type, values); + } + + template <> + TIntrusivePtr<TColumnEnum<i16>> TColumnEnum<i16>::Create( + const TVector<TEnumItem>& enumItems, + const TVector<TString>& names) { + TTypeRef type = TType::CreateEnum16(enumItems); + TVector<i16> values; + values.reserve(names.size()); + for (const TString& name : names) { + values.push_back(type->GetEnumValue(name)); + } + return new TColumnEnum<i16>(type, values); + } + + template <typename T> + void TColumnEnum<T>::Append(const T& value, bool checkValue) { + if (checkValue) { + Y_ENSURE(Type_->HasEnumValue(value), Sprintf("Enum type doesn't have value %d", value)); + } + Data_.push_back(value); + } + + template <typename T> + void TColumnEnum<T>::Append(const TString& name) { + Data_.push_back(Type_->GetEnumValue(name)); + } + + template <typename T> + const T& TColumnEnum<T>::At(size_t n) const { + return Data_.at(n); + } + + template <typename T> + const TString& TColumnEnum<T>::NameAt(size_t n) const { + return Type_->GetEnumName(Data_.at(n)); + } + + template <typename T> + const T& TColumnEnum<T>::operator[](size_t n) const { + return Data_[n]; + } + + template <typename T> + void TColumnEnum<T>::SetAt(size_t n, const T& value, bool checkValue) { + if (checkValue) { + Y_ENSURE(Type_->HasEnumValue(value), Sprintf("Enum type doesn't have value %d", value)); + } + Data_.at(n) = value; + } + + template <typename T> + void TColumnEnum<T>::SetNameAt(size_t n, const TString& name) { + Data_.at(n) = Type_->GetEnumValue(name); + } + + template <typename T> + void TColumnEnum<T>::Append(TColumnRef column) { + if (auto col = column->As<TColumnEnum<T>>()) { + Data_.insert(Data_.end(), col->Data_.begin(), col->Data_.end()); + } + } + + template <typename T> + bool TColumnEnum<T>::Load(TCodedInputStream* input, size_t rows) { + Data_.resize(rows); + return input->ReadRaw(Data_.data(), Data_.size() * sizeof(T)); + } + + template <typename T> + void TColumnEnum<T>::Save(TCodedOutputStream* output) { + output->WriteRaw(Data_.data(), Data_.size() * sizeof(T)); + } + + template <typename T> + size_t TColumnEnum<T>::Size() const { + return Data_.size(); + } + + template <typename T> + TColumnRef TColumnEnum<T>::Slice(size_t begin, size_t len) { + return new TColumnEnum<T>(Type_, SliceVector(Data_, begin, len)); + } + + template class TColumnEnum<i8>; + template class TColumnEnum<i16>; + +} diff --git a/library/cpp/clickhouse/client/columns/enum.h b/library/cpp/clickhouse/client/columns/enum.h new file mode 100644 index 0000000000..90d773bd9f --- /dev/null +++ b/library/cpp/clickhouse/client/columns/enum.h @@ -0,0 +1,57 @@ +#pragma once + +#include "column.h" + +namespace NClickHouse { + template <typename T> + class TColumnEnum: public TColumn { + public: + static TIntrusivePtr<TColumnEnum<T>> Create(const TVector<TEnumItem>& enumItems); + static TIntrusivePtr<TColumnEnum<T>> Create( + const TVector<TEnumItem>& enumItems, + const TVector<T>& values, + bool checkValues = false); + static TIntrusivePtr<TColumnEnum<T>> Create(const TVector<TEnumItem>& enumItems, const TVector<TString>& names); + + /// Appends one element to the end of column. + void Append(const T& value, bool checkValue = false); + void Append(const TString& name); + + /// Returns element at given row number. + const T& At(size_t n) const; + const TString& NameAt(size_t n) const; + + /// Returns element at given row number. + const T& operator[](size_t n) const; + + /// Set element at given row number. + void SetAt(size_t n, const T& value, bool checkValue = false); + void SetNameAt(size_t n, const TString& name); + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnEnum(TTypeRef type); + TColumnEnum(TTypeRef type, const TVector<T>& data); + + TVector<T> Data_; + }; + + using TColumnEnum8 = TColumnEnum<i8>; + using TColumnEnum16 = TColumnEnum<i16>; + +} diff --git a/library/cpp/clickhouse/client/columns/factory.cpp b/library/cpp/clickhouse/client/columns/factory.cpp new file mode 100644 index 0000000000..a29ee70b8d --- /dev/null +++ b/library/cpp/clickhouse/client/columns/factory.cpp @@ -0,0 +1,118 @@ +#include "factory.h" + +#include "array.h" +#include "date.h" +#include "enum.h" +#include "nullable.h" +#include "numeric.h" +#include "string.h" +#include "tuple.h" + +#include <library/cpp/clickhouse/client/types/type_parser.h> + +namespace NClickHouse { + namespace { + TColumnRef CreateTerminalColumn(const TTypeAst& ast) { + if (ast.Name == "UInt8") + return TColumnUInt8::Create(); + if (ast.Name == "UInt16") + return TColumnUInt16::Create(); + if (ast.Name == "UInt32") + return TColumnUInt32::Create(); + if (ast.Name == "UInt64") + return TColumnUInt64::Create(); + + if (ast.Name == "Int8") + return TColumnInt8::Create(); + if (ast.Name == "Int16") + return TColumnInt16::Create(); + if (ast.Name == "Int32") + return TColumnInt32::Create(); + if (ast.Name == "Int64") + return TColumnInt64::Create(); + + if (ast.Name == "Float32") + return TColumnFloat32::Create(); + if (ast.Name == "Float64") + return TColumnFloat64::Create(); + + if (ast.Name == "String") + return TColumnString::Create(); + if (ast.Name == "FixedString") + return TColumnFixedString::Create(ast.Elements.front().Value); + + if (ast.Name == "DateTime") + return TColumnDateTime::Create(); + if (ast.Name == "Date") + return TColumnDate::Create(); + + return nullptr; + } + + TColumnRef CreateColumnFromAst(const TTypeAst& ast) { + switch (ast.Meta) { + case TTypeAst::Array: { + return TColumnArray::Create( + CreateColumnFromAst(ast.Elements.front())); + } + + case TTypeAst::Nullable: { + return TColumnNullable::Create( + CreateColumnFromAst(ast.Elements.front())); + } + + case TTypeAst::Terminal: { + return CreateTerminalColumn(ast); + } + + case TTypeAst::Tuple: { + TVector<TColumnRef> columns; + + for (const auto& elem : ast.Elements) { + if (auto col = CreateColumnFromAst(elem)) { + columns.push_back(col); + } else { + return nullptr; + } + } + + return TColumnTuple::Create(columns); + } + + case TTypeAst::Enum: { + TVector<TEnumItem> enum_items; + + for (const auto& elem : ast.Elements) { + TString name(elem.Name); + i16 value = elem.Value; + enum_items.push_back({name, value}); + } + + if (ast.Name == "Enum8") { + return TColumnEnum8::Create(enum_items); + } else { + return TColumnEnum16::Create(enum_items); + } + } + + case TTypeAst::Null: + case TTypeAst::Number: + break; + } + + return nullptr; + } + + } + + TColumnRef CreateColumnByType(const TString& type_name) { + TTypeAst ast; + + if (TTypeParser(type_name).Parse(&ast)) { + return CreateColumnFromAst(ast); + } + + return nullptr; + } + +} diff --git a/library/cpp/clickhouse/client/columns/factory.h b/library/cpp/clickhouse/client/columns/factory.h new file mode 100644 index 0000000000..0b2b82ece3 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/factory.h @@ -0,0 +1,7 @@ +#pragma once + +#include "column.h" + +namespace NClickHouse { + TColumnRef CreateColumnByType(const TString& type_name); +} diff --git a/library/cpp/clickhouse/client/columns/nullable.cpp b/library/cpp/clickhouse/client/columns/nullable.cpp new file mode 100644 index 0000000000..1d9dffea27 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/nullable.cpp @@ -0,0 +1,70 @@ +#include "nullable.h" + +#include <util/generic/yexception.h> +#include <util/system/yassert.h> + +namespace NClickHouse { + TColumnNullable::TColumnNullable(TColumnRef nested, TColumnRef nulls) + : TColumn(TType::CreateNullable(nested->Type())) + , Nested_(nested) + , Nulls_(nulls->As<TColumnUInt8>()) + { + if (Nested_->Size() != nulls->Size()) { + ythrow yexception() << "count of elements in nested and nulls should be the same"; + } + } + + TIntrusivePtr<TColumnNullable> TColumnNullable::Create(TColumnRef nested) { + return new TColumnNullable(nested, TColumnUInt8::Create()); + } + + TIntrusivePtr<TColumnNullable> TColumnNullable::Create(TColumnRef nested, TColumnRef nulls) { + return new TColumnNullable(nested, nulls); + } + + bool TColumnNullable::IsNull(size_t n) const { + return Nulls_->At(n) != 0; + } + + TColumnRef TColumnNullable::Nested() const { + return Nested_; + } + + void TColumnNullable::Append(TColumnRef column) { + if (auto col = column->As<TColumnNullable>()) { + if (!col->Nested_->Type()->IsEqual(Nested_->Type())) { + return; + } + + Nested_->Append(col->Nested_); + Nulls_->Append(col->Nulls_); + } + } + + bool TColumnNullable::Load(TCodedInputStream* input, size_t rows) { + if (!Nulls_->Load(input, rows)) { + return false; + } + if (!Nested_->Load(input, rows)) { + return false; + } + return true; + } + + void TColumnNullable::Save(TCodedOutputStream* output) { + Nulls_->Save(output); + Nested_->Save(output); + } + + size_t TColumnNullable::Size() const { + Y_ASSERT(Nested_->Size() == Nulls_->Size()); + return Nulls_->Size(); + } + + TColumnRef TColumnNullable::Slice(size_t begin, size_t len) { + (void)begin; + (void)len; + return TColumnRef(); + } + +} diff --git a/library/cpp/clickhouse/client/columns/nullable.h b/library/cpp/clickhouse/client/columns/nullable.h new file mode 100644 index 0000000000..e0f88e6f75 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/nullable.h @@ -0,0 +1,44 @@ +#pragma once + +#include "column.h" +#include "numeric.h" + +namespace NClickHouse { + /** + * Represents column of Nullable(T). + */ + class TColumnNullable: public TColumn { + public: + static TIntrusivePtr<TColumnNullable> Create(TColumnRef nested); + static TIntrusivePtr<TColumnNullable> Create(TColumnRef nested, TColumnRef nulls); + + /// Returns null flag at given row number. + bool IsNull(size_t n) const; + + /// Returns nested column. + TColumnRef Nested() const; + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnNullable(TColumnRef nested, TColumnRef nulls); + + TColumnRef Nested_; + TIntrusivePtr<TColumnUInt8> Nulls_; + }; + +} diff --git a/library/cpp/clickhouse/client/columns/numeric.cpp b/library/cpp/clickhouse/client/columns/numeric.cpp new file mode 100644 index 0000000000..68cbe3d4e4 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/numeric.cpp @@ -0,0 +1,103 @@ +#include "numeric.h" + +#include "utils.h" + +namespace NClickHouse { + template <typename T> + TColumnVector<T>::TColumnVector() + : TColumn(TType::CreateSimple<T>()) + { + } + + template <typename T> + TColumnVector<T>::TColumnVector(const TVector<T>& data) + : TColumn(TType::CreateSimple<T>()) + , Data_(data) + { + } + + template <typename T> + TColumnVector<T>::TColumnVector(TVector<T>&& data) + : TColumn(TType::CreateSimple<T>()) + , Data_(std::move(data)) + { + } + + template <typename T> + TIntrusivePtr<TColumnVector<T>> TColumnVector<T>::Create() { + return new TColumnVector<T>(); + } + + template <typename T> + TIntrusivePtr<TColumnVector<T>> TColumnVector<T>::Create(const TVector<T>& data) { + return new TColumnVector<T>(data); + } + + template <typename T> + TIntrusivePtr<TColumnVector<T>> TColumnVector<T>::Create(TVector<T>&& data) { + return new TColumnVector<T>(std::move(data)); + } + + template <typename T> + void TColumnVector<T>::Append(const T& value) { + Data_.push_back(value); + } + + template <typename T> + const T& TColumnVector<T>::At(size_t n) const { + return Data_.at(n); + } + + template <typename T> + const T& TColumnVector<T>::operator[](size_t n) const { + return Data_[n]; + } + + template <typename T> + void TColumnVector<T>::SetAt(size_t n, const T& value) { + Data_.at(n) = value; + } + + template <typename T> + void TColumnVector<T>::Append(TColumnRef column) { + if (auto col = column->As<TColumnVector<T>>()) { + Data_.insert(Data_.end(), col->Data_.begin(), col->Data_.end()); + } + } + + template <typename T> + bool TColumnVector<T>::Load(TCodedInputStream* input, size_t rows) { + Data_.resize(rows); + + return input->ReadRaw(Data_.data(), Data_.size() * sizeof(T)); + } + + template <typename T> + void TColumnVector<T>::Save(TCodedOutputStream* output) { + output->WriteRaw(Data_.data(), Data_.size() * sizeof(T)); + } + + template <typename T> + size_t TColumnVector<T>::Size() const { + return Data_.size(); + } + + template <typename T> + TColumnRef TColumnVector<T>::Slice(size_t begin, size_t len) { + return new TColumnVector<T>(SliceVector(Data_, begin, len)); + } + + template class TColumnVector<i8>; + template class TColumnVector<i16>; + template class TColumnVector<i32>; + template class TColumnVector<i64>; + + template class TColumnVector<ui8>; + template class TColumnVector<ui16>; + template class TColumnVector<ui32>; + template class TColumnVector<ui64>; + + template class TColumnVector<float>; + template class TColumnVector<double>; + +} diff --git a/library/cpp/clickhouse/client/columns/numeric.h b/library/cpp/clickhouse/client/columns/numeric.h new file mode 100644 index 0000000000..11a2ddac00 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/numeric.h @@ -0,0 +1,65 @@ +#pragma once + +#include "column.h" + +namespace NClickHouse { + /** + * Represents various numeric columns. + */ + template <typename T> + class TColumnVector: public TColumn { + public: + static TIntrusivePtr<TColumnVector<T>> Create(); + static TIntrusivePtr<TColumnVector<T>> Create(const TVector<T>& data); + static TIntrusivePtr<TColumnVector<T>> Create(TVector<T>&& data); + + /// Appends one element to the end of column. + void Append(const T& value); + + /// Returns element at given row number. + const T& At(size_t n) const; + + /// Returns element at given row number. + const T& operator[](size_t n) const; + + /// Set element at given row number. + void SetAt(size_t n, const T& value); + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnVector(); + TColumnVector(const TVector<T>& data); + TColumnVector(TVector<T>&& data); + + TVector<T> Data_; + }; + + using TColumnUInt8 = TColumnVector<ui8>; + using TColumnUInt16 = TColumnVector<ui16>; + using TColumnUInt32 = TColumnVector<ui32>; + using TColumnUInt64 = TColumnVector<ui64>; + + using TColumnInt8 = TColumnVector<i8>; + using TColumnInt16 = TColumnVector<i16>; + using TColumnInt32 = TColumnVector<i32>; + using TColumnInt64 = TColumnVector<i64>; + + using TColumnFloat32 = TColumnVector<float>; + using TColumnFloat64 = TColumnVector<double>; + +} diff --git a/library/cpp/clickhouse/client/columns/string.cpp b/library/cpp/clickhouse/client/columns/string.cpp new file mode 100644 index 0000000000..92053aadc8 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/string.cpp @@ -0,0 +1,241 @@ +#include "string.h" +#include "utils.h" + +#include <library/cpp/clickhouse/client/base/wire_format.h> + +#include <util/memory/tempbuf.h> + +namespace NClickHouse { + TColumnFixedString::TColumnFixedString(size_t n) + : TColumn(TType::CreateString(n)) + , StringSize_(n) + { + } + + TColumnFixedString::TColumnFixedString(size_t n, const TVector<TString>& data) + : TColumnFixedString(n) + { + Data_.reserve(data.size()); + for (const auto& value : data) { + Append(value); + } + } + + TIntrusivePtr<TColumnFixedString> TColumnFixedString::Create(size_t n) { + return new TColumnFixedString(n); + } + + TIntrusivePtr<TColumnFixedString> TColumnFixedString::Create(size_t n, const TVector<TString>& data) { + return new TColumnFixedString(n, data); + } + + void TColumnFixedString::Append(const TString& str) { + Data_.push_back(str); + Data_.back().resize(StringSize_); + } + + const TString& TColumnFixedString::At(size_t n) const { + return Data_.at(n); + } + + const TString& TColumnFixedString::operator[](size_t n) const { + return Data_[n]; + } + + void TColumnFixedString::SetAt(size_t n, const TString& value) { + TString stringResized(value); + stringResized.resize(StringSize_); + Data_.at(n) = stringResized; + } + + void TColumnFixedString::Append(TColumnRef column) { + if (auto col = column->As<TColumnFixedString>()) { + if (StringSize_ == col->StringSize_) { + Data_.insert(Data_.end(), col->Data_.begin(), col->Data_.end()); + } + } + } + + bool TColumnFixedString::Load(TCodedInputStream* input, size_t rows) { + for (size_t i = 0; i < rows; ++i) { + TTempBuf s(StringSize_); + + if (!TWireFormat::ReadBytes(input, s.Data(), StringSize_)) { + return false; + } + + Data_.push_back(TString(s.Data(), StringSize_)); + } + + return true; + } + + void TColumnFixedString::Save(TCodedOutputStream* output) { + for (size_t i = 0; i < Data_.size(); ++i) { + TWireFormat::WriteBytes(output, Data_[i].data(), StringSize_); + } + } + + size_t TColumnFixedString::Size() const { + return Data_.size(); + } + + TColumnRef TColumnFixedString::Slice(size_t begin, size_t len) { + auto result = new TColumnFixedString(StringSize_); + + if (begin < Data_.size()) { + result->Data_ = SliceVector(Data_, begin, len); + } + + return result; + } + + TColumnString::TColumnString() + : TColumn(TType::CreateString()) + { + } + + TColumnString::TColumnString(const TVector<TString>& data) + : TColumn(TType::CreateString()) + , Data_(data) + { + } + + TColumnString::TColumnString(TVector<TString>&& data) + : TColumn(TType::CreateString()) + , Data_(std::move(data)) + { + } + + TIntrusivePtr<TColumnString> TColumnString::Create() { + return new TColumnString(); + } + + TIntrusivePtr<TColumnString> TColumnString::Create(const TVector<TString>& data) { + return new TColumnString(data); + } + + TIntrusivePtr<TColumnString> TColumnString::Create(TVector<TString>&& data) { + return new TColumnString(std::move(data)); + } + + void TColumnString::Append(const TString& str) { + Data_.push_back(str); + } + + const TString& TColumnString::At(size_t n) const { + return Data_.at(n); + } + + const TString& TColumnString::operator[](size_t n) const { + return Data_[n]; + } + + void TColumnString::SetAt(size_t n, const TString& value) { + Data_.at(n) = value; + } + + void TColumnString::Append(TColumnRef column) { + if (auto col = column->As<TColumnString>()) { + Data_.insert(Data_.end(), col->Data_.begin(), col->Data_.end()); + } + } + + bool TColumnString::Load(TCodedInputStream* input, size_t rows) { + for (size_t i = 0; i < rows; ++i) { + TString s; + + if (!TWireFormat::ReadString(input, &s)) { + return false; + } + + Data_.push_back(s); + } + + return true; + } + + void TColumnString::Save(TCodedOutputStream* output) { + for (auto si = Data_.begin(); si != Data_.end(); ++si) { + TWireFormat::WriteString(output, *si); + } + } + + size_t TColumnString::Size() const { + return Data_.size(); + } + + TColumnRef TColumnString::Slice(size_t begin, size_t len) { + return new TColumnString(SliceVector(Data_, begin, len)); + } + + TColumnStringBuf::TColumnStringBuf() + : TColumn(TType::CreateString()) + { + } + + TColumnStringBuf::TColumnStringBuf(const TVector<TStringBuf>& data) + : TColumn(TType::CreateString()) + , Data_(data) + { + } + + TColumnStringBuf::TColumnStringBuf(TVector<TStringBuf>&& data) + : TColumn(TType::CreateString()) + , Data_(std::move(data)) + { + } + + TIntrusivePtr<TColumnStringBuf> TColumnStringBuf::Create() { + return new TColumnStringBuf(); + } + + TIntrusivePtr<TColumnStringBuf> TColumnStringBuf::Create(const TVector<TStringBuf>& data) { + return new TColumnStringBuf(data); + } + + TIntrusivePtr<TColumnStringBuf> TColumnStringBuf::Create(TVector<TStringBuf>&& data) { + return new TColumnStringBuf(std::move(data)); + } + + void TColumnStringBuf::Append(TStringBuf str) { + Data_.push_back(str); + } + + const TStringBuf& TColumnStringBuf::At(size_t n) const { + return Data_.at(n); + } + + const TStringBuf& TColumnStringBuf::operator[](size_t n) const { + return Data_[n]; + } + + void TColumnStringBuf::SetAt(size_t n, TStringBuf value) { + Data_.at(n) = value; + } + + void TColumnStringBuf::Append(TColumnRef column) { + if (auto col = column->As<TColumnStringBuf>()) { + Data_.insert(Data_.end(), col->Data_.begin(), col->Data_.end()); + } + } + + bool TColumnStringBuf::Load(TCodedInputStream*, size_t) { + ythrow yexception() << "load not implemented"; + } + + void TColumnStringBuf::Save(TCodedOutputStream* output) { + for (auto si = Data_.begin(); si != Data_.end(); ++si) { + TWireFormat::WriteStringBuf(output, *si); + } + } + + size_t TColumnStringBuf::Size() const { + return Data_.size(); + } + + TColumnRef TColumnStringBuf::Slice(size_t begin, size_t len) { + return new TColumnStringBuf(SliceVector(Data_, begin, len)); + } + +} diff --git a/library/cpp/clickhouse/client/columns/string.h b/library/cpp/clickhouse/client/columns/string.h new file mode 100644 index 0000000000..19c41fcda3 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/string.h @@ -0,0 +1,142 @@ +#pragma once + +#include "column.h" + +#include <util/generic/string.h> + +namespace NClickHouse { + /** + * Represents column of fixed-length strings. + */ + class TColumnFixedString: public TColumn { + public: + static TIntrusivePtr<TColumnFixedString> Create(size_t n); + static TIntrusivePtr<TColumnFixedString> Create(size_t n, const TVector<TString>& data); + + /// Appends one element to the column. + void Append(const TString& str); + + /// Returns element at given row number. + const TString& At(size_t n) const; + + /// Returns element at given row number. + const TString& operator[](size_t n) const; + + /// Set element at given row number. + void SetAt(size_t n, const TString& value); + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnFixedString(size_t n); + TColumnFixedString(size_t n, const TVector<TString>& data); + + const size_t StringSize_; + TVector<TString> Data_; + }; + + /** + * Represents column of variable-length strings. + */ + class TColumnString: public TColumn { + public: + static TIntrusivePtr<TColumnString> Create(); + static TIntrusivePtr<TColumnString> Create(const TVector<TString>& data); + static TIntrusivePtr<TColumnString> Create(TVector<TString>&& data); + + /// Appends one element to the column. + void Append(const TString& str); + + /// Returns element at given row number. + const TString& At(size_t n) const; + + /// Returns element at given row number. + const TString& operator[](size_t n) const; + + /// Set element at given row number. + void SetAt(size_t n, const TString& value); + + public: + /// Appends content of given column to the end of current one. + void Append(TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnString(); + TColumnString(const TVector<TString>& data); + TColumnString(TVector<TString>&& data); + + TVector<TString> Data_; + }; + + /** +* Represents column of variable-length strings but use TStringBuf instead TString. +*/ + class TColumnStringBuf: public NClickHouse::TColumn { + public: + static TIntrusivePtr<TColumnStringBuf> Create(); + static TIntrusivePtr<TColumnStringBuf> Create(const TVector<TStringBuf>& data); + static TIntrusivePtr<TColumnStringBuf> Create(TVector<TStringBuf>&& data); + + /// Appends one element to the column. + void Append(TStringBuf str); + + /// Returns element at given row number. + const TStringBuf& At(size_t n) const; + + /// Returns element at given row number. + const TStringBuf& operator[](size_t n) const; + + /// Set element at given row number. + void SetAt(size_t n, TStringBuf value); + + public: + /// Appends content of given column to the end of current one. + void Append(NClickHouse::TColumnRef column) override; + + /// Loads column data from input stream. + bool Load(NClickHouse::TCodedInputStream* input, size_t rows) override; + + /// Saves column data to output stream. + void Save(NClickHouse::TCodedOutputStream* output) override; + + /// Returns count of rows in the column. + size_t Size() const override; + + /// Makes slice of the current column. + NClickHouse::TColumnRef Slice(size_t begin, size_t len) override; + + private: + TColumnStringBuf(); + TColumnStringBuf(const TVector<TStringBuf>& data); + TColumnStringBuf(TVector<TStringBuf>&& data); + + TVector<TStringBuf> Data_; + }; + +} diff --git a/library/cpp/clickhouse/client/columns/tuple.cpp b/library/cpp/clickhouse/client/columns/tuple.cpp new file mode 100644 index 0000000000..3d0d00e772 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/tuple.cpp @@ -0,0 +1,42 @@ +#include "tuple.h" + +namespace NClickHouse { + static TVector<TTypeRef> CollectTypes(const TVector<TColumnRef>& columns) { + TVector<TTypeRef> types; + for (const auto& col : columns) { + types.push_back(col->Type()); + } + return types; + } + + TColumnTuple::TColumnTuple(const TVector<TColumnRef>& columns) + : TColumn(TType::CreateTuple(CollectTypes(columns))) + , Columns_(columns) + { + } + + TIntrusivePtr<TColumnTuple> TColumnTuple::Create(const TVector<TColumnRef>& columns) { + return new TColumnTuple(columns); + } + + size_t TColumnTuple::Size() const { + return Columns_.empty() ? 0 : Columns_[0]->Size(); + } + + bool TColumnTuple::Load(TCodedInputStream* input, size_t rows) { + for (auto ci = Columns_.begin(); ci != Columns_.end(); ++ci) { + if (!(*ci)->Load(input, rows)) { + return false; + } + } + + return true; + } + + void TColumnTuple::Save(TCodedOutputStream* output) { + for (auto ci = Columns_.begin(); ci != Columns_.end(); ++ci) { + (*ci)->Save(output); + } + } + +} diff --git a/library/cpp/clickhouse/client/columns/tuple.h b/library/cpp/clickhouse/client/columns/tuple.h new file mode 100644 index 0000000000..d388a7b9a9 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/tuple.h @@ -0,0 +1,37 @@ +#pragma once + +#include "column.h" + +#include <util/generic/vector.h> + +namespace NClickHouse { + /** */ + class TColumnTuple: public TColumn { + public: + static TIntrusivePtr<TColumnTuple> Create(const TVector<TColumnRef>& columns); + + TColumnRef operator[](size_t n) const { + return Columns_[n]; + } + + /// Appends content of given column to the end of current one. + void Append(TColumnRef) override { + } + + size_t Size() const override; + + bool Load(TCodedInputStream* input, size_t rows) override; + + void Save(TCodedOutputStream* output) override; + + TColumnRef Slice(size_t, size_t) override { + return TColumnRef(); + } + + private: + TColumnTuple(const TVector<TColumnRef>& columns); + + TVector<TColumnRef> Columns_; + }; + +} diff --git a/library/cpp/clickhouse/client/columns/utils.h b/library/cpp/clickhouse/client/columns/utils.h new file mode 100644 index 0000000000..fc43828c63 --- /dev/null +++ b/library/cpp/clickhouse/client/columns/utils.h @@ -0,0 +1,19 @@ +#pragma once + +#include <algorithm> +#include <util/generic/vector.h> + +namespace NClickHouse { + template <typename T> + TVector<T> SliceVector(const TVector<T>& vec, size_t begin, size_t len) { + TVector<T> result; + + if (begin < vec.size()) { + len = std::min(len, vec.size() - begin); + result.assign(vec.begin() + begin, vec.begin() + (begin + len)); + } + + return result; + } + +} diff --git a/library/cpp/clickhouse/client/columns/ya.make b/library/cpp/clickhouse/client/columns/ya.make new file mode 100644 index 0000000000..29330f949e --- /dev/null +++ b/library/cpp/clickhouse/client/columns/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +SRCS( + array.cpp + date.cpp + enum.cpp + factory.cpp + nullable.cpp + numeric.cpp + string.cpp + tuple.cpp +) + +PEERDIR( + library/cpp/clickhouse/client/base + library/cpp/clickhouse/client/types +) + +END() diff --git a/library/cpp/clickhouse/client/exceptions.h b/library/cpp/clickhouse/client/exceptions.h new file mode 100644 index 0000000000..d27c5352f9 --- /dev/null +++ b/library/cpp/clickhouse/client/exceptions.h @@ -0,0 +1,27 @@ +#pragma once + +#include "query.h" + +#include <util/generic/yexception.h> + +namespace NClickHouse { + class TServerException: public yexception { + public: + TServerException(std::unique_ptr<TException> e) + : Exception_(std::move(e)) + { + } + + const TException& GetException() const { + return *Exception_; + } + + const char* what() const noexcept override { + return Exception_->DisplayText.c_str(); + } + + private: + std::unique_ptr<TException> Exception_; + }; + +} diff --git a/library/cpp/clickhouse/client/protocol.h b/library/cpp/clickhouse/client/protocol.h new file mode 100644 index 0000000000..3cb5b2646f --- /dev/null +++ b/library/cpp/clickhouse/client/protocol.h @@ -0,0 +1,52 @@ +#pragma once + +namespace NClickHouse { + /// То, что передаёт сервер. + namespace ServerCodes { + enum { + Hello = 0, /// Имя, версия, ревизия. + Data = 1, /// Блок данных со сжатием или без. + Exception = 2, /// Исключение во время обработки запроса. + Progress = 3, /// Прогресс выполнения запроса: строк считано, байт считано. + Pong = 4, /// Ответ на Ping. + EndOfStream = 5, /// Все пакеты были переданы. + ProfileInfo = 6, /// Пакет с профайлинговой информацией. + Totals = 7, /// Блок данных с тотальными значениями, со сжатием или без. + Extremes = 8, /// Блок данных с минимумами и максимумами, аналогично. + Log = 10, /// Системный лог исполнения запроса. + }; + } + + /// То, что передаёт клиент. + namespace ClientCodes { + enum { + Hello = 0, /// Имя, версия, ревизия, БД по-умолчанию. + Query = 1, /** Идентификатор запроса, настройки на отдельный запрос, + * информация, до какой стадии исполнять запрос, + * использовать ли сжатие, текст запроса (без данных для INSERT-а). + */ + Data = 2, /// Блок данных со сжатием или без. + Cancel = 3, /// Отменить выполнение запроса. + Ping = 4, /// Проверка живости соединения с сервером. + }; + } + + /// Использовать ли сжатие. + namespace CompressionState { + enum { + Disable = 0, + Enable = 1, + }; + } + + namespace Stages { + enum { + Complete = 2, + }; + } + + enum class ECompressionMethodByte : ui8 { + LZ4 = 0x82, + ZSTD = 0x90, + }; +} diff --git a/library/cpp/clickhouse/client/query.cpp b/library/cpp/clickhouse/client/query.cpp new file mode 100644 index 0000000000..875dc4a078 --- /dev/null +++ b/library/cpp/clickhouse/client/query.cpp @@ -0,0 +1,20 @@ +#include "query.h" + +namespace NClickHouse { + TQuery::TQuery() { + } + + TQuery::TQuery(const char* query) + : Query_(query) + { + } + + TQuery::TQuery(const TString& query) + : Query_(query) + { + } + + TQuery::~TQuery() { + } + +} diff --git a/library/cpp/clickhouse/client/query.h b/library/cpp/clickhouse/client/query.h new file mode 100644 index 0000000000..fc5879b4f5 --- /dev/null +++ b/library/cpp/clickhouse/client/query.h @@ -0,0 +1,153 @@ +#pragma once + +#include "block.h" + +#include <util/generic/string.h> + +#include <cstdint> +#include <functional> +#include <memory> + +namespace NClickHouse { + /** + * Settings of individual query. + */ + struct TQuerySettings { + /// Максимальное количество потоков выполнения запроса. По-умолчанию - определять автоматически. + int MaxThreads = 0; + /// Считать минимумы и максимумы столбцов результата. + bool Extremes = false; + /// Тихо пропускать недоступные шарды. + bool SkipUnavailableShards = false; + /// Write statistics about read rows, bytes, time elapsed, etc. + bool OutputFormatWriteStatistics = true; + /// Use client timezone for interpreting DateTime string values, instead of adopting server timezone. + bool UseClientTimeZone = false; + + // connect_timeout + // max_block_size + // distributed_group_by_no_merge = false + // strict_insert_defaults = 0 + // network_compression_method = LZ4 + // priority = 0 + }; + + struct TException { + int Code = 0; + TString Name; + TString DisplayText; + TString StackTrace; + /// Pointer to nested exception. + std::unique_ptr<TException> Nested; + }; + + struct TProfile { + ui64 rows = 0; + ui64 blocks = 0; + ui64 bytes = 0; + ui64 rows_before_limit = 0; + bool applied_limit = false; + bool calculated_rows_before_limit = false; + }; + + struct TProgress { + ui64 rows = 0; + ui64 bytes = 0; + ui64 total_rows = 0; + }; + + class TQueryEvents { + public: + virtual ~TQueryEvents() { + } + + /// Some data was received. + virtual void OnData(const TBlock& block) = 0; + + virtual void OnServerException(const TException& e) = 0; + + virtual void OnProfile(const TProfile& profile) = 0; + + virtual void OnProgress(const TProgress& progress) = 0; + + virtual void OnFinish() = 0; + }; + + using TExceptionCallback = std::function<void(const TException& e)>; + using TProfileCallback = std::function<void(const TProfile& profile)>; + using TProgressCallback = std::function<void(const TProgress& progress)>; + using TSelectCallback = std::function<void(const TBlock& block)>; + + class TQuery: public TQueryEvents { + public: + TQuery(); + TQuery(const char* query); + TQuery(const TString& query); + ~TQuery(); + + /// + inline TString GetText() const { + return Query_; + } + + /// Set handler for receiving result data. + inline TQuery& OnData(TSelectCallback cb) { + SelectCb_ = cb; + return *this; + } + + /// Set handler for receiving server's exception. + inline TQuery& OnException(TExceptionCallback cb) { + ExceptionCb_ = cb; + return *this; + } + + /// Set handler for receiving a profile of query execution. + inline TQuery& OnProfile(TProfileCallback pb) { + ProfileCb_ = pb; + return *this; + } + + /// Set handler for receiving a progress of query exceution. + inline TQuery& OnProgress(TProgressCallback cb) { + ProgressCb_ = cb; + return *this; + } + + private: + void OnData(const TBlock& block) override { + if (SelectCb_) { + SelectCb_(block); + } + } + + void OnServerException(const TException& e) override { + if (ExceptionCb_) { + ExceptionCb_(e); + } + } + + void OnProfile(const TProfile& profile) override { + if (ProfileCb_) { + ProfileCb_(profile); + } + } + + void OnProgress(const TProgress& progress) override { + if (ProgressCb_) { + ProgressCb_(progress); + } + } + + void OnFinish() override { + } + + private: + TString Query_; + TExceptionCallback ExceptionCb_; + TProfileCallback ProfileCb_; + TProgressCallback ProgressCb_; + TSelectCallback SelectCb_; + }; + +} diff --git a/library/cpp/clickhouse/client/types/CMakeLists.darwin-arm64.txt b/library/cpp/clickhouse/client/types/CMakeLists.darwin-arm64.txt new file mode 100644 index 0000000000..abd32a8751 --- /dev/null +++ b/library/cpp/clickhouse/client/types/CMakeLists.darwin-arm64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-types) +target_link_libraries(clickhouse-client-types PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-types PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/type_parser.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/types.cpp +) diff --git a/library/cpp/clickhouse/client/types/CMakeLists.darwin-x86_64.txt b/library/cpp/clickhouse/client/types/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..abd32a8751 --- /dev/null +++ b/library/cpp/clickhouse/client/types/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-types) +target_link_libraries(clickhouse-client-types PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-types PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/type_parser.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/types.cpp +) diff --git a/library/cpp/clickhouse/client/types/CMakeLists.linux-aarch64.txt b/library/cpp/clickhouse/client/types/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..1957c91ece --- /dev/null +++ b/library/cpp/clickhouse/client/types/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-types) +target_link_libraries(clickhouse-client-types PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-types PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/type_parser.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/types.cpp +) diff --git a/library/cpp/clickhouse/client/types/CMakeLists.linux-x86_64.txt b/library/cpp/clickhouse/client/types/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..1957c91ece --- /dev/null +++ b/library/cpp/clickhouse/client/types/CMakeLists.linux-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(clickhouse-client-types) +target_link_libraries(clickhouse-client-types PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil +) +target_sources(clickhouse-client-types PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/type_parser.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/clickhouse/client/types/types.cpp +) diff --git a/library/cpp/clickhouse/client/types/CMakeLists.txt b/library/cpp/clickhouse/client/types/CMakeLists.txt new file mode 100644 index 0000000000..1beba2829f --- /dev/null +++ b/library/cpp/clickhouse/client/types/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") + include(CMakeLists.darwin-arm64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/clickhouse/client/types/type_parser.cpp b/library/cpp/clickhouse/client/types/type_parser.cpp new file mode 100644 index 0000000000..4fea43291b --- /dev/null +++ b/library/cpp/clickhouse/client/types/type_parser.cpp @@ -0,0 +1,231 @@ +#include "type_parser.h" + +#include <util/string/cast.h> + +namespace NClickHouse { + static TTypeAst::EMeta GetTypeMeta(const TStringBuf& name) { + if (name == "Array") { + return TTypeAst::Array; + } + + if (name == "Null") { + return TTypeAst::Null; + } + + if (name == "Nullable") { + return TTypeAst::Nullable; + } + + if (name == "Tuple") { + return TTypeAst::Tuple; + } + + if (name == "Enum8" || name == "Enum16") { + return TTypeAst::Enum; + } + + return TTypeAst::Terminal; + } + + TTypeParser::TTypeParser(const TStringBuf& name) + : Cur_(name.data()) + , End_(name.data() + name.size()) + , Type_(nullptr) + { + } + + TTypeParser::~TTypeParser() = default; + + bool TTypeParser::Parse(TTypeAst* type) { + Type_ = type; + OpenElements_.push(Type_); + + do { + const TToken& TToken = NextToken(); + + switch (TToken.Type) { + case TToken::QuotedString: + { + Type_->Meta = TTypeAst::Terminal; + if (TToken.Value.length() < 1) + Type_->Name = {}; + else + Type_->Name = TToken.Value.substr(1, TToken.Value.length() - 2); + //Type_->code = Type::String; + break; + } + case TToken::Name: + Type_->Meta = GetTypeMeta(TToken.Value); + Type_->Name = TToken.Value; + break; + case TToken::Number: + Type_->Meta = TTypeAst::Number; + Type_->Value = FromString<i64>(TToken.Value); + break; + case TToken::LPar: + Type_->Elements.emplace_back(TTypeAst()); + OpenElements_.push(Type_); + Type_ = &Type_->Elements.back(); + break; + case TToken::RPar: + Type_ = OpenElements_.top(); + OpenElements_.pop(); + break; + case TToken::Comma: + Type_ = OpenElements_.top(); + OpenElements_.pop(); + Type_->Elements.emplace_back(TTypeAst()); + OpenElements_.push(Type_); + Type_ = &Type_->Elements.back(); + break; + case TToken::EOS: + return true; + case TToken::Invalid: + return false; + } + } while (true); + } + + TTypeParser::TToken TTypeParser::NextToken() { + for (; Cur_ < End_; ++Cur_) { + switch (*Cur_) { + case ' ': + case '\n': + case '\t': + case '\0': + case '=': + continue; + + case '(': + return TToken{TToken::LPar, TStringBuf(Cur_++, 1)}; + case ')': + return TToken{TToken::RPar, TStringBuf(Cur_++, 1)}; + case ',': + return TToken{TToken::Comma, TStringBuf(Cur_++, 1)}; + case '\'': + { + const size_t end_quote_length = 1; + const TStringBuf end_quote{Cur_, end_quote_length}; + // Fast forward to the closing quote. + const auto start = Cur_++; + for (; Cur_ < End_ - end_quote_length; ++Cur_) { + // TODO (nemkov): handle escaping ? + if (end_quote == TStringBuf{Cur_, end_quote_length}) { + Cur_ += end_quote_length; + + return TToken{TToken::QuotedString, TStringBuf{start, Cur_}}; + } + } + return TToken{TToken::QuotedString, TStringBuf(Cur_++, 1)}; + } + + default: { + const char* st = Cur_; + + if (isalpha(*Cur_) || *Cur_ == '_') { + for (; Cur_ < End_; ++Cur_) { + if (!isalpha(*Cur_) && !isdigit(*Cur_) && *Cur_ != '_') { + break; + } + } + + return TToken{TToken::Name, TStringBuf(st, Cur_)}; + } + + if (isdigit(*Cur_) || *Cur_ == '-') { + ++Cur_; + for (; Cur_ < End_; ++Cur_) { + if (!isdigit(*Cur_)) { + break; + } + } + + return TToken{TToken::Number, TStringBuf(st, Cur_)}; + } + + return TToken{TToken::Invalid, TStringBuf()}; + } + } + } + + return TToken{TToken::EOS, TStringBuf()}; + } + + static TTypeRef CreateTypeFromAst(const TTypeAst& ast) { + if (ast.Meta == TTypeAst::Terminal) { + if (ast.Name == "UInt8") + return TType::CreateSimple<ui8>(); + if (ast.Name == "UInt16") + return TType::CreateSimple<ui16>(); + if (ast.Name == "UInt32") + return TType::CreateSimple<ui32>(); + if (ast.Name == "UInt64") + return TType::CreateSimple<ui64>(); + + if (ast.Name == "Int8") + return TType::CreateSimple<i8>(); + if (ast.Name == "Int16") + return TType::CreateSimple<i16>(); + if (ast.Name == "Int32") + return TType::CreateSimple<i32>(); + if (ast.Name == "Int64") + return TType::CreateSimple<i64>(); + + if (ast.Name == "Float32") + return TType::CreateSimple<float>(); + if (ast.Name == "Float64") + return TType::CreateSimple<double>(); + + if (ast.Name == "String") + return TType::CreateString(); + if (ast.Name == "FixedString") + return TType::CreateString(ast.Elements.front().Value); + + if (ast.Name == "DateTime") + return TType::CreateDateTime(); + if (ast.Name == "Date") + return TType::CreateDate(); + } else if (ast.Meta == TTypeAst::Tuple) { + TVector<TTypeRef> columns; + + for (const auto& elem : ast.Elements) { + if (auto col = CreateTypeFromAst(elem)) { + columns.push_back(col); + } else { + return nullptr; + } + } + + return TType::CreateTuple(columns); + } else if (ast.Meta == TTypeAst::Array) { + return TType::CreateArray(CreateTypeFromAst(ast.Elements.front())); + } else if (ast.Meta == TTypeAst::Enum) { + TVector<TEnumItem> enum_items; + + for (const auto& elem : ast.Elements) { + TString name(elem.Name); + i16 value = elem.Value; + enum_items.push_back({name, value}); + } + + if (ast.Name == "Enum8") { + return TType::CreateEnum8(enum_items); + } else { + return TType::CreateEnum16(enum_items); + } + } + + return nullptr; + } + + TTypeRef ParseTypeFromString(const TStringBuf& type_name) { + TTypeAst ast; + + if (TTypeParser(type_name).Parse(&ast)) { + return CreateTypeFromAst(ast); + } + + return TTypeRef(); + } + +} diff --git a/library/cpp/clickhouse/client/types/type_parser.h b/library/cpp/clickhouse/client/types/type_parser.h new file mode 100644 index 0000000000..c912c4cc40 --- /dev/null +++ b/library/cpp/clickhouse/client/types/type_parser.h @@ -0,0 +1,68 @@ +#pragma once + +#include "types.h" + +#include <util/generic/strbuf.h> +#include <util/generic/list.h> +#include <util/generic/stack.h> + +namespace NClickHouse { + struct TTypeAst { + enum EMeta { + Array, + Null, + Nullable, + Number, + Terminal, + Tuple, + Enum + }; + + /// Type's category. + EMeta Meta; + /// Type's name. + TStringBuf Name; + /// Value associated with the node, used for fixed-width types and enum values. + i64 Value = 0; + /// Subelements of the type. Used to store enum's names and values as well. + TList<TTypeAst> Elements; + }; + + class TTypeParser { + struct TToken { + enum EType { + Invalid = 0, + Name, + Number, + LPar, + RPar, + Comma, + QuotedString, // string with quotation marks included + EOS + }; + + EType Type; + TStringBuf Value; + }; + + public: + explicit TTypeParser(const TStringBuf& name); + ~TTypeParser(); + + bool Parse(TTypeAst* type); + + private: + TToken NextToken(); + + private: + const char* Cur_; + const char* End_; + + TTypeAst* Type_; + TStack<TTypeAst*> OpenElements_; + }; + + /// Create type instance from string representation. + TTypeRef ParseTypeFromString(const TStringBuf& type_name); + +} diff --git a/library/cpp/clickhouse/client/types/types.cpp b/library/cpp/clickhouse/client/types/types.cpp new file mode 100644 index 0000000000..98d88a8f4c --- /dev/null +++ b/library/cpp/clickhouse/client/types/types.cpp @@ -0,0 +1,197 @@ +#include "types.h" + +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/string/join.h> +#include <util/string/printf.h> + +namespace NClickHouse { + TType::TType(const ECode code) + : Code_(code) + { + if (Code_ == Array) { + Array_ = new TArray; + } else if (Code_ == Tuple) { + Tuple_ = new TTuple; + } else if (Code_ == Nullable) { + Nullable_ = new TNullable; + } + } + + TType::~TType() { + if (Code_ == Array) { + delete Array_; + } else if (Code_ == Tuple) { + delete Tuple_; + } else if (Code_ == Nullable) { + delete Nullable_; + } + } + + TType::ECode TType::GetCode() const { + return Code_; + } + + TTypeRef TType::GetItemType() const { + if (Code_ == Array) { + return Array_->ItemType; + } + return TTypeRef(); + } + + const TVector<TEnumItem>& TType::GetEnumItems() const { + return EnumItems_; + } + + const TString& TType::GetEnumName(i16 enumValue) const { + return EnumValueToName_.at(enumValue); + } + + i16 TType::GetEnumValue(const TString& enumName) const { + return EnumNameToValue_.at(enumName); + } + + bool TType::HasEnumName(const TString& enumName) const { + return EnumNameToValue_.contains(enumName); + } + + bool TType::HasEnumValue(i16 enumValue) const { + return EnumValueToName_.contains(enumValue); + } + + TString TType::GetName() const { + switch (Code_) { + case Void: + return "Void"; + case Int8: + return "Int8"; + case Int16: + return "Int16"; + case Int32: + return "Int32"; + case Int64: + return "Int64"; + case UInt8: + return "UInt8"; + case UInt16: + return "UInt16"; + case UInt32: + return "UInt32"; + case UInt64: + return "UInt64"; + case Enum8: + case Enum16: { + TVector<TString> pairs; + for (const auto& item : EnumItems_) { + pairs.push_back(TStringBuilder() << "'" << item.Name << "' = " << item.Value); + } + TStringBuilder ret; + if (Code_ == Enum8) { + ret << "Enum8"; + } else { + ret << "Enum16"; + } + ret << "(" << JoinRange(", ", pairs.begin(), pairs.end()) << ")"; + return ret; + } + case Float32: + return "Float32"; + case Float64: + return "Float64"; + case String: + return "String"; + case FixedString: + return "FixedString(" + ToString(StringSize_) + ")"; + case DateTime: + return "DateTime"; + case Date: + return "Date"; + case Array: + return TString("Array(") + Array_->ItemType->GetName() + ")"; + case Nullable: + return TString("Nullable(") + Nullable_->NestedType->GetName() + ")"; + case Tuple: { + TString result("Tuple("); + for (size_t i = 0; i < Tuple_->ItemTypes.size(); ++i) { + result += Tuple_->ItemTypes[i]->GetName(); + + if (i + 1 != Tuple_->ItemTypes.size()) { + result += ", "; + } + } + result += ")"; + return result; + } + } + + return TString(); + } + + bool TType::IsEqual(const TTypeRef& other) const { + return this->GetName() == other->GetName(); + } + + TTypeRef TType::CreateArray(TTypeRef item_type) { + TTypeRef type(new TType(TType::Array)); + type->Array_->ItemType = item_type; + return type; + } + + TTypeRef TType::CreateDate() { + return TTypeRef(new TType(TType::Date)); + } + + TTypeRef TType::CreateDateTime() { + return TTypeRef(new TType(TType::DateTime)); + } + + TTypeRef TType::CreateNullable(TTypeRef nested_type) { + TTypeRef type(new TType(TType::Nullable)); + type->Nullable_->NestedType = nested_type; + return type; + } + + TTypeRef TType::CreateString() { + return TTypeRef(new TType(TType::String)); + } + + TTypeRef TType::CreateString(size_t n) { + TTypeRef type(new TType(TType::FixedString)); + type->StringSize_ = n; + return type; + } + + TTypeRef TType::CreateTuple(const TVector<TTypeRef>& item_types) { + TTypeRef type(new TType(TType::Tuple)); + type->Tuple_->ItemTypes.assign(item_types.begin(), item_types.end()); + return type; + } + + TTypeRef TType::CreateEnum8(const TVector<TEnumItem>& enum_items) { + for (const auto& item : enum_items) { + Y_ENSURE(item.Value >= Min<i8>() && item.Value <= Max<i8>(), + Sprintf("Enum value %d for %s doesn't fit into Int8", item.Value, item.Name.data())); + } + + TTypeRef type(new TType(TType::Enum8)); + type->EnumItems_.assign(enum_items.begin(), enum_items.end()); + for (const auto& item : enum_items) { + type->EnumNameToValue_.insert({item.Name, item.Value}); + type->EnumValueToName_.insert({item.Value, item.Name}); + } + + return type; + } + + TTypeRef TType::CreateEnum16(const TVector<TEnumItem>& enum_items) { + TTypeRef type(new TType(TType::Enum16)); + type->EnumItems_.assign(enum_items.begin(), enum_items.end()); + for (const auto& item : enum_items) { + type->EnumNameToValue_.insert({item.Name, item.Value}); + type->EnumValueToName_.insert({item.Value, item.Name}); + } + + return type; + } + +} diff --git a/library/cpp/clickhouse/client/types/types.h b/library/cpp/clickhouse/client/types/types.h new file mode 100644 index 0000000000..71bd3620aa --- /dev/null +++ b/library/cpp/clickhouse/client/types/types.h @@ -0,0 +1,163 @@ +#pragma once + +#include <util/generic/hash.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> + +namespace NClickHouse { + using TTypeRef = TIntrusivePtr<class TType>; + + struct TEnumItem { + TString Name; + i16 Value; + }; + + class TType: public TAtomicRefCount<TType> { + public: + enum ECode { + Void = 0, + Int8, + Int16, + Int32, + Int64, + UInt8, + UInt16, + UInt32, + UInt64, + Enum8, + Enum16, + Float32, + Float64, + String, + FixedString, + DateTime, + Date, + Array, + Nullable, + Tuple + }; + + /// Destructor + ~TType(); + + /// Type's code. + ECode GetCode() const; + + /// Type of array's elements. + TTypeRef GetItemType() const; + + /// Methods to work with enum types. + const TVector<TEnumItem>& GetEnumItems() const; + const TString& GetEnumName(i16 enumValue) const; + i16 GetEnumValue(const TString& enumName) const; + bool HasEnumName(const TString& enumName) const; + bool HasEnumValue(i16 enumValue) const; + + /// String representation of the type. + TString GetName() const; + + /// Is given type same as current one. + bool IsEqual(const TTypeRef& other) const; + + public: + static TTypeRef CreateArray(TTypeRef item_type); + + static TTypeRef CreateDate(); + + static TTypeRef CreateDateTime(); + + static TTypeRef CreateNullable(TTypeRef nested_type); + + template <typename T> + static TTypeRef CreateSimple(); + + static TTypeRef CreateString(); + + static TTypeRef CreateString(size_t n); + + static TTypeRef CreateTuple(const TVector<TTypeRef>& item_types); + + static TTypeRef CreateEnum8(const TVector<TEnumItem>& enum_items); + + static TTypeRef CreateEnum16(const TVector<TEnumItem>& enum_items); + + private: + TType(const ECode code); + + struct TArray { + TTypeRef ItemType; + }; + + struct TNullable { + TTypeRef NestedType; + }; + + struct TTuple { + TVector<TTypeRef> ItemTypes; + }; + + TVector<TEnumItem> EnumItems_; + THashMap<i16, TString> EnumValueToName_; + THashMap<TString, i16> EnumNameToValue_; + + const ECode Code_; + union { + TArray* Array_; + TNullable* Nullable_; + TTuple* Tuple_; + int StringSize_; + }; + }; + + template <> + inline TTypeRef TType::CreateSimple<i8>() { + return TTypeRef(new TType(Int8)); + } + + template <> + inline TTypeRef TType::CreateSimple<i16>() { + return TTypeRef(new TType(Int16)); + } + + template <> + inline TTypeRef TType::CreateSimple<i32>() { + return TTypeRef(new TType(Int32)); + } + + template <> + inline TTypeRef TType::CreateSimple<i64>() { + return TTypeRef(new TType(Int64)); + } + + template <> + inline TTypeRef TType::CreateSimple<ui8>() { + return TTypeRef(new TType(UInt8)); + } + + template <> + inline TTypeRef TType::CreateSimple<ui16>() { + return TTypeRef(new TType(UInt16)); + } + + template <> + inline TTypeRef TType::CreateSimple<ui32>() { + return TTypeRef(new TType(UInt32)); + } + + template <> + inline TTypeRef TType::CreateSimple<ui64>() { + return TTypeRef(new TType(UInt64)); + } + + template <> + inline TTypeRef TType::CreateSimple<float>() { + return TTypeRef(new TType(Float32)); + } + + template <> + inline TTypeRef TType::CreateSimple<double>() { + return TTypeRef(new TType(Float64)); + } + +} diff --git a/library/cpp/clickhouse/client/types/ya.make b/library/cpp/clickhouse/client/types/ya.make new file mode 100644 index 0000000000..1c9f7f5932 --- /dev/null +++ b/library/cpp/clickhouse/client/types/ya.make @@ -0,0 +1,8 @@ +LIBRARY() + +SRCS( + type_parser.cpp + types.cpp +) + +END() diff --git a/library/cpp/clickhouse/client/ya.make b/library/cpp/clickhouse/client/ya.make new file mode 100644 index 0000000000..a07ddff2bb --- /dev/null +++ b/library/cpp/clickhouse/client/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + block.cpp + client.cpp + query.cpp +) + +PEERDIR( + contrib/libs/lz4 + contrib/restricted/cityhash-1.0.2 + library/cpp/clickhouse/client/base + library/cpp/clickhouse/client/columns + library/cpp/clickhouse/client/types + library/cpp/openssl/io +) + +END() |