diff options
author | xenoxeno <xeno@ydb.tech> | 2023-06-30 12:43:07 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-06-30 12:43:07 +0300 |
commit | 1962b16a839f1b905bba4a0606c1f352cfb12d59 (patch) | |
tree | fb29ee8d48e9838187b84e9e4c3f669f88cae9ab | |
parent | cf5fa593b5636b86e70e99e0ef5aedd487f10c17 (diff) | |
download | ydb-1962b16a839f1b905bba4a0606c1f352cfb12d59.tar.gz |
support NULL values correctly
%%
xenoxeno@mr-nvme-testing-01:~/psycopg2$ python3 -m unittest tests.test_ipaddress.NetworkingTestCase.test_cidr_cast
.
----------------------------------------------------------------------
Ran 1 test in 0.465s
OK
xenoxeno@mr-nvme-testing-01:~/psycopg2$
%%
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_connection.cpp | 130 | ||||
-rw-r--r-- | ydb/apps/pgwire/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_util.h | 50 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 23 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 63 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_events.h | 10 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_value/value.h | 2 |
11 files changed, 138 insertions, 145 deletions
diff --git a/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt index 3bf3f3c1829..2f1cd4cd704 100644 --- a/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt +++ b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(pgwire PUBLIC library-cpp-cpuid_check ydb-core-base ydb-core-pgproxy + ydb-core-local_pgwire ydb-core-protos api-grpc cpp-client-ydb_driver diff --git a/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt index e0ba1cb02d7..7fef81e5825 100644 --- a/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt +++ b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt @@ -14,6 +14,7 @@ target_link_libraries(pgwire PUBLIC yutil ydb-core-base ydb-core-pgproxy + ydb-core-local_pgwire ydb-core-protos api-grpc cpp-client-ydb_driver diff --git a/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt index 05207659030..66d411ce3a8 100644 --- a/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt +++ b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(pgwire PUBLIC library-cpp-cpuid_check ydb-core-base ydb-core-pgproxy + ydb-core-local_pgwire ydb-core-protos api-grpc cpp-client-ydb_driver diff --git a/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt index 8c2c707343c..30ace1f060a 100644 --- a/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt +++ b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(pgwire PUBLIC library-cpp-cpuid_check ydb-core-base ydb-core-pgproxy + ydb-core-local_pgwire ydb-core-protos api-grpc cpp-client-ydb_driver diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp index 96786dfcf94..ce8ccf53108 100644 --- a/ydb/apps/pgwire/pg_ydb_connection.cpp +++ b/ydb/apps/pgwire/pg_ydb_connection.cpp @@ -2,6 +2,7 @@ #include "pg_ydb_connection.h" #include "log_impl.h" #include <ydb/core/pgproxy/pg_proxy_events.h> +#include <ydb/core/local_pgwire/local_pgwire_util.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> @@ -70,97 +71,6 @@ public: Become(&TPgYdbConnection::StateWork); } - static TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) { - switch (valueParser.GetPrimitiveType()) { - case NYdb::EPrimitiveType::Bool: - return TStringBuilder() << valueParser.GetBool(); - case NYdb::EPrimitiveType::Int8: - return TStringBuilder() << valueParser.GetInt8(); - case NYdb::EPrimitiveType::Uint8: - return TStringBuilder() << valueParser.GetUint8(); - case NYdb::EPrimitiveType::Int16: - return TStringBuilder() << valueParser.GetInt16(); - case NYdb::EPrimitiveType::Uint16: - return TStringBuilder() << valueParser.GetUint16(); - case NYdb::EPrimitiveType::Int32: - return TStringBuilder() << valueParser.GetInt32(); - case NYdb::EPrimitiveType::Uint32: - return TStringBuilder() << valueParser.GetUint32(); - case NYdb::EPrimitiveType::Int64: - return TStringBuilder() << valueParser.GetInt64(); - case NYdb::EPrimitiveType::Uint64: - return TStringBuilder() << valueParser.GetUint64(); - case NYdb::EPrimitiveType::Float: - return TStringBuilder() << valueParser.GetFloat(); - case NYdb::EPrimitiveType::Double: - return TStringBuilder() << valueParser.GetDouble(); - case NYdb::EPrimitiveType::Utf8: - return TStringBuilder() << valueParser.GetUtf8(); - case NYdb::EPrimitiveType::Date: - return valueParser.GetDate().ToString(); - case NYdb::EPrimitiveType::Datetime: - return valueParser.GetDatetime().ToString(); - case NYdb::EPrimitiveType::Timestamp: - return valueParser.GetTimestamp().ToString(); - case NYdb::EPrimitiveType::Interval: - return TStringBuilder() << valueParser.GetInterval(); - case NYdb::EPrimitiveType::TzDate: - return valueParser.GetTzDate(); - case NYdb::EPrimitiveType::TzDatetime: - return valueParser.GetTzDatetime(); - case NYdb::EPrimitiveType::TzTimestamp: - return valueParser.GetTzTimestamp(); - case NYdb::EPrimitiveType::String: - return Base64Encode(valueParser.GetString()); - case NYdb::EPrimitiveType::Yson: - return valueParser.GetYson(); - case NYdb::EPrimitiveType::Json: - return valueParser.GetJson(); - case NYdb::EPrimitiveType::JsonDocument: - return valueParser.GetJsonDocument(); - case NYdb::EPrimitiveType::DyNumber: - return valueParser.GetDyNumber(); - case NYdb::EPrimitiveType::Uuid: - return {}; - } - return {}; - } - - static TString ColumnValueToString(NYdb::TValueParser& valueParser) { - switch (valueParser.GetKind()) { - case NYdb::TTypeParser::ETypeKind::Primitive: - return ColumnPrimitiveValueToString(valueParser); - case NYdb::TTypeParser::ETypeKind::Optional: { - TString value; - valueParser.OpenOptional(); - if (valueParser.IsNull()) { - value = "NULL"; - } else { - value = ColumnValueToString(valueParser); - } - valueParser.CloseOptional(); - return value; - } - case NYdb::TTypeParser::ETypeKind::Tuple: { - TString value; - valueParser.OpenTuple(); - while (valueParser.TryNextElement()) { - if (!value.empty()) { - value += ','; - } - value += ColumnValueToString(valueParser); - } - valueParser.CloseTuple(); - return value; - } - case NYdb::TTypeParser::ETypeKind::Pg: { - return valueParser.GetPg().Content_; - } - default: - return {}; - } - } - static TString ToPgSyntax(TStringBuf query) { return TStringBuilder() << "--!syntax_pg\n" << query; } @@ -200,10 +110,19 @@ public: { for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { - // TODO: fill data types and sizes - response->DataFields.push_back({ - .Name = column.Name, - }); + std::optional<NYdb::TPgType> pgType = NLocalPgWire::GetPgTypeFromYdbType(column.Type); + if (pgType.has_value()) { + response->DataFields.push_back({ + .Name = column.Name, + .DataType = pgType->Oid, + .DataTypeSize = pgType->Typlen, + .DataTypeModifier = pgType->Typmod, + }); + } else { + response->DataFields.push_back({ + .Name = column.Name + }); + } } } @@ -214,7 +133,7 @@ public: auto& row = response->DataRows.back(); row.resize(parser.ColumnsCount()); for (size_t index = 0; index < parser.ColumnsCount(); ++index) { - row[index] = ColumnValueToString(parser.ColumnParser(index)); + row[index] = NLocalPgWire::ColumnValueToRowValueField(parser.ColumnParser(index)); } } } @@ -369,10 +288,19 @@ public: { for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { - // TODO: fill data types and sizes - response->DataFields.push_back({ - .Name = column.Name, - }); + std::optional<NYdb::TPgType> pgType = NLocalPgWire::GetPgTypeFromYdbType(column.Type); + if (pgType.has_value()) { + response->DataFields.push_back({ + .Name = column.Name, + .DataType = pgType->Oid, + .DataTypeSize = pgType->Typlen, + .DataTypeModifier = pgType->Typmod, + }); + } else { + response->DataFields.push_back({ + .Name = column.Name + }); + } } } } @@ -465,7 +393,7 @@ public: auto& row = response->DataRows.back(); row.resize(parser.ColumnsCount()); for (size_t index = 0; index < parser.ColumnsCount(); ++index) { - row[index] = ColumnValueToString(parser.ColumnParser(index)); + row[index] = NLocalPgWire::ColumnValueToRowValueField(parser.ColumnParser(index)); } if (maxRows != 0) { if (--maxRows == 0) { diff --git a/ydb/apps/pgwire/ya.make b/ydb/apps/pgwire/ya.make index 8d0229c1d8b..85245fd539e 100644 --- a/ydb/apps/pgwire/ya.make +++ b/ydb/apps/pgwire/ya.make @@ -16,6 +16,7 @@ SRCS( PEERDIR( ydb/core/base ydb/core/pgproxy + ydb/core/local_pgwire ydb/core/protos ydb/public/api/grpc ydb/public/sdk/cpp/client/ydb_driver diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h index cfe4613ae55..1891500b41c 100644 --- a/ydb/core/local_pgwire/local_pgwire_util.h +++ b/ydb/core/local_pgwire/local_pgwire_util.h @@ -2,6 +2,7 @@ #include <library/cpp/string_utils/base64/base64.h> #include <ydb/core/pgproxy/pg_proxy_types.h> +#include <ydb/core/pgproxy/pg_proxy_events.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> @@ -156,6 +157,44 @@ inline TString ColumnValueToString(NYdb::TValueParser& valueParser) { } } +inline NPG::TEvPGEvents::TRowValueField ColumnValueToRowValueField(NYdb::TValueParser& valueParser) { + switch (valueParser.GetKind()) { + case NYdb::TTypeParser::ETypeKind::Primitive: + return {.Value = ColumnPrimitiveValueToString(valueParser)}; + case NYdb::TTypeParser::ETypeKind::Optional: { + NPG::TEvPGEvents::TRowValueField value; + valueParser.OpenOptional(); + if (!valueParser.IsNull()) { + value = ColumnValueToRowValueField(valueParser); + } + valueParser.CloseOptional(); + return value; + } + case NYdb::TTypeParser::ETypeKind::Tuple: { + TString value; + valueParser.OpenTuple(); + while (valueParser.TryNextElement()) { + if (!value.empty()) { + value += ','; + } + value += ColumnValueToString(valueParser); + } + valueParser.CloseTuple(); + return {.Value = value}; + } + case NYdb::TTypeParser::ETypeKind::Pg: { + auto pg = valueParser.GetPg(); + if (!pg.IsNull()) { + return {.Value = valueParser.GetPg().Content_}; + } else { + return {}; + } + } + default: + return {}; + } +} + inline uint32_t GetPgOidFromYdbType(NYdb::TType type) { NYdb::TTypeParser parser(type); switch (parser.GetKind()) { @@ -167,6 +206,17 @@ inline uint32_t GetPgOidFromYdbType(NYdb::TType type) { } } +inline std::optional<NYdb::TPgType> GetPgTypeFromYdbType(NYdb::TType type) { + NYdb::TTypeParser parser(type); + switch (parser.GetKind()) { + case NYdb::TTypeParser::ETypeKind::Pg: { + return parser.GetPg(); + default: + return {}; + } + } +} + inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& connectionParams) { auto itOptions = connectionParams.find("options"); if (itOptions == connectionParams.end()) { diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index d1e78e02844..3fd3f6b2462 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -169,12 +169,19 @@ public: void FillMeta(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) { for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { - // TODO: fill data sizes - response->DataFields.push_back({ - .Name = column.Name, - .DataType = GetPgOidFromYdbType(column.Type), - // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen() - }); + std::optional<NYdb::TPgType> pgType = GetPgTypeFromYdbType(column.Type); + if (pgType.has_value()) { + response->DataFields.push_back({ + .Name = column.Name, + .DataType = pgType->Oid, + .DataTypeSize = pgType->Typlen, + .DataTypeModifier = pgType->Typmod, + }); + } else { + response->DataFields.push_back({ + .Name = column.Name + }); + } } } @@ -185,7 +192,7 @@ public: auto& row = response->DataRows.back(); row.resize(parser.ColumnsCount()); for (size_t index = 0; index < parser.ColumnsCount(); ++index) { - row[index] = ColumnValueToString(parser.ColumnParser(index)); + row[index] = ColumnValueToRowValueField(parser.ColumnParser(index)); } } } @@ -373,7 +380,7 @@ public: auto& row = response->DataRows.back(); row.resize(parser.ColumnsCount()); for (size_t index = 0; index < parser.ColumnsCount(); ++index) { - row[index] = ColumnValueToString(parser.ColumnParser(index)); + row[index] = ColumnValueToRowValueField(parser.ColumnParser(index)); } } } diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index e9529d2389a..bf781d86e5a 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -430,6 +430,33 @@ protected: void HandleMessage(const TPGFlush*) { } + static void FillDataRow(TPGStreamOutput<TPGDataRow>& dataOut, const TEvPGEvents::TDataRow& dataIn) { + dataOut << uint16_t(dataIn.size()); // number of fields + for (const auto& item : dataIn) { + if (item.Value) { + const auto& value(item.Value.value()); + dataOut << uint32_t(value.size()) << value; + } else { + dataOut << uint32_t(-1); + } + } + } + + static void FillDataRowDescription(TPGStreamOutput<TPGRowDescription>& dataOut, const std::vector<TEvPGEvents::TRowDescriptionField>& dataIn) { + dataOut << uint16_t(dataIn.size()); // number of fields + for (const auto& field : dataIn) { + dataOut + << TStringBuf(field.Name) << '\0' + << uint32_t(field.TableId) + << uint16_t(field.ColumnId) + << uint32_t(field.DataType) + << uint16_t(field.DataTypeSize) + << uint32_t(field.DataTypeModifier) + << uint16_t(0) // format text + ; + } + } + bool FlushAndPoll() { if (FlushOutput()) { RequestPoller(); @@ -497,27 +524,13 @@ protected: } else { if (!ev->Get()->DataFields.empty()) { // rowDescription TPGStreamOutput<TPGRowDescription> rowDescription; - rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields - for (const auto& field : ev->Get()->DataFields) { - rowDescription - << TStringBuf(field.Name) << '\0' - << uint32_t(field.TableId) - << uint16_t(field.ColumnId) - << uint32_t(field.DataType) - << uint16_t(field.DataTypeSize) - << uint32_t(0xffffffff) // type modifier - << uint16_t(0) // format text - ; - } + FillDataRowDescription(rowDescription, ev->Get()->DataFields); SendStream(rowDescription); } { // dataFields for (const auto& row : ev->Get()->DataRows) { TPGStreamOutput<TPGDataRow> dataRow; - dataRow << uint16_t(row.size()); // number of fields - for (const auto& item : row) { - dataRow << uint32_t(item.size()) << item; - } + FillDataRow(dataRow, row); SendStream(dataRow); } } @@ -560,18 +573,7 @@ protected: if (ev->Get()->DataFields.size() > 0) { // rowDescription TPGStreamOutput<TPGRowDescription> rowDescription; - rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields - for (const auto& field : ev->Get()->DataFields) { - rowDescription - << TStringBuf(field.Name) << '\0' - << uint32_t(field.TableId) - << uint16_t(field.ColumnId) - << uint32_t(field.DataType) - << uint16_t(field.DataTypeSize) - << uint32_t(0xffffffff) // type modifier - << uint16_t(0) // format text - ; - } + FillDataRowDescription(rowDescription, ev->Get()->DataFields); SendStream(rowDescription); } else { SendMessage(TPGNoData()); @@ -605,10 +607,7 @@ protected: { // dataFields for (const auto& row : ev->Get()->DataRows) { TPGStreamOutput<TPGDataRow> dataRow; - dataRow << uint16_t(row.size()); // number of fields - for (const auto& item : row) { - dataRow << uint32_t(item.size()) << item; - } + FillDataRow(dataRow, row); SendStream(dataRow); } } diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index acb31d0d0db..f373264a746 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -37,12 +37,16 @@ struct TEvPGEvents { uint32_t TableId = 0; uint16_t ColumnId = 0; uint32_t DataType; - uint16_t DataTypeSize; - //uint32_t DataTypeModifier; + int16_t DataTypeSize; + int32_t DataTypeModifier; //uint16_t Format; }; - using TDataRow = std::vector<TString>; + struct TRowValueField { + std::optional<TString> Value; + }; + + using TDataRow = std::vector<TRowValueField>; struct TEvConnectionOpened : NActors::TEventLocal<TEvConnectionOpened, EvConnectionOpened> { std::shared_ptr<TPGInitial> Message; diff --git a/ydb/public/sdk/cpp/client/ydb_value/value.h b/ydb/public/sdk/cpp/client/ydb_value/value.h index 4e5af92254d..ae14a4562ec 100644 --- a/ydb/public/sdk/cpp/client/ydb_value/value.h +++ b/ydb/public/sdk/cpp/client/ydb_value/value.h @@ -74,7 +74,7 @@ struct TPgType { TString TypeModifier; ui32 Oid = 0; - i32 Typlen = 0; + i16 Typlen = 0; i32 Typmod = 0; TPgType(const TString& typeName, const TString& typeModifier = {}) |