aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-03-11 02:16:32 +0300
committervvvv <vvvv@yandex-team.ru>2022-03-11 02:16:32 +0300
commit242a4a18a13a75c2b834eaf073941940d27410a3 (patch)
treee34f52462d748332431af13730690fc4941382da
parent6082ae315c8cde5673e85919fe49daf7ece09c85 (diff)
downloadydb-242a4a18a13a75c2b834eaf073941940d27410a3.tar.gz
YQL-13710 table IO via skiff, disable llvm
ref:2d6ea55909bffc16ec1c3e684c977afd2ea39ca6
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp219
-rw-r--r--ydb/library/yql/providers/common/codec/yql_codec.cpp4
-rw-r--r--ydb/library/yql/providers/common/codec/yql_codec.h2
-rw-r--r--ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp20
-rw-r--r--ydb/library/yql/providers/config/yql_config_provider.cpp3
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp13
6 files changed, 259 insertions, 2 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index e502396cd20..daf3cf83839 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -142,6 +142,13 @@ void *MkqlAllocSetAlloc(MemoryContext context, Size size) {
void MkqlAllocSetFree(MemoryContext context, void* pointer) {
if (pointer) {
auto original = (void*)((char*)pointer - PallocHdrSize);
+ if (PAllocList) {
+ // remove this block from list
+ auto current = (TPAllocListItem*)original;
+ current->Prev->Next = current->Next;
+ current->Next->Prev = current->Prev;
+ }
+
MKQLFreeDeprecated(original);
}
}
@@ -922,6 +929,218 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
}
}
+NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) {
+ auto marker = buf.Read();
+ if (!marker) {
+ return NUdf::TUnboxedValue();
+ }
+
+ switch (type->GetTypeId()) {
+ case BOOLOID: {
+ auto x = buf.Read();
+ return ScalarDatumToPod(BoolGetDatum(x != 0));
+ }
+ case INT2OID: {
+ i64 x;
+ buf.ReadMany((char*)&x, sizeof(x));
+ return ScalarDatumToPod(Int16GetDatum((i16)x));
+ }
+ case INT4OID: {
+ i64 x;
+ buf.ReadMany((char*)&x, sizeof(x));
+ return ScalarDatumToPod(Int32GetDatum((i32)x));
+ }
+ case INT8OID: {
+ i64 x;
+ buf.ReadMany((char*)&x, sizeof(x));
+ return ScalarDatumToPod(Int64GetDatum(x));
+ }
+ case FLOAT4OID: {
+ double x;
+ buf.ReadMany((char*)&x, sizeof(x));
+ return ScalarDatumToPod(Float4GetDatum((float)x));
+ }
+ case FLOAT8OID: {
+ double x;
+ buf.ReadMany((char*)&x, sizeof(x));
+ return ScalarDatumToPod(Float8GetDatum(x));
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ TString s;
+ if (size) {
+ s = TString::Uninitialized(size);
+ buf.ReadMany(s.begin(), size);
+ }
+
+ SET_MEMORY_CONTEXT;
+ auto ret = cstring_to_text_with_len(s.Data(), s.Size());
+ return PointerDatumToPod((Datum)ret);
+ }
+ case CSTRINGOID: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ TString s;
+ if (size) {
+ s = TString::Uninitialized(size);
+ buf.ReadMany(s.begin(), size);
+ }
+
+ SET_MEMORY_CONTEXT;
+ auto ret = (char*)palloc(s.Size() + 1);
+ memcpy(ret, s.Data(), s.Size());
+ ret[s.Size()] = '\0';
+ return PointerDatumToPod((Datum)ret);
+ }
+ default:
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ TString s;
+ if (size) {
+ s = TString::Uninitialized(size);
+ buf.ReadMany(s.begin(), size);
+ }
+
+ StringInfoData stringInfo;
+ stringInfo.data = (char*)s.Data();
+ stringInfo.len = s.Size();
+ stringInfo.maxlen = s.Size();
+ stringInfo.cursor = 0;
+
+ const auto& typeInfo = NPg::LookupType(type->GetTypeId());
+ auto typeIOParam = MakeTypeIOParam(typeInfo);
+ Y_ENSURE(typeInfo.ReceiveFuncId);
+ FmgrInfo finfo;
+ Zero(finfo);
+ fmgr_info(typeInfo.ReceiveFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
+ LOCAL_FCINFO(callInfo, 3);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 3;
+ callInfo->isnull = false;
+ callInfo->args[0] = { (Datum)&stringInfo, false };
+ callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
+ callInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto x = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_ENSURE(stringInfo.cursor == stringInfo.len);
+ return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
+ }
+}
+
+void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+ if (!value) {
+ buf.Write('\0');
+ return;
+ }
+
+ buf.Write('\1');
+ switch (type->GetTypeId()) {
+ case BOOLOID: {
+ char x = DatumGetBool(ScalarDatumFromPod(value));
+ buf.Write(x);
+ break;
+ }
+ case INT2OID: {
+ i64 x = DatumGetInt16(ScalarDatumFromPod(value));
+ buf.WriteMany((const char*)&x, sizeof(x));
+ break;
+ }
+ case INT4OID: {
+ i64 x = DatumGetInt32(ScalarDatumFromPod(value));
+ buf.WriteMany((const char*)&x, sizeof(x));
+ break;
+ }
+ case INT8OID: {
+ i64 x = DatumGetInt64(ScalarDatumFromPod(value));
+ buf.WriteMany((const char*)&x, sizeof(x));
+ break;
+ }
+ case FLOAT4OID: {
+ double x = DatumGetFloat4(ScalarDatumFromPod(value));
+ buf.WriteMany((const char*)&x, sizeof(x));
+ break;
+ }
+ case FLOAT8OID: {
+ double x = DatumGetFloat8(ScalarDatumFromPod(value));
+ buf.WriteMany((const char*)&x, sizeof(x));
+ break;
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID: {
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto x = (const text*)PointerDatumFromPod(value, true);
+ ui32 len = VARSIZE_ANY_EXHDR(x);
+ buf.WriteMany((const char*)&len, sizeof(len));
+ TString s;
+ if (len) {
+ s = TString::Uninitialized(len);
+ text_to_cstring_buffer(x, s.begin(), len + 1);
+ }
+
+ buf.WriteMany(s.Data(), s.size());
+ break;
+ }
+ case CSTRINGOID: {
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto x = (const char*)PointerDatumFromPod(value, false);
+ ui32 len = strlen(x);
+ buf.WriteMany((const char*)&len, sizeof(len));
+ buf.WriteMany(x, len);
+ break;
+ }
+ default:
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto& typeInfo = NPg::LookupType(type->GetTypeId());
+ Y_ENSURE(typeInfo.SendFuncId);
+ FmgrInfo finfo;
+ Zero(finfo);
+ fmgr_info(typeInfo.SendFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs == 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->isnull = false;
+ callInfo->args[0] = { typeInfo.PassByValue ?
+ ScalarDatumFromPod(value) :
+ PointerDatumFromPod(value, typeInfo.TypeLen == -1), false };
+ auto x = (text*)finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_DEFER{
+ pfree(x);
+ };
+
+ ui32 len = VARSIZE_ANY_EXHDR(x);
+ buf.WriteMany((const char*)&len, sizeof(len));
+ TString s;
+ if (len) {
+ s = TString::Uninitialized(len);
+ text_to_cstring_buffer(x, s.begin(), len + 1);
+ }
+
+ buf.WriteMany(s.Data(), s.size());
+ }
+}
+
} // namespace NCommon
TMaybe<ui32> ConvertToPgType(NUdf::EDataSlot slot) {
diff --git a/ydb/library/yql/providers/common/codec/yql_codec.cpp b/ydb/library/yql/providers/common/codec/yql_codec.cpp
index 28f8ce8c926..d7129722643 100644
--- a/ydb/library/yql/providers/common/codec/yql_codec.cpp
+++ b/ydb/library/yql/providers/common/codec/yql_codec.cpp
@@ -1509,6 +1509,10 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* ty
return ReadSkiffData(type, nativeYtTypeFlags, buf);
}
+ if (type->IsPg()) {
+ return ReadSkiffPg(static_cast<TPgType*>(type), buf);
+ }
+
if (type->IsOptional()) {
auto marker = buf.Read();
if (!marker) {
diff --git a/ydb/library/yql/providers/common/codec/yql_codec.h b/ydb/library/yql/providers/common/codec/yql_codec.h
index 512e01224c4..59718c14153 100644
--- a/ydb/library/yql/providers/common/codec/yql_codec.h
+++ b/ydb/library/yql/providers/common/codec/yql_codec.h
@@ -80,6 +80,7 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* ty
const NKikimr::NMiniKQL::THolderFactory& holderFactory, TInputBuf& buf);
NKikimr::NUdf::TUnboxedValue ReadSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf);
+NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf);
extern "C" void ReadContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf,
bool wrapOptional);
@@ -88,6 +89,7 @@ extern "C" void WriteYsonContainerValue(NKikimr::NMiniKQL::TType* type,
const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
+void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
diff --git a/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp b/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp
index c814cc5ae20..83998a7e55a 100644
--- a/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp
+++ b/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp
@@ -84,8 +84,24 @@ struct TSkiffTypeLoader {
ythrow yexception() << "Unsupported data type" << NUdf::GetDataTypeInfo(*slot).Name;
}
- TMaybe<TType> LoadPgType(const TString& /*pgType*/, ui32 /*level*/) {
- ythrow yexception() << "Unsupported PG type";
+ TMaybe<TType> LoadPgType(const TString& pgType, ui32 /*level*/) {
+ TType itemType;
+ if (pgType == "bool") {
+ itemType = NYT::TNode()("wire_type", "boolean");
+ } else if (pgType == "int2" || pgType == "int4" || pgType == "int8") {
+ itemType = NYT::TNode()("wire_type", "int64");
+ } else if (pgType == "float4" || pgType == "float8") {
+ itemType = NYT::TNode()("wire_type", "double");
+ } else {
+ itemType = NYT::TNode()("wire_type", "string32");
+ }
+
+ return NYT::TNode()
+ ("wire_type", "variant8")
+ ("children", NYT::TNode()
+ .Add(NYT::TNode()("wire_type", "nothing"))
+ .Add(std::move(itemType))
+ );
}
TMaybe<TType> LoadDataTypeParams(const TString& dataType, const TString& paramOne, const TString& /*paramTwo*/, ui32 /*level*/) {
diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp
index 3df2eba81c7..98e7fdced8d 100644
--- a/ydb/library/yql/providers/config/yql_config_provider.cpp
+++ b/ydb/library/yql/providers/config/yql_config_provider.cpp
@@ -742,6 +742,9 @@ namespace {
}
Types.PgTypes = (name == "PgTypes");
+ if (Types.PgTypes) {
+ Types.OptLLVM = "OFF";
+ }
}
else if (name == "FolderSubDirsLimit") {
if (args.size() != 1) {
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index 7af490dac9c..bacf7064592 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -41,6 +41,19 @@ NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd,
throw yexception() << "PG types are not supported";
}
+NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) {
+ Y_UNUSED(type);
+ Y_UNUSED(buf);
+ throw yexception() << "PG types are not supported";
+}
+
+void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+ Y_UNUSED(type);
+ Y_UNUSED(value);
+ Y_UNUSED(buf);
+ throw yexception() << "PG types are not supported";
+}
+
} // namespace NCommon
} // NYql