diff options
author | atarasov5 <atarasov5@yandex-team.com> | 2025-07-10 15:30:05 +0300 |
---|---|---|
committer | atarasov5 <atarasov5@yandex-team.com> | 2025-07-10 15:51:42 +0300 |
commit | 5ce6e1b18f1023a06b261707a585835825e612f9 (patch) | |
tree | e60cbc5135a27d0b684931015e2fc1e8d588f908 | |
parent | fb7f6895a3d8244444609433b725d603dca23631 (diff) | |
download | ydb-5ce6e1b18f1023a06b261707a585835825e612f9.tar.gz |
YQL-20102: Enable debug arrow validation
В данном PR включена поддержка валидации Datum'ов на соответствие `MKQL type <-> arrow type <-> arrow array data structure.`
Выявленные проблемы:
1\. `AllocateResizableBuffer(size_t size)` возвращает массив длины 0 вместо size. Поэтому есть ошибка в работе с датами в некоторых нодах
commit_hash:122f2bd114dec50993131391a3793d9540877cb4
20 files changed, 231 insertions, 47 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp index 50f5ebf151d..3977655154e 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_coalesce.cpp @@ -271,7 +271,7 @@ IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeF TVector<TType*> argsTypes = {firstType, secondType}; auto kernel = MakeBlockCoalesceKernel(argsTypes, secondType, needUnwrapFirst); - return new TBlockFuncNode(ctx.Mutables, "Coalesce", std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), "Coalesce", std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp index 3dd4135e72c..ff7c6c68d14 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_container.cpp @@ -90,7 +90,7 @@ IComputationNode* WrapBlockAsContainer(TCallable& callable, const TComputationNo } auto kernel = MakeBlockAsContainerKernel(argsTypes, callable.GetType()->GetReturnType()); - return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } // namespace NMiniKQL diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp index 0d60f119714..225a2515529 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_decimal.cpp @@ -26,12 +26,12 @@ struct TDecimalBlockExec { const U* GetScalarValue(const arrow::Scalar& scalar) const { return reinterpret_cast<const U*>(GetPrimitiveScalarValuePtr(scalar)); } - + template<> const NYql::NDecimal::TInt128* GetScalarValue<NYql::NDecimal::TInt128>(const arrow::Scalar& scalar) const { return reinterpret_cast<const NYql::NDecimal::TInt128*>(GetStringScalarValue(scalar).data()); } - + void ArrayScalarCore( const NYql::NDecimal::TInt128* val1Ptr, const ui8* valid1, @@ -103,7 +103,7 @@ struct TDecimalBlockExec { } arrow::Status ExecScalarScalar(arrow::compute::KernelContext* kernelCtx, - const arrow::compute::ExecBatch& batch, arrow::Datum* res) const + const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { MKQL_ENSURE(batch.values.size() == 2, "Expected 2 args"); const auto& arg1 = batch.values[0]; @@ -119,7 +119,7 @@ struct TDecimalBlockExec { *mem = Apply(*val1Ptr, *val2Ptr); *res = resDatum; } - + return arrow::Status::OK(); } @@ -142,7 +142,7 @@ struct TDecimalBlockExec { } else { GetBitmap(resArr, 0).SetBitsTo(false); } - + return arrow::Status::OK(); } @@ -266,12 +266,12 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockKernel(const TVector<TTyp MKQL_ENSURE(precision >= 1&& precision <= 35, TStringBuilder() << "Wrong precision: " << (int)precision); auto createKernel = [&](auto exec) { - auto k = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType), + auto k = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType), [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { return exec->Exec(ctx, batch, res); }); k->null_handling = arrow::compute::NullHandling::INTERSECTION; - return k; + return k; }; switch (dataType2->GetSchemeType()) { @@ -283,7 +283,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockKernel(const TVector<TTyp return createKernel(std::make_shared<TExec<type>>(precision, scale)); \ } INTEGRAL_VALUE_TYPES(MAKE_PRIMITIVE_TYPE_MUL) -#undef MAKE_PRIMITIVE_TYPE_MUL +#undef MAKE_PRIMITIVE_TYPE_MUL default: Y_ABORT("Unupported type."); } @@ -305,7 +305,7 @@ IComputationNode* WrapBlockDecimal(TStringBuf name, TCallable& callable, const T TVector<TType*> argsTypes = { firstType, secondType }; std::shared_ptr<arrow::compute::ScalarKernel> kernel = MakeBlockKernel<TExec>(argsTypes, callable.GetType()->GetReturnType()); - return new TBlockFuncNode(ctx.Mutables, name, std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), name, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp index f49d3f6b56e..9636817cd73 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp @@ -55,7 +55,7 @@ IComputationNode* WrapBlockExists(TCallable& callable, const TComputationNodeFac TComputationNodePtrVector argsNodes = { compute }; TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() }; auto kernel = MakeBlockExistsKernel(argsTypes, callable.GetType()->GetReturnType()); - return new TBlockFuncNode(ctx.Mutables, "Exists", std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), "Exists", std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } // namespace NMiniKQL diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp index d94f0ff400a..c1eba56e5cf 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_func.cpp @@ -53,9 +53,9 @@ IComputationNode* WrapBlockFunc(TCallable& callable, const TComputationNodeFacto const TKernel& kernel = ResolveKernel(*ctx.FunctionRegistry.GetBuiltins(), funcName, argsTypes, callableType->GetReturnType()); if (kernel.IsPolymorphic()) { auto arrowKernel = kernel.MakeArrowKernel(argsTypes, callableType->GetReturnType()); - return new TBlockFuncNode(ctx.Mutables, funcName, std::move(argsNodes), argsTypes, *arrowKernel, arrowKernel, kernel.Family.FunctionOptions); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), funcName, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *arrowKernel, arrowKernel, kernel.Family.FunctionOptions); } else { - return new TBlockFuncNode(ctx.Mutables, funcName, std::move(argsNodes), argsTypes, kernel.GetArrowKernel(), {}, kernel.Family.FunctionOptions); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), funcName, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), kernel.GetArrowKernel(), {}, kernel.Family.FunctionOptions); } } diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp index cb164d3185a..636a22eabee 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_getelem.cpp @@ -102,7 +102,7 @@ IComputationNode* WrapBlockGetElement(TCallable& callable, const TComputationNod TComputationNodePtrVector argsNodes = { objectNode }; TVector<TType*> argsTypes = { blockType }; auto kernel = MakeBlockGetElementKernel(argsTypes, callable.GetType()->GetReturnType(), index, isOptional, needExternalOptional); - return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } // namespace diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp index 4ec511fa51f..3701fbe7741 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_if.cpp @@ -248,7 +248,7 @@ IComputationNode* WrapBlockIf(TCallable& callable, const TComputationNodeFactory kernel = MakeBlockIfKernel<false, false>(argsTypes, thenType); } - return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp index d3ecee9bc66..4c4a1e8e27b 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_just.cpp @@ -81,7 +81,7 @@ IComputationNode* WrapBlockJust(TCallable& callable, const TComputationNodeFacto kernel = MakeBlockJustKernel<true>(argsTypes, callable.GetType()->GetReturnType()); } - return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp index 968254a33c1..e8ac91d3d76 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_logical.cpp @@ -529,7 +529,7 @@ IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, c kernel = MakeKernel<TXorBlockExec>(argsTypes, callable.GetType()->GetReturnType()); } - return new TBlockFuncNode(ctx.Mutables, name, std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), name, std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } } // namespace @@ -559,7 +559,7 @@ IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactor TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() }; auto kernel = MakeKernel<TNotBlockExec>(argsTypes, argsTypes[0]); - return new TBlockFuncNode(ctx.Mutables, "Not", std::move(argsNodes), argsTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), "Not", std::move(argsNodes), argsTypes, callable.GetType()->GetReturnType(), *kernel, kernel); } diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp index 91292a3df81..72b4294761c 100644 --- a/yql/essentials/minikql/computation/mkql_block_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp @@ -3,6 +3,7 @@ #include "mkql_block_reader.h" #include <yql/essentials/minikql/arrow/mkql_functions.h> +#include <yql/essentials/minikql/computation/mkql_datum_validate.h> #include <yql/essentials/minikql/mkql_node_builder.h> #include <yql/essentials/minikql/mkql_node_cast.h> #include <yql/essentials/minikql/arrow/arrow_util.h> @@ -247,14 +248,16 @@ NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const return holderFactory.CreateArrowBlock(arrow::Datum(count)); } -TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes, - const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel, - std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder, - const arrow::compute::FunctionOptions* functionOptions) +TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, NYql::NUdf::EValidateDatumMode validateDatumMode, TStringBuf name, TComputationNodePtrVector&& argsNodes, + const TVector<TType*>& argsTypes, TType* outputType, const arrow::compute::ScalarKernel& kernel, + std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder, + const arrow::compute::FunctionOptions* functionOptions) : TMutableComputationNode(mutables) + , ValidateDatumMode_(validateDatumMode) , StateIndex_(mutables.CurValueIndex++) , ArgsNodes_(std::move(argsNodes)) , ArgsValuesDescr_(ToValueDescr(argsTypes)) + , OutValueDescr_(ToValueDescr(outputType)) , Kernel_(kernel) , KernelHolder_(std::move(kernelHolder)) , Options_(functionOptions) @@ -270,7 +273,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con for (ui32 i = 0; i < ArgsNodes_.size(); ++i) { const auto& value = ArgsNodes_[i]->GetValue(ctx); argDatums.emplace_back(TArrowBlock::From(value).GetDatum()); - ARROW_DEBUG_CHECK_DATUM_TYPES(ArgsValuesDescr_[i], argDatums.back().descr()); + ValidateDatum(argDatums.back(), ArgsValuesDescr_[i], ValidateDatumMode_); } if (ScalarOutput_) { @@ -297,8 +300,9 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con ForEachArrayData(output, [&](const auto& arr) { arrays.push_back(arr); }); } - - return ctx.HolderFactory.CreateArrowBlock(MakeArray(arrays)); + auto resultArray = MakeArray(arrays); + ValidateDatum(resultArray, OutValueDescr_, ValidateDatumMode_); + return ctx.HolderFactory.CreateArrowBlock(std::move(resultArray)); } diff --git a/yql/essentials/minikql/computation/mkql_block_impl.h b/yql/essentials/minikql/computation/mkql_block_impl.h index 4bcdc9514fb..509ed62cb09 100644 --- a/yql/essentials/minikql/computation/mkql_block_impl.h +++ b/yql/essentials/minikql/computation/mkql_block_impl.h @@ -29,13 +29,12 @@ arrow::compute::OutputType ConvertToOutputType(TType* output); NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const uint64_t count); -class TBlockFuncNode : public TMutableComputationNode<TBlockFuncNode> { - +class TBlockFuncNode: public TMutableComputationNode<TBlockFuncNode> { public: - TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes, - const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel, - std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {}, - const arrow::compute::FunctionOptions* functionOptions = nullptr); + TBlockFuncNode(TComputationMutables& mutables, NYql::NUdf::EValidateDatumMode validateDatumMode, TStringBuf name, TComputationNodePtrVector&& argsNodes, + const TVector<TType*>& argsTypes, TType* outputType, const arrow::compute::ScalarKernel& kernel, + std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {}, + const arrow::compute::FunctionOptions* functionOptions = nullptr); NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const; private: @@ -79,9 +78,11 @@ private: std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final; private: + NYql::NUdf::EValidateDatumMode ValidateDatumMode_ = NYql::NUdf::EValidateDatumMode::None; const ui32 StateIndex_; const TComputationNodePtrVector ArgsNodes_; const std::vector<arrow::ValueDescr> ArgsValuesDescr_; + arrow::ValueDescr OutValueDescr_; const arrow::compute::ScalarKernel& Kernel_; const std::shared_ptr<arrow::compute::ScalarKernel> KernelHolder_; const arrow::compute::FunctionOptions* const Options_; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_holders.h b/yql/essentials/minikql/computation/mkql_computation_node_holders.h index 9ed032632b4..610d7a24376 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_holders.h +++ b/yql/essentials/minikql/computation/mkql_computation_node_holders.h @@ -7,6 +7,7 @@ #include <yql/essentials/minikql/aligned_page_pool.h> #include <yql/essentials/minikql/compact_hash.h> +#include <yql/essentials/minikql/computation/mkql_datum_validate.h> #include <yql/essentials/minikql/mkql_node_serialization.h> #include <yql/essentials/minikql/mkql_type_ops.h> #include <yql/essentials/minikql/mkql_type_builder.h> @@ -586,6 +587,7 @@ public: : TComputationValue(memInfo) , Datum_(std::move(datum)) { + VALIDATE_DATUM_ARROW_BLOCK_CONSTRUCTOR(Datum_); } inline static const TArrowBlock& From(const NUdf::TUnboxedValuePod& value) { diff --git a/yql/essentials/minikql/computation/mkql_datum_validate.cpp b/yql/essentials/minikql/computation/mkql_datum_validate.cpp new file mode 100644 index 00000000000..b8ad4ced262 --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_datum_validate.cpp @@ -0,0 +1,132 @@ +#include "mkql_datum_validate.h" + +#include <yql/essentials/minikql/defs.h> +#include <yql/essentials/public/udf/arrow/args_dechunker.h> + +#include <util/string/builder.h> + +#include <arrow/array/validate.h> +#include <arrow/util/config.h> + +namespace NKikimr::NMiniKQL { +namespace { +// In order to take a subarray for any nesting depth of a tuple, +// you only need to change the offset of the top-most ArrayData representative. +// All other children should remain as is, without changing their offsets. +// However, in YQL, we recursively traverse all nested ArrayData and change the offset there as well. +// Accordingly, this creates a difference between the classic Arrow ArrayData representation and what we have. + +// E.g. +// We have Tuple<Int> array with following structure. +// { +// len = 10 +// offset = 0 +// children = { +// len = 10 +// offset = 5 +// } +// } +// To create a slice with offset == 3 apache arrow need only to change outer offset. +// { +// len = 10 +// offset = 0 + 3 +// children = { +// len = 10 +// offset = 5 +// } +// } +// But in YQL we change both: outer and inner offset. +// { +// len = 10 +// offset = 0 + 3 +// children = { +// len = 10 +// offset = 5 + 3 + +// } +// } +// So here is the helper that can help fix this. +// It simply sets the offset to 0 for types that have this problem. +// Also check bitmask before fixing. +// +// FIXME(YQL-20162): Change the validation algorithm. +std::shared_ptr<arrow::ArrayData> ConvertYqlOffsetsToArrowStandard( + const arrow::ArrayData& arrayData) { + auto result = arrayData.Copy(); + if (result->type->id() == arrow::Type::STRUCT || + result->type->id() == arrow::Type::DENSE_UNION || + result->type->id() == arrow::Type::SPARSE_UNION) { + if (result->buffers[0]) { + auto actualSize = arrow::BitUtil::BytesForBits(result->length + result->offset); + MKQL_ENSURE(result->buffers[0]->size() >= actualSize, "Bitmask is invalid."); + } + result->offset = 0; + result->null_count = arrow::kUnknownNullCount; + } + + std::vector<std::shared_ptr<arrow::ArrayData>> children; + for (const auto& child : result->child_data) { + children.push_back(ConvertYqlOffsetsToArrowStandard(*child)); + } + result->child_data = children; + return result; +} + +arrow::Status ValidateArrayCheap(arrow::Datum datum) { + auto array = ConvertYqlOffsetsToArrowStandard(*datum.array()); + arrow::Status status = arrow::internal::ValidateArray(*array); + return status; +} + +arrow::Status ValidateArrayExpensive(arrow::Datum datum) { + ARROW_RETURN_NOT_OK(ValidateArrayCheap(datum)); + auto array = ConvertYqlOffsetsToArrowStandard(*datum.array()); + return arrow::internal::ValidateArrayFull(*array); +} + +arrow::Status ValidateDatum(arrow::Datum datum, NYql::NUdf::EValidateDatumMode validateMode) { + if (validateMode == NYql::NUdf::EValidateDatumMode::None) { + return arrow::Status::OK(); + } + if (datum.is_arraylike()) { + NYql::NUdf::TArgsDechunker dechunker({datum}); + std::vector<arrow::Datum> chunk; + while (dechunker.Next(chunk)) { + Y_ENSURE(chunk[0].is_array()); + switch (validateMode) { + case NYql::NUdf::EValidateDatumMode::None: + break; + case NYql::NUdf::EValidateDatumMode::Cheap: + if (auto status = ValidateArrayCheap(chunk[0]); !status.ok()) { + return status; + } + break; + case NYql::NUdf::EValidateDatumMode::Expensive: + if (auto status = ValidateArrayExpensive(chunk[0]); !status.ok()) { + return status; + } + break; + } + } + } else if (datum.is_scalar()) { + // Scalar validation is supported in ARROW-13132. + // Add scalar support after library update (this is very similar to above array validation). + static_assert(ARROW_VERSION_MAJOR == 5, "If you see this message please notify owners about update and remove this assert."); + } else { + // Must be either arraylike or scalar. + Y_UNREACHABLE(); + } + return arrow::Status::OK(); +} + +} // namespace + +void ValidateDatum(arrow::Datum datum, TMaybe<arrow::ValueDescr> expectedDescription, NYql::NUdf::EValidateDatumMode validateMode) { + if (expectedDescription) { + ARROW_DEBUG_CHECK_DATUM_TYPES(*expectedDescription, datum.descr()); + } + auto status = ValidateDatum(datum, validateMode); + Y_DEBUG_ABORT_UNLESS(status.ok(), "%s", (TStringBuilder() << "Type: " << datum.descr().ToString() << ". Original error is: " << status.message()).c_str()); +} + +} // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mkql_datum_validate.h b/yql/essentials/minikql/computation/mkql_datum_validate.h new file mode 100644 index 00000000000..ad27be3cfc5 --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_datum_validate.h @@ -0,0 +1,19 @@ +#pragma once + +#include <yql/essentials/public/udf/udf_validate.h> + +#include <util/generic/fwd.h> + +#include <arrow/datum.h> + +namespace NKikimr::NMiniKQL { + +void ValidateDatum(arrow::Datum datum, TMaybe<arrow::ValueDescr> expectedDescription, NYql::NUdf::EValidateDatumMode validateMode); + +} // namespace NKikimr::NMiniKQL + +#if !defined(NDEBUG) + #define VALIDATE_DATUM_ARROW_BLOCK_CONSTRUCTOR(datum) ValidateDatum((datum), Nothing(), NYql::NUdf::EValidateDatumMode::Cheap); +#else //! defined(NDEBUG) + #define VALIDATE_DATUM_ARROW_BLOCK_CONSTRUCTOR(datum) +#endif // !defined(NDEBUG) diff --git a/yql/essentials/minikql/computation/ya.make b/yql/essentials/minikql/computation/ya.make index 3524ff4bb29..12f1e9d0ad1 100644 --- a/yql/essentials/minikql/computation/ya.make +++ b/yql/essentials/minikql/computation/ya.make @@ -7,6 +7,7 @@ SRCS( mkql_block_transport.cpp mkql_block_trimmer.cpp mkql_computation_node.cpp + mkql_datum_validate.cpp mkql_computation_node_holders.cpp mkql_computation_node_impl.cpp mkql_computation_node_pack.cpp diff --git a/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp b/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp index 6c298bc2607..e6e3c8b57e8 100644 --- a/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp +++ b/yql/essentials/minikql/invoke_builtins/mkql_builtins_impl.cpp @@ -3,7 +3,15 @@ namespace NKikimr { namespace NMiniKQL { +namespace { +std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBufferAndResize(size_t size, arrow::MemoryPool* pool) { + auto result = NYql::NUdf::AllocateResizableBuffer(size, pool); + ARROW_OK(result->Resize(size)); + return result; +} + +} template <typename T> arrow::compute::InputType GetPrimitiveInputArrowType(bool tz) { return arrow::compute::InputType(AddTzType(tz, GetPrimitiveDataType<T>()), arrow::ValueDescr::ANY); @@ -168,7 +176,7 @@ std::shared_ptr<arrow::Scalar> WithTz(EPropagateTz propagateTz, const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(propagateTz == EPropagateTz::FromLeft ? *input1 : *input2); const auto tzId = structScalar.value[1]; return std::make_shared<arrow::StructScalar>(arrow::StructScalar::ValueType{value,tzId}, propagateTz == EPropagateTz::FromLeft ? input1->type : input2->type); -} +} std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayData>& res, bool propagateTz, const std::shared_ptr<arrow::ArrayData>& input, arrow::MemoryPool* pool, @@ -178,7 +186,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD } Y_ENSURE(res->child_data.empty()); - std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool)); + std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool)); res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer })); res->child_data.push_back(input->child_data[1]); return res->child_data[0]; @@ -194,7 +202,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD } Y_ENSURE(res->child_data.empty()); - std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool)); + std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool)); res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer })); if (propagateTz == EPropagateTz::FromLeft) { res->child_data.push_back(input1->child_data[1]); @@ -217,7 +225,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD } Y_ENSURE(res->child_data.empty()); - std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool)); + std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool)); res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer })); if (propagateTz == EPropagateTz::FromLeft) { const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(*input1); @@ -240,7 +248,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD } Y_ENSURE(res->child_data.empty()); - std::shared_ptr<arrow::Buffer> buffer(NUdf::AllocateResizableBuffer(sizeOf * res->length, pool)); + std::shared_ptr<arrow::Buffer> buffer(AllocateResizableBufferAndResize(sizeOf * res->length, pool)); res->child_data.push_back(arrow::ArrayData::Make(outputType, res->length, { nullptr, buffer })); if (propagateTz == EPropagateTz::FromLeft) { res->child_data.push_back(input1->child_data[1]); @@ -251,7 +259,7 @@ std::shared_ptr<arrow::ArrayData> CopyTzImpl(const std::shared_ptr<arrow::ArrayD return res->child_data[0]; } -TPlainKernel::TPlainKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes, +TPlainKernel::TPlainKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes, NUdf::TDataTypeId returnType, std::unique_ptr<arrow::compute::ScalarKernel>&& arrowKernel, TKernel::ENullMode nullMode) : TKernel(family, argTypes, returnType, nullMode) @@ -271,7 +279,7 @@ bool TPlainKernel::IsPolymorphic() const { return false; } -TDecimalKernel::TDecimalKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes, +TDecimalKernel::TDecimalKernel(const TKernelFamily& family, const std::vector<NUdf::TDataTypeId>& argTypes, NUdf::TDataTypeId returnType, TStatelessArrayKernelExec exec, TKernel::ENullMode nullMode) : TKernel(family, argTypes, returnType, nullMode) @@ -296,7 +304,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> TDecimalKernel::MakeArrowKernel(co MKQL_ENSURE(*dataType1->GetDataSlot() == NUdf::EDataSlot::Decimal, "Require decimal"); MKQL_ENSURE(*dataType2->GetDataSlot() == NUdf::EDataSlot::Decimal, "Require decimal"); - + auto decimalType1 = static_cast<TDataDecimalType*>(dataType1); auto decimalType2 = static_cast<TDataDecimalType*>(dataType2); diff --git a/yql/essentials/parser/pg_wrapper/comp_factory.cpp b/yql/essentials/parser/pg_wrapper/comp_factory.cpp index e692c96d601..214650f7db9 100644 --- a/yql/essentials/parser/pg_wrapper/comp_factory.cpp +++ b/yql/essentials/parser/pg_wrapper/comp_factory.cpp @@ -3337,7 +3337,7 @@ TComputationNodeFactory GetPgFactory() { auto execFunc = FindExec(id); YQL_ENSURE(execFunc); auto kernel = MakePgKernel(argTypes, returnType, execFunc, id); - return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argNodes), argTypes, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), std::move(argNodes), argTypes, returnType, *kernel, kernel); } if (name == "PgCast") { @@ -3399,7 +3399,7 @@ TComputationNodeFactory GetPgFactory() { auto returnType = callable.GetType()->GetReturnType(); ui32 sourceId = AS_TYPE(TPgType, AS_TYPE(TBlockType, inputType)->GetItemType())->GetTypeId(); auto kernel = MakeFromPgKernel(inputType, returnType, sourceId); - return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), { arg }, { inputType }, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), { arg }, { inputType }, returnType, *kernel, kernel); } if (name == "ToPg") { @@ -3496,7 +3496,7 @@ TComputationNodeFactory GetPgFactory() { auto returnType = callable.GetType()->GetReturnType(); auto targetId = AS_TYPE(TPgType, AS_TYPE(TBlockType, returnType)->GetItemType())->GetTypeId(); auto kernel = MakeToPgKernel(inputType, returnType, *sourceDataSlot); - return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), { arg }, { inputType }, *kernel, kernel); + return new TBlockFuncNode(ctx.Mutables, ToDatumValidateMode(ctx.ValidateMode), callable.GetType()->GetName(), {arg}, {inputType}, returnType, *kernel, kernel); } if (name == "PgArray") { diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h index ba0e16f5f1c..524aaf4da78 100644 --- a/yql/essentials/public/udf/arrow/util.h +++ b/yql/essentials/public/udf/arrow/util.h @@ -108,11 +108,10 @@ private: arrow::MemoryPool* Pool_; }; -/// \brief same as arrow::AllocateResizableBuffer, but allows to control zero padding template<typename TBuffer = TResizeableBuffer> -std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arrow::MemoryPool* pool, bool zeroPad = false) { +std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t capacity, arrow::MemoryPool* pool, bool zeroPad = false) { std::unique_ptr<TBuffer> result = std::make_unique<TBuffer>(pool); - ARROW_OK(result->Reserve(size)); + ARROW_OK(result->Reserve(capacity)); if (zeroPad) { result->ZeroPadding(); } diff --git a/yql/essentials/public/udf/udf_validate.cpp b/yql/essentials/public/udf/udf_validate.cpp index be61b79cf29..d6fe5524f9a 100644 --- a/yql/essentials/public/udf/udf_validate.cpp +++ b/yql/essentials/public/udf/udf_validate.cpp @@ -52,5 +52,16 @@ EValidatePolicy ValidatePolicyByStr(const TString& validatePolicyStr) { ythrow yexception() << "Unknown udf validate policy: " << validatePolicyStr; } +EValidateDatumMode ToDatumValidateMode(EValidateMode validateMode) { + switch (validateMode) { + case EValidateMode::None: + return EValidateDatumMode::None; + case EValidateMode::Lazy: + return EValidateDatumMode::Cheap; + case EValidateMode::Greedy: + case EValidateMode::Max: + return EValidateDatumMode::Expensive; + } +} } // namspace NUdf } // namspace NYql diff --git a/yql/essentials/public/udf/udf_validate.h b/yql/essentials/public/udf/udf_validate.h index 6e9957ec251..361615770df 100644 --- a/yql/essentials/public/udf/udf_validate.h +++ b/yql/essentials/public/udf/udf_validate.h @@ -15,7 +15,13 @@ namespace NUdf { #define UDF_VALIDATE_POLICY(XX) \ XX(Fail, 0) \ XX(Exception, 1) \ - XX(Max, 2) \ + XX(Max, 2) + +enum class EValidateDatumMode { + None, + Cheap, + Expensive, +}; enum class EValidateMode : ui8 { UDF_VALIDATE_MODE(ENUM_VALUE_GEN) @@ -31,5 +37,6 @@ EValidateMode ValidateModeByStr(const TString& verifyModeStr); TStringBuf ValidatePolicyAsStr(EValidatePolicy verifyPolicy); EValidatePolicy ValidatePolicyByStr(const TString& verifyPolicy); +EValidateDatumMode ToDatumValidateMode(EValidateMode validateMode); } // namspace NUdf } // namspace NYql |