summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2022-11-15 12:01:45 +0300
committeraneporada <[email protected]>2022-11-15 12:01:45 +0300
commit660fa18bb981ed3fa5b10b6ec2d9248d80e33f87 (patch)
tree082309d424b2ff5abab3b6a1b40765f9bb36899e
parent701a5107da136849169c2168f1aecee07500b711 (diff)
Implement BlockCompress
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp48
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.h1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h114
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp492
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_compress.h11
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp67
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp59
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp24
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h1
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp6
13 files changed, 827 insertions, 0 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index 384a8aac516..b9867b9bac7 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -26,6 +26,54 @@ IGraphTransformer::TStatus AsScalarWrapper(const TExprNode::TPtr& input, TExprNo
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 2U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType blockItemTypes;
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (blockItemTypes.size() < 2) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() << "Expected at least two columns, got " << blockItemTypes.size()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ ui32 index = 0;
+ if (!TryFromString(input->Child(1)->Content(), index)) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()),
+ TStringBuilder() << "Failed to convert to integer: " << input->Child(1)->Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (index >= blockItemTypes.size() - 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()),
+ TStringBuilder() << "Index out of range. Index: " << index << ", maximum is: " << blockItemTypes.size() - 1));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto bitmapType = blockItemTypes[index];
+ if (bitmapType->GetKind() != ETypeAnnotationKind::Data || bitmapType->Cast<TDataExprType>()->GetSlot() != NUdf::EDataSlot::Bool) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()),
+ TStringBuilder() << "Expecting Bool as bitmap column type, but got: " << *bitmapType));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto flowItemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->Cast<TMultiExprType>()->GetItems();
+ flowItemTypes.erase(flowItemTypes.begin() + index);
+
+ auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(flowItemTypes);
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
Y_UNUSED(output);
if (!EnsureMinArgsCount(*input, 1U, ctx.Expr)) {
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h
index bc677ba5d16..613a8febfeb 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.h
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h
@@ -9,6 +9,7 @@ namespace NYql {
namespace NTypeAnnImpl {
IGraphTransformer::TStatus AsScalarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 2ae4f45ece3..2c7060bee6e 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -11775,6 +11775,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["WideSkipBlocks"] = &WideSkipTakeBlocksWrapper;
Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper;
Functions["AsScalar"] = &AsScalarWrapper;
+ Functions["BlockCompress"] = &BlockCompressWrapper;
ExtFunctions["BlockFunc"] = &BlockFuncWrapper;
ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper;
ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper;
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
index 3e1f4f514be..5ca69764ccb 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
@@ -39,6 +39,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h b/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h
new file mode 100644
index 00000000000..35df9506631
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h
@@ -0,0 +1,114 @@
+#pragma once
+#include <util/system/types.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+inline ui64 SaturationSub(ui64 x, ui64 y) {
+ // simple code produces cmov (same result as commented one)
+ return (x > y) ? x - y : 0;
+ //ui64 res = x - y;
+ //res &= -(res <= x);
+ //return res;
+}
+
+inline ui8 LoadByteUnaligned(const ui8* bitmap, size_t bitmapOffset) {
+ size_t byteOffset = bitmapOffset >> 3;
+ ui8 bit = ui8(bitmapOffset & 7u);
+
+ ui8 first = bitmap[byteOffset];
+ // extend to ui32 to avoid left UB in case of left shift of byte by 8 bit
+ ui32 second = bitmap[byteOffset + (bit != 0)];
+
+ return (first >> bit) | ui8(second << (8 - bit));
+}
+
+inline ui8 CompressByte(ui8 x, ui8 m) {
+ // algorithm 7-4 (Compress or Generalized Extract) from second edition of "Hacker's Delight" (single byte version)
+ // compresses bits from x according to mask m
+ // X: 01101011
+ // M: 00110010
+ // MASKED VALUES: --10--1-
+ // RESULT: 00000101
+ // TODO: should be replaced by PEXT instruction from BMI2 instruction set
+ ui8 mk, mp, mv, t;
+ x = x & m; // Clear irrelevant bits.
+ mk = ~m << 1; // We will count 0's to right.
+ for (ui8 i = 0; i < 3; i++) {
+ mp = mk ^ (mk << 1); // Parallel suffix.
+ mp = mp ^ (mp << 2);
+ mp = mp ^ (mp << 4);
+ mv = mp & m; // Bits to move.
+ m = m ^ mv | (mv >> (1 << i)); // Compress m.
+ t = x & mv;
+ x = x ^ t | (t >> (1 << i)); // Compress x.
+ mk = mk & ~mp;
+ }
+ return x;
+}
+
+inline ui8 PopCountByte(ui8 value) {
+ static constexpr uint8_t bytePopCounts[] = {
+ 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3,
+ 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4,
+ 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4,
+ 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5,
+ 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2,
+ 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5,
+ 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4,
+ 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6,
+ 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8};
+ return bytePopCounts[value];
+}
+
+inline size_t CompressBitmap(const ui8* src, size_t srcOffset,
+ const ui8* bitmap, size_t bitmapOffset, ui8* dst, size_t dstOffset, size_t count)
+{
+ // TODO: 1) aligned version (srcOffset % 8 == 0), (srcOffset % 8 == 0)
+ // 2) 64 bit processing (instead of 8)
+ ui8* target = dst + (dstOffset >> 3);
+ ui8 state = *target;
+ ui8 stateBits = dstOffset & 7u;
+ state &= (ui32(1) << stateBits) - 1u;
+ while (count) {
+ ui8 srcByte = LoadByteUnaligned(src, srcOffset);
+ ui8 bitmapByte = LoadByteUnaligned(bitmap, bitmapOffset);
+
+ // zero all bits outside of input range
+ bitmapByte &= ui8(0xff) >> ui8(SaturationSub(8u, count));
+
+ ui8 compressed = CompressByte(srcByte, bitmapByte);
+ ui8 compressedBits = PopCountByte(bitmapByte);
+
+ state |= (compressed << stateBits);
+ *target = state;
+ stateBits += compressedBits;
+ target += (stateBits >> 3);
+
+ ui8 overflowState = ui32(compressed) >> (compressedBits - SaturationSub(stateBits, 8));
+
+ ui8 mask = (stateBits >> 3) - 1;
+ state = (state & mask) | (overflowState & ~mask);
+
+ stateBits &= 7;
+ dstOffset += compressedBits;
+ srcOffset += 8;
+ bitmapOffset += 8;
+ count = SaturationSub(count, 8u);
+ }
+ *target = state;
+ return dstOffset;
+}
+
+template<typename T>
+inline T* CompressArray(const T* src, const ui8* bitmap, size_t bitmapOffset, T* dst, size_t count) {
+ while (count--) {
+ *dst = *src++;
+ dst += (bitmap[bitmapOffset >> 3] >> ui8(bitmapOffset & 7u)) & 1u;
+ bitmapOffset++;
+ }
+ return dst;
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr \ No newline at end of file
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
new file mode 100644
index 00000000000..933963e3c39
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
@@ -0,0 +1,492 @@
+#include "mkql_block_compress.h"
+#include "mkql_bit_utils.h"
+
+#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+#include <ydb/library/yql/minikql/mkql_type_builder.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+
+#include <arrow/util/bitmap.h>
+#include <arrow/util/bit_util.h>
+#include <arrow/array/array_primitive.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+constexpr size_t MaxBlockSizeInBytes = 1_MB;
+
+class TCompressWithScalarBitmap : public TStatefulWideFlowComputationNode<TCompressWithScalarBitmap> {
+public:
+ TCompressWithScalarBitmap(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width)
+ : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ , Flow_(flow)
+ , BitmapIndex_(bitmapIndex)
+ , Width_(width)
+ {
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const
+ {
+ auto& s = GetState(state, ctx);
+ if (s.SkipAll_.Defined() && *s.SkipAll_) {
+ return EFetchResult::Finish;
+ }
+
+ for (ui32 i = 0, outIndex = 0; i < Width_; ++i) {
+ if (i != BitmapIndex_) {
+ s.ValuePointers_[i] = output[outIndex++];
+ }
+ }
+
+ EFetchResult result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
+ if (result == EFetchResult::One) {
+ bool bitmapValue = TArrowBlock::From(s.Bitmap_).GetDatum().scalar_as<arrow::BooleanScalar>().value;
+ if (!s.SkipAll_.Defined()) {
+ s.SkipAll_ = !bitmapValue;
+ } else {
+ Y_VERIFY(bitmapValue != *s.SkipAll_);
+ }
+ if (*s.SkipAll_) {
+ result = EFetchResult::Finish;
+ }
+ }
+ return result;
+ }
+private:
+ struct TState : public TComputationValue<TState> {
+ TVector<NUdf::TUnboxedValue*> ValuePointers_;
+ NUdf::TUnboxedValue Bitmap_;
+ TMaybe<bool> SkipAll_;
+
+ TState(TMemoryUsageInfo* memInfo, ui32 width, ui32 bitmapIndex)
+ : TComputationValue(memInfo)
+ , ValuePointers_(width, nullptr)
+ {
+ ValuePointers_[bitmapIndex] = &Bitmap_;
+ }
+ };
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
+ }
+
+ TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(Width_, BitmapIndex_);
+ }
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+private:
+ IComputationWideFlowNode* Flow_;
+ const ui32 BitmapIndex_;
+ const ui32 Width_;
+};
+
+class TCompressScalars : public TStatefulWideFlowComputationNode<TCompressScalars> {
+public:
+ TCompressScalars(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width)
+ : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ , Flow_(flow)
+ , BitmapIndex_(bitmapIndex)
+ , Width_(width)
+ {
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const
+ {
+ auto& s = GetState(state, ctx);
+
+ for (ui32 i = 0, outIndex = 0; i < Width_; ++i) {
+ if (i != BitmapIndex_) {
+ s.ValuePointers_[i] = output[outIndex++];
+ }
+ }
+
+ EFetchResult result;
+ for (;;) {
+ result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
+ if (result != EFetchResult::One) {
+ break;
+ }
+
+ arrow::BooleanArray arr(TArrowBlock::From(s.Bitmap_).GetDatum().array());
+ ui64 popCount = (ui64)arr.true_count();
+ if (popCount != 0) {
+ MKQL_ENSURE(output[Width_ - 2], "Block size should not be marked as unused");
+ *output[Width_ - 2] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(popCount)));
+ break;
+ }
+ }
+ return result;
+ }
+private:
+ struct TState : public TComputationValue<TState> {
+ TVector<NUdf::TUnboxedValue*> ValuePointers_;
+ NUdf::TUnboxedValue Bitmap_;
+
+ TState(TMemoryUsageInfo* memInfo, ui32 width, ui32 bitmapIndex)
+ : TComputationValue(memInfo)
+ , ValuePointers_(width)
+ {
+ ValuePointers_[bitmapIndex] = &Bitmap_;
+ }
+ };
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
+ }
+
+ TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(Width_, BitmapIndex_);
+ }
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+private:
+ IComputationWideFlowNode* Flow_;
+ const ui32 BitmapIndex_;
+ const ui32 Width_;
+};
+
+struct TBlockCompressDescriptor {
+ NUdf::EDataSlot Slot_;
+ bool IsOptional_;
+ std::shared_ptr<arrow::DataType> ArrowType_;
+ bool IsScalar_;
+ TMaybe<size_t> BitWidth_; // only set for arrays with fixed-width data
+};
+
+class TCompressBlocks : public TStatefulWideFlowComputationNode<TCompressBlocks> {
+public:
+ TCompressBlocks(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, TVector<TBlockCompressDescriptor>&& descriptors)
+ : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ , Flow_(flow)
+ , BitmapIndex_(bitmapIndex)
+ , Descriptors_(std::move(descriptors))
+ , Width_(Descriptors_.size())
+ , DesiredLen_(CalcDesiredLen(Descriptors_, BitmapIndex_))
+ {
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const
+ {
+ auto& s = GetState(state, ctx, output);
+
+ EFetchResult result = EFetchResult::One;
+ for (;;) {
+ if (!s.InputPopCount_) {
+ result = s.Finish_ ? EFetchResult::Finish : Flow_->FetchValues(ctx, s.ValuePointers_.data());
+ if (result != EFetchResult::One) {
+ break;
+ }
+ auto& bitmap = s.InputValues_[BitmapIndex_];
+ arrow::BooleanArray arr(TArrowBlock::From(bitmap).GetDatum().array());
+ s.InputSize_ = (size_t)arr.length();
+ s.InputPopCount_ = (size_t)arr.true_count();
+ continue;
+ }
+
+ if (!s.HaveBlocks_) {
+ // client is not interested in any block columns
+ Y_VERIFY_DEBUG(s.OutputPos_ == 0);
+ Y_VERIFY_DEBUG(s.OutputSize_ == 0);
+ WriteOutputBlockSize(s.InputPopCount_, ctx, output);
+ s.InputSize_ = s.InputPopCount_ = 0;
+ break;
+ }
+
+ if (s.OutputSize_ == 0) {
+ AllocateBuffers(s, ctx, output);
+ }
+
+ if (s.InputPopCount_ + s.OutputPos_ <= s.OutputSize_) {
+ AppendInput(s, ctx, output);
+ } else {
+ FlushBuffers(s, ctx, output);
+ break;
+ }
+ }
+
+ if (result == EFetchResult::Finish) {
+ s.Finish_ = true;
+ if (s.OutputPos_ > 0) {
+ FlushBuffers(s, ctx, output);
+ result = EFetchResult::One;
+ }
+ }
+ return result;
+ }
+private:
+ struct TOutputBuffers {
+ std::shared_ptr<arrow::Buffer> Data_;
+ std::shared_ptr<arrow::Buffer> Nulls_;
+ size_t MaxObjCount_ = 0;
+
+ static std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, TComputationContext& ctx) {
+ // align up to 64 bit
+ bitCount = (bitCount + 63u) & ~size_t(63u);
+ // this simplifies code compression code - we can write single 64 bit word after array boundaries
+ bitCount += 64;
+ return ARROW_RESULT(arrow::AllocateBitmap(bitCount, &ctx.ArrowMemoryPool));
+ }
+
+ static std::shared_ptr<arrow::Buffer> AllocateBufferWithReserve(size_t objCount, size_t objSize, TComputationContext& ctx) {
+ // this simplifies code compression code - we can write single object after array boundaries
+ return ARROW_RESULT(arrow::AllocateBuffer((objCount + 1) * objSize, &ctx.ArrowMemoryPool));
+ }
+
+ void Allocate(size_t count, size_t size, TComputationContext& ctx) {
+ Y_VERIFY_DEBUG(!Data_);
+ Y_VERIFY_DEBUG(!Nulls_);
+ Y_VERIFY_DEBUG(!MaxObjCount_);
+ MaxObjCount_ = count;
+ Data_ = (size == 0) ? AllocateBitmapWithReserve(count, ctx) : AllocateBufferWithReserve(count, size, ctx);
+ // Nulls_ allocation will be delayed until actual nulls are encountered
+ }
+
+ NUdf::TUnboxedValue PullUnboxedValue(std::shared_ptr<arrow::DataType> type, size_t length, TComputationContext& ctx) {
+ auto arrayData = arrow::ArrayData::Make(type, length, { Nulls_, Data_ });
+ Data_.reset();
+ Nulls_.reset();
+ MaxObjCount_ = 0;
+ return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(arrayData));
+ }
+ };
+
+ struct TState : public TComputationValue<TState> {
+ TVector<NUdf::TUnboxedValue*> ValuePointers_;
+ TVector<NUdf::TUnboxedValue> InputValues_;
+ size_t InputPopCount_ = 0;
+ size_t InputSize_ = 0;
+
+ TVector<TOutputBuffers> OutputBuffers_;
+ size_t OutputPos_ = 0;
+ size_t OutputSize_ = 0;
+
+ bool Finish_ = false;
+ bool HaveBlocks_ = false;
+
+ TState(TMemoryUsageInfo* memInfo, ui32 width, ui32 bitmapIndex, NUdf::TUnboxedValue*const* output,
+ const TVector<TBlockCompressDescriptor>& descriptors)
+ : TComputationValue(memInfo)
+ , ValuePointers_(width)
+ , InputValues_(width)
+ , OutputBuffers_(width)
+ {
+ for (ui32 i = 0, outIndex = 0; i < width; ++i) {
+ ValuePointers_[i] = &InputValues_[i];
+ if (i != bitmapIndex) {
+ HaveBlocks_ = HaveBlocks_ || (!descriptors[i].IsScalar_ && output[outIndex] != nullptr);
+ outIndex++;
+ }
+ }
+ }
+ };
+
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
+ }
+
+ TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(Width_, BitmapIndex_, output, Descriptors_);
+ }
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+ static size_t CalcDesiredLen(const TVector<TBlockCompressDescriptor>& descriptors, ui32 bitmapIndex) {
+ size_t result = std::numeric_limits<size_t>::max();
+ for (ui32 i = 0; i < descriptors.size(); ++i) {
+ if (i != bitmapIndex && !descriptors[i].IsScalar_) {
+ size_t len = 8 * MaxBlockSizeInBytes / *descriptors[i].BitWidth_;
+ result = std::min(result, len);
+ }
+ }
+
+ MKQL_ENSURE(result != std::numeric_limits<size_t>::max(), "Missing block columns - TCompressScalars should be used");
+ return result;
+ }
+
+ void WriteOutputBlockSize(size_t size, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ MKQL_ENSURE(output[Width_ - 2], "Block size should not be marked as unused");
+ *output[Width_ - 2] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(size)));
+ }
+
+ void AllocateBuffers(TState& s, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ Y_VERIFY_DEBUG(s.OutputSize_ == 0);
+ Y_VERIFY_DEBUG(s.OutputPos_ == 0);
+ Y_VERIFY_DEBUG(s.InputPopCount_ > 0);
+ const size_t count = std::max(s.InputPopCount_, DesiredLen_);
+ for (ui32 i = 0, outIndex = 0; i < Width_; ++i) {
+ auto& desc = Descriptors_[i];
+ if (i != BitmapIndex_ && !desc.IsScalar_ && output[outIndex]) {
+ auto& buffers = s.OutputBuffers_[i];
+ size_t size = (*desc.BitWidth_ == 1) ? 0 : (size_t)arrow::BitUtil::BytesForBits(*desc.BitWidth_);
+ buffers.Allocate(count, size, ctx);
+ }
+ if (i != BitmapIndex_) {
+ outIndex++;
+ }
+ }
+ s.OutputSize_ = count;
+ }
+
+ void FlushBuffers(TState& s, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ Y_VERIFY_DEBUG(s.OutputPos_ > 0);
+ for (ui32 i = 0, outIndex = 0; i < Width_; ++i) {
+ auto& desc = Descriptors_[i];
+ if (i != BitmapIndex_ && output[outIndex]) {
+ *output[outIndex] = desc.IsScalar_ ? s.InputValues_[i] : s.OutputBuffers_[i].PullUnboxedValue(desc.ArrowType_, s.OutputPos_, ctx);
+ }
+ if (i != BitmapIndex_) {
+ outIndex++;
+ }
+ }
+
+ WriteOutputBlockSize(s.OutputPos_, ctx, output);
+ s.OutputPos_ = s.OutputSize_ = 0;
+ }
+
+ void AppendInput(TState& s, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ Y_VERIFY_DEBUG(s.InputPopCount_ > 0);
+ Y_VERIFY_DEBUG(s.InputSize_ >= s.InputPopCount_);
+
+ auto bitmap = TArrowBlock::From(s.InputValues_[BitmapIndex_]).GetDatum().array();
+ for (ui32 i = 0, outIndex = 0; i < Width_; ++i) {
+ auto& desc = Descriptors_[i];
+ if (i != BitmapIndex_ && !desc.IsScalar_ && output[outIndex]) {
+ auto array = TArrowBlock::From(s.InputValues_[i]).GetDatum().array();
+ Y_VERIFY_DEBUG(array->length == s.InputSize_);
+ AppendArray(array, bitmap, s.OutputBuffers_[i], s.OutputPos_, Descriptors_[i].Slot_, ctx);
+ }
+
+ if (i != BitmapIndex_) {
+ outIndex++;
+ }
+ }
+
+ s.OutputPos_ += s.InputPopCount_;
+ s.InputSize_ = s.InputPopCount_ = 0;
+ }
+
+ void AppendArray(const std::shared_ptr<arrow::ArrayData>& src, const std::shared_ptr<arrow::ArrayData>& bitmap,
+ TOutputBuffers& dst, size_t dstPos, NUdf::EDataSlot slot, TComputationContext& ctx) const
+ {
+ const ui8* bitmapData = bitmap->GetValues<ui8>(1, 0);
+ const size_t bitmapOffset = bitmap->offset;
+ Y_VERIFY_DEBUG(bitmap->length == src->length);
+ const size_t length = src->length;
+ ui8* mutDstBase = dst.Data_->mutable_data();
+
+ switch (slot) {
+ case NUdf::EDataSlot::Bool:
+ // special handling for bools
+ CompressBitmap(src->GetValues<ui8>(1, 0), src->offset, bitmapData, bitmapOffset, mutDstBase, dstPos, length); break;
+ case NUdf::EDataSlot::Int8:
+ CompressArray(src->GetValues<i8>(1), bitmapData, bitmapOffset, (i8*)mutDstBase + dstPos, length); break;
+ case NUdf::EDataSlot::Uint8:
+ CompressArray(src->GetValues<ui8>(1), bitmapData, bitmapOffset, (ui8*)mutDstBase + dstPos, length); break;
+ case NUdf::EDataSlot::Int16:
+ CompressArray(src->GetValues<i16>(1), bitmapData, bitmapOffset, (i16*)mutDstBase + dstPos, length); break;
+ case NUdf::EDataSlot::Uint16:
+ CompressArray(src->GetValues<ui16>(1), bitmapData, bitmapOffset, (ui16*)mutDstBase + dstPos, length); break;
+ case NUdf::EDataSlot::Int32:
+ CompressArray(src->GetValues<i32>(1), bitmapData, bitmapOffset, (i32*)mutDstBase + dstPos, length); break;
+ case NUdf::EDataSlot::Uint32:
+ CompressArray(src->GetValues<ui32>(1), bitmapData, bitmapOffset, (ui32*)mutDstBase + dstPos, length); break;
+ case NUdf::EDataSlot::Int64:
+ CompressArray(src->GetValues<i64>(1), bitmapData, bitmapOffset, (i64*)mutDstBase + dstPos, length); break;
+ case NUdf::EDataSlot::Uint64:
+ CompressArray(src->GetValues<ui64>(1), bitmapData, bitmapOffset, (ui64*)mutDstBase + dstPos, length); break;
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+
+ if (src->GetNullCount()) {
+ if (!dst.Nulls_) {
+ dst.Nulls_ = TOutputBuffers::AllocateBitmapWithReserve(dst.MaxObjCount_, ctx);
+ arrow::BitUtil::SetBitsTo(dst.Nulls_->mutable_data(), 0, dstPos, false);
+ }
+ CompressBitmap(src->GetValues<ui8>(0, 0), src->offset, bitmapData, bitmapOffset,
+ dst.Nulls_->mutable_data(), dstPos, length);
+ }
+ }
+
+private:
+ IComputationWideFlowNode* Flow_;
+ const ui32 BitmapIndex_;
+ const TVector<TBlockCompressDescriptor> Descriptors_;
+ const ui32 Width_;
+ const size_t DesiredLen_;
+};
+
+} // namespace
+
+IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args, got " << callable.GetInputsCount());
+
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const ui32 width = tupleType->GetElementsCount();
+ MKQL_ENSURE(width > 1, "Expected at least two columns");
+
+ const auto indexData = AS_VALUE(TDataLiteral, callable.GetInput(1U));
+ const auto index = indexData->AsValue().Get<ui32>();
+ MKQL_ENSURE(index < width - 1, "Bad bitmap index");
+
+ TVector<TBlockCompressDescriptor> descriptors;
+ bool bitmapIsScalar = false;
+ bool allScalars = true;
+ for (ui32 i = 0; i < width; ++i) {
+ descriptors.emplace_back();
+ auto& descriptor = descriptors.back();
+
+ const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i));
+ TDataType* unpacked = UnpackOptionalData(blockType->GetItemType(), descriptor.IsOptional_);
+ descriptor.Slot_ = *unpacked->GetDataSlot();
+
+ bool isOptional;
+ MKQL_ENSURE(ConvertArrowType(unpacked, isOptional, descriptor.ArrowType_), "Unsupported type");
+
+ descriptor.IsScalar_ = blockType->GetShape() == TBlockType::EShape::Scalar;
+ if (i == width - 1) {
+ MKQL_ENSURE(descriptor.IsScalar_, "Expecting scalar block size as last column");
+ MKQL_ENSURE(!descriptor.IsOptional_ && descriptor.Slot_ == NUdf::EDataSlot::Uint64, "Expecting Uint64 as last column");
+ } else if (i == index) {
+ MKQL_ENSURE(!descriptor.IsOptional_ && descriptor.Slot_ == NUdf::EDataSlot::Bool, "Expecting Bool as bitmap column");
+ bitmapIsScalar = descriptor.IsScalar_;
+ } else {
+ allScalars = allScalars && descriptor.IsScalar_;
+ }
+
+ if (!descriptor.IsScalar_) {
+ const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(descriptor.ArrowType_.get());
+ MKQL_ENSURE(fixedType, "Only fixed width types are currently supported");
+ descriptor.BitWidth_ = (size_t)fixedType->bit_width();
+ }
+ }
+
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ if (bitmapIsScalar) {
+ return new TCompressWithScalarBitmap(ctx.Mutables, wideFlow, index, width);
+ } else if (allScalars) {
+ return new TCompressScalars(ctx.Mutables, wideFlow, index, width);
+ }
+
+ return new TCompressBlocks(ctx.Mutables, wideFlow, index, std::move(descriptors));
+}
+
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.h
new file mode 100644
index 00000000000..13b76a243d9
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index 8cd9773020c..7b021e30798 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -7,6 +7,7 @@
#include "mkql_block_func.h"
#include "mkql_blocks.h"
#include "mkql_block_agg.h"
+#include "mkql_block_compress.h"
#include "mkql_block_skiptake.h"
#include "mkql_callable.h"
#include "mkql_chain_map.h"
@@ -272,6 +273,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"WideSkipBlocks", &WrapWideSkipBlocks},
{"WideTakeBlocks", &WrapWideTakeBlocks},
{"AsScalar", &WrapAsScalar},
+ {"BlockCompress", &WrapBlockCompress},
{"BlockCombineAll", &WrapBlockCombineAll},
{"MakeHeap", &WrapMakeHeap},
{"PushHeap", &WrapPushHeap},
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp
new file mode 100644
index 00000000000..919ee317a3b
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp
@@ -0,0 +1,67 @@
+#include "mkql_computation_node_ut.h"
+
+#include <ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+ui8 NaiveCompressByte(ui8 value, ui8 mask) {
+ ui8 result = 0;
+ ui8 outPos = 0;
+ for (ui8 i = 0; i < 8; ++i) {
+ if (mask & (1 << i)) {
+ ui8 bit = (value & (1 << i)) != 0;
+ result |= (bit << outPos);
+ ++outPos;
+ }
+ }
+ return result;
+}
+
+} // namespace
+
+
+Y_UNIT_TEST_SUITE(TMiniKQLBitUtilsTest) {
+Y_UNIT_TEST(TestCompressByte) {
+ for (size_t value = 0; value < 256; ++value) {
+ for (size_t mask = 0; mask < 256; ++mask) {
+ UNIT_ASSERT_EQUAL(NaiveCompressByte(value, mask), CompressByte(value, mask));
+ }
+ }
+}
+
+Y_UNIT_TEST(TestLoad) {
+ const ui8 src[] = {0b01110100, 0b11011101, 0b01101011};
+ UNIT_ASSERT_EQUAL(LoadByteUnaligned(src, 10), 0b11110111);
+ UNIT_ASSERT_EQUAL(LoadByteUnaligned(src, 16), 0b01101011);
+}
+
+Y_UNIT_TEST(CompressAligned) {
+ const ui8 data[] = {0b01110100, 0b11011101, 0b01101011};
+ const ui8 mask[] = {0b11101100, 0b10111010, 0b10001111};
+ ui8 result[100];
+ auto res = CompressBitmap(data, 0, mask, 0, result, 0, 24);
+ UNIT_ASSERT_EQUAL(res, 15);
+ UNIT_ASSERT_EQUAL(result[0], 0b11001101);
+ UNIT_ASSERT_EQUAL(result[1] & 0x7fu, 0b00101110);
+}
+
+Y_UNIT_TEST(CompressUnalignedOutput) {
+ const ui8 data[] = {0b01110100, 0b11011101, 0b01101011};
+ const ui8 mask[] = {0b11101100, 0b10111010, 0b10001111};
+ ui8 result[100];
+ result[0] = 0b101;
+ auto res = CompressBitmap(data, 0, mask, 0, result, 3, 24);
+ UNIT_ASSERT_EQUAL(res, 18);
+ UNIT_ASSERT_EQUAL(result[0], 0b01101101);
+ UNIT_ASSERT_EQUAL(result[1], 0b01110110);
+ UNIT_ASSERT_EQUAL(result[2] & 0x3, 0b01);
+}
+
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
+
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp
new file mode 100644
index 00000000000..2c3a8896f59
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp
@@ -0,0 +1,59 @@
+#include "mkql_computation_node_ut.h"
+
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+Y_UNIT_TEST_SUITE(TMiniKQLBlockCompressTest) {
+Y_UNIT_TEST(CompressBasic) {
+ TSetup<false> setup;
+ auto& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto boolType = pb.NewDataType(NUdf::TDataType<bool>::Id);
+ const auto tupleType = pb.NewTupleType({boolType, ui64Type, boolType});
+
+ const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<bool>(true)});
+ const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(true), pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<bool>(false)});
+ const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<bool>(true)});
+ const auto data4 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(4), pb.NewDataLiteral<bool>(true)});
+ const auto data5 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(true), pb.NewDataLiteral<ui64>(5), pb.NewDataLiteral<bool>(false)});
+ const auto data6 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(true), pb.NewDataLiteral<ui64>(6), pb.NewDataLiteral<bool>(true)});
+ const auto data7 = pb.NewTuple(tupleType, {pb.NewDataLiteral<bool>(false), pb.NewDataLiteral<ui64>(7), pb.NewDataLiteral<bool>(true)});
+
+ const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7});
+ const auto flow = pb.ToFlow(list);
+
+ const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)};
+ });
+ const auto compressedFlow = pb.WideFromBlocks(pb.BlockCompress(pb.WideToBlocks(wideFlow), 0));
+ const auto narrowFlow = pb.NarrowMap(compressedFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
+ return pb.NewTuple({items[0], items[1]});
+ });
+
+ const auto pgmReturn = pb.ForwardList(narrowFlow);
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.GetElement(0).Get<ui64>(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(item.GetElement(1).Get<bool>(), false);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.GetElement(0).Get<ui64>(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(item.GetElement(1).Get<bool>(), false);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.GetElement(0).Get<ui64>(), 6);
+ UNIT_ASSERT_VALUES_EQUAL(item.GetElement(1).Get<bool>(), true);
+
+ UNIT_ASSERT(!iterator.Next(item));
+}
+
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 859aa2b8597..70678989f32 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -1489,6 +1489,30 @@ TRuntimeNode TProgramBuilder::AsScalar(TRuntimeNode value) {
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TProgramBuilder::BlockCompress(TRuntimeNode flow, ui32 bitmapIndex) {
+ auto blockItemTypes = ValidateBlockFlowType(flow.GetStaticType());
+
+ MKQL_ENSURE(blockItemTypes.size() >= 2, "Expected at least two input columns");
+ MKQL_ENSURE(bitmapIndex < blockItemTypes.size() - 1, "Invalid bitmap index");
+ MKQL_ENSURE(AS_TYPE(TDataType, blockItemTypes[bitmapIndex])->GetSchemeType() == NUdf::TDataType<bool>::Id, "Expected Bool as bitmap column type");
+
+
+ const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ MKQL_ENSURE(inputTupleType->GetElementsCount() == blockItemTypes.size(), "Unexpected tuple size");
+ std::vector<TType*> flowItems;
+ for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) {
+ if (i == bitmapIndex) {
+ continue;
+ }
+ flowItems.push_back(inputTupleType->GetElementType(i));
+ }
+
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(flowItems)));
+ callableBuilder.Add(flow);
+ callableBuilder.Add(NewDataLiteral<ui32>(bitmapIndex));
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
TRuntimeNode TProgramBuilder::ListFromRange(TRuntimeNode start, TRuntimeNode end, TRuntimeNode step) {
MKQL_ENSURE(start.GetStaticType()->IsData(), "Expected data");
MKQL_ENSURE(end.GetStaticType()->IsSameType(*start.GetStaticType()), "Mismatch type");
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 00f690d4397..4d30a51f101 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -246,6 +246,7 @@ public:
TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count);
TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count);
TRuntimeNode AsScalar(TRuntimeNode value);
+ TRuntimeNode BlockCompress(TRuntimeNode flow, ui32 bitmapIndex);
TRuntimeNode BlockFunc(const std::string_view& funcName, TType* returnType, const TArrayRef<const TRuntimeNode>& args);
TRuntimeNode BlockBitCast(TRuntimeNode value, TType* targetType);
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 24177549560..2f47985e669 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -2421,6 +2421,12 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.BlockCombineAll(arg, countColumn, filterColumn, aggs, returnType);
});
+ AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ const auto flow = MkqlBuildExpr(node.Head(), ctx);
+ const auto index = FromString<ui32>(node.Child(1)->Content());
+ return ctx.ProgramBuilder.BlockCompress(flow, index);
+ });
+
AddCallable("PgArray", [](const TExprNode& node, TMkqlBuildContext& ctx) {
std::vector<TRuntimeNode> args;
args.reserve(node.ChildrenSize());