aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-06-30 12:43:07 +0300
committerxenoxeno <xeno@ydb.tech>2023-06-30 12:43:07 +0300
commit1962b16a839f1b905bba4a0606c1f352cfb12d59 (patch)
treefb29ee8d48e9838187b84e9e4c3f669f88cae9ab
parentcf5fa593b5636b86e70e99e0ef5aedd487f10c17 (diff)
downloadydb-1962b16a839f1b905bba4a0606c1f352cfb12d59.tar.gz
support NULL values correctly
%% xenoxeno@mr-nvme-testing-01:~/psycopg2$ python3 -m unittest tests.test_ipaddress.NetworkingTestCase.test_cidr_cast . ---------------------------------------------------------------------- Ran 1 test in 0.465s OK xenoxeno@mr-nvme-testing-01:~/psycopg2$ %%
-rw-r--r--ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/apps/pgwire/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/apps/pgwire/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/apps/pgwire/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/apps/pgwire/pg_ydb_connection.cpp130
-rw-r--r--ydb/apps/pgwire/ya.make1
-rw-r--r--ydb/core/local_pgwire/local_pgwire_util.h50
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp23
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp63
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_value/value.h2
11 files changed, 138 insertions, 145 deletions
diff --git a/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt
index 3bf3f3c1829..2f1cd4cd704 100644
--- a/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt
+++ b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(pgwire PUBLIC
library-cpp-cpuid_check
ydb-core-base
ydb-core-pgproxy
+ ydb-core-local_pgwire
ydb-core-protos
api-grpc
cpp-client-ydb_driver
diff --git a/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt
index e0ba1cb02d7..7fef81e5825 100644
--- a/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt
+++ b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt
@@ -14,6 +14,7 @@ target_link_libraries(pgwire PUBLIC
yutil
ydb-core-base
ydb-core-pgproxy
+ ydb-core-local_pgwire
ydb-core-protos
api-grpc
cpp-client-ydb_driver
diff --git a/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt
index 05207659030..66d411ce3a8 100644
--- a/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt
+++ b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(pgwire PUBLIC
library-cpp-cpuid_check
ydb-core-base
ydb-core-pgproxy
+ ydb-core-local_pgwire
ydb-core-protos
api-grpc
cpp-client-ydb_driver
diff --git a/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt
index 8c2c707343c..30ace1f060a 100644
--- a/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt
+++ b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(pgwire PUBLIC
library-cpp-cpuid_check
ydb-core-base
ydb-core-pgproxy
+ ydb-core-local_pgwire
ydb-core-protos
api-grpc
cpp-client-ydb_driver
diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp
index 96786dfcf94..ce8ccf53108 100644
--- a/ydb/apps/pgwire/pg_ydb_connection.cpp
+++ b/ydb/apps/pgwire/pg_ydb_connection.cpp
@@ -2,6 +2,7 @@
#include "pg_ydb_connection.h"
#include "log_impl.h"
#include <ydb/core/pgproxy/pg_proxy_events.h>
+#include <ydb/core/local_pgwire/local_pgwire_util.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
@@ -70,97 +71,6 @@ public:
Become(&TPgYdbConnection::StateWork);
}
- static TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) {
- switch (valueParser.GetPrimitiveType()) {
- case NYdb::EPrimitiveType::Bool:
- return TStringBuilder() << valueParser.GetBool();
- case NYdb::EPrimitiveType::Int8:
- return TStringBuilder() << valueParser.GetInt8();
- case NYdb::EPrimitiveType::Uint8:
- return TStringBuilder() << valueParser.GetUint8();
- case NYdb::EPrimitiveType::Int16:
- return TStringBuilder() << valueParser.GetInt16();
- case NYdb::EPrimitiveType::Uint16:
- return TStringBuilder() << valueParser.GetUint16();
- case NYdb::EPrimitiveType::Int32:
- return TStringBuilder() << valueParser.GetInt32();
- case NYdb::EPrimitiveType::Uint32:
- return TStringBuilder() << valueParser.GetUint32();
- case NYdb::EPrimitiveType::Int64:
- return TStringBuilder() << valueParser.GetInt64();
- case NYdb::EPrimitiveType::Uint64:
- return TStringBuilder() << valueParser.GetUint64();
- case NYdb::EPrimitiveType::Float:
- return TStringBuilder() << valueParser.GetFloat();
- case NYdb::EPrimitiveType::Double:
- return TStringBuilder() << valueParser.GetDouble();
- case NYdb::EPrimitiveType::Utf8:
- return TStringBuilder() << valueParser.GetUtf8();
- case NYdb::EPrimitiveType::Date:
- return valueParser.GetDate().ToString();
- case NYdb::EPrimitiveType::Datetime:
- return valueParser.GetDatetime().ToString();
- case NYdb::EPrimitiveType::Timestamp:
- return valueParser.GetTimestamp().ToString();
- case NYdb::EPrimitiveType::Interval:
- return TStringBuilder() << valueParser.GetInterval();
- case NYdb::EPrimitiveType::TzDate:
- return valueParser.GetTzDate();
- case NYdb::EPrimitiveType::TzDatetime:
- return valueParser.GetTzDatetime();
- case NYdb::EPrimitiveType::TzTimestamp:
- return valueParser.GetTzTimestamp();
- case NYdb::EPrimitiveType::String:
- return Base64Encode(valueParser.GetString());
- case NYdb::EPrimitiveType::Yson:
- return valueParser.GetYson();
- case NYdb::EPrimitiveType::Json:
- return valueParser.GetJson();
- case NYdb::EPrimitiveType::JsonDocument:
- return valueParser.GetJsonDocument();
- case NYdb::EPrimitiveType::DyNumber:
- return valueParser.GetDyNumber();
- case NYdb::EPrimitiveType::Uuid:
- return {};
- }
- return {};
- }
-
- static TString ColumnValueToString(NYdb::TValueParser& valueParser) {
- switch (valueParser.GetKind()) {
- case NYdb::TTypeParser::ETypeKind::Primitive:
- return ColumnPrimitiveValueToString(valueParser);
- case NYdb::TTypeParser::ETypeKind::Optional: {
- TString value;
- valueParser.OpenOptional();
- if (valueParser.IsNull()) {
- value = "NULL";
- } else {
- value = ColumnValueToString(valueParser);
- }
- valueParser.CloseOptional();
- return value;
- }
- case NYdb::TTypeParser::ETypeKind::Tuple: {
- TString value;
- valueParser.OpenTuple();
- while (valueParser.TryNextElement()) {
- if (!value.empty()) {
- value += ',';
- }
- value += ColumnValueToString(valueParser);
- }
- valueParser.CloseTuple();
- return value;
- }
- case NYdb::TTypeParser::ETypeKind::Pg: {
- return valueParser.GetPg().Content_;
- }
- default:
- return {};
- }
- }
-
static TString ToPgSyntax(TStringBuf query) {
return TStringBuilder() << "--!syntax_pg\n" << query;
}
@@ -200,10 +110,19 @@ public:
{
for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
- // TODO: fill data types and sizes
- response->DataFields.push_back({
- .Name = column.Name,
- });
+ std::optional<NYdb::TPgType> pgType = NLocalPgWire::GetPgTypeFromYdbType(column.Type);
+ if (pgType.has_value()) {
+ response->DataFields.push_back({
+ .Name = column.Name,
+ .DataType = pgType->Oid,
+ .DataTypeSize = pgType->Typlen,
+ .DataTypeModifier = pgType->Typmod,
+ });
+ } else {
+ response->DataFields.push_back({
+ .Name = column.Name
+ });
+ }
}
}
@@ -214,7 +133,7 @@ public:
auto& row = response->DataRows.back();
row.resize(parser.ColumnsCount());
for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
- row[index] = ColumnValueToString(parser.ColumnParser(index));
+ row[index] = NLocalPgWire::ColumnValueToRowValueField(parser.ColumnParser(index));
}
}
}
@@ -369,10 +288,19 @@ public:
{
for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
- // TODO: fill data types and sizes
- response->DataFields.push_back({
- .Name = column.Name,
- });
+ std::optional<NYdb::TPgType> pgType = NLocalPgWire::GetPgTypeFromYdbType(column.Type);
+ if (pgType.has_value()) {
+ response->DataFields.push_back({
+ .Name = column.Name,
+ .DataType = pgType->Oid,
+ .DataTypeSize = pgType->Typlen,
+ .DataTypeModifier = pgType->Typmod,
+ });
+ } else {
+ response->DataFields.push_back({
+ .Name = column.Name
+ });
+ }
}
}
}
@@ -465,7 +393,7 @@ public:
auto& row = response->DataRows.back();
row.resize(parser.ColumnsCount());
for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
- row[index] = ColumnValueToString(parser.ColumnParser(index));
+ row[index] = NLocalPgWire::ColumnValueToRowValueField(parser.ColumnParser(index));
}
if (maxRows != 0) {
if (--maxRows == 0) {
diff --git a/ydb/apps/pgwire/ya.make b/ydb/apps/pgwire/ya.make
index 8d0229c1d8b..85245fd539e 100644
--- a/ydb/apps/pgwire/ya.make
+++ b/ydb/apps/pgwire/ya.make
@@ -16,6 +16,7 @@ SRCS(
PEERDIR(
ydb/core/base
ydb/core/pgproxy
+ ydb/core/local_pgwire
ydb/core/protos
ydb/public/api/grpc
ydb/public/sdk/cpp/client/ydb_driver
diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h
index cfe4613ae55..1891500b41c 100644
--- a/ydb/core/local_pgwire/local_pgwire_util.h
+++ b/ydb/core/local_pgwire/local_pgwire_util.h
@@ -2,6 +2,7 @@
#include <library/cpp/string_utils/base64/base64.h>
#include <ydb/core/pgproxy/pg_proxy_types.h>
+#include <ydb/core/pgproxy/pg_proxy_events.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -156,6 +157,44 @@ inline TString ColumnValueToString(NYdb::TValueParser& valueParser) {
}
}
+inline NPG::TEvPGEvents::TRowValueField ColumnValueToRowValueField(NYdb::TValueParser& valueParser) {
+ switch (valueParser.GetKind()) {
+ case NYdb::TTypeParser::ETypeKind::Primitive:
+ return {.Value = ColumnPrimitiveValueToString(valueParser)};
+ case NYdb::TTypeParser::ETypeKind::Optional: {
+ NPG::TEvPGEvents::TRowValueField value;
+ valueParser.OpenOptional();
+ if (!valueParser.IsNull()) {
+ value = ColumnValueToRowValueField(valueParser);
+ }
+ valueParser.CloseOptional();
+ return value;
+ }
+ case NYdb::TTypeParser::ETypeKind::Tuple: {
+ TString value;
+ valueParser.OpenTuple();
+ while (valueParser.TryNextElement()) {
+ if (!value.empty()) {
+ value += ',';
+ }
+ value += ColumnValueToString(valueParser);
+ }
+ valueParser.CloseTuple();
+ return {.Value = value};
+ }
+ case NYdb::TTypeParser::ETypeKind::Pg: {
+ auto pg = valueParser.GetPg();
+ if (!pg.IsNull()) {
+ return {.Value = valueParser.GetPg().Content_};
+ } else {
+ return {};
+ }
+ }
+ default:
+ return {};
+ }
+}
+
inline uint32_t GetPgOidFromYdbType(NYdb::TType type) {
NYdb::TTypeParser parser(type);
switch (parser.GetKind()) {
@@ -167,6 +206,17 @@ inline uint32_t GetPgOidFromYdbType(NYdb::TType type) {
}
}
+inline std::optional<NYdb::TPgType> GetPgTypeFromYdbType(NYdb::TType type) {
+ NYdb::TTypeParser parser(type);
+ switch (parser.GetKind()) {
+ case NYdb::TTypeParser::ETypeKind::Pg: {
+ return parser.GetPg();
+ default:
+ return {};
+ }
+ }
+}
+
inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& connectionParams) {
auto itOptions = connectionParams.find("options");
if (itOptions == connectionParams.end()) {
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index d1e78e02844..3fd3f6b2462 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -169,12 +169,19 @@ public:
void FillMeta(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) {
for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
- // TODO: fill data sizes
- response->DataFields.push_back({
- .Name = column.Name,
- .DataType = GetPgOidFromYdbType(column.Type),
- // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen()
- });
+ std::optional<NYdb::TPgType> pgType = GetPgTypeFromYdbType(column.Type);
+ if (pgType.has_value()) {
+ response->DataFields.push_back({
+ .Name = column.Name,
+ .DataType = pgType->Oid,
+ .DataTypeSize = pgType->Typlen,
+ .DataTypeModifier = pgType->Typmod,
+ });
+ } else {
+ response->DataFields.push_back({
+ .Name = column.Name
+ });
+ }
}
}
@@ -185,7 +192,7 @@ public:
auto& row = response->DataRows.back();
row.resize(parser.ColumnsCount());
for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
- row[index] = ColumnValueToString(parser.ColumnParser(index));
+ row[index] = ColumnValueToRowValueField(parser.ColumnParser(index));
}
}
}
@@ -373,7 +380,7 @@ public:
auto& row = response->DataRows.back();
row.resize(parser.ColumnsCount());
for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
- row[index] = ColumnValueToString(parser.ColumnParser(index));
+ row[index] = ColumnValueToRowValueField(parser.ColumnParser(index));
}
}
}
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index e9529d2389a..bf781d86e5a 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -430,6 +430,33 @@ protected:
void HandleMessage(const TPGFlush*) {
}
+ static void FillDataRow(TPGStreamOutput<TPGDataRow>& dataOut, const TEvPGEvents::TDataRow& dataIn) {
+ dataOut << uint16_t(dataIn.size()); // number of fields
+ for (const auto& item : dataIn) {
+ if (item.Value) {
+ const auto& value(item.Value.value());
+ dataOut << uint32_t(value.size()) << value;
+ } else {
+ dataOut << uint32_t(-1);
+ }
+ }
+ }
+
+ static void FillDataRowDescription(TPGStreamOutput<TPGRowDescription>& dataOut, const std::vector<TEvPGEvents::TRowDescriptionField>& dataIn) {
+ dataOut << uint16_t(dataIn.size()); // number of fields
+ for (const auto& field : dataIn) {
+ dataOut
+ << TStringBuf(field.Name) << '\0'
+ << uint32_t(field.TableId)
+ << uint16_t(field.ColumnId)
+ << uint32_t(field.DataType)
+ << uint16_t(field.DataTypeSize)
+ << uint32_t(field.DataTypeModifier)
+ << uint16_t(0) // format text
+ ;
+ }
+ }
+
bool FlushAndPoll() {
if (FlushOutput()) {
RequestPoller();
@@ -497,27 +524,13 @@ protected:
} else {
if (!ev->Get()->DataFields.empty()) { // rowDescription
TPGStreamOutput<TPGRowDescription> rowDescription;
- rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields
- for (const auto& field : ev->Get()->DataFields) {
- rowDescription
- << TStringBuf(field.Name) << '\0'
- << uint32_t(field.TableId)
- << uint16_t(field.ColumnId)
- << uint32_t(field.DataType)
- << uint16_t(field.DataTypeSize)
- << uint32_t(0xffffffff) // type modifier
- << uint16_t(0) // format text
- ;
- }
+ FillDataRowDescription(rowDescription, ev->Get()->DataFields);
SendStream(rowDescription);
}
{ // dataFields
for (const auto& row : ev->Get()->DataRows) {
TPGStreamOutput<TPGDataRow> dataRow;
- dataRow << uint16_t(row.size()); // number of fields
- for (const auto& item : row) {
- dataRow << uint32_t(item.size()) << item;
- }
+ FillDataRow(dataRow, row);
SendStream(dataRow);
}
}
@@ -560,18 +573,7 @@ protected:
if (ev->Get()->DataFields.size() > 0) {
// rowDescription
TPGStreamOutput<TPGRowDescription> rowDescription;
- rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields
- for (const auto& field : ev->Get()->DataFields) {
- rowDescription
- << TStringBuf(field.Name) << '\0'
- << uint32_t(field.TableId)
- << uint16_t(field.ColumnId)
- << uint32_t(field.DataType)
- << uint16_t(field.DataTypeSize)
- << uint32_t(0xffffffff) // type modifier
- << uint16_t(0) // format text
- ;
- }
+ FillDataRowDescription(rowDescription, ev->Get()->DataFields);
SendStream(rowDescription);
} else {
SendMessage(TPGNoData());
@@ -605,10 +607,7 @@ protected:
{ // dataFields
for (const auto& row : ev->Get()->DataRows) {
TPGStreamOutput<TPGDataRow> dataRow;
- dataRow << uint16_t(row.size()); // number of fields
- for (const auto& item : row) {
- dataRow << uint32_t(item.size()) << item;
- }
+ FillDataRow(dataRow, row);
SendStream(dataRow);
}
}
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index acb31d0d0db..f373264a746 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -37,12 +37,16 @@ struct TEvPGEvents {
uint32_t TableId = 0;
uint16_t ColumnId = 0;
uint32_t DataType;
- uint16_t DataTypeSize;
- //uint32_t DataTypeModifier;
+ int16_t DataTypeSize;
+ int32_t DataTypeModifier;
//uint16_t Format;
};
- using TDataRow = std::vector<TString>;
+ struct TRowValueField {
+ std::optional<TString> Value;
+ };
+
+ using TDataRow = std::vector<TRowValueField>;
struct TEvConnectionOpened : NActors::TEventLocal<TEvConnectionOpened, EvConnectionOpened> {
std::shared_ptr<TPGInitial> Message;
diff --git a/ydb/public/sdk/cpp/client/ydb_value/value.h b/ydb/public/sdk/cpp/client/ydb_value/value.h
index 4e5af92254d..ae14a4562ec 100644
--- a/ydb/public/sdk/cpp/client/ydb_value/value.h
+++ b/ydb/public/sdk/cpp/client/ydb_value/value.h
@@ -74,7 +74,7 @@ struct TPgType {
TString TypeModifier;
ui32 Oid = 0;
- i32 Typlen = 0;
+ i16 Typlen = 0;
i32 Typmod = 0;
TPgType(const TString& typeName, const TString& typeModifier = {})