aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2023-03-23 14:22:28 +0300
committermonster <monster@ydb.tech>2023-03-23 14:22:28 +0300
commite62f0606af1897a4152a75f3e4df5cf110d14930 (patch)
treea0dfbcb76d2dcb103ed092548c74f91e695edaeb
parent9a42b18985c1ab5da0f9c851288ca0bfa0f703f7 (diff)
downloadydb-e62f0606af1897a4152a75f3e4df5cf110d14930.tar.gz
support coercion for pg array types
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp67
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp105
2 files changed, 137 insertions, 35 deletions
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index ade78d357e..9f2d030874 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -24,6 +24,7 @@ namespace {
TString TypeMod;
bool ShouldPass;
std::function<TString()> TextIn, TextOut;
+ std::function<TString(TString)> ArrayPrint = [] (auto s) { return Sprintf("{%s,%s}", s.c_str(), s.c_str()); };
};
}
@@ -417,7 +418,8 @@ Y_UNIT_TEST_SUITE(KqpPg) {
BPCHAROID, "6",
SUCCESS,
[] () { return TString("abcd"); },
- [] () { return TString("abcd "); }
+ [] () { return TString("abcd "); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
@@ -562,69 +564,80 @@ Y_UNIT_TEST_SUITE(KqpPg) {
TIMESTAMPOID, "2",
SUCCESS,
[] () { return TString("2001-01-01 01:02:03.1234"); },
- [] () { return TString("2001-01-01 01:02:03.12"); }
+ [] () { return TString("2001-01-01 01:02:03.12"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
TIMESTAMPOID, "4",
SUCCESS,
[] () { return TString("2001-01-01 01:02:03.1234"); },
- [] () { return TString("2001-01-01 01:02:03.1234"); }
+ [] () { return TString("2001-01-01 01:02:03.1234"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
TIMESTAMPOID, "6",
SUCCESS,
[] () { return TString("2001-01-01 01:02:03.1234"); },
- [] () { return TString("2001-01-01 01:02:03.1234"); }
+ [] () { return TString("2001-01-01 01:02:03.1234"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
TIMESTAMPTZOID, "2",
SUCCESS,
[] () { return TString("2001-01-01 01:02:03.1234+00"); },
- [] () { return TString("2001-01-01 01:02:03.12+00"); }
+ [] () { return TString("2001-01-01 01:02:03.12+00"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
TIMESTAMPTZOID, "4",
SUCCESS,
[] () { return TString("2001-01-01 01:02:03.1234+00"); },
- [] () { return TString("2001-01-01 01:02:03.1234+00"); }
+ [] () { return TString("2001-01-01 01:02:03.1234+00"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
TIMESTAMPTZOID, "6",
SUCCESS,
[] () { return TString("2001-01-01 01:02:03.1234+00"); },
- [] () { return TString("2001-01-01 01:02:03.1234+00"); }
+ [] () { return TString("2001-01-01 01:02:03.1234+00"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
INTERVALOID, "day",
SUCCESS,
[] () { return TString("100 01:02:03.1234"); },
- [] () { return TString("100 days"); }
+ [] () { return TString("100 days"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
INTERVALOID, "day to minute",
SUCCESS,
[] () { return TString("100 01:02:03.1234"); },
- [] () { return TString("100 days 01:02:00"); }
+ [] () { return TString("100 days 01:02:00"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
INTERVALOID, "day to second,2",
SUCCESS,
[] () { return TString("100 01:02:03.1234"); },
- [] () { return TString("100 days 01:02:03.12"); }
+ [] () { return TString("100 days 01:02:03.12"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
INTERVALOID, "day to second,6",
SUCCESS,
[] () { return TString("100 01:02:03.1234"); },
- [] () { return TString("100 days 01:02:03.1234"); }
+ [] () { return TString("100 days 01:02:03.1234"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
{
INTERVALOID, "day to second,6",
SUCCESS,
[] () { return TString("100 01:02:03.1234"); },
- [] () { return TString("100 days 01:02:03.1234"); }
+ [] () { return TString("100 days 01:02:03.1234"); },
+ [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }
},
};
@@ -836,7 +849,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
Y_UNIT_TEST(TypeCoercionBulkUpsert) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
- auto testType = [&kikimr] (const TPgTypeCoercionTestSpec& spec) {
+ auto testSingleType = [&kikimr] (const TPgTypeCoercionTestSpec& spec) {
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -872,16 +885,42 @@ Y_UNIT_TEST_SUITE(KqpPg) {
auto result = NPg::PgNativeTextFromNativeBinary(c.GetPg().Content_, spec.TypeId);
UNIT_ASSERT_C(!result.Error, *result.Error);
UNIT_ASSERT_VALUES_EQUAL(expected, result.Str);
- Cerr << expected << Endl;
+ Cerr << result.Str << Endl;
}
}
session.Close().GetValueSync();
};
+ auto testType = [&] (const TPgTypeCoercionTestSpec& spec) {
+ auto textInArray = [&spec] () {
+ auto str = spec.TextIn();
+ return spec.ArrayPrint(str);
+ };
+
+ auto textOutArray = [&spec] () {
+ auto str = spec.TextOut();
+ return spec.ArrayPrint(str);
+ };
+
+ auto arrayTypeId = NYql::NPg::LookupType(spec.TypeId).ArrayTypeId;
+ TPgTypeCoercionTestSpec arraySpec{arrayTypeId, spec.TypeMod, spec.ShouldPass, textInArray, textOutArray};
+
+ testSingleType(spec);
+ testSingleType(arraySpec);
+ };
+
for (const auto& spec : typeCoercionSpecs) {
testType(spec);
}
+
+ TPgTypeCoercionTestSpec partialArrayCoerce{
+ NUMERICARRAYOID, "2",
+ false,
+ [] () { return TString("{99,99,9999,99}"); },
+ [] () { return TString(""); }
+ };
+ testSingleType(partialArrayCoerce);
}
Y_UNIT_TEST(EmptyQuery) {
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 6946d9748b..408deff7fc 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -2962,13 +2962,17 @@ public:
if (typeDesc.OutFuncId) {
OutFuncId = NYql::NPg::LookupProc("array_out", { 0 }).ProcId;
}
+ if (NYql::NPg::HasCast(ElementTypeId, ElementTypeId) && typeDesc.TypeModInFuncId) {
+ NeedsCoercion = true;
+ TypeModInFuncId = typeDesc.TypeModInFuncId;
+ }
} else {
YdbTypeName = TString("pg") + desc.Name;
StoredSize = TypeLen < 0 ? 0 : TypeLen;
if (TypeId == NAMEOID) {
StoredSize = 0; // store 'name' as usual string
}
- if (NYql::NPg::HasCast(TypeId, TypeId) && TypeModInFuncId != 0) {
+ if (NYql::NPg::HasCast(TypeId, TypeId) && TypeModInFuncId) {
NeedsCoercion = true;
}
}
@@ -3166,7 +3170,7 @@ public:
dvalues.reserve(params.size());
TString textNumberParam;
- if (TypeId == INTERVALOID) {
+ if (TypeId == INTERVALOID || TypeId == INTERVALARRAYOID) {
i32 typmod = -1;
auto ok = NYql::ParsePgIntervalModifier(params[0], typmod);
if (!ok) {
@@ -3274,16 +3278,25 @@ public:
NMiniKQL::TPAllocScope scope;
Datum datum = 0;
Datum datumCasted = 0;
+ TVector<Datum> elems;
+ TVector<bool> nulls;
+ TVector<Datum> castedElements;
+ bool passByValueElem = false;
text* serialized = nullptr;
Y_DEFER {
if (!PassByValue) {
if (datum) {
pfree((void*)datum);
}
- if (datumCasted && datumCasted != datum) {
+ if (datumCasted) {
pfree((void*)datumCasted);
}
}
+ if (IsArray() && !passByValueElem) {
+ for (ui32 i = 0; i < castedElements.size(); ++i) {
+ pfree((void*)castedElements[i]);
+ }
+ }
if (serialized) {
pfree(serialized);
}
@@ -3292,23 +3305,51 @@ public:
{
datum = Receive(binary.Data(), binary.Size());
- const auto& cast = NYql::NPg::LookupCast(TypeId, TypeId);
- FmgrInfo finfo;
- InitFunc(cast.FunctionId, &finfo, 2, 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { datum, false };
- callInfo->args[1] = { Int32GetDatum(typmod), false };
- callInfo->args[2] = { BoolGetDatum(false), false };
+ if (IsArray()) {
+ const auto& typeDesc = NYql::NPg::LookupType(ElementTypeId);
+ passByValueElem = typeDesc.PassByValue;
+
+ auto arr = (ArrayType*)DatumGetPointer(datum);
+ auto ndim = ARR_NDIM(arr);
+ auto dims = ARR_DIMS(arr);
+ auto lb = ARR_LBOUND(arr);
+ auto nitems = ArrayGetNItems(ndim, dims);
+
+ elems.resize(nitems);
+ nulls.resize(nitems);
+ castedElements.reserve(nitems);
+
+ array_iter iter;
+ array_iter_setup(&iter, (AnyArrayType*)arr);
+ for (ui32 i = 0; i < nitems; ++i) {
+ bool isNull;
+ auto datum = array_iter_next(&iter, &isNull, i,
+ typeDesc.TypeLen, typeDesc.PassByValue, typeDesc.TypeAlign);
+ if (isNull) {
+ elems[i] = 0;
+ nulls[i] = true;
+ continue;
+ }
+ elems[i] = CoerceOne(ElementTypeId, datum, typmod);
+ nulls[i] = false;
+ if (elems[i] != datum) {
+ castedElements.push_back(elems[i]);
+ }
+ }
- datumCasted = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
+ if (!castedElements.empty()) {
+ auto newArray = construct_md_array(elems.data(), nulls.data(), ndim, dims, lb,
+ typeDesc.TypeId, typeDesc.TypeLen, typeDesc.PassByValue, typeDesc.TypeAlign);
+ datumCasted = PointerGetDatum(newArray);
+ }
+ } else {
+ datumCasted = CoerceOne(TypeId, datum, typmod);
+ if (datumCasted == datum) {
+ datumCasted = 0;
+ }
+ }
- if (datum == datumCasted) {
+ if (!datumCasted) {
return {{}, {}};
} else {
FmgrInfo finfo;
@@ -3330,9 +3371,7 @@ public:
{
auto error_data = CopyErrorData();
TStringBuilder errMsg;
- errMsg << "Error in 'cast' function: "
- << NYql::NPg::LookupProc(ReceiveFuncId).Name
- << ", reason: " << error_data->message;
+ errMsg << "Error while coercing value, reason: " << error_data->message;
FreeErrorData(error_data);
FlushErrorState();
return {{}, errMsg};
@@ -3341,6 +3380,26 @@ public:
}
private:
+ Datum CoerceOne(ui32 typeId, Datum datum, i32 typmod) const {
+ const auto& cast = NYql::NPg::LookupCast(typeId, typeId);
+
+ FmgrInfo finfo;
+ InitFunc(cast.FunctionId, &finfo, 2, 3);
+ LOCAL_FCINFO(callInfo, 3);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 3;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { datum, false };
+ callInfo->args[1] = { Int32GetDatum(typmod), false };
+ callInfo->args[2] = { BoolGetDatum(false), false };
+
+ auto result = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ return result;
+ }
+
Datum Receive(const char* data, size_t size) const {
StringInfoData stringInfo;
stringInfo.data = (char*)data;
@@ -3365,6 +3424,10 @@ private:
return result;
}
+ bool IsArray() {
+ return TypeId == ArrayTypeId;
+ }
+
static inline void InitFunc(ui32 funcId, FmgrInfo* info, ui32 argCountMin, ui32 argCountMax) {
Zero(*info);
Y_ENSURE(funcId);