diff options
author | xenoxeno <xeno@ydb.tech> | 2023-05-12 09:37:46 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-05-12 09:37:46 +0300 |
commit | 77f974279efe0b78709ed6933547e04a5e636dbb (patch) | |
tree | 70e5b891425e8c2490a67f7e927590d8d2e3596d | |
parent | b291c820d0a871e7cee0c620ceabc8ae73205608 (diff) | |
download | ydb-77f974279efe0b78709ed6933547e04a5e636dbb.tar.gz |
add support of sync command
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 3 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 76 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_types.h | 10 |
3 files changed, 40 insertions, 49 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index 8fd6229fe2..3b43f753bd 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -280,8 +280,7 @@ public: } switch (paramType) { case INT2OID: - paramsBuilder.AddParam(TStringBuilder() << "$_" << idxParam + 1).Int16(atoi(paramValue.data())).Build(); - injectedQuery << "DECLARE $_" << idxParam + 1 << " AS Int16;" << Endl; + paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build(); break; } diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index fd914990f6..3c9a1adaec 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -20,6 +20,7 @@ public: PasswordMessage = 'p', Parse = 'P', ParameterStatus = 'S', + Sync = 'S', Bind = 'B', Describe = 'D', Execute = 'E', @@ -50,6 +51,7 @@ public: std::shared_ptr<TPGInitial> InitialMessage; ui64 IncomingSequenceNumber = 1; ui64 OutgoingSequenceNumber = 1; + ui64 SyncSequenceNumber = 1; std::deque<TAutoPtr<IEventHandle>> PostponedEvents; TPGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) @@ -139,7 +141,6 @@ protected: {'C', "CommandComplete"}, {'X', "Terminate"}, {'T', "RowDescription"}, - {'S', "ParameterStatus"}, {'I', "EmptyQueryResponse"}, {'p', "PasswordMessage"}, {'P', "Parse"}, @@ -154,10 +155,12 @@ protected: static const std::unordered_map<char, TStringBuf> incomingMessageName = { {'E', "Execute"}, {'D', "Describe"}, + {'S', "Sync"}, }; static const std::unordered_map<char, TStringBuf> outgoingMessageName = { {'E', "ErrorResponse"}, {'D', "DataRow"}, + {'S', "ParameterStatus"}, }; switch (direction) { case EDirection::Incoming: @@ -179,8 +182,14 @@ protected: return ((const TPGInitial&)message).Dump(); case 'Q': return ((const TPGQuery&)message).Dump(); - case 'S': - return ((const TPGParameterStatus&)message).Dump(); + case 'S': { + switch (direction) { + case EDirection::Incoming: + return ((const TPGSync&)message).Dump(); + case EDirection::Outgoing: + return ((const TPGParameterStatus&)message).Dump(); + } + } case 'Z': return ((const TPGReadyForQuery&)message).Dump(); case 'C': @@ -299,8 +308,17 @@ protected: } void BecomeReadyForQuery() { - SendReadyForQuery(); ++OutgoingSequenceNumber; + SendReadyForQuery(); + ReplayPostponedEvents(); + FlushAndPoll(); + } + + void BecomeReadyForQueryOnSync() { + if (++OutgoingSequenceNumber == SyncSequenceNumber) { + ++OutgoingSequenceNumber; + SendReadyForQuery(); + } ReplayPostponedEvents(); FlushAndPoll(); } @@ -391,53 +409,17 @@ protected: } void HandleMessage(const TPGParse* message) { - if (IsQueryEmpty(message->GetQueryData().Query)) { - SendMessage(TPGEmptyQueryResponse()); - BecomeReadyForQuery(); - } else { - Send(DatabaseProxy, new TEvPGEvents::TEvParse(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++); - } + Send(DatabaseProxy, new TEvPGEvents::TEvParse(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++); } - void HandleMessage(const TPGParameterStatus* message) { - Y_UNUSED(message); + void HandleMessage(const TPGSync*) { + SyncSequenceNumber = IncomingSequenceNumber++; } void HandleMessage(const TPGBind* message) { Send(DatabaseProxy, new TEvPGEvents::TEvBind(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++); } - // void HandleMessage(const TPGDescribe* message) { - // Y_UNUSED(message); - // // describe current statement - // auto ev = std::make_unique<TEvPGEvents::TEvRowDescription>(); - // ev->Fields.push_back({ - // .Name = "column1", - // }); - // Send(SelfId(), ev.release()); - // } - - // void HandleMessage(const TPGExecute* message) { - // Y_UNUSED(message); - // // execute current statement - // auto ev = std::make_unique<NPG::TEvPGEvents::TEvDataRows>(); - // { - // ev->Rows.emplace_back(); - // auto& row = ev->Rows.back(); - // row.resize(1); - // row[0] = "345"; - // } - // { - // ev->Rows.emplace_back(); - // auto& row = ev->Rows.back(); - // row.resize(1); - // row[0] = "456"; - // } - // Send(SelfId(), ev.release()); - // // - // Send(SelfId(), new NPG::TEvPGEvents::TEvCommandComplete("OK")); - // } - void HandleMessage(const TPGDescribe* message) { Send(DatabaseProxy, new TEvPGEvents::TEvDescribe(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++); } @@ -611,7 +593,7 @@ protected: errorResponse << '\0'; SendStream(errorResponse); } - BecomeReadyForQuery(); + BecomeReadyForQueryOnSync(); } else { PostponeEvent(ev); } @@ -621,7 +603,7 @@ protected: if (IsEventExpected(ev)) { TPGStreamOutput<TPGParseComplete> parseComplete; SendStream(parseComplete); - BecomeReadyForQuery(); + BecomeReadyForQueryOnSync(); } else { PostponeEvent(ev); } @@ -689,8 +671,8 @@ protected: case EMessageCode::Parse: HandleMessage(static_cast<const TPGParse*>(message)); break; - case EMessageCode::ParameterStatus: - HandleMessage(static_cast<const TPGParameterStatus*>(message)); + case EMessageCode::Sync: + HandleMessage(static_cast<const TPGSync*>(message)); break; case EMessageCode::Bind: HandleMessage(static_cast<const TPGBind*>(message)); diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h index be4f876f43..3350f4ab0b 100644 --- a/ydb/core/pgproxy/pg_proxy_types.h +++ b/ydb/core/pgproxy/pg_proxy_types.h @@ -40,6 +40,10 @@ struct TPGMessage { bool Empty() const { return GetDataSize() == 0; } + + TString Dump() const { + return {}; + } }; struct TPGInitial : TPGMessage { // it's not true, because we don't receive message code from a network, but imply it on the start @@ -122,6 +126,12 @@ struct TPGParameterStatus : TPGMessage { } }; +struct TPGSync : TPGMessage { + TPGSync() { + Message = 'S'; + } +}; + struct TPGReadyForQuery : TPGMessage { TPGReadyForQuery() { Message = 'Z'; |