aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-06-01 20:30:53 +0300
committervvvv <vvvv@ydb.tech>2023-06-01 20:30:53 +0300
commit65144ea2f27d134ddba08749527839776f379501 (patch)
tree521cfe77df1fc757f85b258eb20d73eadebe7dda
parentf10160e4b83003ae269ced1b69fc9fbad1ff32c2 (diff)
downloadydb-65144ea2f27d134ddba08749527839776f379501.tar.gz
Block PG aggregations over keys, tuned layout of palloc-ed datums in arrow Scalar/Array
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp11
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp16
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp21
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h8
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp42
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp14
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp28
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp10
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.cpp2
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.h708
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp29
-rw-r--r--ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp9
-rw-r--r--ydb/library/yql/parser/pg_wrapper/utils.h2
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h12
-rw-r--r--ydb/library/yql/public/udf/arrow/util.h12
15 files changed, 735 insertions, 189 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 46fc98e3284..17ee8b5d51c 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -2900,7 +2900,16 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
.Callable("ShuffleByKeys")
.Add(0, AggList)
.Add(1, keySelector)
- .Add(2, lambdaStream)
+ .Lambda(2)
+ .Param("stream")
+ .Apply(GetContextLambda())
+ .With(0)
+ .Apply(lambdaStream)
+ .With(0, "stream")
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal()
.Seal()
.Build();
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index 1b49d64d9cd..1992d6e602e 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -677,6 +677,7 @@ public:
if (result == EFetchResult::Yield) {
return result;
} else if (result == EFetchResult::One) {
+ ++s.BatchNum_;
ui64 batchLength = GetBatchLength(s.Values_.data());
if (!batchLength) {
continue;
@@ -824,6 +825,7 @@ private:
using TSetImpl = THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>;
using TFixedMapImpl = TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>;
+ ui64 BatchNum_ = 0;
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
TVector<std::unique_ptr<TAggregator>> Aggs_;
@@ -969,16 +971,16 @@ private:
for (auto i : Streams_[currentStreamIndex]) {
if (output[Keys_.size() + i]) {
- s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.UnwrappedValues_.data(), row);
+ s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.BatchNum_, s.UnwrappedValues_.data(), row);
}
}
} else {
for (size_t i = 0; i < s.Aggs_.size(); ++i) {
if (output[Keys_.size() + i]) {
if constexpr (Finalize) {
- s.Aggs_[i]->LoadState(ptr, s.Values_.data(), row);
+ s.Aggs_[i]->LoadState(ptr, s.BatchNum_, s.Values_.data(), row);
} else {
- s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row);
+ s.Aggs_[i]->InitKey(ptr, s.BatchNum_, s.Values_.data(), row);
}
}
@@ -1009,9 +1011,9 @@ private:
for (auto i : Streams_[currentStreamIndex]) {
if (output[Keys_.size() + i]) {
if (isNewStream) {
- s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.UnwrappedValues_.data(), row);
+ s.Aggs_[i]->LoadState(ptr + s.AggStateOffsets_[i], s.BatchNum_, s.UnwrappedValues_.data(), row);
} else {
- s.Aggs_[i]->UpdateState(ptr + s.AggStateOffsets_[i], s.UnwrappedValues_.data(), row);
+ s.Aggs_[i]->UpdateState(ptr + s.AggStateOffsets_[i], s.BatchNum_, s.UnwrappedValues_.data(), row);
}
}
}
@@ -1019,9 +1021,9 @@ private:
for (size_t i = 0; i < s.Aggs_.size(); ++i) {
if (output[Keys_.size() + i]) {
if constexpr (Finalize) {
- s.Aggs_[i]->UpdateState(ptr, s.Values_.data(), row);
+ s.Aggs_[i]->UpdateState(ptr, s.BatchNum_, s.Values_.data(), row);
} else {
- s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row);
+ s.Aggs_[i]->UpdateKey(ptr, s.BatchNum_, s.Values_.data(), row);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
index 20e43e80190..6a5d358daca 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
@@ -92,9 +92,9 @@ public:
Y_UNUSED(argColumn);
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TState();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -102,7 +102,8 @@ public:
Y_UNUSED(state);
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
Y_UNUSED(columns);
Y_UNUSED(row);
auto typedState = static_cast<TState*>(state);
@@ -125,9 +126,9 @@ public:
{
}
- void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TState();
- UpdateState(state, columns, row);
+ UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -135,7 +136,8 @@ public:
Y_UNUSED(state);
}
- void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
@@ -234,9 +236,9 @@ public:
{
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TState();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -244,7 +246,8 @@ public:
Y_UNUSED(state);
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
index 6a3b284440f..09768e9c66e 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
@@ -44,9 +44,9 @@ public:
class IBlockAggregatorCombineKeys : public IBlockAggregatorBase {
public:
- virtual void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+ virtual void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
- virtual void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+ virtual void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
virtual std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) = 0;
@@ -57,9 +57,9 @@ public:
class IBlockAggregatorFinalizeKeys : public IBlockAggregatorBase {
public:
- virtual void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+ virtual void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
- virtual void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+ virtual void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
virtual std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) = 0;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index 6c34a3fc6b6..923236b999b 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -298,9 +298,9 @@ public:
{
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TGenericState();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -309,7 +309,8 @@ public:
*typedState = TGenericState();
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TGenericState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
@@ -342,9 +343,9 @@ public:
{
}
- void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TGenericState();
- UpdateState(state, columns, row);
+ UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -353,7 +354,8 @@ public:
*typedState = TGenericState();
}
- void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TGenericState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
@@ -534,9 +536,9 @@ public:
{
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TGenericState();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -545,7 +547,8 @@ public:
*typedState = TGenericState();
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TGenericState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<TStringType, IsMin>(typedState, datum, row);
@@ -572,9 +575,9 @@ public:
{
}
- void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TGenericState();
- UpdateState(state, columns, row);
+ UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -583,7 +586,8 @@ public:
*typedState = TGenericState();
}
- void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TGenericState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<TStringType, IsMin>(typedState, datum, row);
@@ -758,9 +762,9 @@ public:
{
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TStateType();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -768,7 +772,8 @@ public:
Y_UNUSED(state);
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TStateType*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState, datum, row);
@@ -797,9 +802,9 @@ public:
{
}
- void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TStateType();
- UpdateState(state, columns, row);
+ UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -807,7 +812,8 @@ public:
Y_UNUSED(state);
}
- void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TStateType*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState, datum, row);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
index 005b79219e5..58cedbfaeb1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
@@ -138,9 +138,9 @@ public:
{
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TGenericState();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -149,7 +149,8 @@ public:
*typedState = TGenericState();
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TGenericState*>(state);
if (*typedState) {
return;
@@ -184,9 +185,9 @@ public:
{
}
- void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TGenericState();
- UpdateState(state, columns, row);
+ UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -195,7 +196,8 @@ public:
*typedState = TGenericState();
}
- void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TGenericState*>(state);
if (*typedState) {
return;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
index 67941c14933..cf499b45dc4 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -284,9 +284,9 @@ public:
{
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TStateType();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -294,7 +294,8 @@ public:
Y_UNUSED(state);
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TStateType*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState, datum, row);
@@ -322,9 +323,9 @@ public:
{
}
- void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TStateType();
- UpdateState(state, columns, row);
+ UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -332,7 +333,8 @@ public:
Y_UNUSED(state);
}
- void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TStateType*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState, datum, row);
@@ -464,9 +466,9 @@ public:
{
}
- void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TAvgState();
- UpdateKey(state, columns, row);
+ UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -474,7 +476,8 @@ public:
Y_UNUSED(state);
}
- void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TAvgState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
@@ -517,9 +520,9 @@ public:
{
}
- void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TAvgState();
- UpdateState(state, columns, row);
+ UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
@@ -527,7 +530,8 @@ public:
Y_UNUSED(state);
}
- void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(batchNum);
auto typedState = static_cast<TAvgState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
index 90b1c8047f1..e1960766f88 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
@@ -56,9 +56,9 @@ public:
}
if constexpr (PgString == NUdf::EPgStringType::CString) {
- return PgBuilder->MakeCString(item.AsStringRef().Data()).Release();
+ return PgBuilder->MakeCString(item.AsStringRef().Data() + sizeof(void*)).Release();
} else if constexpr (PgString == NUdf::EPgStringType::Text) {
- return PgBuilder->MakeText(item.AsStringRef().Data()).Release();
+ return PgBuilder->MakeText(item.AsStringRef().Data()+ sizeof(void*)).Release();
} else {
return MakeString(item.AsStringRef());
}
@@ -72,9 +72,11 @@ public:
}
if constexpr (PgString == NUdf::EPgStringType::CString) {
- return TBlockItem(PgBuilder->AsCStringBuffer(value));
+ auto buf = PgBuilder->AsCStringBuffer(value);
+ return TBlockItem(NYql::NUdf::TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*)));
} else if constexpr (PgString == NUdf::EPgStringType::Text) {
- return TBlockItem(PgBuilder->AsTextBuffer(value));
+ auto buf = PgBuilder->AsTextBuffer(value);
+ return TBlockItem(NYql::NUdf::TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*)));
} else {
return TBlockItem(value.AsStringRef());
}
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.cpp b/ydb/library/yql/parser/pg_wrapper/arrow.cpp
index 1b318585465..e8494512235 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.cpp
@@ -48,7 +48,7 @@ const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMin
if (returnType) {
MKQL_ENSURE(argsColumns.size() == 1, "Expected one column");
TType* stateType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType();
- TType* returnItemType = AS_TYPE(TBlockType, returnType);
+ TType* returnItemType = AS_TYPE(TBlockType, returnType)->GetItemType();
return NPg::LookupAggregation(name, AS_TYPE(TPgType, stateType)->GetTypeId(), AS_TYPE(TPgType, returnItemType)->GetTypeId());
} else {
TVector<ui32> argTypeIds;
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h
index 1ef5710cf22..52bdce5c70f 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.h
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.h
@@ -54,33 +54,51 @@ struct TPgIndirectFunc {
};
template <bool IsFixed>
-void SaveNullableDatum(const NullableDatum& from, NullableDatum& to, bool isCString) {
- bool wasNull = to.isnull;
- to.isnull = from.isnull;
- if (!to.isnull) {
- if constexpr (IsFixed) {
- to.value = from.value;
- } else {
- if (!wasNull) {
- if (to.value == from.value) {
- return;
- }
-
- pfree((void*)to.value);
- }
+Datum CloneDatumToAggContext(Datum src, bool isCString) {
+ if constexpr (IsFixed) {
+ return src;
+ } else {
+ Y_ENSURE(NKikimr::NMiniKQL::TlsAllocState->CurrentContext);
+ auto length = isCString ? 1 + strlen((const char*)src) : GetFullVarSize((const text*)src);
+ auto ret = (Datum)palloc(length);
+ memcpy((void*)ret, (const void*)src, length);
+ return ret;
+ }
+}
- auto length = isCString ? strlen((const char*)from.value) : GetFullVarSize((const text*)from.value);
- to.value = (Datum)palloc(length);
- memcpy((void*)to.value, (const void*)from.value, length);
- }
+template <bool IsFixed>
+void CopyState(NullableDatum src, NullableDatum& dst) {
+ if constexpr (IsFixed) {
+ dst = src;
} else {
- if (!wasNull) {
- if constexpr (!IsFixed) {
- pfree((void*)to.value);
- to.value = 0;
- }
+ if (src.isnull == dst.isnull && src.value == dst.value) {
+ return;
}
+
+ if (!dst.isnull) {
+ pfree((void*)dst.value);
+ }
+
+ dst = src;
+ }
+}
+
+template <bool IsFixed>
+void SaveToAggContext(NullableDatum& d, bool isCString) {
+ if constexpr (IsFixed) {
+ return;
}
+
+ if (d.isnull) {
+ return;
+ }
+
+ // arrow Scalars/Arrays have null memory context
+ if (NUdf::GetMemoryContext((void*)d.value)) {
+ return;
+ }
+
+ d.value = CloneDatumToAggContext<false>(d.value, isCString);
}
template <typename TArgsPolicy>
@@ -110,7 +128,7 @@ struct TInputArgsAccessor {
Scalars[j].value = (Datum)*static_cast<const ui64*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data());
} else {
auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
- Scalars[j].value = (Datum)buffer->data();
+ Scalars[j].value = (Datum)(buffer->data() + sizeof(void*));
}
}
} else {
@@ -153,7 +171,7 @@ void FillScalarItem(const arrow::Scalar& scalar, NullableDatum& d) {
d.isnull = true;
} else {
d.isnull = false;
- d.value = (Datum)item.AsStringRef().Data();
+ d.value = (Datum)(item.AsStringRef().Data() + sizeof(void*));
}
}
}
@@ -176,7 +194,7 @@ void FillArrayItem(const arrow::ArrayData& array, size_t i, NullableDatum& d) {
d.isnull = true;
} else {
d.isnull = false;
- d.value = (Datum)item.AsStringRef().Data();
+ d.value = (Datum)(item.AsStringRef().Data() + sizeof(void*));
}
}
}
@@ -272,7 +290,7 @@ struct TGenericExec {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<true, true>(batch, length, state, builder);
} else {
- NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<true, true>(batch, length, state, builder);
}
} else {
@@ -280,7 +298,7 @@ struct TGenericExec {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<true, false>(batch, length, state, builder);
} else {
- NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<true, false>(batch, length, state, builder);
}
}
@@ -362,7 +380,7 @@ struct TGenericExec {
if (v) {
d.value = (Datum)inputArgsAccessor.FixedArrays[j][i];
} else {
- d.value = (Datum)(inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]);
+ d.value = (Datum)(sizeof(void*) + inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]);
}
}
@@ -419,9 +437,10 @@ struct TGenericExec {
if (fcinfo->isnull) {
builder.Add(NUdf::TBlockItem{});
} else {
- auto ptr = (const char*)ret;
+ auto ptr = (char*)ret;
auto len = state.IsCStringResult ? 1 + strlen(ptr) : VARHDRSZ + VARSIZE((const text*)ptr);
- builder.Add(NUdf::TBlockItem(NUdf::TStringRef(ptr, len)));
+ NUdf::ZeroMemoryContext(ptr);
+ builder.Add(NUdf::TBlockItem(NUdf::TStringRef(ptr - sizeof(void*), len + sizeof(void*))));
}
}
SkipCall:;
@@ -440,6 +459,110 @@ TExecFunc MakeIndirectExec(PGFunction pgFunc) {
return TGenericExec<TPgIndirectFunc, IsStrict, IsFixedResult>(TPgIndirectFunc(pgFunc));
}
+template <bool IsFixed, typename TArgsPolicy>
+NullableDatum GetInputValue(const TInputArgsAccessor<TArgsPolicy>& accessor, ui32 j, ui64 row) {
+ static_assert(!TArgsPolicy::VarArgs);
+ if (accessor.IsScalar[j]) {
+ return accessor.Scalars[j];
+ } else {
+ NullableDatum d;
+ ui64 fullIndex = (row + accessor.Offsets[j]) & accessor.ValidOffsetMask[j];
+ d.isnull = ((accessor.ValidMasks[j][fullIndex >> 3] >> (fullIndex & 0x07)) & 1) == 0;
+
+ if constexpr (IsFixed) {
+ d.value = (Datum)accessor.FixedArrays[j][row];
+ } else {
+ d.value = (Datum)(sizeof(void*) + accessor.StringOffsetsArrays[j][row] + accessor.StringDataArrays[j]);
+ }
+
+ return d;
+ }
+}
+
+template <bool IsFixed>
+NullableDatum GetInputValueSlow(const std::vector<arrow::Datum>& values, ui32 j, ui64 row) {
+ NullableDatum d;
+ if (values[j].is_scalar()) {
+ if constexpr (IsFixed) {
+ FillScalarItem<true, true>(*values[j].scalar(), d);
+ } else {
+ FillScalarItem<true, false>(*values[j].scalar(), d);
+ }
+ } else {
+ if constexpr (IsFixed) {
+ FillArrayItem<true, true>(*values[j].array(), row, d);
+ } else {
+ FillArrayItem<true, false>(*values[j].array(), row, d);
+ }
+ }
+
+ return d;
+}
+
+template <bool IsFixed, bool IsCString, bool HasFunc, typename TFunc, bool IsStrict, typename TBuilder>
+class TAggColumnBuilder : public NKikimr::NMiniKQL::IAggColumnBuilder {
+public:
+ TAggColumnBuilder(const TString& name, TFunc func, ui64 size, FmgrInfo* funcInfo, const std::shared_ptr<arrow::DataType>& dataType,
+ NKikimr::NMiniKQL::TComputationContext& ctx)
+ : Name_(name)
+ , Func_(func)
+ , FuncInfo_(funcInfo)
+ , Builder_(NKikimr::NMiniKQL::TTypeInfoHelper(), dataType, ctx.ArrowMemoryPool, size)
+ , Ctx_(ctx)
+ {
+ }
+
+ void Add(const void* state) final {
+ auto typedState = (NullableDatum*)state;
+ auto ret = *typedState;
+ if constexpr (HasFunc) {
+ if (!IsStrict || !typedState->isnull) {
+ WithPgTry(Name_, [&]() {
+ LOCAL_FCINFO(callInfo, 1);
+ callInfo->flinfo = FuncInfo_;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+ callInfo->isnull = false;
+ callInfo->args[0].isnull = typedState->isnull;
+ callInfo->args[0].value = typedState->value;
+ ret.value = Func_(callInfo);
+ ret.isnull = callInfo->isnull;
+ });
+ }
+ }
+
+ if (ret.isnull) {
+ Builder_.Add(NYql::NUdf::TBlockItem());
+ } else {
+ if constexpr (IsFixed) {
+ Builder_.Add(NYql::NUdf::TBlockItem(ui64(ret.value)));
+ } else if constexpr (IsCString) {
+ auto ptr = (char*)ret.value;
+ ui32 len = 1 + strlen(ptr);
+ NUdf::ZeroMemoryContext(ptr);
+ Builder_.Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef(ptr - sizeof(void*), len + sizeof(void*))));
+ } else {
+ auto ptr = (char*)ret.value;
+ ui32 len = GetFullVarSize((const text*)ptr);
+ NUdf::ZeroMemoryContext(ptr);
+ Builder_.Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef(ptr - sizeof(void*), len + sizeof(void*))));
+ }
+ }
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true));
+ }
+
+private:
+ const TString Name_;
+ const TFunc Func_;
+ FmgrInfo* FuncInfo_;
+ TBuilder Builder_;
+ NKikimr::NMiniKQL::TComputationContext& Ctx_;
+};
+
template <typename TTransFunc, bool IsTransStrict, typename TTransArgsPolicy,
typename TCombineFunc, bool IsCombineStrict, typename TCombineArgsPolicy,
bool HasSerialize, typename TSerializeFunc, typename TSerializeArgsPolicy,
@@ -458,61 +581,91 @@ public:
{}
private:
- template <bool HasFilter>
- class TCombineAllAggregator : public NKikimr::NMiniKQL::TCombineAllTag::TBase {
- public:
- using TBase = NKikimr::NMiniKQL::TCombineAllTag::TBase;
- TCombineAllAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns,
+ template <typename TAggregatorBase>
+ class TCombineAggregatorBase: public TAggregatorBase {
+ protected:
+ TCombineAggregatorBase(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns,
std::optional<ui32> filterColumn, const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx)
- : TBase(sizeof(NullableDatum), std::optional<ui32>(), ctx)
- , TransFunc(transFunc)
- , SerializeFunc(serializeFunc)
- , ArgsColumns(argsColumns)
- , FilterColumn(filterColumn)
- , AggDesc(aggDesc)
+ : TAggregatorBase(sizeof(NullableDatum), filterColumn, ctx)
+ , TransFunc_(transFunc)
+ , SerializeFunc_(serializeFunc)
+ , ArgsColumns_(argsColumns)
+ , AggDesc_(aggDesc)
{
if (!HasInitValue && IsTransStrict) {
- Y_ENSURE(AggDesc.ArgTypes.size() == 1);
+ Y_ENSURE(AggDesc_.ArgTypes.size() == 1);
}
- Y_ENSURE(HasFilter == FilterColumn.has_value());
-
- const auto& transDesc = NPg::LookupProc(AggDesc.TransFuncId);
+ const auto& transDesc = NPg::LookupProc(AggDesc_.TransFuncId);
for (ui32 i = 1; i < transDesc.ArgTypes.size(); ++i) {
- IsFixedArg.push_back(NPg::LookupType(transDesc.ArgTypes[i]).PassByValue);
+ IsFixedArg_.push_back(NPg::LookupType(transDesc.ArgTypes[i]).PassByValue);
}
- Zero(TransFuncInfo);
- fmgr_info(AggDesc.TransFuncId, &TransFuncInfo);
- Y_ENSURE(TransFuncInfo.fn_addr);
- auto nargs = NPg::LookupProc(AggDesc.TransFuncId).ArgTypes.size();
+ Zero(TransFuncInfo_);
+ fmgr_info(AggDesc_.TransFuncId, &TransFuncInfo_);
+ Y_ENSURE(TransFuncInfo_.fn_addr);
+ auto nargs = NPg::LookupProc(AggDesc_.TransFuncId).ArgTypes.size();
if constexpr (HasSerialize) {
- Zero(SerializeFuncInfo);
- fmgr_info(AggDesc.SerializeFuncId, &SerializeFuncInfo);
- Y_ENSURE(SerializeFuncInfo.fn_addr);
+ Zero(SerializeFuncInfo_);
+ fmgr_info(AggDesc_.SerializeFuncId, &SerializeFuncInfo_);
+ Y_ENSURE(SerializeFuncInfo_.fn_addr);
}
+
if constexpr (HasInitValue) {
- Zero(InFuncInfo);
- const auto& transTypeDesc = NPg::LookupType(AggDesc.TransTypeId);
+ Zero(InFuncInfo_);
+ const auto& transTypeDesc = NPg::LookupType(AggDesc_.TransTypeId);
auto inFuncId = transTypeDesc.InFuncId;
if (transTypeDesc.TypeId == transTypeDesc.ArrayTypeId) {
inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId;
}
- TypeIOParam = MakeTypeIOParam(transTypeDesc);
- fmgr_info(inFuncId, &InFuncInfo);
- Y_ENSURE(InFuncInfo.fn_addr);
+ TypeIOParam_ = MakeTypeIOParam(transTypeDesc);
+ fmgr_info(inFuncId, &InFuncInfo_);
+ Y_ENSURE(InFuncInfo_.fn_addr);
+
+ WithPgTry(this->AggDesc_.Name, [&]() {
+ LOCAL_FCINFO(inCallInfo, 3);
+ inCallInfo->flinfo = &this->InFuncInfo_;
+ inCallInfo->nargs = 3;
+ inCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ inCallInfo->isnull = false;
+ inCallInfo->args[0] = { (Datum)this->AggDesc_.InitValue.c_str(), false };
+ inCallInfo->args[1] = { ObjectIdGetDatum(this->TypeIOParam_), false };
+ inCallInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto state = this->InFuncInfo_.fn_addr(inCallInfo);
+ Y_ENSURE(!inCallInfo->isnull);
+ PreparedInitValue_ = AnyDatumToPod(state, IsTransTypeFixed);
+ });
}
}
+ const TTransFunc TransFunc_;
+ const TSerializeFunc SerializeFunc_;
+ const std::vector<ui32> ArgsColumns_;
+ const NPg::TAggregateDesc& AggDesc_;
+ std::vector<bool> IsFixedArg_;
+ FmgrInfo TransFuncInfo_;
+ FmgrInfo SerializeFuncInfo_;
+ FmgrInfo InFuncInfo_;
+ ui32 TypeIOParam_ = 0;
+ NKikimr::NUdf::TUnboxedValue PreparedInitValue_;
+ };
+
+ template <bool HasFilter>
+ class TCombineAllAggregator : public TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineAllTag::TBase> {
+ public:
+ using TBase = TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineAllTag::TBase>;
+ TCombineAllAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns,
+ std::optional<ui32> filterColumn, const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx)
+ : TBase(transFunc, serializeFunc, argsColumns, filterColumn, aggDesc, ctx)
+ {
+ Y_ENSURE(HasFilter == filterColumn.has_value());
+ }
+
private:
void DestroyState(void* state) noexcept final {
- auto typedState = (NullableDatum*)state;
- if constexpr (!IsTransTypeFixed) {
- if (!typedState->isnull) {
- pfree((void*)typedState->value);
- }
- }
+ Y_UNUSED(state);
}
void InitState(void* state) final {
@@ -521,29 +674,17 @@ private:
typedState->isnull = true;
typedState->value = 0;
if constexpr (HasInitValue) {
- WithPgTry(AggDesc.Name, [&]() {
- LOCAL_FCINFO(inCallInfo, 3);
- inCallInfo->flinfo = &InFuncInfo;
- inCallInfo->nargs = 3;
- inCallInfo->fncollation = DEFAULT_COLLATION_OID;
- inCallInfo->isnull = false;
- inCallInfo->args[0] = { (Datum)AggDesc.InitValue.c_str(), false };
- inCallInfo->args[1] = { ObjectIdGetDatum(TypeIOParam), false };
- inCallInfo->args[2] = { Int32GetDatum(-1), false };
-
- auto state = InFuncInfo.fn_addr(inCallInfo);
- Y_ENSURE(!inCallInfo->isnull);
- typedState->value = state;
- typedState->isnull = false;
- });
+ auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_);
+ typedState->isnull = false;
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID);
}
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
auto typedState = (NullableDatum*)state;
std::vector<arrow::Datum> values;
- values.reserve(ArgsColumns.size());
- for (auto col : ArgsColumns) {
+ values.reserve(this->ArgsColumns_.size());
+ for (auto col : this->ArgsColumns_) {
values.push_back(NKikimr::NMiniKQL::TArrowBlock::From(columns[col]).GetDatum());
}
@@ -564,13 +705,13 @@ private:
const ui8* filterBitmap = nullptr;
if constexpr(HasFilter) {
- const auto& filterDatum = NKikimr::NMiniKQL::TArrowBlock::From(columns[*FilterColumn]).GetDatum();
+ const auto& filterDatum = NKikimr::NMiniKQL::TArrowBlock::From(columns[*this->FilterColumn_]).GetDatum();
const auto& filterArray = filterDatum.array();
Y_ENSURE(filterArray->GetNullCount() == 0);
filterBitmap = filterArray->template GetValues<uint8_t>(1);
}
- WithPgTry(AggDesc.Name, [&]() {
+ WithPgTry(this->AggDesc_.Name, [&]() {
if (hasNulls) {
if (hasScalars) {
AddManyImpl<true, true>(typedState, values, batchLength, filterBitmap);
@@ -590,8 +731,13 @@ private:
template <bool HasNulls, bool HasScalars>
void AddManyImpl(NullableDatum* typedState, const std::vector<arrow::Datum>& values, ui64 batchLength, const ui8* filterBitmap) {
LOCAL_FCINFO(transCallInfo, FUNC_MAX_ARGS);
- transCallInfo->flinfo = &TransFuncInfo;
- transCallInfo->nargs = 1;
+ transCallInfo->flinfo = &this->TransFuncInfo_;
+ if constexpr (!TTransArgsPolicy::VarArgs) {
+ transCallInfo->nargs = TTransArgsPolicy::IsFixedArg.size();
+ } else {
+ transCallInfo->nargs = 1 + values.size();
+ }
+
transCallInfo->fncollation = DEFAULT_COLLATION_OID;
transCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
@@ -627,7 +773,7 @@ private:
if (v) {
d.value = (Datum)inputArgsAccessor.FixedArrays[j][i];
} else {
- d.value = (Datum)(inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]);
+ d.value = (Datum)(sizeof(void*) + inputArgsAccessor.StringOffsetsArrays[j][i] + inputArgsAccessor.StringDataArrays[j]);
}
}
@@ -644,13 +790,13 @@ private:
for (size_t j = 0; j < values.size(); ++j) {
NullableDatum d;
if (HasScalars && values[j].is_scalar()) {
- if (IsFixedArg[j]) {
+ if (this->IsFixedArg_[j]) {
FillScalarItem<HasNulls, true>(*values[j].scalar(), d);
} else {
FillScalarItem<HasNulls, false>(*values[j].scalar(), d);
}
} else {
- if (IsFixedArg[j]) {
+ if (this->IsFixedArg_[j]) {
FillArrayItem<HasNulls, true>(*values[j].array(), i, d);
} else {
FillArrayItem<HasNulls, false>(*values[j].array(), i, d);
@@ -673,13 +819,14 @@ private:
}
transCallInfo->isnull = false;
- ret = TransFunc(transCallInfo);
+ ret = this->TransFunc_(transCallInfo);
transCallInfo->args[0].value = ret;
transCallInfo->args[0].isnull = transCallInfo->isnull;
SkipCall:;
}
- SaveNullableDatum<IsTransTypeFixed>(transCallInfo->args[0], *typedState, AggDesc.TransTypeId == CSTRINGOID);
+ CopyState<IsTransTypeFixed>(transCallInfo->args[0], *typedState);
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
@@ -690,16 +837,16 @@ SkipCall:;
if constexpr (HasSerialize) {
NUdf::TUnboxedValue ret;
- WithPgTry(AggDesc.Name, [&]() {
+ WithPgTry(this->AggDesc_.Name, [&]() {
LOCAL_FCINFO(serializeCallInfo, 1);
- serializeCallInfo->flinfo = &SerializeFuncInfo;
+ serializeCallInfo->flinfo = &this->SerializeFuncInfo_;
serializeCallInfo->nargs = 1;
serializeCallInfo->fncollation = DEFAULT_COLLATION_OID;
serializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
serializeCallInfo->isnull = false;
serializeCallInfo->args[0].isnull = false;
serializeCallInfo->args[0].value = typedState->value;
- auto ser = SerializeFunc(serializeCallInfo);
+ auto ser = this->SerializeFunc_(serializeCallInfo);
Y_ENSURE(!serializeCallInfo->isnull);
if constexpr (IsSerializedTypeFixed) {
ret = ScalarDatumToPod(ser);
@@ -724,18 +871,319 @@ SkipCall:;
}
}
}
+ };
- const TTransFunc TransFunc;
- const TSerializeFunc SerializeFunc;
- const std::vector<ui32> ArgsColumns;
- const std::optional<ui32> FilterColumn;
- const NPg::TAggregateDesc& AggDesc;
- std::vector<bool> IsFixedArg;
- bool IsTransTypeCString;
- FmgrInfo TransFuncInfo;
- FmgrInfo SerializeFuncInfo;
- FmgrInfo InFuncInfo;
- ui32 TypeIOParam = 0;
+ class TCombineKeysAggregator : public TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineKeysTag::TBase> {
+ public:
+ using TBase = TCombineAggregatorBase<NKikimr::NMiniKQL::TCombineKeysTag::TBase>;
+ TCombineKeysAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns,
+ const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx)
+ : TBase(transFunc, serializeFunc, argsColumns, std::optional<ui32>(), aggDesc, ctx)
+ , SerializedType_(HasSerialize ? NPg::LookupProc(this->AggDesc_.SerializeFuncId).ResultType : this->AggDesc_.TransTypeId)
+ {
+ Values_.reserve(this->IsFixedArg_.size());
+ }
+
+ void DestroyState(void* state) noexcept final {
+ Y_UNUSED(state);
+ }
+
+ void PrepareBatch(ui64 batchNum, const NKikimr::NUdf::TUnboxedValue* columns) {
+ Values_.clear();
+ for (auto col : this->ArgsColumns_) {
+ Values_.push_back(NKikimr::NMiniKQL::TArrowBlock::From(columns[col]).GetDatum());
+ }
+
+ InputArgsAccessor_.Bind(Values_, 1);
+ BatchNum_ = batchNum;
+ }
+
+ void InitKey(void* state, ui64 batchNum, const NKikimr::NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) NullableDatum();
+ auto typedState = (NullableDatum*)state;
+ typedState->isnull = true;
+ typedState->value = 0;
+ if constexpr (HasInitValue) {
+ auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_);
+ typedState->isnull = false;
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID);
+ }
+
+ UpdateKey(state, batchNum, columns, row);
+ }
+
+ void UpdateKey(void* state, ui64 batchNum, const NKikimr::NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = (NullableDatum*)state;
+ if (batchNum != BatchNum_) {
+ PrepareBatch(batchNum, columns);
+ }
+
+ LOCAL_FCINFO(transCallInfo, FUNC_MAX_ARGS);
+ transCallInfo->flinfo = &this->TransFuncInfo_;
+ if constexpr (!TTransArgsPolicy::VarArgs) {
+ transCallInfo->nargs = TTransArgsPolicy::IsFixedArg.size();
+ } else {
+ transCallInfo->nargs = 1 + Values_.size();
+ }
+
+ transCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ transCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+
+ transCallInfo->args[0] = *typedState;
+
+ Datum ret;
+ if constexpr (!TTransArgsPolicy::VarArgs) {
+ if (!constexpr_for_tuple([&](auto const& j, auto const& v) {
+ if (j == 0) {
+ return true;
+ }
+
+ NullableDatum d;
+ if (v) {
+ d = GetInputValue<true>(InputArgsAccessor_, j, row);
+ } else {
+ d = GetInputValue<false>(InputArgsAccessor_, j, row);
+ }
+
+ if (IsTransStrict && d.isnull) {
+ return false;
+ }
+
+ transCallInfo->args[j] = d;
+ return true;
+ }, TTransArgsPolicy::IsFixedArg)) {
+ return;
+ }
+ } else {
+ for (size_t j = 0; j < Values_.size(); ++j) {
+ NullableDatum d;
+ if (this->IsFixedArg_[j]) {
+ d = GetInputValueSlow<true>(Values_, j, row);
+ } else {
+ d = GetInputValueSlow<false>(Values_, j, row);
+ }
+
+ if (IsTransStrict && d.isnull) {
+ return;
+ }
+
+ transCallInfo->args[1 + j] = d;
+ }
+ }
+
+ if (!HasInitValue && IsTransStrict) {
+ if (transCallInfo->args[0].isnull) {
+ typedState->isnull = false;
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(transCallInfo->args[1].value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ return;
+ }
+ }
+
+ transCallInfo->isnull = false;
+ WithPgTry(this->AggDesc_.Name, [&]() {
+ ret = this->TransFunc_(transCallInfo);
+ });
+
+ CopyState<IsTransTypeFixed>({ret, transCallInfo->isnull}, *typedState);
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
+ }
+
+ std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ if constexpr (IsSerializedTypeFixed) {
+ return std::make_unique<TAggColumnBuilder<true, false, HasSerialize, TSerializeFunc, true, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>(
+ this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::uint64(), this->Ctx_);
+ } else {
+ if (SerializedType_ == CSTRINGOID) {
+ return std::make_unique<TAggColumnBuilder<false, true, HasSerialize, TSerializeFunc, true, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true, NYql::NUdf::EPgStringType::CString>>>(
+ this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::binary(), this->Ctx_);
+ } else {
+ return std::make_unique<TAggColumnBuilder<false, false, HasSerialize, TSerializeFunc, true, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true, NYql::NUdf::EPgStringType::Text>>>(
+ this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::binary(), this->Ctx_);
+ }
+ }
+ }
+
+ const ui32 SerializedType_;
+ ui64 BatchNum_ = Max<ui64>();
+ std::vector<arrow::Datum> Values_;
+ TInputArgsAccessor<TTransArgsPolicy> InputArgsAccessor_;
+ };
+
+ class TFinalizeKeysAggregator : public NKikimr::NMiniKQL::TFinalizeKeysTag::TBase {
+ public:
+ using TBase = NKikimr::NMiniKQL::TFinalizeKeysTag::TBase;
+ TFinalizeKeysAggregator(TDeserializeFunc deserializeFunc, TCombineFunc combineFunc, TFinalFunc finalFunc,
+ ui32 stateColumn, const NPg::TAggregateDesc& aggDesc, NKikimr::NMiniKQL::TComputationContext& ctx)
+ : TBase(sizeof(NullableDatum), std::optional<ui32>(), ctx)
+ , DeserializeFunc_(deserializeFunc)
+ , CombineFunc_(combineFunc)
+ , FinalFunc_(finalFunc)
+ , StateColumn_(stateColumn)
+ , AggDesc_(aggDesc)
+ , SerializedType_(HasSerialize ? NPg::LookupProc(this->AggDesc_.SerializeFuncId).ResultType : this->AggDesc_.TransTypeId)
+ , FinalType_(HasFinal ? NPg::LookupProc(this->AggDesc_.FinalFuncId).ResultType : this->AggDesc_.TransTypeId)
+ {
+ Values_.reserve(1);
+ }
+
+ void DestroyState(void* state) noexcept final {
+ Y_UNUSED(state);
+ }
+
+ void PrepareBatch(ui64 batchNum, const NUdf::TUnboxedValue* columns) {
+ Values_.clear();
+ Values_.push_back(NKikimr::NMiniKQL::TArrowBlock::From(columns[StateColumn_]).GetDatum());
+ if constexpr (HasDeserialize) {
+ DeserializeAccessor_.Bind(Values_, 0);
+ } else {
+ CombineAccessor_.Bind(Values_, 1);
+ }
+
+ BatchNum_ = batchNum;
+ }
+
+ void Deserialize(Datum ser, NullableDatum& result) {
+ WithPgTry(this->AggDesc_.Name, [&]() {
+ LOCAL_FCINFO(deserializeCallInfo, 1);
+ deserializeCallInfo->flinfo = &this->DeserializeFuncInfo_;
+ deserializeCallInfo->nargs = 1;
+ deserializeCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ deserializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+ deserializeCallInfo->isnull = false;
+ deserializeCallInfo->args[0].isnull = false;
+ deserializeCallInfo->args[0].value = ser;
+ result.value = this->DeserializeFunc_(deserializeCallInfo);
+ result.isnull = deserializeCallInfo->isnull;
+ });
+ }
+
+ void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) NullableDatum();
+ auto typedState = (NullableDatum*)state;
+ typedState->isnull = true;
+ typedState->value = 0;
+ if (BatchNum_ != batchNum) {
+ PrepareBatch(batchNum, columns);
+ }
+
+ NullableDatum d;
+ if constexpr (HasDeserialize && !TDeserializeArgsPolicy::VarArgs) {
+ d = GetInputValue<IsSerializedTypeFixed>(DeserializeAccessor_, 0, row);
+ } else if constexpr (!HasDeserialize && !TCombineArgsPolicy::VarArgs) {
+ d = GetInputValue<IsSerializedTypeFixed>(CombineAccessor_, 1, row);
+ } else {
+ d = GetInputValueSlow<IsSerializedTypeFixed>(Values_, 0, row);
+ }
+
+ if (d.isnull) {
+ return;
+ }
+
+ if constexpr (!HasDeserialize) {
+ typedState->isnull = false;
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ } else {
+ Deserialize(d.value, *typedState);
+ }
+
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
+ }
+
+ void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = (NullableDatum*)state;
+ if (BatchNum_ != batchNum) {
+ PrepareBatch(batchNum, columns);
+ }
+
+ NullableDatum d;
+ if constexpr (HasDeserialize && !TDeserializeArgsPolicy::VarArgs) {
+ d = GetInputValue<IsSerializedTypeFixed>(DeserializeAccessor_, 0, row);
+ } else if constexpr (!HasDeserialize && !TCombineArgsPolicy::VarArgs) {
+ d = GetInputValue<IsSerializedTypeFixed>(CombineAccessor_, 1, row);
+ } else {
+ d = GetInputValueSlow<IsSerializedTypeFixed>(Values_, 0, row);
+ }
+
+ if (IsCombineStrict && d.isnull) {
+ return;
+ }
+
+ NullableDatum deser;
+ if (d.isnull) {
+ deser.isnull = true;
+ deser.value = 0;
+ } else {
+ if constexpr (!HasDeserialize) {
+ if (IsCombineStrict && typedState->isnull) {
+ typedState->isnull = false;
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ return;
+ }
+
+ deser = d;
+ } else {
+ Deserialize(d.value, deser);
+ if (IsCombineStrict && typedState->isnull) {
+ *typedState = deser;
+ return;
+ }
+ }
+ }
+
+ WithPgTry(this->AggDesc_.Name, [&]() {
+ LOCAL_FCINFO(combineCallInfo, 2);
+ combineCallInfo->flinfo = &this->CombineFuncInfo_;
+ combineCallInfo->nargs = 2;
+ combineCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ combineCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+ combineCallInfo->isnull = false;
+ combineCallInfo->args[0] = *typedState;
+ combineCallInfo->args[1] = deser;
+ auto ret = this->CombineFunc_(combineCallInfo);
+ if constexpr (!HasDeserialize) {
+ if (!combineCallInfo->isnull && ret == d.value) {
+ typedState->isnull = false;
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ return;
+ }
+ }
+
+ CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState);
+ });
+
+
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
+ }
+
+ std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ if constexpr (IsFinalTypeFixed) {
+ return std::make_unique<TAggColumnBuilder<true, false, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>(
+ this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::uint64(), this->Ctx_);
+ } else {
+ if (FinalType_ == CSTRINGOID) {
+ return std::make_unique<TAggColumnBuilder<false, true, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true>>>(
+ this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::binary(), this->Ctx_);
+ } else {
+ return std::make_unique<TAggColumnBuilder<false, false, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true>>>(
+ this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::binary(), this->Ctx_);
+ }
+ }
+ }
+
+ const TDeserializeFunc DeserializeFunc_;
+ const TCombineFunc CombineFunc_;
+ const TFinalFunc FinalFunc_;
+ const ui32 StateColumn_;
+ const NPg::TAggregateDesc& AggDesc_;
+ const ui32 SerializedType_;
+ const ui32 FinalType_;
+ ui64 BatchNum_ = Max<ui64>();
+ std::vector<arrow::Datum> Values_;
+ TInputArgsAccessor<TDeserializeArgsPolicy> DeserializeAccessor_;
+ TInputArgsAccessor<TCombineArgsPolicy> CombineAccessor_;
+ FmgrInfo DeserializeFuncInfo_;
+ FmgrInfo CombineFuncInfo_;
+ FmgrInfo FinalFuncInfo_;
};
class TPreparedCombineAllAggregator : public NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineAll>{
@@ -766,6 +1214,52 @@ SkipCall:;
const NPg::TAggregateDesc& AggDesc;
};
+ class TPreparedCombineKeysAggregator : public NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineKeys>{
+ public:
+ TPreparedCombineKeysAggregator(TTransFunc transFunc, TSerializeFunc serializeFunc, const std::vector<ui32>& argsColumns,
+ const NPg::TAggregateDesc& aggDesc)
+ : IPreparedBlockAggregator(sizeof(NullableDatum))
+ , TransFunc(transFunc)
+ , SerializeFunc(serializeFunc)
+ , ArgsColumns(argsColumns)
+ , AggDesc(aggDesc)
+ {}
+
+ private:
+ std::unique_ptr<NKikimr::NMiniKQL::IBlockAggregatorCombineKeys> Make(NKikimr::NMiniKQL::TComputationContext& ctx) const {
+ return std::make_unique<TCombineKeysAggregator>(TransFunc, SerializeFunc, ArgsColumns, AggDesc, ctx);
+ }
+
+ const TTransFunc TransFunc;
+ const TSerializeFunc SerializeFunc;
+ const std::vector<ui32> ArgsColumns;
+ const NPg::TAggregateDesc& AggDesc;
+ };
+
+ class TPreparedFinalizeKeysAggregator : public NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorFinalizeKeys>{
+ public:
+ TPreparedFinalizeKeysAggregator(TDeserializeFunc deserializeFunc, TCombineFunc combineFunc, TFinalFunc finalFunc, ui32 stateColumn,
+ const NPg::TAggregateDesc& aggDesc)
+ : IPreparedBlockAggregator(sizeof(NullableDatum))
+ , DeserializeFunc(deserializeFunc)
+ , CombineFunc(combineFunc)
+ , FinalFunc(finalFunc)
+ , StateColumn(stateColumn)
+ , AggDesc(aggDesc)
+ {}
+
+ private:
+ std::unique_ptr<NKikimr::NMiniKQL::IBlockAggregatorFinalizeKeys> Make(NKikimr::NMiniKQL::TComputationContext& ctx) const {
+ return std::make_unique<TFinalizeKeysAggregator>(DeserializeFunc, CombineFunc, FinalFunc, StateColumn, AggDesc, ctx);
+ }
+
+ const TDeserializeFunc DeserializeFunc;
+ const TCombineFunc CombineFunc;
+ const TFinalFunc FinalFunc;
+ const ui32 StateColumn;
+ const NPg::TAggregateDesc& AggDesc;
+ };
+
public:
std::unique_ptr<NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineAll>> PrepareCombineAll(
std::optional<ui32> filterColumn,
@@ -777,13 +1271,13 @@ public:
std::unique_ptr<NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorCombineKeys>> PrepareCombineKeys(
const std::vector<ui32>& argsColumns,
const NPg::TAggregateDesc& aggDesc) {
- ythrow yexception() << "Not implemented";
+ return std::make_unique<TPreparedCombineKeysAggregator>(TransFunc, SerializeFunc, argsColumns, aggDesc);
}
std::unique_ptr<NKikimr::NMiniKQL::IPreparedBlockAggregator<NKikimr::NMiniKQL::IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys(
ui32 stateColumn,
const NPg::TAggregateDesc& aggDesc) {
- ythrow yexception() << "Not implemented";
+ return std::make_unique<TPreparedFinalizeKeysAggregator>(DeserializeFunc, CombineFunc, FinalFunc, stateColumn, aggDesc);
}
private:
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 5cb4fc78d65..931da4dc1fc 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -1510,7 +1510,7 @@ struct TFromPgExec {
case BYTEAOID:
case CSTRINGOID: {
NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
- NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), SourceId == BYTEAOID ? arrow::binary() : arrow::utf8(), *ctx->memory_pool(), length);
for (size_t i = 0; i < length; ++i) {
auto item = reader.GetItem(array, i);
if (!item) {
@@ -1519,12 +1519,12 @@ struct TFromPgExec {
}
ui32 len;
- const char* ptr = item.AsStringRef().Data();
+ const char* ptr = item.AsStringRef().Data() + sizeof(void*);
if (SourceId == CSTRINGOID) {
- len = strlen(item.AsStringRef().Data());
+ len = strlen(ptr);
} else {
- len = GetCleanVarSize((const text*)item.AsStringRef().Data());
- Y_ENSURE(len + VARHDRSZ == item.AsStringRef().Size());
+ len = GetCleanVarSize((const text*)ptr);
+ Y_ENSURE(len + VARHDRSZ + sizeof(void*) == item.AsStringRef().Size());
ptr += VARHDRSZ;
}
@@ -1639,7 +1639,7 @@ struct TToPgExec {
case BYTEAOID:
case CSTRINGOID: {
NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
- NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
std::vector<char> tmp;
for (size_t i = 0; i < length; ++i) {
auto item = reader.GetItem(array, i);
@@ -1650,7 +1650,7 @@ struct TToPgExec {
ui32 len;
if (TargetId == CSTRINGOID) {
- len = 1 + item.AsStringRef().Size();
+ len = sizeof(void*) + 1 + item.AsStringRef().Size();
if (Y_UNLIKELY(len < item.AsStringRef().Size())) {
ythrow yexception() << "Too long string";
}
@@ -1660,10 +1660,11 @@ struct TToPgExec {
}
tmp.resize(len);
- memcpy(tmp.data(), item.AsStringRef().Data(), len - 1);
+ NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*));
+ memcpy(tmp.data() + sizeof(void*), item.AsStringRef().Data(), len - 1 - sizeof(void*));
tmp[len - 1] = 0;
} else {
- len = VARHDRSZ + item.AsStringRef().Size();
+ len = sizeof(void*) + VARHDRSZ + item.AsStringRef().Size();
if (Y_UNLIKELY(len < item.AsStringRef().Size())) {
ythrow yexception() << "Too long string";
}
@@ -1673,8 +1674,9 @@ struct TToPgExec {
}
tmp.resize(len);
- memcpy(tmp.data() + VARHDRSZ, item.AsStringRef().Data(), len - VARHDRSZ);
- UpdateCleanVarSize((text*)tmp.data(), item.AsStringRef().Size());
+ NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*));
+ memcpy(tmp.data() + sizeof(void*) + VARHDRSZ, item.AsStringRef().Data(), len - VARHDRSZ);
+ UpdateCleanVarSize((text*)(tmp.data() + sizeof(void*)), item.AsStringRef().Size());
}
builder.Add(NUdf::TBlockItem(NUdf::TStringRef(tmp.data(), len)));
@@ -2561,8 +2563,9 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf:
size = strlen(ptr) + 1;
}
- std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(size, &pool)));
- std::memcpy(buffer->mutable_data(), ptr, size);
+ std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(size + sizeof(void*), &pool)));
+ NUdf::ZeroMemoryContext(buffer->mutable_data() + sizeof(void*));
+ std::memcpy(buffer->mutable_data() + sizeof(void*), ptr, size);
return arrow::Datum(std::make_shared<arrow::BinaryScalar>(buffer));
}
}
diff --git a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
index 5c8c680fe2d..1d9388de199 100644
--- a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
@@ -134,8 +134,8 @@ Y_UNIT_TEST_SUITE(TPgCodegen) {
builder.Add(NUdf::TBlockItem{});
} else {
auto s = item.AsStringRef();
- size_t len = s.Size() - VARHDRSZ;
- const char* ptr = s.Data() + VARHDRSZ;
+ size_t len = s.Size() - VARHDRSZ - sizeof(void*);
+ const char* ptr = s.Data() + VARHDRSZ + sizeof(void*);
builder.Add(NUdf::TBlockItem{NUdf::TStringRef(ptr, len)});
}
}
@@ -206,8 +206,9 @@ Y_UNIT_TEST_SUITE(TPgCodegen) {
arrow::BinaryBuilder builder;
ARROW_OK(builder.Reserve(N));
for (size_t i = 0; i < N; ++i) {
- std::string s(VARHDRSZ + 500, 'A' + i % 26);
- auto t = (text*)s.data();
+ std::string s(sizeof(void*) + VARHDRSZ + 500, 'A' + i % 26);
+ NUdf::ZeroMemoryContext(s.data() + sizeof(void*));
+ auto t = (text*)(s.data() + sizeof(void*));
SET_VARSIZE(t, VARHDRSZ + 500);
builder.Append(s);
}
diff --git a/ydb/library/yql/parser/pg_wrapper/utils.h b/ydb/library/yql/parser/pg_wrapper/utils.h
index 66de4e890ae..dd26d558ab3 100644
--- a/ydb/library/yql/parser/pg_wrapper/utils.h
+++ b/ydb/library/yql/parser/pg_wrapper/utils.h
@@ -78,7 +78,7 @@ inline Datum PointerDatumFromPod(const NKikimr::NUdf::TUnboxedValuePod& value) {
}
inline Datum PointerDatumFromItem(const NKikimr::NUdf::TBlockItem& value) {
- return (Datum)value.AsStringRef().Data();
+ return (Datum)(value.AsStringRef().Data() + sizeof(void*));
}
inline ui32 GetFullVarSize(const text* s) {
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index 49865bcf5fd..acbeff59905 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -506,10 +506,18 @@ public:
if constexpr (PgString == EPgStringType::CString) {
static_assert(Nullable);
- DoAdd(TBlockItem(PgBuilder->AsCStringBuffer(value)));
+ auto buf = PgBuilder->AsCStringBuffer(value);
+ auto prevCtx = GetMemoryContext(buf.Data());
+ ZeroMemoryContext((char*)buf.Data());
+ DoAdd(TBlockItem(TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*))));
+ SetMemoryContext((char*)buf.Data(), prevCtx);
} else if constexpr (PgString == EPgStringType::Text) {
static_assert(Nullable);
- DoAdd(TBlockItem(PgBuilder->AsTextBuffer(value)));
+ auto buf = PgBuilder->AsTextBuffer(value);
+ auto prevCtx = GetMemoryContext(buf.Data());
+ ZeroMemoryContext((char*)buf.Data());
+ DoAdd(TBlockItem(TStringRef(buf.Data() - sizeof(void*), buf.Size() + sizeof(void*))));
+ SetMemoryContext((char*)buf.Data(), prevCtx);
} else {
DoAdd(TBlockItem(value.AsStringRef()));
}
diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h
index ba7fd7de2ee..f4ce026149e 100644
--- a/ydb/library/yql/public/udf/arrow/util.h
+++ b/ydb/library/yql/public/udf/arrow/util.h
@@ -125,5 +125,17 @@ private:
size_t Len = 0;
};
+inline void* GetMemoryContext(const void* ptr) {
+ return *(void**)((char*)ptr - sizeof(void*));
+}
+
+inline void SetMemoryContext(void* ptr, void* ctx) {
+ *(void**)((char*)ptr - sizeof(void*)) = ctx;
+}
+
+inline void ZeroMemoryContext(void* ptr) {
+ SetMemoryContext(ptr, nullptr);
+}
+
}
}