diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:19:26 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:29:50 +0300 |
commit | 2661be00f3bc47590fda9218bf0386d6355c8c88 (patch) | |
tree | 3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h | |
parent | cf2a23963ac10add28c50cc114fbf48953eca5aa (diff) | |
download | ydb-2661be00f3bc47590fda9218bf0386d6355c8c88.tar.gz |
Moved yql/minikql YQL-19206
init
[nodiff:caesar]
commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h')
-rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h b/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h new file mode 100644 index 0000000000..bb675423fe --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h @@ -0,0 +1,255 @@ +#pragma once + +#include <yql/essentials/minikql/defs.h> +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/pack_num.h> +#include <yql/essentials/public/decimal/yql_decimal_serialize.h> +#include <yql/essentials/public/udf/udf_value.h> + +#include <library/cpp/packedtypes/zigzag.h> +#include <contrib/ydb/library/actors/util/rope.h> + +#include <util/generic/buffer.h> +#include <util/generic/strbuf.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace NDetails { + +template<typename TBuf> +inline void PackUInt64(ui64 val, TBuf& buf) { + buf.Advance(MAX_PACKED64_SIZE); + char* dst = buf.Pos() - MAX_PACKED64_SIZE; + buf.EraseBack(MAX_PACKED64_SIZE - Pack64(val, dst)); +} + +template<typename TBuf> +inline void PackInt64(i64 val, TBuf& buf) { + PackUInt64(ZigZagEncode(val), buf); +} + +template<typename TBuf> +inline void PackUInt32(ui32 val, TBuf& buf) { + buf.Advance(MAX_PACKED32_SIZE); + char* dst = buf.Pos() - MAX_PACKED32_SIZE; + buf.EraseBack(MAX_PACKED32_SIZE - Pack32(val, dst)); +} + +template<typename TBuf> +inline void PackInt32(i32 val, TBuf& buf) { + PackUInt32(ZigZagEncode(val), buf); +} + +template<typename TBuf> +inline void PackUInt16(ui16 val, TBuf& buf) { + buf.Advance(MAX_PACKED32_SIZE); + char* dst = buf.Pos() - MAX_PACKED32_SIZE; + buf.EraseBack(MAX_PACKED32_SIZE - Pack32(val, dst)); +} + +template<typename TBuf> +inline void PackInt16(i16 val, TBuf& buf) { + PackUInt16(ZigZagEncode(val), buf); +} + +template <typename T, typename TBuf> +void PutRawData(T val, TBuf& buf) { + buf.Advance(sizeof(T)); + std::memcpy(buf.Pos() - sizeof(T), &val, sizeof(T)); +} + +constexpr size_t MAX_PACKED_DECIMAL_SIZE = sizeof(NYql::NDecimal::TInt128); +template<typename TBuf> +void PackDecimal(NYql::NDecimal::TInt128 val, TBuf& buf) { + buf.Advance(MAX_PACKED_DECIMAL_SIZE); + char* dst = buf.Pos() - MAX_PACKED_DECIMAL_SIZE; + buf.EraseBack(MAX_PACKED_DECIMAL_SIZE - NYql::NDecimal::Serialize(val, dst)); +} + +class TChunkedInputBuffer : private TNonCopyable { +public: + explicit TChunkedInputBuffer(TRope&& rope) + : Rope_(std::move(rope)) + { + Next(); + } + + explicit TChunkedInputBuffer(TStringBuf input) + : Rope_(TRope{}) + , Data_(input.data()) + , Len_(input.size()) + { + } + + inline const char* data() const { + return Data_; + } + + inline size_t length() const { + return Len_; + } + + inline size_t size() const { + return Len_; + } + + inline void Skip(size_t size) { + Y_DEBUG_ABORT_UNLESS(size <= Len_); + Data_ += size; + Len_ -= size; + } + + bool IsEmpty() { + if (size()) { + return false; + } + Next(); + return size() == 0; + } + + inline void CopyTo(char* dst, size_t toCopy) { + if (Y_LIKELY(toCopy <= size())) { + std::memcpy(dst, data(), toCopy); + Skip(toCopy); + } else { + CopyToChunked(dst, toCopy); + } + } + + inline TRope ReleaseRope() { + Y_DEBUG_ABORT_UNLESS(OriginalLen_ >= Len_); + Rope_.EraseFront(OriginalLen_ - Len_); + TRope result = std::move(Rope_); + + Data_ = nullptr; + Len_ = OriginalLen_ = 0; + Rope_.clear(); + + return result; + } + + void Next() { + Y_DEBUG_ABORT_UNLESS(Len_ == 0); + Rope_.EraseFront(OriginalLen_); + if (!Rope_.IsEmpty()) { + Len_ = OriginalLen_ = Rope_.begin().ContiguousSize(); + Data_ = Rope_.begin().ContiguousData(); + Y_DEBUG_ABORT_UNLESS(Len_ > 0); + } else { + Len_ = OriginalLen_ = 0; + Data_ = nullptr; + } + } + +private: + void CopyToChunked(char* dst, size_t toCopy) { + while (toCopy) { + size_t chunkSize = std::min(size(), toCopy); + std::memcpy(dst, data(), chunkSize); + Skip(chunkSize); + dst += chunkSize; + toCopy -= chunkSize; + if (toCopy) { + Next(); + MKQL_ENSURE(size(), "Unexpected end of buffer"); + } + } + } + + TRope Rope_; + const char* Data_ = nullptr; + size_t Len_ = 0; + size_t OriginalLen_ = 0; +}; + +template <typename T> +T GetRawData(TChunkedInputBuffer& buf) { + T val; + buf.CopyTo(reinterpret_cast<char*>(&val), sizeof(val)); + return val; +} + +template<typename T> +T UnpackInteger(TChunkedInputBuffer& buf) { + T res; + size_t read; + if constexpr (std::is_same_v<T, NYql::NDecimal::TInt128>) { + std::tie(res, read) = NYql::NDecimal::Deserialize(buf.data(), buf.size()); + Y_DEBUG_ABORT_UNLESS((read != 0) xor (NYql::NDecimal::IsError(res))); + } else if constexpr (std::is_same_v<T, ui64>) { + read = Unpack64(buf.data(), buf.size(), res); + } else { + static_assert(std::is_same_v<T, ui32>, "Only ui32/ui64/TInt128 are supported"); + read = Unpack32(buf.data(), buf.size(), res); + } + + if (Y_LIKELY(read > 0)) { + buf.Skip(read); + return res; + } + + static_assert(MAX_PACKED_DECIMAL_SIZE > MAX_PACKED64_SIZE); + char tmpBuf[MAX_PACKED_DECIMAL_SIZE]; + Y_DEBUG_ABORT_UNLESS(buf.size() < MAX_PACKED_DECIMAL_SIZE); + std::memcpy(tmpBuf, buf.data(), buf.size()); + size_t pos = buf.size(); + buf.Skip(buf.size()); + + for (;;) { + if (buf.size() == 0) { + buf.Next(); + MKQL_ENSURE(buf.size() > 0, (std::is_same_v<T, NYql::NDecimal::TInt128> ? "Bad decimal packed data" : "Bad uint packed data")); + } + Y_DEBUG_ABORT_UNLESS(pos < MAX_PACKED_DECIMAL_SIZE); + tmpBuf[pos++] = *buf.data(); + buf.Skip(1); + if constexpr (std::is_same_v<T, NYql::NDecimal::TInt128>) { + std::tie(res, read) = NYql::NDecimal::Deserialize(tmpBuf, pos); + Y_DEBUG_ABORT_UNLESS((read != 0) xor (NYql::NDecimal::IsError(res))); + } else if constexpr (std::is_same_v<T, ui64>) { + read = Unpack64(tmpBuf, pos, res); + } else { + read = Unpack32(tmpBuf, pos, res); + } + if (read) { + break; + } + } + return res; +} + +inline NYql::NDecimal::TInt128 UnpackDecimal(TChunkedInputBuffer& buf) { + return UnpackInteger<NYql::NDecimal::TInt128>(buf); +} + +inline ui64 UnpackUInt64(TChunkedInputBuffer& buf) { + return UnpackInteger<ui64>(buf); +} + +inline i64 UnpackInt64(TChunkedInputBuffer& buf) { + return ZigZagDecode(UnpackUInt64(buf)); +} + +inline ui32 UnpackUInt32(TChunkedInputBuffer& buf) { + return UnpackInteger<ui32>(buf); +} + +inline i32 UnpackInt32(TChunkedInputBuffer& buf) { + return ZigZagDecode(UnpackUInt32(buf)); +} + +inline ui16 UnpackUInt16(TChunkedInputBuffer& buf) { + ui32 res = UnpackUInt32(buf); + MKQL_ENSURE(res <= Max<ui16>(), "Corrupted data"); + return res; +} + +inline i16 UnpackInt16(TChunkedInputBuffer& buf) { + return ZigZagDecode(UnpackUInt16(buf)); +} + +} // NDetails + +} +} |