diff options
author | monster <monster@ydb.tech> | 2023-01-26 20:40:15 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2023-01-26 20:40:15 +0300 |
commit | 0996e5e1336dceacb4b56248a3ef421cec64ff4b (patch) | |
tree | 69efebbcd6917bda89b54b19802182a7552b67a4 | |
parent | bcb5d533a29a90c4099f725e73442909cc462635 (diff) | |
download | ydb-0996e5e1336dceacb4b56248a3ef421cec64ff4b.tar.gz |
support text pg value format in bulk upsert
-rw-r--r-- | ydb/core/grpc_services/rpc_load_rows.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 49 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 30 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/interface/type_desc.h | 10 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 12 |
5 files changed, 86 insertions, 33 deletions
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 7e6467d5503..8092d80428e 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -272,8 +272,22 @@ private: break; } case NScheme::NTypeIds::Pg : { - TString v = val.Getbytes_value(); - c = TCell(v.data(), v.size()); + TString text = val.Gettext_value(); + if (!text.empty()) { + auto desc = type.GetTypeDesc(); + auto id = NPg::PgTypeIdFromTypeDesc(desc); + auto result = NPg::PgNativeBinaryFromNativeText(text, id); + if (!result.Error.empty()) { + err = TStringBuilder() << "Invalid text value for " + << NPg::PgTypeNameFromTypeDesc(desc) << ": " << result.Error; + return false; + } + const auto valueInPool = valueDataPool.AppendString(TStringBuf(result.Str)); + c = TCell(valueInPool.data(), valueInPool.size()); + } else { + TString binary = val.Getbytes_value(); + c = TCell(binary.data(), binary.size()); + } break; } default: diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 1fd50c06108..a3f1a6e1090 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -298,11 +298,13 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } }; + auto createTable = [] ( NYdb::NTable::TTableClient& db, NYdb::NTable::TSession& session, ui32 id, bool isKey, + bool isText, std::function<TString(size_t)> textIn, TString setTableName = "", ui16 rowCount = 10 @@ -316,27 +318,29 @@ Y_UNIT_TEST_SUITE(KqpPg) { builder.AddNullableColumn("value", makePgType(id)); builder.SetPrimaryKeyColumn("key"); - auto tableName = (setTableName.empty()) ? Sprintf("/Root/Pg%u", id) : setTableName; + auto tableName = (setTableName.empty()) ? + Sprintf("/Root/Pg%u_%s", id, isText ? "t" : "b") : setTableName; + auto result = session.CreateTable(tableName, builder.Build()).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); NYdb::TValueBuilder rows; rows.BeginList(); for (size_t i = 0; i < rowCount; ++i) { - auto str = NPg::PgNativeBinaryFromNativeText(textIn(i), id); + auto str = isText ? textIn(i) : NPg::PgNativeBinaryFromNativeText(textIn(i), id).Str; + auto mode = isText ? TPgValue::VK_TEXT : TPgValue::VK_BINARY; if (isKey) { rows.AddListItem() .BeginStruct() - .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) - .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) + .AddMember("key").Pg(TPgValue(mode, str, makePgType(id))) + .AddMember("value").Pg(TPgValue(mode, str, makePgType(id))) .EndStruct(); } else { - auto int2Val = (i16)i; - TString int2Str((const char*)&int2Val, sizeof(int2Val)); + auto int2Str = NPg::PgNativeBinaryFromNativeText(Sprintf("%u", i), INT2OID).Str; rows.AddListItem() .BeginStruct() .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, int2Str, makePgType(INT2OID))) - .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) + .AddMember("value").Pg(TPgValue(mode, str, makePgType(id))) .EndStruct(); } } @@ -357,13 +361,13 @@ Y_UNIT_TEST_SUITE(KqpPg) { Y_UNIT_TEST(CreateTableBulkUpsertAndRead) { TKikimrRunner kikimr; - auto testSingleType = [&kikimr] (ui32 id, bool isKey, + auto testSingleType = [&kikimr] (ui32 id, bool isKey, bool isText, std::function<TString(size_t)> textIn, std::function<TString(size_t)> textOut) { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - auto tableName = createTable(db, session, id, isKey, textIn); + auto tableName = createTable(db, session, id, isKey, isText, textIn); auto readSettings = TReadTableSettings() .AppendColumns("key") @@ -385,7 +389,9 @@ Y_UNIT_TEST_SUITE(KqpPg) { for (size_t i = 0; parser.TryNextRow(); ++i) { auto check = [&parser, &id, &i] (const TString& column, const TString& expected) { auto& c = parser.ColumnParser(column); - UNIT_ASSERT_VALUES_EQUAL(expected, NPg::PgNativeTextFromNativeBinary(c.GetPg().Content_, id)); + auto result = NPg::PgNativeTextFromNativeBinary(c.GetPg().Content_, id); + UNIT_ASSERT_C(result.Error.empty(), result.Error); + UNIT_ASSERT_VALUES_EQUAL(expected, result.Str); Cerr << expected << Endl; }; auto expected = textOut(i); @@ -401,7 +407,8 @@ Y_UNIT_TEST_SUITE(KqpPg) { auto testType = [&] (ui32 id, const TPgTypeTestSpec& typeSpec) { - testSingleType(id, typeSpec.IsKey, typeSpec.TextIn, typeSpec.TextOut); + testSingleType(id, typeSpec.IsKey, false, typeSpec.TextIn, typeSpec.TextOut); + testSingleType(id, typeSpec.IsKey, true, typeSpec.TextIn, typeSpec.TextOut); auto arrayId = NYql::NPg::LookupType(id).ArrayTypeId; @@ -415,15 +422,24 @@ Y_UNIT_TEST_SUITE(KqpPg) { return typeSpec.ArrayPrint(str); }; - testSingleType(arrayId, false, textInArray, textOutArray); + testSingleType(arrayId, false, false, textInArray, textOutArray); + testSingleType(arrayId, false, true, textInArray, textOutArray); }; auto testByteaType = [&] () { - testSingleType(BYTEAOID, true, + testSingleType(BYTEAOID, true, false, [] (auto i) { return Sprintf("bytea %u", i); }, [] (auto i) { return Sprintf("\\x627974656120%x", i + 48); }); - testSingleType(BYTEAARRAYOID, false, + testSingleType(BYTEAOID, true, true, + [] (auto i) { return Sprintf("bytea %u", i); }, + [] (auto i) { return Sprintf("\\x627974656120%x", i + 48); }); + + testSingleType(BYTEAARRAYOID, true, false, + [] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); }, + [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); }); + + testSingleType(BYTEAARRAYOID, true, true, [] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); }, [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); }); }; @@ -433,6 +449,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { for (const auto& [oid, spec] : typeSpecs) { testType(oid, spec); } + // TODO: varchar as a key // TODO: native range/multirange types (use get_range_io_data()) } @@ -479,7 +496,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - auto tableName = createTable(db, session, id, isKey, textIn); + auto tableName = createTable(db, session, id, isKey, false, textIn); session.Close().GetValueSync(); NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); auto result = client.ExecuteYqlScript( @@ -548,7 +565,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { Y_UNIT_TEST(ReadPgArray) { NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__); - auto binaryStr = NPg::PgNativeBinaryFromNativeText("{1,1}", INT2ARRAYOID); + auto binaryStr = NPg::PgNativeBinaryFromNativeText("{1,1}", INT2ARRAYOID).Str; Y_ENSURE(binaryStr.Size() == 32); auto value = NYql::NCommon::PgValueFromNativeBinary(binaryStr, INT2ARRAYOID); } diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 476cf1cc083..7e0a7553517 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -3042,7 +3042,7 @@ public: return 0; } - TString NativeBinaryFromNativeText(const TString& str) const { + TConvertResult NativeBinaryFromNativeText(const TString& str) const { NMiniKQL::TScopedAlloc alloc(__LOCATION__); NMiniKQL::TPAllocScope scope; Datum datum = 0; @@ -3085,18 +3085,21 @@ public: serialized = (text*)finfo.fn_addr(callInfo); Y_ENSURE(!callInfo->isnull); - return TString(NMiniKQL::GetVarBuf(serialized)); + return {TString(NMiniKQL::GetVarBuf(serialized)), ""}; } PG_CATCH(); { - // TODO - Y_FAIL("PG error in NativeBinaryFromNativeText"); + auto error_data = CopyErrorData(); + TStringBuilder errMsg; + errMsg << "Error while converting text to binary: " << error_data->message; + FreeErrorData(error_data); + FlushErrorState(); + return {"", errMsg}; } PG_END_TRY(); - return 0; } - TString NativeTextFromNativeBinary(const TString& binary) const { + TConvertResult NativeTextFromNativeBinary(const TString& binary) const { NMiniKQL::TScopedAlloc alloc(__LOCATION__); NMiniKQL::TPAllocScope scope; Datum datum = 0; @@ -3124,15 +3127,18 @@ public: str = (char*)finfo.fn_addr(callInfo); Y_ENSURE(!callInfo->isnull); - return TString(str); + return {TString(str), ""}; } PG_CATCH(); { - // TODO - Y_FAIL("PG error in NativeTextFromNativeBinary"); + auto error_data = CopyErrorData(); + TStringBuilder errMsg; + errMsg << "Error while converting binary to text: " << error_data->message; + FreeErrorData(error_data); + FlushErrorState(); + return {"", errMsg}; } PG_END_TRY(); - return 0; } private: @@ -3257,13 +3263,13 @@ ui64 PgNativeBinaryHash(const char* data, size_t size, void* typeDesc) { return static_cast<TPgTypeDescriptor*>(typeDesc)->Hash(data, size); } -TString PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId) { +TConvertResult PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId) { auto* typeDesc = TypeDescFromPgTypeId(pgTypeId); Y_VERIFY(typeDesc); return static_cast<TPgTypeDescriptor*>(typeDesc)->NativeBinaryFromNativeText(str); } -TString PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId) { +TConvertResult PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId) { auto* typeDesc = TypeDescFromPgTypeId(pgTypeId); Y_VERIFY(typeDesc); return static_cast<TPgTypeDescriptor*>(typeDesc)->NativeTextFromNativeBinary(binary); diff --git a/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h b/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h index 277f9faf334..831ffac1440 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h @@ -17,8 +17,12 @@ int PgNativeBinaryCompare(const char* dataL, size_t sizeL, const char* dataR, si ui64 PgNativeBinaryHash(const char* data, size_t size, void* typeDesc); -// for tests -TString PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId); -TString PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId); +struct TConvertResult { + TString Str; + TString Error; +}; + +TConvertResult PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId); +TConvertResult PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId); } // namespace NKikimr::NPg diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 5b4376d75b7..ac9404e30d9 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -321,4 +321,16 @@ ui64 PgNativeBinaryHash(const char* data, size_t size, void* typeDesc) { throw yexception() << "PG types are not supported"; } +TConvertResult PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId) { + Y_UNUSED(str); + Y_UNUSED(pgTypeId); + throw yexception() << "PG types are not supported"; +} + +TConvertResult PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId) { + Y_UNUSED(binary); + Y_UNUSED(pgTypeId); + throw yexception() << "PG types are not supported"; +} + } // namespace NKikimr::NPg |