aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoratarasov5 <atarasov5@yandex-team.com>2025-07-10 15:30:05 +0300
committeratarasov5 <atarasov5@yandex-team.com>2025-07-10 15:51:42 +0300
commit5ce6e1b18f1023a06b261707a585835825e612f9 (patch)
treee60cbc5135a27d0b684931015e2fc1e8d588f908
parentfb7f6895a3d8244444609433b725d603dca23631 (diff)
downloadydb-5ce6e1b18f1023a06b261707a585835825e612f9.tar.gz
YQL-20102: Enable debug arrow validation
В данном PR включена поддержка валидации Datum'ов на соответствие `MKQL type <-> arrow type <-> arrow array data structure.` Выявленные проблемы: 1\. `AllocateResizableBuffer(size_t size)` возвращает массив длины 0 вместо size. Поэтому есть ошибка в работе с датами в некоторых нодах commit_hash:122f2bd114dec50993131391a3793d9540877cb4
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_container.cpp2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp18
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_func.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_if.cpp2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_just.cpp2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp4
-rw-r--r--yql/essentials/minikql/computation/mkql_block_impl.cpp18
-rw-r--r--yql/essentials/minikql/computation/mkql_block_impl.h13
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_holders.h2
-rw-r--r--yql/essentials/minikql/computation/mkql_datum_validate.cpp132
-rw-r--r--yql/essentials/minikql/computation/mkql_datum_validate.h19
-rw-r--r--yql/essentials/minikql/computation/ya.make1
-rw-r--r--yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp24
-rw-r--r--yql/essentials/parser/pg_wrapper/comp_factory.cpp6
-rw-r--r--yql/essentials/public/udf/arrow/util.h5
-rw-r--r--yql/essentials/public/udf/udf_validate.cpp11
-rw-r--r--yql/essentials/public/udf/udf_validate.h9
20 files changed, 231 insertions, 47 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp
index 50f5ebf151d..3977655154e 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp
@@ -271,7 +271,7 @@ IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeF
TVector<TType*> argsTypes = {firstType, secondType};
auto kernel = MakeBlockCoalesceKernel(argsTypes, secondType, needUnwrapFirst);
- return new TBlockFuncNode(ctx.Mutables, "Coalesce", std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), "Coalesce", std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
} // namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp
index 3dd4135e72c..ff7c6c68d14 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp
@@ -90,7 +90,7 @@ IComputationNode* WrapBlockAsContainer(TCallable& callable, const TComputationNo
}
auto kernel = MakeBlockAsContainerKernel(argsTypes, callable.GetType()->GetReturnType());
- return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
} // namespace NMiniKQL
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp
index 0d60f119714..225a2515529 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp
@@ -26,12 +26,12 @@ struct TDecimalBlockExec {
const U* GetScalarValue(const arrow::Scalar& scalar) const {
return reinterpret_cast<const U*>(GetPrimitiveScalarValuePtr(scalar));
}
-
+
template<>
const NYql::NDecimal::TInt128* GetScalarValue<NYql::NDecimal::TInt128>(const arrow::Scalar& scalar) const {
return reinterpret_cast<const NYql::NDecimal::TInt128*>(GetStringScalarValue(scalar).data());
}
-
+
void ArrayScalarCore(
const NYql::NDecimal::TInt128* val1Ptr,
const ui8* valid1,
@@ -103,7 +103,7 @@ struct TDecimalBlockExec {
}
arrow::Status ExecScalarScalar(arrow::compute::KernelContext* kernelCtx,
- const arrow::compute::ExecBatch& batch, arrow::Datum* res) const
+ const arrow::compute::ExecBatch& batch, arrow::Datum* res) const
{
MKQL_ENSURE(batch.values.size() == 2, "Expected 2 args");
const auto& arg1 = batch.values[0];
@@ -119,7 +119,7 @@ struct TDecimalBlockExec {
*mem = Apply(*val1Ptr, *val2Ptr);
*res = resDatum;
}
-
+
return arrow::Status::OK();
}
@@ -142,7 +142,7 @@ struct TDecimalBlockExec {
} else {
GetBitmap(resArr, 0).SetBitsTo(false);
}
-
+
return arrow::Status::OK();
}
@@ -266,12 +266,12 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockKernel(const TVector<TTyp
MKQL_ENSURE(precision >= 1&& precision <= 35, TStringBuilder() << "Wrong precision: " << (int)precision);
auto createKernel = [&](auto exec) {
- auto k = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
+ auto k = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
[exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
return exec->Exec(ctx, batch, res);
});
k->null_handling = arrow::compute::NullHandling::INTERSECTION;
- return k;
+ return k;
};
switch (dataType2->GetSchemeType()) {
@@ -283,7 +283,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockKernel(const TVector<TTyp
return createKernel(std::make_shared<TExec<type>>(precision, scale)); \
}
INTEGRAL_VALUE_TYPES(MAKE_PRIMITIVE_TYPE_MUL)
-#undef MAKE_PRIMITIVE_TYPE_MUL
+#undef MAKE_PRIMITIVE_TYPE_MUL
default:
Y_ABORT("Unupported type.");
}
@@ -305,7 +305,7 @@ IComputationNode* WrapBlockDecimal(TStringBuf name, TCallable& callable, const T
TVector<TType*> argsTypes = { firstType, secondType };
std::shared_ptr<arrow::compute::ScalarKernel> kernel = MakeBlockKernel<TExec>(argsTypes, callable.GetType()->GetReturnType());
- return new TBlockFuncNode(ctx.Mutables, name, std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), name, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp
index f49d3f6b56e..9636817cd73 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp
@@ -55,7 +55,7 @@ IComputationNode* WrapBlockExists(TCallable& callable, const TComputationNodeFac
TComputationNodePtrVector argsNodes = { compute };
TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() };
auto kernel = MakeBlockExistsKernel(argsTypes, callable.GetType()->GetReturnType());
- return new TBlockFuncNode(ctx.Mutables, "Exists", std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), "Exists", std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
} // namespace NMiniKQL
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp
index d94f0ff400a..c1eba56e5cf 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp
@@ -53,9 +53,9 @@ IComputationNode* WrapBlockFunc(TCallable& callable, const TComputationNodeFacto
const TKernel& kernel = ResolveKernel(*ctx.FunctionRegistry.GetBuiltins(), funcName, argsTypes, callableType->GetReturnType());
if (kernel.IsPolymorphic()) {
auto arrowKernel = kernel.MakeArrowKernel(argsTypes, callableType->GetReturnType());
- return new TBlockFuncNode(ctx.Mutables, funcName, std::move(argsNodes), argsTypes, *arrowKernel, arrowKernel, kernel.Family.FunctionOptions);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), funcName, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *arrowKernel, arrowKernel, kernel.Family.FunctionOptions);
} else {
- return new TBlockFuncNode(ctx.Mutables, funcName, std::move(argsNodes), argsTypes, kernel.GetArrowKernel(), {}, kernel.Family.FunctionOptions);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), funcName, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), kernel.GetArrowKernel(), {}, kernel.Family.FunctionOptions);
}
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp
index cb164d3185a..636a22eabee 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp
@@ -102,7 +102,7 @@ IComputationNode* WrapBlockGetElement(TCallable& callable, const TComputationNod
TComputationNodePtrVector argsNodes = { objectNode };
TVector<TType*> argsTypes = { blockType };
auto kernel = MakeBlockGetElementKernel(argsTypes, callable.GetType()->GetReturnType(), index, isOptional, needExternalOptional);
- return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
} // namespace
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp
index 4ec511fa51f..3701fbe7741 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp
@@ -248,7 +248,7 @@ IComputationNode* WrapBlockIf(TCallable& callable, const TComputationNodeFactory
kernel = MakeBlockIfKernel<false, false>(argsTypes, thenType);
}
- return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp
index d3ecee9bc66..4c4a1e8e27b 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp
@@ -81,7 +81,7 @@ IComputationNode* WrapBlockJust(TCallable& callable, const TComputationNodeFacto
kernel = MakeBlockJustKernel<true>(argsTypes, callable.GetType()->GetReturnType());
}
- return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp
index 968254a33c1..e8ac91d3d76 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp
@@ -529,7 +529,7 @@ IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, c
kernel = MakeKernel<TXorBlockExec>(argsTypes, callable.GetType()->GetReturnType());
}
- return new TBlockFuncNode(ctx.Mutables, name, std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), name, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
} // namespace
@@ -559,7 +559,7 @@ IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactor
TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() };
auto kernel = MakeKernel<TNotBlockExec>(argsTypes, argsTypes[0]);
- return new TBlockFuncNode(ctx.Mutables, "Not", std::move(argsNodes), argsTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), "Not", std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel);
}
diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp
index 91292a3df81..72b4294761c 100644
--- a/yql/essentials/minikql/computation/mkql_block_impl.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp
@@ -3,6 +3,7 @@
#include "mkql_block_reader.h"
#include <yql/essentials/minikql/arrow/mkql_functions.h>
+#include <yql/essentials/minikql/computation/mkql_datum_validate.h>
#include <yql/essentials/minikql/mkql_node_builder.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/arrow/arrow_util.h>
@@ -247,14 +248,16 @@ NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const
return holderFactory.CreateArrowBlock(arrow::Datum(count));
}
-TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
- const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
- std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder,
- const arrow::compute::FunctionOptions* functionOptions)
+TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, NYql::NUdf::EValidateDatumMode validateDatumMode, TStringBuf name, TComputationNodePtrVector&& argsNodes,
+ const TVector<TType*>& argsTypes, TType* outputType, const arrow::compute::ScalarKernel& kernel,
+ std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder,
+ const arrow::compute::FunctionOptions* functionOptions)
: TMutableComputationNode(mutables)
+ , ValidateDatumMode_(validateDatumMode)
, StateIndex_(mutables.CurValueIndex++)
, ArgsNodes_(std::move(argsNodes))
, ArgsValuesDescr_(ToValueDescr(argsTypes))
+ , OutValueDescr_(ToValueDescr(outputType))
, Kernel_(kernel)
, KernelHolder_(std::move(kernelHolder))
, Options_(functionOptions)
@@ -270,7 +273,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con
for (ui32 i = 0; i < ArgsNodes_.size(); ++i) {
const auto& value = ArgsNodes_[i]->GetValue(ctx);
argDatums.emplace_back(TArrowBlock::From(value).GetDatum());
- ARROW_DEBUG_CHECK_DATUM_TYPES(ArgsValuesDescr_[i], argDatums.back().descr());
+ ValidateDatum(argDatums.back(), ArgsValuesDescr_[i], ValidateDatumMode_);
}
if (ScalarOutput_) {
@@ -297,8 +300,9 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con
ForEachArrayData(output, [&](const auto& arr) { arrays.push_back(arr); });
}
-
- return ctx.HolderFactory.CreateArrowBlock(MakeArray(arrays));
+ auto resultArray = MakeArray(arrays);
+ ValidateDatum(resultArray, OutValueDescr_, ValidateDatumMode_);
+ return ctx.HolderFactory.CreateArrowBlock(std::move(resultArray));
}
diff --git a/yql/essentials/minikql/computation/mkql_block_impl.h b/yql/essentials/minikql/computation/mkql_block_impl.h
index 4bcdc9514fb..509ed62cb09 100644
--- a/yql/essentials/minikql/computation/mkql_block_impl.h
+++ b/yql/essentials/minikql/computation/mkql_block_impl.h
@@ -29,13 +29,12 @@ arrow::compute::OutputType ConvertToOutputType(TType* output);
NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const uint64_t count);
-class TBlockFuncNode : public TMutableComputationNode<TBlockFuncNode> {
-
+class TBlockFuncNode: public TMutableComputationNode<TBlockFuncNode> {
public:
- TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
- const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
- std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {},
- const arrow::compute::FunctionOptions* functionOptions = nullptr);
+ TBlockFuncNode(TComputationMutables& mutables, NYql::NUdf::EValidateDatumMode validateDatumMode, TStringBuf name, TComputationNodePtrVector&& argsNodes,
+ const TVector<TType*>& argsTypes, TType* outputType, const arrow::compute::ScalarKernel& kernel,
+ std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {},
+ const arrow::compute::FunctionOptions* functionOptions = nullptr);
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const;
private:
@@ -79,9 +78,11 @@ private:
std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final;
private:
+ NYql::NUdf::EValidateDatumMode ValidateDatumMode_ = NYql::NUdf::EValidateDatumMode::None;
const ui32 StateIndex_;
const TComputationNodePtrVector ArgsNodes_;
const std::vector<arrow::ValueDescr> ArgsValuesDescr_;
+ arrow::ValueDescr OutValueDescr_;
const arrow::compute::ScalarKernel& Kernel_;
const std::shared_ptr<arrow::compute::ScalarKernel> KernelHolder_;
const arrow::compute::FunctionOptions* const Options_;
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_holders.h b/yql/essentials/minikql/computation/mkql_computation_node_holders.h
index 9ed032632b4..610d7a24376 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_holders.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node_holders.h
@@ -7,6 +7,7 @@
#include <yql/essentials/minikql/aligned_page_pool.h>
#include <yql/essentials/minikql/compact_hash.h>
+#include <yql/essentials/minikql/computation/mkql_datum_validate.h>
#include <yql/essentials/minikql/mkql_node_serialization.h>
#include <yql/essentials/minikql/mkql_type_ops.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
@@ -586,6 +587,7 @@ public:
: TComputationValue(memInfo)
, Datum_(std::move(datum))
{
+ VALIDATE_DATUM_ARROW_BLOCK_CONSTRUCTOR(Datum_);
}
inline static const TArrowBlock& From(const NUdf::TUnboxedValuePod& value) {
diff --git a/yql/essentials/minikql/computation/mkql_datum_validate.cpp b/yql/essentials/minikql/computation/mkql_datum_validate.cpp
new file mode 100644
index 00000000000..b8ad4ced262
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_datum_validate.cpp
@@ -0,0 +1,132 @@
+#include "mkql_datum_validate.h"
+
+#include <yql/essentials/minikql/defs.h>
+#include <yql/essentials/public/udf/arrow/args_dechunker.h>
+
+#include <util/string/builder.h>
+
+#include <arrow/array/validate.h>
+#include <arrow/util/config.h>
+
+namespace NKikimr::NMiniKQL {
+namespace {
+// In order to take a subarray for any nesting depth of a tuple,
+// you only need to change the offset of the top-most ArrayData representative.
+// All other children should remain as is, without changing their offsets.
+// However, in YQL, we recursively traverse all nested ArrayData and change the offset there as well.
+// Accordingly, this creates a difference between the classic Arrow ArrayData representation and what we have.
+
+// E.g.
+// We have Tuple<Int> array with following structure.
+// {
+// len = 10
+// offset = 0
+// children = {
+// len = 10
+// offset = 5
+// }
+// }
+// To create a slice with offset == 3 apache arrow need only to change outer offset.
+// {
+// len = 10
+// offset = 0 + 3
+// children = {
+// len = 10
+// offset = 5
+// }
+// }
+// But in YQL we change both: outer and inner offset.
+// {
+// len = 10
+// offset = 0 + 3
+// children = {
+// len = 10
+// offset = 5 + 3
+
+// }
+// }
+// So here is the helper that can help fix this.
+// It simply sets the offset to 0 for types that have this problem.
+// Also check bitmask before fixing.
+//
+// FIXME(YQL-20162): Change the validation algorithm.
+std::shared_ptr<arrow::ArrayData> ConvertYqlOffsetsToArrowStandard(
+ const arrow::ArrayData& arrayData) {
+ auto result = arrayData.Copy();
+ if (result->type->id() == arrow::Type::STRUCT ||
+ result->type->id() == arrow::Type::DENSE_UNION ||
+ result->type->id() == arrow::Type::SPARSE_UNION) {
+ if (result->buffers[0]) {
+ auto actualSize = arrow::BitUtil::BytesForBits(result->length + result->offset);
+ MKQL_ENSURE(result->buffers[0]->size() >= actualSize, "Bitmask is invalid.");
+ }
+ result->offset = 0;
+ result->null_count = arrow::kUnknownNullCount;
+ }
+
+ std::vector<std::shared_ptr<arrow::ArrayData>> children;
+ for (const auto& child : result->child_data) {
+ children.push_back(ConvertYqlOffsetsToArrowStandard(*child));
+ }
+ result->child_data = children;
+ return result;
+}
+
+arrow::Status ValidateArrayCheap(arrow::Datum datum) {
+ auto array = ConvertYqlOffsetsToArrowStandard(*datum.array());
+ arrow::Status status = arrow::internal::ValidateArray(*array);
+ return status;
+}
+
+arrow::Status ValidateArrayExpensive(arrow::Datum datum) {
+ ARROW_RETURN_NOT_OK(ValidateArrayCheap(datum));
+ auto array = ConvertYqlOffsetsToArrowStandard(*datum.array());
+ return arrow::internal::ValidateArrayFull(*array);
+}
+
+arrow::Status ValidateDatum(arrow::Datum datum, NYql::NUdf::EValidateDatumMode validateMode) {
+ if (validateMode == NYql::NUdf::EValidateDatumMode::None) {
+ return arrow::Status::OK();
+ }
+ if (datum.is_arraylike()) {
+ NYql::NUdf::TArgsDechunker dechunker({datum});
+ std::vector<arrow::Datum> chunk;
+ while (dechunker.Next(chunk)) {
+ Y_ENSURE(chunk[0].is_array());
+ switch (validateMode) {
+ case NYql::NUdf::EValidateDatumMode::None:
+ break;
+ case NYql::NUdf::EValidateDatumMode::Cheap:
+ if (auto status = ValidateArrayCheap(chunk[0]); !status.ok()) {
+ return status;
+ }
+ break;
+ case NYql::NUdf::EValidateDatumMode::Expensive:
+ if (auto status = ValidateArrayExpensive(chunk[0]); !status.ok()) {
+ return status;
+ }
+ break;
+ }
+ }
+ } else if (datum.is_scalar()) {
+ // Scalar validation is supported in ARROW-13132.
+ // Add scalar support after library update (this is very similar to above array validation).
+ static_assert(ARROW_VERSION_MAJOR == 5, "If you see this message please notify owners about update and remove this assert.");
+ } else {
+ // Must be either arraylike or scalar.
+ Y_UNREACHABLE();
+ }
+ return arrow::Status::OK();
+}
+
+} // namespace
+
+void ValidateDatum(arrow::Datum datum, TMaybe<arrow::ValueDescr> expectedDescription, NYql::NUdf::EValidateDatumMode validateMode) {
+ if (expectedDescription) {
+ ARROW_DEBUG_CHECK_DATUM_TYPES(*expectedDescription, datum.descr());
+ }
+ auto status = ValidateDatum(datum, validateMode);
+ Y_DEBUG_ABORT_UNLESS(status.ok(), "%s", (TStringBuilder() << "Type: " << datum.descr().ToString() << ". Original error is: " << status.message()).c_str());
+}
+
+} // namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/computation/mkql_datum_validate.h b/yql/essentials/minikql/computation/mkql_datum_validate.h
new file mode 100644
index 00000000000..ad27be3cfc5
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_datum_validate.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <yql/essentials/public/udf/udf_validate.h>
+
+#include <util/generic/fwd.h>
+
+#include <arrow/datum.h>
+
+namespace NKikimr::NMiniKQL {
+
+void ValidateDatum(arrow::Datum datum, TMaybe<arrow::ValueDescr> expectedDescription, NYql::NUdf::EValidateDatumMode validateMode);
+
+} // namespace NKikimr::NMiniKQL
+
+#if !defined(NDEBUG)
+ #define VALIDATE_DATUM_ARROW_BLOCK_CONSTRUCTOR(datum) ValidateDatum((datum), Nothing(), NYql::NUdf::EValidateDatumMode::Cheap);
+#else //! defined(NDEBUG)
+ #define VALIDATE_DATUM_ARROW_BLOCK_CONSTRUCTOR(datum)
+#endif // !defined(NDEBUG)
diff --git a/yql/essentials/minikql/computation/ya.make b/yql/essentials/minikql/computation/ya.make
index 3524ff4bb29..12f1e9d0ad1 100644
--- a/yql/essentials/minikql/computation/ya.make
+++ b/yql/essentials/minikql/computation/ya.make
@@ -7,6 +7,7 @@ SRCS(
mkql_block_transport.cpp
mkql_block_trimmer.cpp
mkql_computation_node.cpp
+ mkql_datum_validate.cpp
mkql_computation_node_holders.cpp
mkql_computation_node_impl.cpp
mkql_computation_node_pack.cpp
diff --git a/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp b/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp
index 6c298bc2607..e6e3c8b57e8 100644
--- a/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp
+++ b/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp
@@ -3,7 +3,15 @@
namespace NKikimr {
namespace NMiniKQL {
+namespace {
+std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBufferAndResize(size_t size, arrow::MemoryPool* pool) {
+ auto result = NYql::NUdf::AllocateResizableBuffer(size, pool);
+ ARROW_OK(result->Resize(size));
+ return result;
+}
+
+}
template <typename T>
arrow::compute::InputType GetPrimitiveInputArrowType(bool tz) {
return arrow::compute::InputType(AddTzType(tz, GetPrimitiveDataType<T>()), arrow::ValueDescr::ANY);
@@ -168,7 +176,7 @@ std::shared_ptr<arrow::Scalar> WithTz(EPropagateTz propagateTz,
const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(propagateTz == EPropagateTz::FromLeft ? *input1 : *input2);
const auto tzId = structScalar.value[1];
return std::make_shared<arrow::StructScalar>(arrow::StructScalar::ValueType{value,tzId}, propagateTz == EPropagateTz::FromLeft ? input1->type : input2->type);
-}
+}
std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayData>& res, bool propagateTz,
const std::shared_ptr<arrow::ArrayData>& input, arrow::MemoryPool* pool,
@@ -178,7 +186,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD
}
Y_ENSURE(res->child_data.empty());
- std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool));
+ std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool));
res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer }));
res->child_data.push_back(input->child_data[1]);
return res->child_data[0];
@@ -194,7 +202,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD
}
Y_ENSURE(res->child_data.empty());
- std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool));
+ std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool));
res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer }));
if (propagateTz == EPropagateTz::FromLeft) {
res->child_data.push_back(input1->child_data[1]);
@@ -217,7 +225,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD
}
Y_ENSURE(res->child_data.empty());
- std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool));
+ std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool));
res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer }));
if (propagateTz == EPropagateTz::FromLeft) {
const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(*input1);
@@ -240,7 +248,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD
}
Y_ENSURE(res->child_data.empty());
- std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool));
+ std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool));
res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer }));
if (propagateTz == EPropagateTz::FromLeft) {
res->child_data.push_back(input1->child_data[1]);
@@ -251,7 +259,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD
return res->child_data[0];
}
-TPlainKernel::TPlainKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes,
+TPlainKernel::TPlainKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes,
NUdf::TDataTypeId returnType, std::unique_ptr<arrow::compute::ScalarKernel>&& arrowKernel,
TKernel::ENullMode nullMode)
: TKernel(family, argTypes, returnType, nullMode)
@@ -271,7 +279,7 @@ bool TPlainKernel::IsPolymorphic() const {
return false;
}
-TDecimalKernel::TDecimalKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes,
+TDecimalKernel::TDecimalKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes,
NUdf::TDataTypeId returnType, TStatelessArrayKernelExec exec,
TKernel::ENullMode nullMode)
: TKernel(family, argTypes, returnType, nullMode)
@@ -296,7 +304,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> TDecimalKernel::MakeArrowKernel(co
MKQL_ENSURE(*dataType1->GetDataSlot() == NUdf::EDataSlot::Decimal, "Require decimal");
MKQL_ENSURE(*dataType2->GetDataSlot() == NUdf::EDataSlot::Decimal, "Require decimal");
-
+
auto decimalType1 = static_cast<TDataDecimalType*>(dataType1);
auto decimalType2 = static_cast<TDataDecimalType*>(dataType2);
diff --git a/yql/essentials/parser/pg_wrapper/comp_factory.cpp b/yql/essentials/parser/pg_wrapper/comp_factory.cpp
index e692c96d601..214650f7db9 100644
--- a/yql/essentials/parser/pg_wrapper/comp_factory.cpp
+++ b/yql/essentials/parser/pg_wrapper/comp_factory.cpp
@@ -3337,7 +3337,7 @@ TComputationNodeFactory GetPgFactory() {
auto execFunc = FindExec(id);
YQL_ENSURE(execFunc);
auto kernel = MakePgKernel(argTypes, returnType, execFunc, id);
- return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argNodes), argTypes, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argNodes), argTypes, returnType, *kernel, kernel);
}
if (name == "PgCast") {
@@ -3399,7 +3399,7 @@ TComputationNodeFactory GetPgFactory() {
auto returnType = callable.GetType()->GetReturnType();
ui32 sourceId = AS_TYPE(TPgType, AS_TYPE(TBlockType, inputType)->GetItemType())->GetTypeId();
auto kernel = MakeFromPgKernel(inputType, returnType, sourceId);
- return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), { arg }, { inputType }, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), { arg }, { inputType }, returnType, *kernel, kernel);
}
if (name == "ToPg") {
@@ -3496,7 +3496,7 @@ TComputationNodeFactory GetPgFactory() {
auto returnType = callable.GetType()->GetReturnType();
auto targetId = AS_TYPE(TPgType, AS_TYPE(TBlockType, returnType)->GetItemType())->GetTypeId();
auto kernel = MakeToPgKernel(inputType, returnType, *sourceDataSlot);
- return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), { arg }, { inputType }, *kernel, kernel);
+ return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), {arg}, {inputType}, returnType, *kernel, kernel);
}
if (name == "PgArray") {
diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h
index ba0e16f5f1c..524aaf4da78 100644
--- a/yql/essentials/public/udf/arrow/util.h
+++ b/yql/essentials/public/udf/arrow/util.h
@@ -108,11 +108,10 @@ private:
arrow::MemoryPool* Pool_;
};
-/// \brief same as arrow::AllocateResizableBuffer, but allows to control zero padding
template<typename TBuffer = TResizeableBuffer>
-std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arrow::MemoryPool* pool, bool zeroPad = false) {
+std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t capacity, arrow::MemoryPool* pool, bool zeroPad = false) {
std::unique_ptr<TBuffer> result = std::make_unique<TBuffer>(pool);
- ARROW_OK(result->Reserve(size));
+ ARROW_OK(result->Reserve(capacity));
if (zeroPad) {
result->ZeroPadding();
}
diff --git a/yql/essentials/public/udf/udf_validate.cpp b/yql/essentials/public/udf/udf_validate.cpp
index be61b79cf29..d6fe5524f9a 100644
--- a/yql/essentials/public/udf/udf_validate.cpp
+++ b/yql/essentials/public/udf/udf_validate.cpp
@@ -52,5 +52,16 @@ EValidatePolicy ValidatePolicyByStr(const TString& validatePolicyStr) {
ythrow yexception() << "Unknown udf validate policy: " << validatePolicyStr;
}
+EValidateDatumMode ToDatumValidateMode(EValidateMode validateMode) {
+ switch (validateMode) {
+ case EValidateMode::None:
+ return EValidateDatumMode::None;
+ case EValidateMode::Lazy:
+ return EValidateDatumMode::Cheap;
+ case EValidateMode::Greedy:
+ case EValidateMode::Max:
+ return EValidateDatumMode::Expensive;
+ }
+}
} // namspace NUdf
} // namspace NYql
diff --git a/yql/essentials/public/udf/udf_validate.h b/yql/essentials/public/udf/udf_validate.h
index 6e9957ec251..361615770df 100644
--- a/yql/essentials/public/udf/udf_validate.h
+++ b/yql/essentials/public/udf/udf_validate.h
@@ -15,7 +15,13 @@ namespace NUdf {
#define UDF_VALIDATE_POLICY(XX) \
XX(Fail, 0) \
XX(Exception, 1) \
- XX(Max, 2) \
+ XX(Max, 2)
+
+enum class EValidateDatumMode {
+ None,
+ Cheap,
+ Expensive,
+};
enum class EValidateMode : ui8 {
UDF_VALIDATE_MODE(ENUM_VALUE_GEN)
@@ -31,5 +37,6 @@ EValidateMode ValidateModeByStr(const TString& verifyModeStr);
TStringBuf ValidatePolicyAsStr(EValidatePolicy verifyPolicy);
EValidatePolicy ValidatePolicyByStr(const TString& verifyPolicy);
+EValidateDatumMode ToDatumValidateMode(EValidateMode validateMode);
} // namspace NUdf
} // namspace NYql