diff options
author | aneporada <[email protected]> | 2022-11-15 12:01:45 +0300 |
---|---|---|
committer | aneporada <[email protected]> | 2022-11-15 12:01:45 +0300 |
commit | 660fa18bb981ed3fa5b10b6ec2d9248d80e33f87 (patch) | |
tree | 082309d424b2ff5abab3b6a1b40765f9bb36899e | |
parent | 701a5107da136849169c2168f1aecee07500b711 (diff) |
Implement BlockCompress
13 files changed, 827 insertions, 0 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index 384a8aac516..b9867b9bac7 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -26,6 +26,54 @@ IGraphTransformer::TStatus AsScalarWrapper(const TExprNode::TPtr& input, TExprNo return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 2U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType blockItemTypes; + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (blockItemTypes.size() < 2) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() << "Expected at least two columns, got " << blockItemTypes.size())); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureAtom(*input->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + ui32 index = 0; + if (!TryFromString(input->Child(1)->Content(), index)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), + TStringBuilder() << "Failed to convert to integer: " << input->Child(1)->Content())); + return IGraphTransformer::TStatus::Error; + } + + if (index >= blockItemTypes.size() - 1) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), + TStringBuilder() << "Index out of range. Index: " << index << ", maximum is: " << blockItemTypes.size() - 1)); + return IGraphTransformer::TStatus::Error; + } + + auto bitmapType = blockItemTypes[index]; + if (bitmapType->GetKind() != ETypeAnnotationKind::Data || bitmapType->Cast<TDataExprType>()->GetSlot() != NUdf::EDataSlot::Bool) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), + TStringBuilder() << "Expecting Bool as bitmap column type, but got: " << *bitmapType)); + return IGraphTransformer::TStatus::Error; + } + + auto flowItemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->Cast<TMultiExprType>()->GetItems(); + flowItemTypes.erase(flowItemTypes.begin() + index); + + auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(flowItemTypes); + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); if (!EnsureMinArgsCount(*input, 1U, ctx.Expr)) { diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h index bc677ba5d16..613a8febfeb 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.h +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h @@ -9,6 +9,7 @@ namespace NYql { namespace NTypeAnnImpl { IGraphTransformer::TStatus AsScalarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 2ae4f45ece3..2c7060bee6e 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11775,6 +11775,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["WideSkipBlocks"] = &WideSkipTakeBlocksWrapper; Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper; Functions["AsScalar"] = &AsScalarWrapper; + Functions["BlockCompress"] = &BlockCompressWrapper; ExtFunctions["BlockFunc"] = &BlockFuncWrapper; ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper; ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper; diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt index 3e1f4f514be..5ca69764ccb 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt @@ -39,6 +39,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h b/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h new file mode 100644 index 00000000000..35df9506631 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h @@ -0,0 +1,114 @@ +#pragma once +#include <util/system/types.h> + +namespace NKikimr { +namespace NMiniKQL { + +inline ui64 SaturationSub(ui64 x, ui64 y) { + // simple code produces cmov (same result as commented one) + return (x > y) ? x - y : 0; + //ui64 res = x - y; + //res &= -(res <= x); + //return res; +} + +inline ui8 LoadByteUnaligned(const ui8* bitmap, size_t bitmapOffset) { + size_t byteOffset = bitmapOffset >> 3; + ui8 bit = ui8(bitmapOffset & 7u); + + ui8 first = bitmap[byteOffset]; + // extend to ui32 to avoid left UB in case of left shift of byte by 8 bit + ui32 second = bitmap[byteOffset + (bit != 0)]; + + return (first >> bit) | ui8(second << (8 - bit)); +} + +inline ui8 CompressByte(ui8 x, ui8 m) { + // algorithm 7-4 (Compress or Generalized Extract) from second edition of "Hacker's Delight" (single byte version) + // compresses bits from x according to mask m + // X: 01101011 + // M: 00110010 + // MASKED VALUES: --10--1- + // RESULT: 00000101 + // TODO: should be replaced by PEXT instruction from BMI2 instruction set + ui8 mk, mp, mv, t; + x = x & m; // Clear irrelevant bits. + mk = ~m << 1; // We will count 0's to right. + for (ui8 i = 0; i < 3; i++) { + mp = mk ^ (mk << 1); // Parallel suffix. + mp = mp ^ (mp << 2); + mp = mp ^ (mp << 4); + mv = mp & m; // Bits to move. + m = m ^ mv | (mv >> (1 << i)); // Compress m. + t = x & mv; + x = x ^ t | (t >> (1 << i)); // Compress x. + mk = mk & ~mp; + } + return x; +} + +inline ui8 PopCountByte(ui8 value) { + static constexpr uint8_t bytePopCounts[] = { + 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, + 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, + 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, + 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, + 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, + 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}; + return bytePopCounts[value]; +} + +inline size_t CompressBitmap(const ui8* src, size_t srcOffset, + const ui8* bitmap, size_t bitmapOffset, ui8* dst, size_t dstOffset, size_t count) +{ + // TODO: 1) aligned version (srcOffset % 8 == 0), (srcOffset % 8 == 0) + // 2) 64 bit processing (instead of 8) + ui8* target = dst + (dstOffset >> 3); + ui8 state = *target; + ui8 stateBits = dstOffset & 7u; + state &= (ui32(1) << stateBits) - 1u; + while (count) { + ui8 srcByte = LoadByteUnaligned(src, srcOffset); + ui8 bitmapByte = LoadByteUnaligned(bitmap, bitmapOffset); + + // zero all bits outside of input range + bitmapByte &= ui8(0xff) >> ui8(SaturationSub(8u, count)); + + ui8 compressed = CompressByte(srcByte, bitmapByte); + ui8 compressedBits = PopCountByte(bitmapByte); + + state |= (compressed << stateBits); + *target = state; + stateBits += compressedBits; + target += (stateBits >> 3); + + ui8 overflowState = ui32(compressed) >> (compressedBits - SaturationSub(stateBits, 8)); + + ui8 mask = (stateBits >> 3) - 1; + state = (state & mask) | (overflowState & ~mask); + + stateBits &= 7; + dstOffset += compressedBits; + srcOffset += 8; + bitmapOffset += 8; + count = SaturationSub(count, 8u); + } + *target = state; + return dstOffset; +} + +template<typename T> +inline T* CompressArray(const T* src, const ui8* bitmap, size_t bitmapOffset, T* dst, size_t count) { + while (count--) { + *dst = *src++; + dst += (bitmap[bitmapOffset >> 3] >> ui8(bitmapOffset & 7u)) & 1u; + bitmapOffset++; + } + return dst; +} + +} // namespace NMiniKQL +} // namespace NKikimr
\ No newline at end of file diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp new file mode 100644 index 00000000000..933963e3c39 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp @@ -0,0 +1,492 @@ +#include "mkql_block_compress.h" +#include "mkql_bit_utils.h" + +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_node_builder.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> + +#include <arrow/util/bitmap.h> +#include <arrow/util/bit_util.h> +#include <arrow/array/array_primitive.h> + +#include <util/generic/size_literals.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +constexpr size_t MaxBlockSizeInBytes = 1_MB; + +class TCompressWithScalarBitmap : public TStatefulWideFlowComputationNode<TCompressWithScalarBitmap> { +public: + TCompressWithScalarBitmap(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width) + : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + , Flow_(flow) + , BitmapIndex_(bitmapIndex) + , Width_(width) + { + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const + { + auto& s = GetState(state, ctx); + if (s.SkipAll_.Defined() && *s.SkipAll_) { + return EFetchResult::Finish; + } + + for (ui32 i = 0, outIndex = 0; i < Width_; ++i) { + if (i != BitmapIndex_) { + s.ValuePointers_[i] = output[outIndex++]; + } + } + + EFetchResult result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); + if (result == EFetchResult::One) { + bool bitmapValue = TArrowBlock::From(s.Bitmap_).GetDatum().scalar_as<arrow::BooleanScalar>().value; + if (!s.SkipAll_.Defined()) { + s.SkipAll_ = !bitmapValue; + } else { + Y_VERIFY(bitmapValue != *s.SkipAll_); + } + if (*s.SkipAll_) { + result = EFetchResult::Finish; + } + } + return result; + } +private: + struct TState : public TComputationValue<TState> { + TVector<NUdf::TUnboxedValue*> ValuePointers_; + NUdf::TUnboxedValue Bitmap_; + TMaybe<bool> SkipAll_; + + TState(TMemoryUsageInfo* memInfo, ui32 width, ui32 bitmapIndex) + : TComputationValue(memInfo) + , ValuePointers_(width, nullptr) + { + ValuePointers_[bitmapIndex] = &Bitmap_; + } + }; + +private: + void RegisterDependencies() const final { + FlowDependsOn(Flow_); + } + + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(Width_, BitmapIndex_); + } + return *static_cast<TState*>(state.AsBoxed().Get()); + } + +private: + IComputationWideFlowNode* Flow_; + const ui32 BitmapIndex_; + const ui32 Width_; +}; + +class TCompressScalars : public TStatefulWideFlowComputationNode<TCompressScalars> { +public: + TCompressScalars(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width) + : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + , Flow_(flow) + , BitmapIndex_(bitmapIndex) + , Width_(width) + { + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const + { + auto& s = GetState(state, ctx); + + for (ui32 i = 0, outIndex = 0; i < Width_; ++i) { + if (i != BitmapIndex_) { + s.ValuePointers_[i] = output[outIndex++]; + } + } + + EFetchResult result; + for (;;) { + result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); + if (result != EFetchResult::One) { + break; + } + + arrow::BooleanArray arr(TArrowBlock::From(s.Bitmap_).GetDatum().array()); + ui64 popCount = (ui64)arr.true_count(); + if (popCount != 0) { + MKQL_ENSURE(output[Width_ - 2], "Block size should not be marked as unused"); + *output[Width_ - 2] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(popCount))); + break; + } + } + return result; + } +private: + struct TState : public TComputationValue<TState> { + TVector<NUdf::TUnboxedValue*> ValuePointers_; + NUdf::TUnboxedValue Bitmap_; + + TState(TMemoryUsageInfo* memInfo, ui32 width, ui32 bitmapIndex) + : TComputationValue(memInfo) + , ValuePointers_(width) + { + ValuePointers_[bitmapIndex] = &Bitmap_; + } + }; + +private: + void RegisterDependencies() const final { + FlowDependsOn(Flow_); + } + + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(Width_, BitmapIndex_); + } + return *static_cast<TState*>(state.AsBoxed().Get()); + } + +private: + IComputationWideFlowNode* Flow_; + const ui32 BitmapIndex_; + const ui32 Width_; +}; + +struct TBlockCompressDescriptor { + NUdf::EDataSlot Slot_; + bool IsOptional_; + std::shared_ptr<arrow::DataType> ArrowType_; + bool IsScalar_; + TMaybe<size_t> BitWidth_; // only set for arrays with fixed-width data +}; + +class TCompressBlocks : public TStatefulWideFlowComputationNode<TCompressBlocks> { +public: + TCompressBlocks(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, TVector<TBlockCompressDescriptor>&& descriptors) + : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + , Flow_(flow) + , BitmapIndex_(bitmapIndex) + , Descriptors_(std::move(descriptors)) + , Width_(Descriptors_.size()) + , DesiredLen_(CalcDesiredLen(Descriptors_, BitmapIndex_)) + { + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const + { + auto& s = GetState(state, ctx, output); + + EFetchResult result = EFetchResult::One; + for (;;) { + if (!s.InputPopCount_) { + result = s.Finish_ ? EFetchResult::Finish : Flow_->FetchValues(ctx, s.ValuePointers_.data()); + if (result != EFetchResult::One) { + break; + } + auto& bitmap = s.InputValues_[BitmapIndex_]; + arrow::BooleanArray arr(TArrowBlock::From(bitmap).GetDatum().array()); + s.InputSize_ = (size_t)arr.length(); + s.InputPopCount_ = (size_t)arr.true_count(); + continue; + } + + if (!s.HaveBlocks_) { + // client is not interested in any block columns + Y_VERIFY_DEBUG(s.OutputPos_ == 0); + Y_VERIFY_DEBUG(s.OutputSize_ == 0); + WriteOutputBlockSize(s.InputPopCount_, ctx, output); + s.InputSize_ = s.InputPopCount_ = 0; + break; + } + + if (s.OutputSize_ == 0) { + AllocateBuffers(s, ctx, output); + } + + if (s.InputPopCount_ + s.OutputPos_ <= s.OutputSize_) { + AppendInput(s, ctx, output); + } else { + FlushBuffers(s, ctx, output); + break; + } + } + + if (result == EFetchResult::Finish) { + s.Finish_ = true; + if (s.OutputPos_ > 0) { + FlushBuffers(s, ctx, output); + result = EFetchResult::One; + } + } + return result; + } +private: + struct TOutputBuffers { + std::shared_ptr<arrow::Buffer> Data_; + std::shared_ptr<arrow::Buffer> Nulls_; + size_t MaxObjCount_ = 0; + + static std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, TComputationContext& ctx) { + // align up to 64 bit + bitCount = (bitCount + 63u) & ~size_t(63u); + // this simplifies code compression code - we can write single 64 bit word after array boundaries + bitCount += 64; + return ARROW_RESULT(arrow::AllocateBitmap(bitCount, &ctx.ArrowMemoryPool)); + } + + static std::shared_ptr<arrow::Buffer> AllocateBufferWithReserve(size_t objCount, size_t objSize, TComputationContext& ctx) { + // this simplifies code compression code - we can write single object after array boundaries + return ARROW_RESULT(arrow::AllocateBuffer((objCount + 1) * objSize, &ctx.ArrowMemoryPool)); + } + + void Allocate(size_t count, size_t size, TComputationContext& ctx) { + Y_VERIFY_DEBUG(!Data_); + Y_VERIFY_DEBUG(!Nulls_); + Y_VERIFY_DEBUG(!MaxObjCount_); + MaxObjCount_ = count; + Data_ = (size == 0) ? AllocateBitmapWithReserve(count, ctx) : AllocateBufferWithReserve(count, size, ctx); + // Nulls_ allocation will be delayed until actual nulls are encountered + } + + NUdf::TUnboxedValue PullUnboxedValue(std::shared_ptr<arrow::DataType> type, size_t length, TComputationContext& ctx) { + auto arrayData = arrow::ArrayData::Make(type, length, { Nulls_, Data_ }); + Data_.reset(); + Nulls_.reset(); + MaxObjCount_ = 0; + return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(arrayData)); + } + }; + + struct TState : public TComputationValue<TState> { + TVector<NUdf::TUnboxedValue*> ValuePointers_; + TVector<NUdf::TUnboxedValue> InputValues_; + size_t InputPopCount_ = 0; + size_t InputSize_ = 0; + + TVector<TOutputBuffers> OutputBuffers_; + size_t OutputPos_ = 0; + size_t OutputSize_ = 0; + + bool Finish_ = false; + bool HaveBlocks_ = false; + + TState(TMemoryUsageInfo* memInfo, ui32 width, ui32 bitmapIndex, NUdf::TUnboxedValue*const* output, + const TVector<TBlockCompressDescriptor>& descriptors) + : TComputationValue(memInfo) + , ValuePointers_(width) + , InputValues_(width) + , OutputBuffers_(width) + { + for (ui32 i = 0, outIndex = 0; i < width; ++i) { + ValuePointers_[i] = &InputValues_[i]; + if (i != bitmapIndex) { + HaveBlocks_ = HaveBlocks_ || (!descriptors[i].IsScalar_ && output[outIndex] != nullptr); + outIndex++; + } + } + } + }; + + void RegisterDependencies() const final { + FlowDependsOn(Flow_); + } + + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(Width_, BitmapIndex_, output, Descriptors_); + } + return *static_cast<TState*>(state.AsBoxed().Get()); + } + + static size_t CalcDesiredLen(const TVector<TBlockCompressDescriptor>& descriptors, ui32 bitmapIndex) { + size_t result = std::numeric_limits<size_t>::max(); + for (ui32 i = 0; i < descriptors.size(); ++i) { + if (i != bitmapIndex && !descriptors[i].IsScalar_) { + size_t len = 8 * MaxBlockSizeInBytes / *descriptors[i].BitWidth_; + result = std::min(result, len); + } + } + + MKQL_ENSURE(result != std::numeric_limits<size_t>::max(), "Missing block columns - TCompressScalars should be used"); + return result; + } + + void WriteOutputBlockSize(size_t size, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + MKQL_ENSURE(output[Width_ - 2], "Block size should not be marked as unused"); + *output[Width_ - 2] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(size))); + } + + void AllocateBuffers(TState& s, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + Y_VERIFY_DEBUG(s.OutputSize_ == 0); + Y_VERIFY_DEBUG(s.OutputPos_ == 0); + Y_VERIFY_DEBUG(s.InputPopCount_ > 0); + const size_t count = std::max(s.InputPopCount_, DesiredLen_); + for (ui32 i = 0, outIndex = 0; i < Width_; ++i) { + auto& desc = Descriptors_[i]; + if (i != BitmapIndex_ && !desc.IsScalar_ && output[outIndex]) { + auto& buffers = s.OutputBuffers_[i]; + size_t size = (*desc.BitWidth_ == 1) ? 0 : (size_t)arrow::BitUtil::BytesForBits(*desc.BitWidth_); + buffers.Allocate(count, size, ctx); + } + if (i != BitmapIndex_) { + outIndex++; + } + } + s.OutputSize_ = count; + } + + void FlushBuffers(TState& s, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + Y_VERIFY_DEBUG(s.OutputPos_ > 0); + for (ui32 i = 0, outIndex = 0; i < Width_; ++i) { + auto& desc = Descriptors_[i]; + if (i != BitmapIndex_ && output[outIndex]) { + *output[outIndex] = desc.IsScalar_ ? s.InputValues_[i] : s.OutputBuffers_[i].PullUnboxedValue(desc.ArrowType_, s.OutputPos_, ctx); + } + if (i != BitmapIndex_) { + outIndex++; + } + } + + WriteOutputBlockSize(s.OutputPos_, ctx, output); + s.OutputPos_ = s.OutputSize_ = 0; + } + + void AppendInput(TState& s, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + Y_VERIFY_DEBUG(s.InputPopCount_ > 0); + Y_VERIFY_DEBUG(s.InputSize_ >= s.InputPopCount_); + + auto bitmap = TArrowBlock::From(s.InputValues_[BitmapIndex_]).GetDatum().array(); + for (ui32 i = 0, outIndex = 0; i < Width_; ++i) { + auto& desc = Descriptors_[i]; + if (i != BitmapIndex_ && !desc.IsScalar_ && output[outIndex]) { + auto array = TArrowBlock::From(s.InputValues_[i]).GetDatum().array(); + Y_VERIFY_DEBUG(array->length == s.InputSize_); + AppendArray(array, bitmap, s.OutputBuffers_[i], s.OutputPos_, Descriptors_[i].Slot_, ctx); + } + + if (i != BitmapIndex_) { + outIndex++; + } + } + + s.OutputPos_ += s.InputPopCount_; + s.InputSize_ = s.InputPopCount_ = 0; + } + + void AppendArray(const std::shared_ptr<arrow::ArrayData>& src, const std::shared_ptr<arrow::ArrayData>& bitmap, + TOutputBuffers& dst, size_t dstPos, NUdf::EDataSlot slot, TComputationContext& ctx) const + { + const ui8* bitmapData = bitmap->GetValues<ui8>(1, 0); + const size_t bitmapOffset = bitmap->offset; + Y_VERIFY_DEBUG(bitmap->length == src->length); + const size_t length = src->length; + ui8* mutDstBase = dst.Data_->mutable_data(); + + switch (slot) { + case NUdf::EDataSlot::Bool: + // special handling for bools + CompressBitmap(src->GetValues<ui8>(1, 0), src->offset, bitmapData, bitmapOffset, mutDstBase, dstPos, length); break; + case NUdf::EDataSlot::Int8: + CompressArray(src->GetValues<i8>(1), bitmapData, bitmapOffset, (i8*)mutDstBase + dstPos, length); break; + case NUdf::EDataSlot::Uint8: + CompressArray(src->GetValues<ui8>(1), bitmapData, bitmapOffset, (ui8*)mutDstBase + dstPos, length); break; + case NUdf::EDataSlot::Int16: + CompressArray(src->GetValues<i16>(1), bitmapData, bitmapOffset, (i16*)mutDstBase + dstPos, length); break; + case NUdf::EDataSlot::Uint16: + CompressArray(src->GetValues<ui16>(1), bitmapData, bitmapOffset, (ui16*)mutDstBase + dstPos, length); break; + case NUdf::EDataSlot::Int32: + CompressArray(src->GetValues<i32>(1), bitmapData, bitmapOffset, (i32*)mutDstBase + dstPos, length); break; + case NUdf::EDataSlot::Uint32: + CompressArray(src->GetValues<ui32>(1), bitmapData, bitmapOffset, (ui32*)mutDstBase + dstPos, length); break; + case NUdf::EDataSlot::Int64: + CompressArray(src->GetValues<i64>(1), bitmapData, bitmapOffset, (i64*)mutDstBase + dstPos, length); break; + case NUdf::EDataSlot::Uint64: + CompressArray(src->GetValues<ui64>(1), bitmapData, bitmapOffset, (ui64*)mutDstBase + dstPos, length); break; + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } + + if (src->GetNullCount()) { + if (!dst.Nulls_) { + dst.Nulls_ = TOutputBuffers::AllocateBitmapWithReserve(dst.MaxObjCount_, ctx); + arrow::BitUtil::SetBitsTo(dst.Nulls_->mutable_data(), 0, dstPos, false); + } + CompressBitmap(src->GetValues<ui8>(0, 0), src->offset, bitmapData, bitmapOffset, + dst.Nulls_->mutable_data(), dstPos, length); + } + } + +private: + IComputationWideFlowNode* Flow_; + const ui32 BitmapIndex_; + const TVector<TBlockCompressDescriptor> Descriptors_; + const ui32 Width_; + const size_t DesiredLen_; +}; + +} // namespace + +IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args, got " << callable.GetInputsCount()); + + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const ui32 width = tupleType->GetElementsCount(); + MKQL_ENSURE(width > 1, "Expected at least two columns"); + + const auto indexData = AS_VALUE(TDataLiteral, callable.GetInput(1U)); + const auto index = indexData->AsValue().Get<ui32>(); + MKQL_ENSURE(index < width - 1, "Bad bitmap index"); + + TVector<TBlockCompressDescriptor> descriptors; + bool bitmapIsScalar = false; + bool allScalars = true; + for (ui32 i = 0; i < width; ++i) { + descriptors.emplace_back(); + auto& descriptor = descriptors.back(); + + const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i)); + TDataType* unpacked = UnpackOptionalData(blockType->GetItemType(), descriptor.IsOptional_); + descriptor.Slot_ = *unpacked->GetDataSlot(); + + bool isOptional; + MKQL_ENSURE(ConvertArrowType(unpacked, isOptional, descriptor.ArrowType_), "Unsupported type"); + + descriptor.IsScalar_ = blockType->GetShape() == TBlockType::EShape::Scalar; + if (i == width - 1) { + MKQL_ENSURE(descriptor.IsScalar_, "Expecting scalar block size as last column"); + MKQL_ENSURE(!descriptor.IsOptional_ && descriptor.Slot_ == NUdf::EDataSlot::Uint64, "Expecting Uint64 as last column"); + } else if (i == index) { + MKQL_ENSURE(!descriptor.IsOptional_ && descriptor.Slot_ == NUdf::EDataSlot::Bool, "Expecting Bool as bitmap column"); + bitmapIsScalar = descriptor.IsScalar_; + } else { + allScalars = allScalars && descriptor.IsScalar_; + } + + if (!descriptor.IsScalar_) { + const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(descriptor.ArrowType_.get()); + MKQL_ENSURE(fixedType, "Only fixed width types are currently supported"); + descriptor.BitWidth_ = (size_t)fixedType->bit_width(); + } + } + + auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + + if (bitmapIsScalar) { + return new TCompressWithScalarBitmap(ctx.Mutables, wideFlow, index, width); + } else if (allScalars) { + return new TCompressScalars(ctx.Mutables, wideFlow, index, width); + } + + return new TCompressBlocks(ctx.Mutables, wideFlow, index, std::move(descriptors)); +} + + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.h new file mode 100644 index 00000000000..13b76a243d9 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.h @@ -0,0 +1,11 @@ +#pragma once + +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index 8cd9773020c..7b021e30798 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -7,6 +7,7 @@ #include "mkql_block_func.h" #include "mkql_blocks.h" #include "mkql_block_agg.h" +#include "mkql_block_compress.h" #include "mkql_block_skiptake.h" #include "mkql_callable.h" #include "mkql_chain_map.h" @@ -272,6 +273,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"WideSkipBlocks", &WrapWideSkipBlocks}, {"WideTakeBlocks", &WrapWideTakeBlocks}, {"AsScalar", &WrapAsScalar}, + {"BlockCompress", &WrapBlockCompress}, {"BlockCombineAll", &WrapBlockCombineAll}, {"MakeHeap", &WrapMakeHeap}, {"PushHeap", &WrapPushHeap}, diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp new file mode 100644 index 00000000000..919ee317a3b --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp @@ -0,0 +1,67 @@ +#include "mkql_computation_node_ut.h" + +#include <ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +ui8 NaiveCompressByte(ui8 value, ui8 mask) { + ui8 result = 0; + ui8 outPos = 0; + for (ui8 i = 0; i < 8; ++i) { + if (mask & (1 << i)) { + ui8 bit = (value & (1 << i)) != 0; + result |= (bit << outPos); + ++outPos; + } + } + return result; +} + +} // namespace + + +Y_UNIT_TEST_SUITE(TMiniKQLBitUtilsTest) { +Y_UNIT_TEST(TestCompressByte) { + for (size_t value = 0; value < 256; ++value) { + for (size_t mask = 0; mask < 256; ++mask) { + UNIT_ASSERT_EQUAL(NaiveCompressByte(value, mask), CompressByte(value, mask)); + } + } +} + +Y_UNIT_TEST(TestLoad) { + const ui8 src[] = {0b01110100, 0b11011101, 0b01101011}; + UNIT_ASSERT_EQUAL(LoadByteUnaligned(src, 10), 0b11110111); + UNIT_ASSERT_EQUAL(LoadByteUnaligned(src, 16), 0b01101011); +} + +Y_UNIT_TEST(CompressAligned) { + const ui8 data[] = {0b01110100, 0b11011101, 0b01101011}; + const ui8 mask[] = {0b11101100, 0b10111010, 0b10001111}; + ui8 result[100]; + auto res = CompressBitmap(data, 0, mask, 0, result, 0, 24); + UNIT_ASSERT_EQUAL(res, 15); + UNIT_ASSERT_EQUAL(result[0], 0b11001101); + UNIT_ASSERT_EQUAL(result[1] & 0x7fu, 0b00101110); +} + +Y_UNIT_TEST(CompressUnalignedOutput) { + const ui8 data[] = {0b01110100, 0b11011101, 0b01101011}; + const ui8 mask[] = {0b11101100, 0b10111010, 0b10001111}; + ui8 result[100]; + result[0] = 0b101; + auto res = CompressBitmap(data, 0, mask, 0, result, 3, 24); + UNIT_ASSERT_EQUAL(res, 18); + UNIT_ASSERT_EQUAL(result[0], 0b01101101); + UNIT_ASSERT_EQUAL(result[1], 0b01110110); + UNIT_ASSERT_EQUAL(result[2] & 0x3, 0b01); +} + +} + +} // namespace NMiniKQL +} // namespace NKikimr + diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp new file mode 100644 index 00000000000..2c3a8896f59 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp @@ -0,0 +1,59 @@ +#include "mkql_computation_node_ut.h" + +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> + +namespace NKikimr { +namespace NMiniKQL { +Y_UNIT_TEST_SUITE(TMiniKQLBlockCompressTest) { +Y_UNIT_TEST(CompressBasic) { + TSetup<false> setup; + auto& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto boolType = pb.NewDataType(NUdf::TDataType<bool>::Id); + const auto tupleType = pb.NewTupleType({boolType, ui64Type, boolType}); + + const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<bool>(true)}); + const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(true), pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<bool>(false)}); + const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<bool>(true)}); + const auto data4 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(4), pb.NewDataLiteral<bool>(true)}); + const auto data5 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(true), pb.NewDataLiteral<ui64>(5), pb.NewDataLiteral<bool>(false)}); + const auto data6 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(true), pb.NewDataLiteral<ui64>(6), pb.NewDataLiteral<bool>(true)}); + const auto data7 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(7), pb.NewDataLiteral<bool>(true)}); + + const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7}); + const auto flow = pb.ToFlow(list); + + const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { + return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; + }); + const auto compressedFlow = pb.WideFromBlocks(pb.BlockCompress(pb.WideToBlocks(wideFlow), 0)); + const auto narrowFlow = pb.NarrowMap(compressedFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { + return pb.NewTuple({items[0], items[1]}); + }); + + const auto pgmReturn = pb.ForwardList(narrowFlow); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.GetElement(0).Get<ui64>(), 2); + UNIT_ASSERT_VALUES_EQUAL(item.GetElement(1).Get<bool>(), false); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.GetElement(0).Get<ui64>(), 5); + UNIT_ASSERT_VALUES_EQUAL(item.GetElement(1).Get<bool>(), false); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.GetElement(0).Get<ui64>(), 6); + UNIT_ASSERT_VALUES_EQUAL(item.GetElement(1).Get<bool>(), true); + + UNIT_ASSERT(!iterator.Next(item)); +} + +} + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 859aa2b8597..70678989f32 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1489,6 +1489,30 @@ TRuntimeNode TProgramBuilder::AsScalar(TRuntimeNode value) { return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::BlockCompress(TRuntimeNode flow, ui32 bitmapIndex) { + auto blockItemTypes = ValidateBlockFlowType(flow.GetStaticType()); + + MKQL_ENSURE(blockItemTypes.size() >= 2, "Expected at least two input columns"); + MKQL_ENSURE(bitmapIndex < blockItemTypes.size() - 1, "Invalid bitmap index"); + MKQL_ENSURE(AS_TYPE(TDataType, blockItemTypes[bitmapIndex])->GetSchemeType() == NUdf::TDataType<bool>::Id, "Expected Bool as bitmap column type"); + + + const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + MKQL_ENSURE(inputTupleType->GetElementsCount() == blockItemTypes.size(), "Unexpected tuple size"); + std::vector<TType*> flowItems; + for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) { + if (i == bitmapIndex) { + continue; + } + flowItems.push_back(inputTupleType->GetElementType(i)); + } + + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(flowItems))); + callableBuilder.Add(flow); + callableBuilder.Add(NewDataLiteral<ui32>(bitmapIndex)); + return TRuntimeNode(callableBuilder.Build(), false); +} + TRuntimeNode TProgramBuilder::ListFromRange(TRuntimeNode start, TRuntimeNode end, TRuntimeNode step) { MKQL_ENSURE(start.GetStaticType()->IsData(), "Expected data"); MKQL_ENSURE(end.GetStaticType()->IsSameType(*start.GetStaticType()), "Mismatch type"); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 00f690d4397..4d30a51f101 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -246,6 +246,7 @@ public: TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count); TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count); TRuntimeNode AsScalar(TRuntimeNode value); + TRuntimeNode BlockCompress(TRuntimeNode flow, ui32 bitmapIndex); TRuntimeNode BlockFunc(const std::string_view& funcName, TType* returnType, const TArrayRef<const TRuntimeNode>& args); TRuntimeNode BlockBitCast(TRuntimeNode value, TType* targetType); diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 24177549560..2f47985e669 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -2421,6 +2421,12 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.BlockCombineAll(arg, countColumn, filterColumn, aggs, returnType); }); + AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto flow = MkqlBuildExpr(node.Head(), ctx); + const auto index = FromString<ui32>(node.Child(1)->Content()); + return ctx.ProgramBuilder.BlockCompress(flow, index); + }); + AddCallable("PgArray", [](const TExprNode& node, TMkqlBuildContext& ctx) { std::vector<TRuntimeNode> args; args.reserve(node.ChildrenSize()); |