aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2023-01-26 20:40:15 +0300
committermonster <monster@ydb.tech>2023-01-26 20:40:15 +0300
commit0996e5e1336dceacb4b56248a3ef421cec64ff4b (patch)
tree69efebbcd6917bda89b54b19802182a7552b67a4
parentbcb5d533a29a90c4099f725e73442909cc462635 (diff)
downloadydb-0996e5e1336dceacb4b56248a3ef421cec64ff4b.tar.gz
support text pg value format in bulk upsert
-rw-r--r--ydb/core/grpc_services/rpc_load_rows.cpp18
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp49
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp30
-rw-r--r--ydb/library/yql/parser/pg_wrapper/interface/type_desc.h10
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp12
5 files changed, 86 insertions, 33 deletions
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp
index 7e6467d5503..8092d80428e 100644
--- a/ydb/core/grpc_services/rpc_load_rows.cpp
+++ b/ydb/core/grpc_services/rpc_load_rows.cpp
@@ -272,8 +272,22 @@ private:
break;
}
case NScheme::NTypeIds::Pg : {
- TString v = val.Getbytes_value();
- c = TCell(v.data(), v.size());
+ TString text = val.Gettext_value();
+ if (!text.empty()) {
+ auto desc = type.GetTypeDesc();
+ auto id = NPg::PgTypeIdFromTypeDesc(desc);
+ auto result = NPg::PgNativeBinaryFromNativeText(text, id);
+ if (!result.Error.empty()) {
+ err = TStringBuilder() << "Invalid text value for "
+ << NPg::PgTypeNameFromTypeDesc(desc) << ": " << result.Error;
+ return false;
+ }
+ const auto valueInPool = valueDataPool.AppendString(TStringBuf(result.Str));
+ c = TCell(valueInPool.data(), valueInPool.size());
+ } else {
+ TString binary = val.Getbytes_value();
+ c = TCell(binary.data(), binary.size());
+ }
break;
}
default:
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index 1fd50c06108..a3f1a6e1090 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -298,11 +298,13 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}
};
+
auto createTable = [] (
NYdb::NTable::TTableClient& db,
NYdb::NTable::TSession& session,
ui32 id,
bool isKey,
+ bool isText,
std::function<TString(size_t)> textIn,
TString setTableName = "",
ui16 rowCount = 10
@@ -316,27 +318,29 @@ Y_UNIT_TEST_SUITE(KqpPg) {
builder.AddNullableColumn("value", makePgType(id));
builder.SetPrimaryKeyColumn("key");
- auto tableName = (setTableName.empty()) ? Sprintf("/Root/Pg%u", id) : setTableName;
+ auto tableName = (setTableName.empty()) ?
+ Sprintf("/Root/Pg%u_%s", id, isText ? "t" : "b") : setTableName;
+
auto result = session.CreateTable(tableName, builder.Build()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
NYdb::TValueBuilder rows;
rows.BeginList();
for (size_t i = 0; i < rowCount; ++i) {
- auto str = NPg::PgNativeBinaryFromNativeText(textIn(i), id);
+ auto str = isText ? textIn(i) : NPg::PgNativeBinaryFromNativeText(textIn(i), id).Str;
+ auto mode = isText ? TPgValue::VK_TEXT : TPgValue::VK_BINARY;
if (isKey) {
rows.AddListItem()
.BeginStruct()
- .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id)))
- .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id)))
+ .AddMember("key").Pg(TPgValue(mode, str, makePgType(id)))
+ .AddMember("value").Pg(TPgValue(mode, str, makePgType(id)))
.EndStruct();
} else {
- auto int2Val = (i16)i;
- TString int2Str((const char*)&int2Val, sizeof(int2Val));
+ auto int2Str = NPg::PgNativeBinaryFromNativeText(Sprintf("%u", i), INT2OID).Str;
rows.AddListItem()
.BeginStruct()
.AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, int2Str, makePgType(INT2OID)))
- .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id)))
+ .AddMember("value").Pg(TPgValue(mode, str, makePgType(id)))
.EndStruct();
}
}
@@ -357,13 +361,13 @@ Y_UNIT_TEST_SUITE(KqpPg) {
Y_UNIT_TEST(CreateTableBulkUpsertAndRead) {
TKikimrRunner kikimr;
- auto testSingleType = [&kikimr] (ui32 id, bool isKey,
+ auto testSingleType = [&kikimr] (ui32 id, bool isKey, bool isText,
std::function<TString(size_t)> textIn,
std::function<TString(size_t)> textOut)
{
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
- auto tableName = createTable(db, session, id, isKey, textIn);
+ auto tableName = createTable(db, session, id, isKey, isText, textIn);
auto readSettings = TReadTableSettings()
.AppendColumns("key")
@@ -385,7 +389,9 @@ Y_UNIT_TEST_SUITE(KqpPg) {
for (size_t i = 0; parser.TryNextRow(); ++i) {
auto check = [&parser, &id, &i] (const TString& column, const TString& expected) {
auto& c = parser.ColumnParser(column);
- UNIT_ASSERT_VALUES_EQUAL(expected, NPg::PgNativeTextFromNativeBinary(c.GetPg().Content_, id));
+ auto result = NPg::PgNativeTextFromNativeBinary(c.GetPg().Content_, id);
+ UNIT_ASSERT_C(result.Error.empty(), result.Error);
+ UNIT_ASSERT_VALUES_EQUAL(expected, result.Str);
Cerr << expected << Endl;
};
auto expected = textOut(i);
@@ -401,7 +407,8 @@ Y_UNIT_TEST_SUITE(KqpPg) {
auto testType = [&] (ui32 id, const TPgTypeTestSpec& typeSpec)
{
- testSingleType(id, typeSpec.IsKey, typeSpec.TextIn, typeSpec.TextOut);
+ testSingleType(id, typeSpec.IsKey, false, typeSpec.TextIn, typeSpec.TextOut);
+ testSingleType(id, typeSpec.IsKey, true, typeSpec.TextIn, typeSpec.TextOut);
auto arrayId = NYql::NPg::LookupType(id).ArrayTypeId;
@@ -415,15 +422,24 @@ Y_UNIT_TEST_SUITE(KqpPg) {
return typeSpec.ArrayPrint(str);
};
- testSingleType(arrayId, false, textInArray, textOutArray);
+ testSingleType(arrayId, false, false, textInArray, textOutArray);
+ testSingleType(arrayId, false, true, textInArray, textOutArray);
};
auto testByteaType = [&] () {
- testSingleType(BYTEAOID, true,
+ testSingleType(BYTEAOID, true, false,
[] (auto i) { return Sprintf("bytea %u", i); },
[] (auto i) { return Sprintf("\\x627974656120%x", i + 48); });
- testSingleType(BYTEAARRAYOID, false,
+ testSingleType(BYTEAOID, true, true,
+ [] (auto i) { return Sprintf("bytea %u", i); },
+ [] (auto i) { return Sprintf("\\x627974656120%x", i + 48); });
+
+ testSingleType(BYTEAARRAYOID, true, false,
+ [] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); },
+ [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); });
+
+ testSingleType(BYTEAARRAYOID, true, true,
[] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); },
[] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); });
};
@@ -433,6 +449,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
for (const auto& [oid, spec] : typeSpecs) {
testType(oid, spec);
}
+
// TODO: varchar as a key
// TODO: native range/multirange types (use get_range_io_data())
}
@@ -479,7 +496,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
{
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
- auto tableName = createTable(db, session, id, isKey, textIn);
+ auto tableName = createTable(db, session, id, isKey, false, textIn);
session.Close().GetValueSync();
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
auto result = client.ExecuteYqlScript(
@@ -548,7 +565,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
Y_UNIT_TEST(ReadPgArray) {
NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__);
- auto binaryStr = NPg::PgNativeBinaryFromNativeText("{1,1}", INT2ARRAYOID);
+ auto binaryStr = NPg::PgNativeBinaryFromNativeText("{1,1}", INT2ARRAYOID).Str;
Y_ENSURE(binaryStr.Size() == 32);
auto value = NYql::NCommon::PgValueFromNativeBinary(binaryStr, INT2ARRAYOID);
}
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 476cf1cc083..7e0a7553517 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -3042,7 +3042,7 @@ public:
return 0;
}
- TString NativeBinaryFromNativeText(const TString& str) const {
+ TConvertResult NativeBinaryFromNativeText(const TString& str) const {
NMiniKQL::TScopedAlloc alloc(__LOCATION__);
NMiniKQL::TPAllocScope scope;
Datum datum = 0;
@@ -3085,18 +3085,21 @@ public:
serialized = (text*)finfo.fn_addr(callInfo);
Y_ENSURE(!callInfo->isnull);
- return TString(NMiniKQL::GetVarBuf(serialized));
+ return {TString(NMiniKQL::GetVarBuf(serialized)), ""};
}
PG_CATCH();
{
- // TODO
- Y_FAIL("PG error in NativeBinaryFromNativeText");
+ auto error_data = CopyErrorData();
+ TStringBuilder errMsg;
+ errMsg << "Error while converting text to binary: " << error_data->message;
+ FreeErrorData(error_data);
+ FlushErrorState();
+ return {"", errMsg};
}
PG_END_TRY();
- return 0;
}
- TString NativeTextFromNativeBinary(const TString& binary) const {
+ TConvertResult NativeTextFromNativeBinary(const TString& binary) const {
NMiniKQL::TScopedAlloc alloc(__LOCATION__);
NMiniKQL::TPAllocScope scope;
Datum datum = 0;
@@ -3124,15 +3127,18 @@ public:
str = (char*)finfo.fn_addr(callInfo);
Y_ENSURE(!callInfo->isnull);
- return TString(str);
+ return {TString(str), ""};
}
PG_CATCH();
{
- // TODO
- Y_FAIL("PG error in NativeTextFromNativeBinary");
+ auto error_data = CopyErrorData();
+ TStringBuilder errMsg;
+ errMsg << "Error while converting binary to text: " << error_data->message;
+ FreeErrorData(error_data);
+ FlushErrorState();
+ return {"", errMsg};
}
PG_END_TRY();
- return 0;
}
private:
@@ -3257,13 +3263,13 @@ ui64 PgNativeBinaryHash(const char* data, size_t size, void* typeDesc) {
return static_cast<TPgTypeDescriptor*>(typeDesc)->Hash(data, size);
}
-TString PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId) {
+TConvertResult PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId) {
auto* typeDesc = TypeDescFromPgTypeId(pgTypeId);
Y_VERIFY(typeDesc);
return static_cast<TPgTypeDescriptor*>(typeDesc)->NativeBinaryFromNativeText(str);
}
-TString PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId) {
+TConvertResult PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId) {
auto* typeDesc = TypeDescFromPgTypeId(pgTypeId);
Y_VERIFY(typeDesc);
return static_cast<TPgTypeDescriptor*>(typeDesc)->NativeTextFromNativeBinary(binary);
diff --git a/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h b/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h
index 277f9faf334..831ffac1440 100644
--- a/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h
+++ b/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h
@@ -17,8 +17,12 @@ int PgNativeBinaryCompare(const char* dataL, size_t sizeL, const char* dataR, si
ui64 PgNativeBinaryHash(const char* data, size_t size, void* typeDesc);
-// for tests
-TString PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId);
-TString PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId);
+struct TConvertResult {
+ TString Str;
+ TString Error;
+};
+
+TConvertResult PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId);
+TConvertResult PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId);
} // namespace NKikimr::NPg
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index 5b4376d75b7..ac9404e30d9 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -321,4 +321,16 @@ ui64 PgNativeBinaryHash(const char* data, size_t size, void* typeDesc) {
throw yexception() << "PG types are not supported";
}
+TConvertResult PgNativeBinaryFromNativeText(const TString& str, ui32 pgTypeId) {
+ Y_UNUSED(str);
+ Y_UNUSED(pgTypeId);
+ throw yexception() << "PG types are not supported";
+}
+
+TConvertResult PgNativeTextFromNativeBinary(const TString& binary, ui32 pgTypeId) {
+ Y_UNUSED(binary);
+ Y_UNUSED(pgTypeId);
+ throw yexception() << "PG types are not supported";
+}
+
} // namespace NKikimr::NPg