diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-03-11 02:16:32 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.ru> | 2022-03-11 02:16:32 +0300 |
commit | 242a4a18a13a75c2b834eaf073941940d27410a3 (patch) | |
tree | e34f52462d748332431af13730690fc4941382da | |
parent | 6082ae315c8cde5673e85919fe49daf7ece09c85 (diff) | |
download | ydb-242a4a18a13a75c2b834eaf073941940d27410a3.tar.gz |
YQL-13710 table IO via skiff, disable llvm
ref:2d6ea55909bffc16ec1c3e684c977afd2ea39ca6
6 files changed, 259 insertions, 2 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index e502396cd20..daf3cf83839 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -142,6 +142,13 @@ void *MkqlAllocSetAlloc(MemoryContext context, Size size) { void MkqlAllocSetFree(MemoryContext context, void* pointer) { if (pointer) { auto original = (void*)((char*)pointer - PallocHdrSize); + if (PAllocList) { + // remove this block from list + auto current = (TPAllocListItem*)original; + current->Prev->Next = current->Next; + current->Next->Prev = current->Prev; + } + MKQLFreeDeprecated(original); } } @@ -922,6 +929,218 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { } } +NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) { + auto marker = buf.Read(); + if (!marker) { + return NUdf::TUnboxedValue(); + } + + switch (type->GetTypeId()) { + case BOOLOID: { + auto x = buf.Read(); + return ScalarDatumToPod(BoolGetDatum(x != 0)); + } + case INT2OID: { + i64 x; + buf.ReadMany((char*)&x, sizeof(x)); + return ScalarDatumToPod(Int16GetDatum((i16)x)); + } + case INT4OID: { + i64 x; + buf.ReadMany((char*)&x, sizeof(x)); + return ScalarDatumToPod(Int32GetDatum((i32)x)); + } + case INT8OID: { + i64 x; + buf.ReadMany((char*)&x, sizeof(x)); + return ScalarDatumToPod(Int64GetDatum(x)); + } + case FLOAT4OID: { + double x; + buf.ReadMany((char*)&x, sizeof(x)); + return ScalarDatumToPod(Float4GetDatum((float)x)); + } + case FLOAT8OID: { + double x; + buf.ReadMany((char*)&x, sizeof(x)); + return ScalarDatumToPod(Float8GetDatum(x)); + } + case BYTEAOID: + case VARCHAROID: + case TEXTOID: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + TString s; + if (size) { + s = TString::Uninitialized(size); + buf.ReadMany(s.begin(), size); + } + + SET_MEMORY_CONTEXT; + auto ret = cstring_to_text_with_len(s.Data(), s.Size()); + return PointerDatumToPod((Datum)ret); + } + case CSTRINGOID: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + TString s; + if (size) { + s = TString::Uninitialized(size); + buf.ReadMany(s.begin(), size); + } + + SET_MEMORY_CONTEXT; + auto ret = (char*)palloc(s.Size() + 1); + memcpy(ret, s.Data(), s.Size()); + ret[s.Size()] = '\0'; + return PointerDatumToPod((Datum)ret); + } + default: + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + TString s; + if (size) { + s = TString::Uninitialized(size); + buf.ReadMany(s.begin(), size); + } + + StringInfoData stringInfo; + stringInfo.data = (char*)s.Data(); + stringInfo.len = s.Size(); + stringInfo.maxlen = s.Size(); + stringInfo.cursor = 0; + + const auto& typeInfo = NPg::LookupType(type->GetTypeId()); + auto typeIOParam = MakeTypeIOParam(typeInfo); + Y_ENSURE(typeInfo.ReceiveFuncId); + FmgrInfo finfo; + Zero(finfo); + fmgr_info(typeInfo.ReceiveFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); + LOCAL_FCINFO(callInfo, 3); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 3; + callInfo->isnull = false; + callInfo->args[0] = { (Datum)&stringInfo, false }; + callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; + callInfo->args[2] = { Int32GetDatum(-1), false }; + + auto x = finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_ENSURE(stringInfo.cursor == stringInfo.len); + return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); + } +} + +void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { + if (!value) { + buf.Write('\0'); + return; + } + + buf.Write('\1'); + switch (type->GetTypeId()) { + case BOOLOID: { + char x = DatumGetBool(ScalarDatumFromPod(value)); + buf.Write(x); + break; + } + case INT2OID: { + i64 x = DatumGetInt16(ScalarDatumFromPod(value)); + buf.WriteMany((const char*)&x, sizeof(x)); + break; + } + case INT4OID: { + i64 x = DatumGetInt32(ScalarDatumFromPod(value)); + buf.WriteMany((const char*)&x, sizeof(x)); + break; + } + case INT8OID: { + i64 x = DatumGetInt64(ScalarDatumFromPod(value)); + buf.WriteMany((const char*)&x, sizeof(x)); + break; + } + case FLOAT4OID: { + double x = DatumGetFloat4(ScalarDatumFromPod(value)); + buf.WriteMany((const char*)&x, sizeof(x)); + break; + } + case FLOAT8OID: { + double x = DatumGetFloat8(ScalarDatumFromPod(value)); + buf.WriteMany((const char*)&x, sizeof(x)); + break; + } + case BYTEAOID: + case VARCHAROID: + case TEXTOID: { + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto x = (const text*)PointerDatumFromPod(value, true); + ui32 len = VARSIZE_ANY_EXHDR(x); + buf.WriteMany((const char*)&len, sizeof(len)); + TString s; + if (len) { + s = TString::Uninitialized(len); + text_to_cstring_buffer(x, s.begin(), len + 1); + } + + buf.WriteMany(s.Data(), s.size()); + break; + } + case CSTRINGOID: { + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto x = (const char*)PointerDatumFromPod(value, false); + ui32 len = strlen(x); + buf.WriteMany((const char*)&len, sizeof(len)); + buf.WriteMany(x, len); + break; + } + default: + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto& typeInfo = NPg::LookupType(type->GetTypeId()); + Y_ENSURE(typeInfo.SendFuncId); + FmgrInfo finfo; + Zero(finfo); + fmgr_info(typeInfo.SendFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs == 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->isnull = false; + callInfo->args[0] = { typeInfo.PassByValue ? + ScalarDatumFromPod(value) : + PointerDatumFromPod(value, typeInfo.TypeLen == -1), false }; + auto x = (text*)finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_DEFER{ + pfree(x); + }; + + ui32 len = VARSIZE_ANY_EXHDR(x); + buf.WriteMany((const char*)&len, sizeof(len)); + TString s; + if (len) { + s = TString::Uninitialized(len); + text_to_cstring_buffer(x, s.begin(), len + 1); + } + + buf.WriteMany(s.Data(), s.size()); + } +} + } // namespace NCommon TMaybe<ui32> ConvertToPgType(NUdf::EDataSlot slot) { diff --git a/ydb/library/yql/providers/common/codec/yql_codec.cpp b/ydb/library/yql/providers/common/codec/yql_codec.cpp index 28f8ce8c926..d7129722643 100644 --- a/ydb/library/yql/providers/common/codec/yql_codec.cpp +++ b/ydb/library/yql/providers/common/codec/yql_codec.cpp @@ -1509,6 +1509,10 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* ty return ReadSkiffData(type, nativeYtTypeFlags, buf); } + if (type->IsPg()) { + return ReadSkiffPg(static_cast<TPgType*>(type), buf); + } + if (type->IsOptional()) { auto marker = buf.Read(); if (!marker) { diff --git a/ydb/library/yql/providers/common/codec/yql_codec.h b/ydb/library/yql/providers/common/codec/yql_codec.h index 512e01224c4..59718c14153 100644 --- a/ydb/library/yql/providers/common/codec/yql_codec.h +++ b/ydb/library/yql/providers/common/codec/yql_codec.h @@ -80,6 +80,7 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* ty const NKikimr::NMiniKQL::THolderFactory& holderFactory, TInputBuf& buf); NKikimr::NUdf::TUnboxedValue ReadSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf); +NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf); extern "C" void ReadContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, bool wrapOptional); @@ -88,6 +89,7 @@ extern "C" void WriteYsonContainerValue(NKikimr::NMiniKQL::TType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); +void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); diff --git a/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp b/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp index c814cc5ae20..83998a7e55a 100644 --- a/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp +++ b/ydb/library/yql/providers/common/schema/skiff/yql_skiff_schema.cpp @@ -84,8 +84,24 @@ struct TSkiffTypeLoader { ythrow yexception() << "Unsupported data type" << NUdf::GetDataTypeInfo(*slot).Name; } - TMaybe<TType> LoadPgType(const TString& /*pgType*/, ui32 /*level*/) { - ythrow yexception() << "Unsupported PG type"; + TMaybe<TType> LoadPgType(const TString& pgType, ui32 /*level*/) { + TType itemType; + if (pgType == "bool") { + itemType = NYT::TNode()("wire_type", "boolean"); + } else if (pgType == "int2" || pgType == "int4" || pgType == "int8") { + itemType = NYT::TNode()("wire_type", "int64"); + } else if (pgType == "float4" || pgType == "float8") { + itemType = NYT::TNode()("wire_type", "double"); + } else { + itemType = NYT::TNode()("wire_type", "string32"); + } + + return NYT::TNode() + ("wire_type", "variant8") + ("children", NYT::TNode() + .Add(NYT::TNode()("wire_type", "nothing")) + .Add(std::move(itemType)) + ); } TMaybe<TType> LoadDataTypeParams(const TString& dataType, const TString& paramOne, const TString& /*paramTwo*/, ui32 /*level*/) { diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp index 3df2eba81c7..98e7fdced8d 100644 --- a/ydb/library/yql/providers/config/yql_config_provider.cpp +++ b/ydb/library/yql/providers/config/yql_config_provider.cpp @@ -742,6 +742,9 @@ namespace { } Types.PgTypes = (name == "PgTypes"); + if (Types.PgTypes) { + Types.OptLLVM = "OFF"; + } } else if (name == "FolderSubDirsLimit") { if (args.size() != 1) { diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 7af490dac9c..bacf7064592 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -41,6 +41,19 @@ NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd, throw yexception() << "PG types are not supported"; } +NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) { + Y_UNUSED(type); + Y_UNUSED(buf); + throw yexception() << "PG types are not supported"; +} + +void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { + Y_UNUSED(type); + Y_UNUSED(value); + Y_UNUSED(buf); + throw yexception() << "PG types are not supported"; +} + } // namespace NCommon } // NYql |