aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-06-27 14:41:54 +0300
committervvvv <vvvv@ydb.tech>2023-06-27 14:41:54 +0300
commit65276d5feee9170231b2ff6e033f715b2c82a589 (patch)
treef98ce0001ab7818e7bbad9a9b5a528b816084b3b
parent6c808b1dcbe96e06cbabe037825ebd3ac8b81ff9 (diff)
downloadydb-65276d5feee9170231b2ff6e033f715b2c82a589.tar.gz
Check of catalog's typeLen instead of direct CSTRINGOID
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.cpp2
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.h28
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp29
3 files changed, 32 insertions, 27 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.cpp b/ydb/library/yql/parser/pg_wrapper/arrow.cpp
index 7846d3c4a4..660e2b2054 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.cpp
@@ -212,7 +212,7 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>&
return {};
}
- return [primaryType, originalType, isCString = targetType->GetTypeId() == CSTRINGOID](const std::shared_ptr<arrow::Array>& value) {
+ return [primaryType, originalType, isCString = NPg::LookupType(targetType->GetTypeId()).TypeLen == -2](const std::shared_ptr<arrow::Array>& value) {
auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
if (isCString) {
return PgConvertString<true>(res);
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h
index aaa8fe082e..9389252d44 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.h
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.h
@@ -585,6 +585,7 @@ private:
, SerializeFunc_(serializeFunc)
, ArgsColumns_(argsColumns)
, AggDesc_(aggDesc)
+ , IsCStringTransType_(NPg::LookupType(this->AggDesc_.TransTypeId).TypeLen == -2)
{
if (!HasInitValue && IsTransStrict) {
Y_ENSURE(AggDesc_.ArgTypes.size() == 1);
@@ -636,6 +637,7 @@ private:
const TSerializeFunc SerializeFunc_;
const std::vector<ui32> ArgsColumns_;
const NPg::TAggregateDesc& AggDesc_;
+ const bool IsCStringTransType_;
std::vector<bool> IsFixedArg_;
FmgrInfo TransFuncInfo_;
FmgrInfo SerializeFuncInfo_;
@@ -668,7 +670,7 @@ private:
if constexpr (HasInitValue) {
auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_);
typedState->isnull = false;
- typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID);
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->IsCStringTransType_);
}
}
@@ -816,7 +818,7 @@ SkipCall:;
}
CopyState<IsTransTypeFixed>(transCallInfo->args[0], *typedState);
- SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_);
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
@@ -894,7 +896,7 @@ SkipCall:;
if constexpr (HasInitValue) {
auto datum = IsTransTypeFixed ? ScalarDatumFromPod(this->PreparedInitValue_) : PointerDatumFromPod(this->PreparedInitValue_);
typedState->isnull = false;
- typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->AggDesc_.TransTypeId == CSTRINGOID);
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(datum, this->IsCStringTransType_);
}
UpdateKey(state, batchNum, columns, row);
@@ -962,7 +964,7 @@ SkipCall:;
if (!HasInitValue && IsTransStrict) {
if (transCallInfo->args[0].isnull) {
typedState->isnull = false;
- typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(transCallInfo->args[1].value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(transCallInfo->args[1].value, this->IsCStringTransType_);
return;
}
}
@@ -971,7 +973,7 @@ SkipCall:;
ret = this->TransFunc_(transCallInfo);
CopyState<IsTransTypeFixed>({ret, transCallInfo->isnull}, *typedState);
- SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_);
}
std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
@@ -979,7 +981,7 @@ SkipCall:;
return std::make_unique<TAggColumnBuilder<true, false, HasSerialize, TSerializeFunc, true, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>(
this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::uint64(), this->Ctx_);
} else {
- if (SerializedType_ == CSTRINGOID) {
+ if (NPg::LookupType(SerializedType_).TypeLen == -2) {
return std::make_unique<TAggColumnBuilder<false, true, HasSerialize, TSerializeFunc, true, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true, NYql::NUdf::EPgStringType::CString>>>(
this->AggDesc_.Name, this->SerializeFunc_, size, &this->SerializeFuncInfo_, arrow::binary(), this->Ctx_);
} else {
@@ -1008,6 +1010,7 @@ SkipCall:;
, AggDesc_(aggDesc)
, SerializedType_(HasSerialize ? NPg::LookupProc(this->AggDesc_.SerializeFuncId).ResultType : this->AggDesc_.TransTypeId)
, FinalType_(HasFinal ? NPg::LookupProc(this->AggDesc_.FinalFuncId).ResultType : this->AggDesc_.TransTypeId)
+ , IsCStringTransType_(NPg::LookupType(this->AggDesc_.TransTypeId).TypeLen == -2)
{
Values_.reserve(1);
}
@@ -1065,12 +1068,12 @@ SkipCall:;
if constexpr (!HasDeserialize) {
typedState->isnull = false;
- typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->IsCStringTransType_);
} else {
Deserialize(d.value, *typedState);
}
- SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_);
}
void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
@@ -1100,7 +1103,7 @@ SkipCall:;
if constexpr (!HasDeserialize) {
if (IsCombineStrict && typedState->isnull) {
typedState->isnull = false;
- typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->IsCStringTransType_);
return;
}
@@ -1126,13 +1129,13 @@ SkipCall:;
if constexpr (!HasDeserialize) {
if (!combineCallInfo->isnull && ret == d.value) {
typedState->isnull = false;
- typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->IsCStringTransType_);
return;
}
}
CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState);
- SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
+ SaveToAggContext<IsTransTypeFixed>(*typedState, this->IsCStringTransType_);
}
std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
@@ -1140,7 +1143,7 @@ SkipCall:;
return std::make_unique<TAggColumnBuilder<true, false, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TFixedSizeArrayBuilder<ui64, true>>>(
this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::uint64(), this->Ctx_);
} else {
- if (FinalType_ == CSTRINGOID) {
+ if (NPg::LookupType(FinalType_).TypeLen == -2) {
return std::make_unique<TAggColumnBuilder<false, true, HasFinal, TFinalFunc, IsFinalStrict, NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, true>>>(
this->AggDesc_.Name, this->FinalFunc_, size, &this->FinalFuncInfo_, arrow::binary(), this->Ctx_);
} else {
@@ -1157,6 +1160,7 @@ SkipCall:;
const NPg::TAggregateDesc& AggDesc_;
const ui32 SerializedType_;
const ui32 FinalType_;
+ const bool IsCStringTransType_;
ui64 BatchNum_ = Max<ui64>();
std::vector<arrow::Datum> Values_;
TInputArgsAccessor<TDeserializeArgsPolicy> DeserializeAccessor_;
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index bff63c78ab..678ad34d92 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -619,15 +619,15 @@ public:
if (!NPg::HasCast(SourceElemDesc.TypeId, TargetElemDesc.TypeId) || (IsSourceArray != IsTargetArray)) {
ArrayCast = IsSourceArray && IsTargetArray;
if (IsSourceArray && !IsTargetArray) {
- Y_ENSURE(TargetTypeDesc.Category == 'S');
+ Y_ENSURE(TargetTypeDesc.Category == 'S' || TargetId == UNKNOWNOID);
funcId = NPg::LookupProc("array_out", { 0 }).ProcId;
} else if (IsTargetArray && !IsSourceArray) {
- Y_ENSURE(SourceElemDesc.Category == 'S');
+ Y_ENSURE(SourceElemDesc.Category == 'S' || SourceId == UNKNOWNOID);
funcId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId;
- } else if (SourceElemDesc.Category == 'S') {
+ } else if (SourceElemDesc.Category == 'S' || SourceId == UNKNOWNOID) {
funcId = TargetElemDesc.InFuncId;
} else {
- Y_ENSURE(TargetTypeDesc.Category == 'S');
+ Y_ENSURE(TargetTypeDesc.Category == 'S' || TargetId == UNKNOWNOID);
funcId = SourceElemDesc.OutFuncId;
}
} else {
@@ -658,7 +658,7 @@ public:
Y_ENSURE(FInfo1.fn_nargs >= 1 && FInfo1.fn_nargs <= 3);
Func1Lookup = NPg::LookupProc(funcId);
Y_ENSURE(Func1Lookup.ArgTypes.size() >= 1 && Func1Lookup.ArgTypes.size() <= 3);
- if (Func1Lookup.ArgTypes[0] == CSTRINGOID && SourceElemDesc.Category == 'S') {
+ if (NPg::LookupType(Func1Lookup.ArgTypes[0]).TypeLen == -2 && SourceElemDesc.Category == 'S') {
ConvertArgToCString = true;
}
@@ -673,16 +673,16 @@ public:
}
if (!funcId2) {
- if (Func1Lookup.ResultType == CSTRINGOID && TargetElemDesc.Category == 'S') {
+ if (NPg::LookupType(Func1Lookup.ResultType).TypeLen == -2 && TargetElemDesc.Category == 'S') {
ConvertResFromCString = true;
}
} else {
const auto& Func2ArgType = NPg::LookupType(Func2Lookup.ArgTypes[0]);
- if (Func1Lookup.ResultType == CSTRINGOID && Func2ArgType.Category == 'S') {
+ if (NPg::LookupType(Func1Lookup.ResultType).TypeLen == -2 && Func2ArgType.Category == 'S') {
ConvertResFromCString = true;
}
- if (Func2Lookup.ResultType == CSTRINGOID && TargetElemDesc.Category == 'S') {
+ if (NPg::LookupType(Func2Lookup.ResultType).TypeLen == -2 && TargetElemDesc.Category == 'S') {
ConvertResFromCString2 = true;
}
}
@@ -1389,6 +1389,7 @@ private:
struct TFromPgExec {
TFromPgExec(ui32 sourceId)
: SourceId(sourceId)
+ , IsCString(NPg::LookupType(sourceId).TypeLen == -2)
{}
arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
@@ -1460,7 +1461,7 @@ struct TFromPgExec {
ui32 len;
const char* ptr = item.AsStringRef().Data() + sizeof(void*);
- if (SourceId == CSTRINGOID) {
+ if (IsCString) {
len = strlen(ptr);
} else {
len = GetCleanVarSize((const text*)ptr);
@@ -1481,6 +1482,7 @@ struct TFromPgExec {
}
const ui32 SourceId;
+ const bool IsCString;
};
std::shared_ptr<arrow::compute::ScalarKernel> MakeFromPgKernel(TType* inputType, TType* resultType, ui32 sourceId) {
@@ -1518,6 +1520,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeFromPgKernel(TType* inputType,
struct TToPgExec {
TToPgExec(ui32 targetId)
: TargetId(targetId)
+ , IsCString(NPg::LookupType(targetId).TypeLen == -2)
{}
arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
@@ -1589,7 +1592,7 @@ struct TToPgExec {
}
ui32 len;
- if (TargetId == CSTRINGOID) {
+ if (IsCString) {
len = sizeof(void*) + 1 + item.AsStringRef().Size();
if (Y_UNLIKELY(len < item.AsStringRef().Size())) {
ythrow yexception() << "Too long string";
@@ -1632,6 +1635,7 @@ struct TToPgExec {
}
const ui32 TargetId;
+ const bool IsCString;
};
std::shared_ptr<arrow::compute::ScalarKernel> MakeToPgKernel(TType* inputType, TType* resultType, ui32 targetId) {
@@ -1687,7 +1691,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakePgKernel(TVector<TType*> argTy
const auto& retTypeDesc = NPg::LookupType(procDesc.ResultType);
state->Name = procDesc.Name;
state->IsFixedResult = retTypeDesc.PassByValue;
- state->IsCStringResult = procDesc.ResultType == CSTRINGOID;
+ state->IsCStringResult = NPg::LookupType(procDesc.ResultType).TypeLen == -2;
for (const auto& argTypeId : procDesc.ArgTypes) {
const auto& argTypeDesc = NPg::LookupType(argTypeId);
state->IsFixedArg.push_back(argTypeDesc.PassByValue);
@@ -1771,9 +1775,6 @@ TComputationNodeFactory GetPgFactory() {
ui32 sourceId = 0;
if (!inputType->IsNull()) {
sourceId = AS_TYPE(TPgType, inputType)->GetTypeId();
- if (sourceId == UNKNOWNOID) {
- sourceId = TEXTOID;
- }
}
auto returnType = callable.GetType()->GetReturnType();