diff options
author | vvvv <vvvv@ydb.tech> | 2023-06-27 14:41:54 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-06-27 14:41:54 +0300 |
commit | 65276d5feee9170231b2ff6e033f715b2c82a589 (patch) | |
tree | f98ce0001ab7818e7bbad9a9b5a528b816084b3b | |
parent | 6c808b1dcbe96e06cbabe037825ebd3ac8b81ff9 (diff) | |
download | ydb-65276d5feee9170231b2ff6e033f715b2c82a589.tar.gz |
Check of catalog's typeLen instead of direct CSTRINGOID
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/arrow.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/arrow.h | 28 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 29 |
3 files changed, 32 insertions, 27 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.cpp b/ydb/library/yql/parser/pg_wrapper/arrow.cpp index 7846d3c4a4..660e2b2054 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.cpp +++ b/ydb/library/yql/parser/pg_wrapper/arrow.cpp @@ -212,7 +212,7 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& return {}; } - return [primaryType, originalType, isCString = targetType->GetTypeId() == CSTRINGOID](const std::shared_ptr<arrow::Array>& value) { + return [primaryType, originalType, isCString = NPg::LookupType(targetType->GetTypeId()).TypeLen == -2](const std::shared_ptr<arrow::Array>& value) { auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType)); if (isCString) { return PgConvertString<true>(res); diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h index aaa8fe082e..9389252d44 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/arrow.h @@ -585,6 +585,7 @@ private: , SerializeFunc_(serializeFunc) , ArgsColumns_(argsColumns) , AggDesc_(aggDesc) + , IsCStringTransType_(NPg::LookupType(this->AggDesc_.TransTypeId).TypeLen == -2) { if (!HasInitValue && IsTransStrict) { Y_ENSURE(AggDesc_.ArgTypes.size() == 1); @@ -636,6 +637,7 @@ private: const TSerializeFunc SerializeFunc_; const std::vector<ui32> ArgsColumns_; const NPg::TAggregateDesc& AggDesc_; + const bool IsCStringTransType_; std::vector<bool> IsFixedArg_; FmgrInfo TransFuncInfo_; FmgrInfo SerializeFuncInfo_; @@ -668,7 +670,7 @@ private: if constexpr (HasInitValue) { auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_); typedState->isnull = false; - typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID); + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->IsCStringTransType_); } } @@ -816,7 +818,7 @@ SkipCall:; } CopyState<IsTransTypeFixed>(transCallInfo->args[0], *typedState); - SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); + SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_); } NUdf::TUnboxedValue FinishOne(const void* state) final { @@ -894,7 +896,7 @@ SkipCall:; if constexpr (HasInitValue) { auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_); typedState->isnull = false; - typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID); + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->IsCStringTransType_); } UpdateKey(state, batchNum, columns, row); @@ -962,7 +964,7 @@ SkipCall:; if (!HasInitValue && IsTransStrict) { if (transCallInfo->args[0].isnull) { typedState->isnull = false; - typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(transCallInfo->args[1].value, this->AggDesc_.TransTypeId == CSTRINGOID); + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(transCallInfo->args[1].value, this->IsCStringTransType_); return; } } @@ -971,7 +973,7 @@ SkipCall:; ret = this->TransFunc_(transCallInfo); CopyState<IsTransTypeFixed>({ret, transCallInfo->isnull}, *typedState); - SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); + SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_); } std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeStateBuilder(ui64 size) final { @@ -979,7 +981,7 @@ SkipCall:; return std::make_unique<TAggColumnBuilder<true, false, HasSerialize, TSerializeFunc, true, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>( this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::uint64(), this->Ctx_); } else { - if (SerializedType_ == CSTRINGOID) { + if (NPg::LookupType(SerializedType_).TypeLen == -2) { return std::make_unique<TAggColumnBuilder<false, true, HasSerialize, TSerializeFunc, true, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true, NYql::NUdf::EPgStringType::CString>>>( this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::binary(), this->Ctx_); } else { @@ -1008,6 +1010,7 @@ SkipCall:; , AggDesc_(aggDesc) , SerializedType_(HasSerialize ? NPg::LookupProc(this->AggDesc_.SerializeFuncId).ResultType : this->AggDesc_.TransTypeId) , FinalType_(HasFinal ? NPg::LookupProc(this->AggDesc_.FinalFuncId).ResultType : this->AggDesc_.TransTypeId) + , IsCStringTransType_(NPg::LookupType(this->AggDesc_.TransTypeId).TypeLen == -2) { Values_.reserve(1); } @@ -1065,12 +1068,12 @@ SkipCall:; if constexpr (!HasDeserialize) { typedState->isnull = false; - typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->IsCStringTransType_); } else { Deserialize(d.value, *typedState); } - SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); + SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_); } void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { @@ -1100,7 +1103,7 @@ SkipCall:; if constexpr (!HasDeserialize) { if (IsCombineStrict && typedState->isnull) { typedState->isnull = false; - typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->IsCStringTransType_); return; } @@ -1126,13 +1129,13 @@ SkipCall:; if constexpr (!HasDeserialize) { if (!combineCallInfo->isnull && ret == d.value) { typedState->isnull = false; - typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->IsCStringTransType_); return; } } CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState); - SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); + SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_); } std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeResultBuilder(ui64 size) final { @@ -1140,7 +1143,7 @@ SkipCall:; return std::make_unique<TAggColumnBuilder<true, false, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>( this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::uint64(), this->Ctx_); } else { - if (FinalType_ == CSTRINGOID) { + if (NPg::LookupType(FinalType_).TypeLen == -2) { return std::make_unique<TAggColumnBuilder<false, true, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true>>>( this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::binary(), this->Ctx_); } else { @@ -1157,6 +1160,7 @@ SkipCall:; const NPg::TAggregateDesc& AggDesc_; const ui32 SerializedType_; const ui32 FinalType_; + const bool IsCStringTransType_; ui64 BatchNum_ = Max<ui64>(); std::vector<arrow::Datum> Values_; TInputArgsAccessor<TDeserializeArgsPolicy> DeserializeAccessor_; diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index bff63c78ab..678ad34d92 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -619,15 +619,15 @@ public: if (!NPg::HasCast(SourceElemDesc.TypeId, TargetElemDesc.TypeId) || (IsSourceArray != IsTargetArray)) { ArrayCast = IsSourceArray && IsTargetArray; if (IsSourceArray && !IsTargetArray) { - Y_ENSURE(TargetTypeDesc.Category == 'S'); + Y_ENSURE(TargetTypeDesc.Category == 'S' || TargetId == UNKNOWNOID); funcId = NPg::LookupProc("array_out", { 0 }).ProcId; } else if (IsTargetArray && !IsSourceArray) { - Y_ENSURE(SourceElemDesc.Category == 'S'); + Y_ENSURE(SourceElemDesc.Category == 'S' || SourceId == UNKNOWNOID); funcId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId; - } else if (SourceElemDesc.Category == 'S') { + } else if (SourceElemDesc.Category == 'S' || SourceId == UNKNOWNOID) { funcId = TargetElemDesc.InFuncId; } else { - Y_ENSURE(TargetTypeDesc.Category == 'S'); + Y_ENSURE(TargetTypeDesc.Category == 'S' || TargetId == UNKNOWNOID); funcId = SourceElemDesc.OutFuncId; } } else { @@ -658,7 +658,7 @@ public: Y_ENSURE(FInfo1.fn_nargs >= 1 && FInfo1.fn_nargs <= 3); Func1Lookup = NPg::LookupProc(funcId); Y_ENSURE(Func1Lookup.ArgTypes.size() >= 1 && Func1Lookup.ArgTypes.size() <= 3); - if (Func1Lookup.ArgTypes[0] == CSTRINGOID && SourceElemDesc.Category == 'S') { + if (NPg::LookupType(Func1Lookup.ArgTypes[0]).TypeLen == -2 && SourceElemDesc.Category == 'S') { ConvertArgToCString = true; } @@ -673,16 +673,16 @@ public: } if (!funcId2) { - if (Func1Lookup.ResultType == CSTRINGOID && TargetElemDesc.Category == 'S') { + if (NPg::LookupType(Func1Lookup.ResultType).TypeLen == -2 && TargetElemDesc.Category == 'S') { ConvertResFromCString = true; } } else { const auto& Func2ArgType = NPg::LookupType(Func2Lookup.ArgTypes[0]); - if (Func1Lookup.ResultType == CSTRINGOID && Func2ArgType.Category == 'S') { + if (NPg::LookupType(Func1Lookup.ResultType).TypeLen == -2 && Func2ArgType.Category == 'S') { ConvertResFromCString = true; } - if (Func2Lookup.ResultType == CSTRINGOID && TargetElemDesc.Category == 'S') { + if (NPg::LookupType(Func2Lookup.ResultType).TypeLen == -2 && TargetElemDesc.Category == 'S') { ConvertResFromCString2 = true; } } @@ -1389,6 +1389,7 @@ private: struct TFromPgExec { TFromPgExec(ui32 sourceId) : SourceId(sourceId) + , IsCString(NPg::LookupType(sourceId).TypeLen == -2) {} arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { @@ -1460,7 +1461,7 @@ struct TFromPgExec { ui32 len; const char* ptr = item.AsStringRef().Data() + sizeof(void*); - if (SourceId == CSTRINGOID) { + if (IsCString) { len = strlen(ptr); } else { len = GetCleanVarSize((const text*)ptr); @@ -1481,6 +1482,7 @@ struct TFromPgExec { } const ui32 SourceId; + const bool IsCString; }; std::shared_ptr<arrow::compute::ScalarKernel> MakeFromPgKernel(TType* inputType, TType* resultType, ui32 sourceId) { @@ -1518,6 +1520,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeFromPgKernel(TType* inputType, struct TToPgExec { TToPgExec(ui32 targetId) : TargetId(targetId) + , IsCString(NPg::LookupType(targetId).TypeLen == -2) {} arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { @@ -1589,7 +1592,7 @@ struct TToPgExec { } ui32 len; - if (TargetId == CSTRINGOID) { + if (IsCString) { len = sizeof(void*) + 1 + item.AsStringRef().Size(); if (Y_UNLIKELY(len < item.AsStringRef().Size())) { ythrow yexception() << "Too long string"; @@ -1632,6 +1635,7 @@ struct TToPgExec { } const ui32 TargetId; + const bool IsCString; }; std::shared_ptr<arrow::compute::ScalarKernel> MakeToPgKernel(TType* inputType, TType* resultType, ui32 targetId) { @@ -1687,7 +1691,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakePgKernel(TVector<TType*> argTy const auto& retTypeDesc = NPg::LookupType(procDesc.ResultType); state->Name = procDesc.Name; state->IsFixedResult = retTypeDesc.PassByValue; - state->IsCStringResult = procDesc.ResultType == CSTRINGOID; + state->IsCStringResult = NPg::LookupType(procDesc.ResultType).TypeLen == -2; for (const auto& argTypeId : procDesc.ArgTypes) { const auto& argTypeDesc = NPg::LookupType(argTypeId); state->IsFixedArg.push_back(argTypeDesc.PassByValue); @@ -1771,9 +1775,6 @@ TComputationNodeFactory GetPgFactory() { ui32 sourceId = 0; if (!inputType->IsNull()) { sourceId = AS_TYPE(TPgType, inputType)->GetTypeId(); - if (sourceId == UNKNOWNOID) { - sourceId = TEXTOID; - } } auto returnType = callable.GetType()->GetReturnType(); |