diff options
author | vvvv <vvvv@ydb.tech> | 2023-06-01 20:30:53 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-06-01 20:30:53 +0300 |
commit | 65144ea2f27d134ddba08749527839776f379501 (patch) | |
tree | 521cfe77df1fc757f85b258eb20d73eadebe7dda | |
parent | f10160e4b83003ae269ced1b69fc9fbad1ff32c2 (diff) | |
download | ydb-65144ea2f27d134ddba08749527839776f379501.tar.gz |
Block PG aggregations over keys, tuned layout of palloc-ed datums in arrow Scalar/Array
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 11 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp | 16 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp | 21 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h | 8 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp | 42 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp | 28 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/arrow.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/arrow.h | 708 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 29 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp | 9 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/utils.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/block_builder.h | 12 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/util.h | 12 |
15 files changed, 735 insertions, 189 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 46fc98e3284..17ee8b5d51c 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -2900,7 +2900,16 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() { .Callable("ShuffleByKeys") .Add(0, AggList) .Add(1, keySelector) - .Add(2, lambdaStream) + .Lambda(2) + .Param("stream") + .Apply(GetContextLambda()) + .With(0) + .Apply(lambdaStream) + .With(0, "stream") + .Seal() + .Done() + .Seal() + .Seal() .Seal() .Build(); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index 1b49d64d9cd..1992d6e602e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -677,6 +677,7 @@ public: if (result == EFetchResult::Yield) { return result; } else if (result == EFetchResult::One) { + ++s.BatchNum_; ui64 batchLength = GetBatchLength(s.Values_.data()); if (!batchLength) { continue; @@ -824,6 +825,7 @@ private: using TSetImpl = THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>; using TFixedMapImpl = TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>; + ui64 BatchNum_ = 0; TVector<NUdf::TUnboxedValue> Values_; TVector<NUdf::TUnboxedValue*> ValuePointers_; TVector<std::unique_ptr<TAggregator>> Aggs_; @@ -969,16 +971,16 @@ private: for (auto i : Streams_[currentStreamIndex]) { if (output[Keys_.size() + i]) { - s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.UnwrappedValues_.data(), row); + s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.BatchNum_, s.UnwrappedValues_.data(), row); } } } else { for (size_t i = 0; i < s.Aggs_.size(); ++i) { if (output[Keys_.size() + i]) { if constexpr (Finalize) { - s.Aggs_[i]->LoadState(ptr, s.Values_.data(), row); + s.Aggs_[i]->LoadState(ptr, s.BatchNum_, s.Values_.data(), row); } else { - s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row); + s.Aggs_[i]->InitKey(ptr, s.BatchNum_, s.Values_.data(), row); } } @@ -1009,9 +1011,9 @@ private: for (auto i : Streams_[currentStreamIndex]) { if (output[Keys_.size() + i]) { if (isNewStream) { - s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.UnwrappedValues_.data(), row); + s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.BatchNum_, s.UnwrappedValues_.data(), row); } else { - s.Aggs_[i]->UpdateState(ptr + s.AggStateOffsets_[i], s.UnwrappedValues_.data(), row); + s.Aggs_[i]->UpdateState(ptr + s.AggStateOffsets_[i], s.BatchNum_, s.UnwrappedValues_.data(), row); } } } @@ -1019,9 +1021,9 @@ private: for (size_t i = 0; i < s.Aggs_.size(); ++i) { if (output[Keys_.size() + i]) { if constexpr (Finalize) { - s.Aggs_[i]->UpdateState(ptr, s.Values_.data(), row); + s.Aggs_[i]->UpdateState(ptr, s.BatchNum_, s.Values_.data(), row); } else { - s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row); + s.Aggs_[i]->UpdateKey(ptr, s.BatchNum_, s.Values_.data(), row); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp index 20e43e80190..6a5d358daca 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp @@ -92,9 +92,9 @@ public: Y_UNUSED(argColumn); } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TState(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -102,7 +102,8 @@ public: Y_UNUSED(state); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); Y_UNUSED(columns); Y_UNUSED(row); auto typedState = static_cast<TState*>(state); @@ -125,9 +126,9 @@ public: { } - void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TState(); - UpdateState(state, columns, row); + UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -135,7 +136,8 @@ public: Y_UNUSED(state); } - void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { @@ -234,9 +236,9 @@ public: { } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TState(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -244,7 +246,8 @@ public: Y_UNUSED(state); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h index 6a3b284440f..09768e9c66e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h @@ -44,9 +44,9 @@ public: class IBlockAggregatorCombineKeys : public IBlockAggregatorBase { public: - virtual void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + virtual void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0; - virtual void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + virtual void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0; virtual std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) = 0; @@ -57,9 +57,9 @@ public: class IBlockAggregatorFinalizeKeys : public IBlockAggregatorBase { public: - virtual void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + virtual void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0; - virtual void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + virtual void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0; virtual std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) = 0; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp index 6c34a3fc6b6..923236b999b 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp @@ -298,9 +298,9 @@ public: { } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -309,7 +309,8 @@ public: *typedState = TGenericState(); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TGenericState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_); @@ -342,9 +343,9 @@ public: { } - void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); - UpdateState(state, columns, row); + UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -353,7 +354,8 @@ public: *typedState = TGenericState(); } - void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TGenericState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_); @@ -534,9 +536,9 @@ public: { } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -545,7 +547,8 @@ public: *typedState = TGenericState(); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TGenericState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<TStringType, IsMin>(typedState, datum, row); @@ -572,9 +575,9 @@ public: { } - void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); - UpdateState(state, columns, row); + UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -583,7 +586,8 @@ public: *typedState = TGenericState(); } - void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TGenericState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<TStringType, IsMin>(typedState, datum, row); @@ -758,9 +762,9 @@ public: { } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TStateType(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -768,7 +772,8 @@ public: Y_UNUSED(state); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TStateType*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState, datum, row); @@ -797,9 +802,9 @@ public: { } - void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TStateType(); - UpdateState(state, columns, row); + UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -807,7 +812,8 @@ public: Y_UNUSED(state); } - void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TStateType*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState, datum, row); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp index 005b79219e5..58cedbfaeb1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp @@ -138,9 +138,9 @@ public: { } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -149,7 +149,8 @@ public: *typedState = TGenericState(); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TGenericState*>(state); if (*typedState) { return; @@ -184,9 +185,9 @@ public: { } - void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); - UpdateState(state, columns, row); + UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -195,7 +196,8 @@ public: *typedState = TGenericState(); } - void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TGenericState*>(state); if (*typedState) { return; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp index 67941c14933..cf499b45dc4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -284,9 +284,9 @@ public: { } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TStateType(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -294,7 +294,8 @@ public: Y_UNUSED(state); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TStateType*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState, datum, row); @@ -322,9 +323,9 @@ public: { } - void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TStateType(); - UpdateState(state, columns, row); + UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -332,7 +333,8 @@ public: Y_UNUSED(state); } - void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TStateType*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState, datum, row); @@ -464,9 +466,9 @@ public: { } - void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TAvgState(); - UpdateKey(state, columns, row); + UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -474,7 +476,8 @@ public: Y_UNUSED(state); } - void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TAvgState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { @@ -517,9 +520,9 @@ public: { } - void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TAvgState(); - UpdateState(state, columns, row); + UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { @@ -527,7 +530,8 @@ public: Y_UNUSED(state); } - void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(batchNum); auto typedState = static_cast<TAvgState*>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp index 90b1c8047f1..e1960766f88 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp @@ -56,9 +56,9 @@ public: } if constexpr (PgString == NUdf::EPgStringType::CString) { - return PgBuilder->MakeCString(item.AsStringRef().Data()).Release(); + return PgBuilder->MakeCString(item.AsStringRef().Data() + sizeof(void*)).Release(); } else if constexpr (PgString == NUdf::EPgStringType::Text) { - return PgBuilder->MakeText(item.AsStringRef().Data()).Release(); + return PgBuilder->MakeText(item.AsStringRef().Data()+ sizeof(void*)).Release(); } else { return MakeString(item.AsStringRef()); } @@ -72,9 +72,11 @@ public: } if constexpr (PgString == NUdf::EPgStringType::CString) { - return TBlockItem(PgBuilder->AsCStringBuffer(value)); + auto buf = PgBuilder->AsCStringBuffer(value); + return TBlockItem(NYql::NUdf::TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*))); } else if constexpr (PgString == NUdf::EPgStringType::Text) { - return TBlockItem(PgBuilder->AsTextBuffer(value)); + auto buf = PgBuilder->AsTextBuffer(value); + return TBlockItem(NYql::NUdf::TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*))); } else { return TBlockItem(value.AsStringRef()); } diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.cpp b/ydb/library/yql/parser/pg_wrapper/arrow.cpp index 1b318585465..e8494512235 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.cpp +++ b/ydb/library/yql/parser/pg_wrapper/arrow.cpp @@ -48,7 +48,7 @@ const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMin if (returnType) { MKQL_ENSURE(argsColumns.size() == 1, "Expected one column"); TType* stateType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType(); - TType* returnItemType = AS_TYPE(TBlockType, returnType); + TType* returnItemType = AS_TYPE(TBlockType, returnType)->GetItemType(); return NPg::LookupAggregation(name, AS_TYPE(TPgType, stateType)->GetTypeId(), AS_TYPE(TPgType, returnItemType)->GetTypeId()); } else { TVector<ui32> argTypeIds; diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h index 1ef5710cf22..52bdce5c70f 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/arrow.h @@ -54,33 +54,51 @@ struct TPgIndirectFunc { }; template <bool IsFixed> -void SaveNullableDatum(const NullableDatum& from, NullableDatum& to, bool isCString) { - bool wasNull = to.isnull; - to.isnull = from.isnull; - if (!to.isnull) { - if constexpr (IsFixed) { - to.value = from.value; - } else { - if (!wasNull) { - if (to.value == from.value) { - return; - } - - pfree((void*)to.value); - } +Datum CloneDatumToAggContext(Datum src, bool isCString) { + if constexpr (IsFixed) { + return src; + } else { + Y_ENSURE(NKikimr::NMiniKQL::TlsAllocState->CurrentContext); + auto length = isCString ? 1 + strlen((const char*)src) : GetFullVarSize((const text*)src); + auto ret = (Datum)palloc(length); + memcpy((void*)ret, (const void*)src, length); + return ret; + } +} - auto length = isCString ? strlen((const char*)from.value) : GetFullVarSize((const text*)from.value); - to.value = (Datum)palloc(length); - memcpy((void*)to.value, (const void*)from.value, length); - } +template <bool IsFixed> +void CopyState(NullableDatum src, NullableDatum& dst) { + if constexpr (IsFixed) { + dst = src; } else { - if (!wasNull) { - if constexpr (!IsFixed) { - pfree((void*)to.value); - to.value = 0; - } + if (src.isnull == dst.isnull && src.value == dst.value) { + return; } + + if (!dst.isnull) { + pfree((void*)dst.value); + } + + dst = src; + } +} + +template <bool IsFixed> +void SaveToAggContext(NullableDatum& d, bool isCString) { + if constexpr (IsFixed) { + return; } + + if (d.isnull) { + return; + } + + // arrow Scalars/Arrays have null memory context + if (NUdf::GetMemoryContext((void*)d.value)) { + return; + } + + d.value = CloneDatumToAggContext<false>(d.value, isCString); } template <typename TArgsPolicy> @@ -110,7 +128,7 @@ struct TInputArgsAccessor { Scalars[j].value = (Datum)*static_cast<const ui64*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()); } else { auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value; - Scalars[j].value = (Datum)buffer->data(); + Scalars[j].value = (Datum)(buffer->data() + sizeof(void*)); } } } else { @@ -153,7 +171,7 @@ void FillScalarItem(const arrow::Scalar& scalar, NullableDatum& d) { d.isnull = true; } else { d.isnull = false; - d.value = (Datum)item.AsStringRef().Data(); + d.value = (Datum)(item.AsStringRef().Data() + sizeof(void*)); } } } @@ -176,7 +194,7 @@ void FillArrayItem(const arrow::ArrayData& array, size_t i, NullableDatum& d) { d.isnull = true; } else { d.isnull = false; - d.value = (Datum)item.AsStringRef().Data(); + d.value = (Datum)(item.AsStringRef().Data() + sizeof(void*)); } } } @@ -272,7 +290,7 @@ struct TGenericExec { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<true, true>(batch, length, state, builder); } else { - NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<true, true>(batch, length, state, builder); } } else { @@ -280,7 +298,7 @@ struct TGenericExec { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<true, false>(batch, length, state, builder); } else { - NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<true, false>(batch, length, state, builder); } } @@ -362,7 +380,7 @@ struct TGenericExec { if (v) { d.value = (Datum)inputArgsAccessor.FixedArrays[j][i]; } else { - d.value = (Datum)(inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]); + d.value = (Datum)(sizeof(void*) + inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]); } } @@ -419,9 +437,10 @@ struct TGenericExec { if (fcinfo->isnull) { builder.Add(NUdf::TBlockItem{}); } else { - auto ptr = (const char*)ret; + auto ptr = (char*)ret; auto len = state.IsCStringResult ? 1 + strlen(ptr) : VARHDRSZ + VARSIZE((const text*)ptr); - builder.Add(NUdf::TBlockItem(NUdf::TStringRef(ptr, len))); + NUdf::ZeroMemoryContext(ptr); + builder.Add(NUdf::TBlockItem(NUdf::TStringRef(ptr - sizeof(void*), len + sizeof(void*)))); } } SkipCall:; @@ -440,6 +459,110 @@ TExecFunc MakeIndirectExec(PGFunction pgFunc) { return TGenericExec<TPgIndirectFunc, IsStrict, IsFixedResult>(TPgIndirectFunc(pgFunc)); } +template <bool IsFixed, typename TArgsPolicy> +NullableDatum GetInputValue(const TInputArgsAccessor<TArgsPolicy>& accessor, ui32 j, ui64 row) { + static_assert(!TArgsPolicy::VarArgs); + if (accessor.IsScalar[j]) { + return accessor.Scalars[j]; + } else { + NullableDatum d; + ui64 fullIndex = (row + accessor.Offsets[j]) & accessor.ValidOffsetMask[j]; + d.isnull = ((accessor.ValidMasks[j][fullIndex >> 3] >> (fullIndex & 0x07)) & 1) == 0; + + if constexpr (IsFixed) { + d.value = (Datum)accessor.FixedArrays[j][row]; + } else { + d.value = (Datum)(sizeof(void*) + accessor.StringOffsetsArrays[j][row] + accessor.StringDataArrays[j]); + } + + return d; + } +} + +template <bool IsFixed> +NullableDatum GetInputValueSlow(const std::vector<arrow::Datum>& values, ui32 j, ui64 row) { + NullableDatum d; + if (values[j].is_scalar()) { + if constexpr (IsFixed) { + FillScalarItem<true, true>(*values[j].scalar(), d); + } else { + FillScalarItem<true, false>(*values[j].scalar(), d); + } + } else { + if constexpr (IsFixed) { + FillArrayItem<true, true>(*values[j].array(), row, d); + } else { + FillArrayItem<true, false>(*values[j].array(), row, d); + } + } + + return d; +} + +template <bool IsFixed, bool IsCString, bool HasFunc, typename TFunc, bool IsStrict, typename TBuilder> +class TAggColumnBuilder : public NKikimr::NMiniKQL::IAggColumnBuilder { +public: + TAggColumnBuilder(const TString& name, TFunc func, ui64 size, FmgrInfo* funcInfo, const std::shared_ptr<arrow::DataType>& dataType, + NKikimr::NMiniKQL::TComputationContext& ctx) + : Name_(name) + , Func_(func) + , FuncInfo_(funcInfo) + , Builder_(NKikimr::NMiniKQL::TTypeInfoHelper(), dataType, ctx.ArrowMemoryPool, size) + , Ctx_(ctx) + { + } + + void Add(const void* state) final { + auto typedState = (NullableDatum*)state; + auto ret = *typedState; + if constexpr (HasFunc) { + if (!IsStrict || !typedState->isnull) { + WithPgTry(Name_, [&]() { + LOCAL_FCINFO(callInfo, 1); + callInfo->flinfo = FuncInfo_; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + callInfo->isnull = false; + callInfo->args[0].isnull = typedState->isnull; + callInfo->args[0].value = typedState->value; + ret.value = Func_(callInfo); + ret.isnull = callInfo->isnull; + }); + } + } + + if (ret.isnull) { + Builder_.Add(NYql::NUdf::TBlockItem()); + } else { + if constexpr (IsFixed) { + Builder_.Add(NYql::NUdf::TBlockItem(ui64(ret.value))); + } else if constexpr (IsCString) { + auto ptr = (char*)ret.value; + ui32 len = 1 + strlen(ptr); + NUdf::ZeroMemoryContext(ptr); + Builder_.Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef(ptr - sizeof(void*), len + sizeof(void*)))); + } else { + auto ptr = (char*)ret.value; + ui32 len = GetFullVarSize((const text*)ptr); + NUdf::ZeroMemoryContext(ptr); + Builder_.Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef(ptr - sizeof(void*), len + sizeof(void*)))); + } + } + } + + NUdf::TUnboxedValue Build() final { + return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true)); + } + +private: + const TString Name_; + const TFunc Func_; + FmgrInfo* FuncInfo_; + TBuilder Builder_; + NKikimr::NMiniKQL::TComputationContext& Ctx_; +}; + template <typename TTransFunc, bool IsTransStrict, typename TTransArgsPolicy, typename TCombineFunc, bool IsCombineStrict, typename TCombineArgsPolicy, bool HasSerialize, typename TSerializeFunc, typename TSerializeArgsPolicy, @@ -458,61 +581,91 @@ public: {} private: - template <bool HasFilter> - class TCombineAllAggregator : public NKikimr::NMiniKQL::TCombineAllTag::TBase { - public: - using TBase = NKikimr::NMiniKQL::TCombineAllTag::TBase; - TCombineAllAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns, + template <typename TAggregatorBase> + class TCombineAggregatorBase: public TAggregatorBase { + protected: + TCombineAggregatorBase(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns, std::optional<ui32> filterColumn, const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx) - : TBase(sizeof(NullableDatum), std::optional<ui32>(), ctx) - , TransFunc(transFunc) - , SerializeFunc(serializeFunc) - , ArgsColumns(argsColumns) - , FilterColumn(filterColumn) - , AggDesc(aggDesc) + : TAggregatorBase(sizeof(NullableDatum), filterColumn, ctx) + , TransFunc_(transFunc) + , SerializeFunc_(serializeFunc) + , ArgsColumns_(argsColumns) + , AggDesc_(aggDesc) { if (!HasInitValue && IsTransStrict) { - Y_ENSURE(AggDesc.ArgTypes.size() == 1); + Y_ENSURE(AggDesc_.ArgTypes.size() == 1); } - Y_ENSURE(HasFilter == FilterColumn.has_value()); - - const auto& transDesc = NPg::LookupProc(AggDesc.TransFuncId); + const auto& transDesc = NPg::LookupProc(AggDesc_.TransFuncId); for (ui32 i = 1; i < transDesc.ArgTypes.size(); ++i) { - IsFixedArg.push_back(NPg::LookupType(transDesc.ArgTypes[i]).PassByValue); + IsFixedArg_.push_back(NPg::LookupType(transDesc.ArgTypes[i]).PassByValue); } - Zero(TransFuncInfo); - fmgr_info(AggDesc.TransFuncId, &TransFuncInfo); - Y_ENSURE(TransFuncInfo.fn_addr); - auto nargs = NPg::LookupProc(AggDesc.TransFuncId).ArgTypes.size(); + Zero(TransFuncInfo_); + fmgr_info(AggDesc_.TransFuncId, &TransFuncInfo_); + Y_ENSURE(TransFuncInfo_.fn_addr); + auto nargs = NPg::LookupProc(AggDesc_.TransFuncId).ArgTypes.size(); if constexpr (HasSerialize) { - Zero(SerializeFuncInfo); - fmgr_info(AggDesc.SerializeFuncId, &SerializeFuncInfo); - Y_ENSURE(SerializeFuncInfo.fn_addr); + Zero(SerializeFuncInfo_); + fmgr_info(AggDesc_.SerializeFuncId, &SerializeFuncInfo_); + Y_ENSURE(SerializeFuncInfo_.fn_addr); } + if constexpr (HasInitValue) { - Zero(InFuncInfo); - const auto& transTypeDesc = NPg::LookupType(AggDesc.TransTypeId); + Zero(InFuncInfo_); + const auto& transTypeDesc = NPg::LookupType(AggDesc_.TransTypeId); auto inFuncId = transTypeDesc.InFuncId; if (transTypeDesc.TypeId == transTypeDesc.ArrayTypeId) { inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId; } - TypeIOParam = MakeTypeIOParam(transTypeDesc); - fmgr_info(inFuncId, &InFuncInfo); - Y_ENSURE(InFuncInfo.fn_addr); + TypeIOParam_ = MakeTypeIOParam(transTypeDesc); + fmgr_info(inFuncId, &InFuncInfo_); + Y_ENSURE(InFuncInfo_.fn_addr); + + WithPgTry(this->AggDesc_.Name, [&]() { + LOCAL_FCINFO(inCallInfo, 3); + inCallInfo->flinfo = &this->InFuncInfo_; + inCallInfo->nargs = 3; + inCallInfo->fncollation = DEFAULT_COLLATION_OID; + inCallInfo->isnull = false; + inCallInfo->args[0] = { (Datum)this->AggDesc_.InitValue.c_str(), false }; + inCallInfo->args[1] = { ObjectIdGetDatum(this->TypeIOParam_), false }; + inCallInfo->args[2] = { Int32GetDatum(-1), false }; + + auto state = this->InFuncInfo_.fn_addr(inCallInfo); + Y_ENSURE(!inCallInfo->isnull); + PreparedInitValue_ = AnyDatumToPod(state, IsTransTypeFixed); + }); } } + const TTransFunc TransFunc_; + const TSerializeFunc SerializeFunc_; + const std::vector<ui32> ArgsColumns_; + const NPg::TAggregateDesc& AggDesc_; + std::vector<bool> IsFixedArg_; + FmgrInfo TransFuncInfo_; + FmgrInfo SerializeFuncInfo_; + FmgrInfo InFuncInfo_; + ui32 TypeIOParam_ = 0; + NKikimr::NUdf::TUnboxedValue PreparedInitValue_; + }; + + template <bool HasFilter> + class TCombineAllAggregator : public TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineAllTag::TBase> { + public: + using TBase = TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineAllTag::TBase>; + TCombineAllAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns, + std::optional<ui32> filterColumn, const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx) + : TBase(transFunc, serializeFunc, argsColumns, filterColumn, aggDesc, ctx) + { + Y_ENSURE(HasFilter == filterColumn.has_value()); + } + private: void DestroyState(void* state) noexcept final { - auto typedState = (NullableDatum*)state; - if constexpr (!IsTransTypeFixed) { - if (!typedState->isnull) { - pfree((void*)typedState->value); - } - } + Y_UNUSED(state); } void InitState(void* state) final { @@ -521,29 +674,17 @@ private: typedState->isnull = true; typedState->value = 0; if constexpr (HasInitValue) { - WithPgTry(AggDesc.Name, [&]() { - LOCAL_FCINFO(inCallInfo, 3); - inCallInfo->flinfo = &InFuncInfo; - inCallInfo->nargs = 3; - inCallInfo->fncollation = DEFAULT_COLLATION_OID; - inCallInfo->isnull = false; - inCallInfo->args[0] = { (Datum)AggDesc.InitValue.c_str(), false }; - inCallInfo->args[1] = { ObjectIdGetDatum(TypeIOParam), false }; - inCallInfo->args[2] = { Int32GetDatum(-1), false }; - - auto state = InFuncInfo.fn_addr(inCallInfo); - Y_ENSURE(!inCallInfo->isnull); - typedState->value = state; - typedState->isnull = false; - }); + auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_); + typedState->isnull = false; + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID); } } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { auto typedState = (NullableDatum*)state; std::vector<arrow::Datum> values; - values.reserve(ArgsColumns.size()); - for (auto col : ArgsColumns) { + values.reserve(this->ArgsColumns_.size()); + for (auto col : this->ArgsColumns_) { values.push_back(NKikimr::NMiniKQL::TArrowBlock::From(columns[col]).GetDatum()); } @@ -564,13 +705,13 @@ private: const ui8* filterBitmap = nullptr; if constexpr(HasFilter) { - const auto& filterDatum = NKikimr::NMiniKQL::TArrowBlock::From(columns[*FilterColumn]).GetDatum(); + const auto& filterDatum = NKikimr::NMiniKQL::TArrowBlock::From(columns[*this->FilterColumn_]).GetDatum(); const auto& filterArray = filterDatum.array(); Y_ENSURE(filterArray->GetNullCount() == 0); filterBitmap = filterArray->template GetValues<uint8_t>(1); } - WithPgTry(AggDesc.Name, [&]() { + WithPgTry(this->AggDesc_.Name, [&]() { if (hasNulls) { if (hasScalars) { AddManyImpl<true, true>(typedState, values, batchLength, filterBitmap); @@ -590,8 +731,13 @@ private: template <bool HasNulls, bool HasScalars> void AddManyImpl(NullableDatum* typedState, const std::vector<arrow::Datum>& values, ui64 batchLength, const ui8* filterBitmap) { LOCAL_FCINFO(transCallInfo, FUNC_MAX_ARGS); - transCallInfo->flinfo = &TransFuncInfo; - transCallInfo->nargs = 1; + transCallInfo->flinfo = &this->TransFuncInfo_; + if constexpr (!TTransArgsPolicy::VarArgs) { + transCallInfo->nargs = TTransArgsPolicy::IsFixedArg.size(); + } else { + transCallInfo->nargs = 1 + values.size(); + } + transCallInfo->fncollation = DEFAULT_COLLATION_OID; transCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; @@ -627,7 +773,7 @@ private: if (v) { d.value = (Datum)inputArgsAccessor.FixedArrays[j][i]; } else { - d.value = (Datum)(inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]); + d.value = (Datum)(sizeof(void*) + inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]); } } @@ -644,13 +790,13 @@ private: for (size_t j = 0; j < values.size(); ++j) { NullableDatum d; if (HasScalars && values[j].is_scalar()) { - if (IsFixedArg[j]) { + if (this->IsFixedArg_[j]) { FillScalarItem<HasNulls, true>(*values[j].scalar(), d); } else { FillScalarItem<HasNulls, false>(*values[j].scalar(), d); } } else { - if (IsFixedArg[j]) { + if (this->IsFixedArg_[j]) { FillArrayItem<HasNulls, true>(*values[j].array(), i, d); } else { FillArrayItem<HasNulls, false>(*values[j].array(), i, d); @@ -673,13 +819,14 @@ private: } transCallInfo->isnull = false; - ret = TransFunc(transCallInfo); + ret = this->TransFunc_(transCallInfo); transCallInfo->args[0].value = ret; transCallInfo->args[0].isnull = transCallInfo->isnull; SkipCall:; } - SaveNullableDatum<IsTransTypeFixed>(transCallInfo->args[0], *typedState, AggDesc.TransTypeId == CSTRINGOID); + CopyState<IsTransTypeFixed>(transCallInfo->args[0], *typedState); + SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); } NUdf::TUnboxedValue FinishOne(const void* state) final { @@ -690,16 +837,16 @@ SkipCall:; if constexpr (HasSerialize) { NUdf::TUnboxedValue ret; - WithPgTry(AggDesc.Name, [&]() { + WithPgTry(this->AggDesc_.Name, [&]() { LOCAL_FCINFO(serializeCallInfo, 1); - serializeCallInfo->flinfo = &SerializeFuncInfo; + serializeCallInfo->flinfo = &this->SerializeFuncInfo_; serializeCallInfo->nargs = 1; serializeCallInfo->fncollation = DEFAULT_COLLATION_OID; serializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; serializeCallInfo->isnull = false; serializeCallInfo->args[0].isnull = false; serializeCallInfo->args[0].value = typedState->value; - auto ser = SerializeFunc(serializeCallInfo); + auto ser = this->SerializeFunc_(serializeCallInfo); Y_ENSURE(!serializeCallInfo->isnull); if constexpr (IsSerializedTypeFixed) { ret = ScalarDatumToPod(ser); @@ -724,18 +871,319 @@ SkipCall:; } } } + }; - const TTransFunc TransFunc; - const TSerializeFunc SerializeFunc; - const std::vector<ui32> ArgsColumns; - const std::optional<ui32> FilterColumn; - const NPg::TAggregateDesc& AggDesc; - std::vector<bool> IsFixedArg; - bool IsTransTypeCString; - FmgrInfo TransFuncInfo; - FmgrInfo SerializeFuncInfo; - FmgrInfo InFuncInfo; - ui32 TypeIOParam = 0; + class TCombineKeysAggregator : public TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineKeysTag::TBase> { + public: + using TBase = TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineKeysTag::TBase>; + TCombineKeysAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns, + const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx) + : TBase(transFunc, serializeFunc, argsColumns, std::optional<ui32>(), aggDesc, ctx) + , SerializedType_(HasSerialize ? NPg::LookupProc(this->AggDesc_.SerializeFuncId).ResultType : this->AggDesc_.TransTypeId) + { + Values_.reserve(this->IsFixedArg_.size()); + } + + void DestroyState(void* state) noexcept final { + Y_UNUSED(state); + } + + void PrepareBatch(ui64 batchNum, const NKikimr::NUdf::TUnboxedValue* columns) { + Values_.clear(); + for (auto col : this->ArgsColumns_) { + Values_.push_back(NKikimr::NMiniKQL::TArrowBlock::From(columns[col]).GetDatum()); + } + + InputArgsAccessor_.Bind(Values_, 1); + BatchNum_ = batchNum; + } + + void InitKey(void* state, ui64 batchNum, const NKikimr::NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) NullableDatum(); + auto typedState = (NullableDatum*)state; + typedState->isnull = true; + typedState->value = 0; + if constexpr (HasInitValue) { + auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_); + typedState->isnull = false; + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID); + } + + UpdateKey(state, batchNum, columns, row); + } + + void UpdateKey(void* state, ui64 batchNum, const NKikimr::NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = (NullableDatum*)state; + if (batchNum != BatchNum_) { + PrepareBatch(batchNum, columns); + } + + LOCAL_FCINFO(transCallInfo, FUNC_MAX_ARGS); + transCallInfo->flinfo = &this->TransFuncInfo_; + if constexpr (!TTransArgsPolicy::VarArgs) { + transCallInfo->nargs = TTransArgsPolicy::IsFixedArg.size(); + } else { + transCallInfo->nargs = 1 + Values_.size(); + } + + transCallInfo->fncollation = DEFAULT_COLLATION_OID; + transCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + + transCallInfo->args[0] = *typedState; + + Datum ret; + if constexpr (!TTransArgsPolicy::VarArgs) { + if (!constexpr_for_tuple([&](auto const& j, auto const& v) { + if (j == 0) { + return true; + } + + NullableDatum d; + if (v) { + d = GetInputValue<true>(InputArgsAccessor_, j, row); + } else { + d = GetInputValue<false>(InputArgsAccessor_, j, row); + } + + if (IsTransStrict && d.isnull) { + return false; + } + + transCallInfo->args[j] = d; + return true; + }, TTransArgsPolicy::IsFixedArg)) { + return; + } + } else { + for (size_t j = 0; j < Values_.size(); ++j) { + NullableDatum d; + if (this->IsFixedArg_[j]) { + d = GetInputValueSlow<true>(Values_, j, row); + } else { + d = GetInputValueSlow<false>(Values_, j, row); + } + + if (IsTransStrict && d.isnull) { + return; + } + + transCallInfo->args[1 + j] = d; + } + } + + if (!HasInitValue && IsTransStrict) { + if (transCallInfo->args[0].isnull) { + typedState->isnull = false; + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(transCallInfo->args[1].value, this->AggDesc_.TransTypeId == CSTRINGOID); + return; + } + } + + transCallInfo->isnull = false; + WithPgTry(this->AggDesc_.Name, [&]() { + ret = this->TransFunc_(transCallInfo); + }); + + CopyState<IsTransTypeFixed>({ret, transCallInfo->isnull}, *typedState); + SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); + } + + std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeStateBuilder(ui64 size) final { + if constexpr (IsSerializedTypeFixed) { + return std::make_unique<TAggColumnBuilder<true, false, HasSerialize, TSerializeFunc, true, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>( + this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::uint64(), this->Ctx_); + } else { + if (SerializedType_ == CSTRINGOID) { + return std::make_unique<TAggColumnBuilder<false, true, HasSerialize, TSerializeFunc, true, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true, NYql::NUdf::EPgStringType::CString>>>( + this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::binary(), this->Ctx_); + } else { + return std::make_unique<TAggColumnBuilder<false, false, HasSerialize, TSerializeFunc, true, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true, NYql::NUdf::EPgStringType::Text>>>( + this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::binary(), this->Ctx_); + } + } + } + + const ui32 SerializedType_; + ui64 BatchNum_ = Max<ui64>(); + std::vector<arrow::Datum> Values_; + TInputArgsAccessor<TTransArgsPolicy> InputArgsAccessor_; + }; + + class TFinalizeKeysAggregator : public NKikimr::NMiniKQL::TFinalizeKeysTag::TBase { + public: + using TBase = NKikimr::NMiniKQL::TFinalizeKeysTag::TBase; + TFinalizeKeysAggregator(TDeserializeFunc deserializeFunc, TCombineFunc combineFunc, TFinalFunc finalFunc, + ui32 stateColumn, const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx) + : TBase(sizeof(NullableDatum), std::optional<ui32>(), ctx) + , DeserializeFunc_(deserializeFunc) + , CombineFunc_(combineFunc) + , FinalFunc_(finalFunc) + , StateColumn_(stateColumn) + , AggDesc_(aggDesc) + , SerializedType_(HasSerialize ? NPg::LookupProc(this->AggDesc_.SerializeFuncId).ResultType : this->AggDesc_.TransTypeId) + , FinalType_(HasFinal ? NPg::LookupProc(this->AggDesc_.FinalFuncId).ResultType : this->AggDesc_.TransTypeId) + { + Values_.reserve(1); + } + + void DestroyState(void* state) noexcept final { + Y_UNUSED(state); + } + + void PrepareBatch(ui64 batchNum, const NUdf::TUnboxedValue* columns) { + Values_.clear(); + Values_.push_back(NKikimr::NMiniKQL::TArrowBlock::From(columns[StateColumn_]).GetDatum()); + if constexpr (HasDeserialize) { + DeserializeAccessor_.Bind(Values_, 0); + } else { + CombineAccessor_.Bind(Values_, 1); + } + + BatchNum_ = batchNum; + } + + void Deserialize(Datum ser, NullableDatum& result) { + WithPgTry(this->AggDesc_.Name, [&]() { + LOCAL_FCINFO(deserializeCallInfo, 1); + deserializeCallInfo->flinfo = &this->DeserializeFuncInfo_; + deserializeCallInfo->nargs = 1; + deserializeCallInfo->fncollation = DEFAULT_COLLATION_OID; + deserializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + deserializeCallInfo->isnull = false; + deserializeCallInfo->args[0].isnull = false; + deserializeCallInfo->args[0].value = ser; + result.value = this->DeserializeFunc_(deserializeCallInfo); + result.isnull = deserializeCallInfo->isnull; + }); + } + + void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) NullableDatum(); + auto typedState = (NullableDatum*)state; + typedState->isnull = true; + typedState->value = 0; + if (BatchNum_ != batchNum) { + PrepareBatch(batchNum, columns); + } + + NullableDatum d; + if constexpr (HasDeserialize && !TDeserializeArgsPolicy::VarArgs) { + d = GetInputValue<IsSerializedTypeFixed>(DeserializeAccessor_, 0, row); + } else if constexpr (!HasDeserialize && !TCombineArgsPolicy::VarArgs) { + d = GetInputValue<IsSerializedTypeFixed>(CombineAccessor_, 1, row); + } else { + d = GetInputValueSlow<IsSerializedTypeFixed>(Values_, 0, row); + } + + if (d.isnull) { + return; + } + + if constexpr (!HasDeserialize) { + typedState->isnull = false; + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); + } else { + Deserialize(d.value, *typedState); + } + + SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); + } + + void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = (NullableDatum*)state; + if (BatchNum_ != batchNum) { + PrepareBatch(batchNum, columns); + } + + NullableDatum d; + if constexpr (HasDeserialize && !TDeserializeArgsPolicy::VarArgs) { + d = GetInputValue<IsSerializedTypeFixed>(DeserializeAccessor_, 0, row); + } else if constexpr (!HasDeserialize && !TCombineArgsPolicy::VarArgs) { + d = GetInputValue<IsSerializedTypeFixed>(CombineAccessor_, 1, row); + } else { + d = GetInputValueSlow<IsSerializedTypeFixed>(Values_, 0, row); + } + + if (IsCombineStrict && d.isnull) { + return; + } + + NullableDatum deser; + if (d.isnull) { + deser.isnull = true; + deser.value = 0; + } else { + if constexpr (!HasDeserialize) { + if (IsCombineStrict && typedState->isnull) { + typedState->isnull = false; + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); + return; + } + + deser = d; + } else { + Deserialize(d.value, deser); + if (IsCombineStrict && typedState->isnull) { + *typedState = deser; + return; + } + } + } + + WithPgTry(this->AggDesc_.Name, [&]() { + LOCAL_FCINFO(combineCallInfo, 2); + combineCallInfo->flinfo = &this->CombineFuncInfo_; + combineCallInfo->nargs = 2; + combineCallInfo->fncollation = DEFAULT_COLLATION_OID; + combineCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + combineCallInfo->isnull = false; + combineCallInfo->args[0] = *typedState; + combineCallInfo->args[1] = deser; + auto ret = this->CombineFunc_(combineCallInfo); + if constexpr (!HasDeserialize) { + if (!combineCallInfo->isnull && ret == d.value) { + typedState->isnull = false; + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); + return; + } + } + + CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState); + }); + + + SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); + } + + std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeResultBuilder(ui64 size) final { + if constexpr (IsFinalTypeFixed) { + return std::make_unique<TAggColumnBuilder<true, false, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>( + this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::uint64(), this->Ctx_); + } else { + if (FinalType_ == CSTRINGOID) { + return std::make_unique<TAggColumnBuilder<false, true, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true>>>( + this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::binary(), this->Ctx_); + } else { + return std::make_unique<TAggColumnBuilder<false, false, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true>>>( + this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::binary(), this->Ctx_); + } + } + } + + const TDeserializeFunc DeserializeFunc_; + const TCombineFunc CombineFunc_; + const TFinalFunc FinalFunc_; + const ui32 StateColumn_; + const NPg::TAggregateDesc& AggDesc_; + const ui32 SerializedType_; + const ui32 FinalType_; + ui64 BatchNum_ = Max<ui64>(); + std::vector<arrow::Datum> Values_; + TInputArgsAccessor<TDeserializeArgsPolicy> DeserializeAccessor_; + TInputArgsAccessor<TCombineArgsPolicy> CombineAccessor_; + FmgrInfo DeserializeFuncInfo_; + FmgrInfo CombineFuncInfo_; + FmgrInfo FinalFuncInfo_; }; class TPreparedCombineAllAggregator : public NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineAll>{ @@ -766,6 +1214,52 @@ SkipCall:; const NPg::TAggregateDesc& AggDesc; }; + class TPreparedCombineKeysAggregator : public NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineKeys>{ + public: + TPreparedCombineKeysAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns, + const NPg::TAggregateDesc& aggDesc) + : IPreparedBlockAggregator(sizeof(NullableDatum)) + , TransFunc(transFunc) + , SerializeFunc(serializeFunc) + , ArgsColumns(argsColumns) + , AggDesc(aggDesc) + {} + + private: + std::unique_ptr<NKikimr::NMiniKQL::IBlockAggregatorCombineKeys> Make(NKikimr::NMiniKQL::TComputationContext& ctx) const { + return std::make_unique<TCombineKeysAggregator>(TransFunc, SerializeFunc, ArgsColumns, AggDesc, ctx); + } + + const TTransFunc TransFunc; + const TSerializeFunc SerializeFunc; + const std::vector<ui32> ArgsColumns; + const NPg::TAggregateDesc& AggDesc; + }; + + class TPreparedFinalizeKeysAggregator : public NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorFinalizeKeys>{ + public: + TPreparedFinalizeKeysAggregator(TDeserializeFunc deserializeFunc, TCombineFunc combineFunc, TFinalFunc finalFunc, ui32 stateColumn, + const NPg::TAggregateDesc& aggDesc) + : IPreparedBlockAggregator(sizeof(NullableDatum)) + , DeserializeFunc(deserializeFunc) + , CombineFunc(combineFunc) + , FinalFunc(finalFunc) + , StateColumn(stateColumn) + , AggDesc(aggDesc) + {} + + private: + std::unique_ptr<NKikimr::NMiniKQL::IBlockAggregatorFinalizeKeys> Make(NKikimr::NMiniKQL::TComputationContext& ctx) const { + return std::make_unique<TFinalizeKeysAggregator>(DeserializeFunc, CombineFunc, FinalFunc, StateColumn, AggDesc, ctx); + } + + const TDeserializeFunc DeserializeFunc; + const TCombineFunc CombineFunc; + const TFinalFunc FinalFunc; + const ui32 StateColumn; + const NPg::TAggregateDesc& AggDesc; + }; + public: std::unique_ptr<NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineAll>> PrepareCombineAll( std::optional<ui32> filterColumn, @@ -777,13 +1271,13 @@ public: std::unique_ptr<NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineKeys>> PrepareCombineKeys( const std::vector<ui32>& argsColumns, const NPg::TAggregateDesc& aggDesc) { - ythrow yexception() << "Not implemented"; + return std::make_unique<TPreparedCombineKeysAggregator>(TransFunc, SerializeFunc, argsColumns, aggDesc); } std::unique_ptr<NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys( ui32 stateColumn, const NPg::TAggregateDesc& aggDesc) { - ythrow yexception() << "Not implemented"; + return std::make_unique<TPreparedFinalizeKeysAggregator>(DeserializeFunc, CombineFunc, FinalFunc, stateColumn, aggDesc); } private: diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 5cb4fc78d65..931da4dc1fc 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -1510,7 +1510,7 @@ struct TFromPgExec { case BYTEAOID: case CSTRINGOID: { NUdf::TStringBlockReader<arrow::BinaryType, true> reader; - NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), SourceId == BYTEAOID ? arrow::binary() : arrow::utf8(), *ctx->memory_pool(), length); for (size_t i = 0; i < length; ++i) { auto item = reader.GetItem(array, i); if (!item) { @@ -1519,12 +1519,12 @@ struct TFromPgExec { } ui32 len; - const char* ptr = item.AsStringRef().Data(); + const char* ptr = item.AsStringRef().Data() + sizeof(void*); if (SourceId == CSTRINGOID) { - len = strlen(item.AsStringRef().Data()); + len = strlen(ptr); } else { - len = GetCleanVarSize((const text*)item.AsStringRef().Data()); - Y_ENSURE(len + VARHDRSZ == item.AsStringRef().Size()); + len = GetCleanVarSize((const text*)ptr); + Y_ENSURE(len + VARHDRSZ + sizeof(void*) == item.AsStringRef().Size()); ptr += VARHDRSZ; } @@ -1639,7 +1639,7 @@ struct TToPgExec { case BYTEAOID: case CSTRINGOID: { NUdf::TStringBlockReader<arrow::BinaryType, true> reader; - NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length); std::vector<char> tmp; for (size_t i = 0; i < length; ++i) { auto item = reader.GetItem(array, i); @@ -1650,7 +1650,7 @@ struct TToPgExec { ui32 len; if (TargetId == CSTRINGOID) { - len = 1 + item.AsStringRef().Size(); + len = sizeof(void*) + 1 + item.AsStringRef().Size(); if (Y_UNLIKELY(len < item.AsStringRef().Size())) { ythrow yexception() << "Too long string"; } @@ -1660,10 +1660,11 @@ struct TToPgExec { } tmp.resize(len); - memcpy(tmp.data(), item.AsStringRef().Data(), len - 1); + NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*)); + memcpy(tmp.data() + sizeof(void*), item.AsStringRef().Data(), len - 1 - sizeof(void*)); tmp[len - 1] = 0; } else { - len = VARHDRSZ + item.AsStringRef().Size(); + len = sizeof(void*) + VARHDRSZ + item.AsStringRef().Size(); if (Y_UNLIKELY(len < item.AsStringRef().Size())) { ythrow yexception() << "Too long string"; } @@ -1673,8 +1674,9 @@ struct TToPgExec { } tmp.resize(len); - memcpy(tmp.data() + VARHDRSZ, item.AsStringRef().Data(), len - VARHDRSZ); - UpdateCleanVarSize((text*)tmp.data(), item.AsStringRef().Size()); + NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*)); + memcpy(tmp.data() + sizeof(void*) + VARHDRSZ, item.AsStringRef().Data(), len - VARHDRSZ); + UpdateCleanVarSize((text*)(tmp.data() + sizeof(void*)), item.AsStringRef().Size()); } builder.Add(NUdf::TBlockItem(NUdf::TStringRef(tmp.data(), len))); @@ -2561,8 +2563,9 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf: size = strlen(ptr) + 1; } - std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(size, &pool))); - std::memcpy(buffer->mutable_data(), ptr, size); + std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(size + sizeof(void*), &pool))); + NUdf::ZeroMemoryContext(buffer->mutable_data() + sizeof(void*)); + std::memcpy(buffer->mutable_data() + sizeof(void*), ptr, size); return arrow::Datum(std::make_shared<arrow::BinaryScalar>(buffer)); } } diff --git a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp index 5c8c680fe2d..1d9388de199 100644 --- a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp +++ b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp @@ -134,8 +134,8 @@ Y_UNIT_TEST_SUITE(TPgCodegen) { builder.Add(NUdf::TBlockItem{}); } else { auto s = item.AsStringRef(); - size_t len = s.Size() - VARHDRSZ; - const char* ptr = s.Data() + VARHDRSZ; + size_t len = s.Size() - VARHDRSZ - sizeof(void*); + const char* ptr = s.Data() + VARHDRSZ + sizeof(void*); builder.Add(NUdf::TBlockItem{NUdf::TStringRef(ptr, len)}); } } @@ -206,8 +206,9 @@ Y_UNIT_TEST_SUITE(TPgCodegen) { arrow::BinaryBuilder builder; ARROW_OK(builder.Reserve(N)); for (size_t i = 0; i < N; ++i) { - std::string s(VARHDRSZ + 500, 'A' + i % 26); - auto t = (text*)s.data(); + std::string s(sizeof(void*) + VARHDRSZ + 500, 'A' + i % 26); + NUdf::ZeroMemoryContext(s.data() + sizeof(void*)); + auto t = (text*)(s.data() + sizeof(void*)); SET_VARSIZE(t, VARHDRSZ + 500); builder.Append(s); } diff --git a/ydb/library/yql/parser/pg_wrapper/utils.h b/ydb/library/yql/parser/pg_wrapper/utils.h index 66de4e890ae..dd26d558ab3 100644 --- a/ydb/library/yql/parser/pg_wrapper/utils.h +++ b/ydb/library/yql/parser/pg_wrapper/utils.h @@ -78,7 +78,7 @@ inline Datum PointerDatumFromPod(const NKikimr::NUdf::TUnboxedValuePod& value) { } inline Datum PointerDatumFromItem(const NKikimr::NUdf::TBlockItem& value) { - return (Datum)value.AsStringRef().Data(); + return (Datum)(value.AsStringRef().Data() + sizeof(void*)); } inline ui32 GetFullVarSize(const text* s) { diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index 49865bcf5fd..acbeff59905 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -506,10 +506,18 @@ public: if constexpr (PgString == EPgStringType::CString) { static_assert(Nullable); - DoAdd(TBlockItem(PgBuilder->AsCStringBuffer(value))); + auto buf = PgBuilder->AsCStringBuffer(value); + auto prevCtx = GetMemoryContext(buf.Data()); + ZeroMemoryContext((char*)buf.Data()); + DoAdd(TBlockItem(TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*)))); + SetMemoryContext((char*)buf.Data(), prevCtx); } else if constexpr (PgString == EPgStringType::Text) { static_assert(Nullable); - DoAdd(TBlockItem(PgBuilder->AsTextBuffer(value))); + auto buf = PgBuilder->AsTextBuffer(value); + auto prevCtx = GetMemoryContext(buf.Data()); + ZeroMemoryContext((char*)buf.Data()); + DoAdd(TBlockItem(TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*)))); + SetMemoryContext((char*)buf.Data(), prevCtx); } else { DoAdd(TBlockItem(value.AsStringRef())); } diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h index ba7fd7de2ee..f4ce026149e 100644 --- a/ydb/library/yql/public/udf/arrow/util.h +++ b/ydb/library/yql/public/udf/arrow/util.h @@ -125,5 +125,17 @@ private: size_t Len = 0; }; +inline void* GetMemoryContext(const void* ptr) { + return *(void**)((char*)ptr - sizeof(void*)); +} + +inline void SetMemoryContext(void* ptr, void* ctx) { + *(void**)((char*)ptr - sizeof(void*)) = ctx; +} + +inline void ZeroMemoryContext(void* ptr) { + SetMemoryContext(ptr, nullptr); +} + } } |