aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-05-12 09:37:46 +0300
committerxenoxeno <xeno@ydb.tech>2023-05-12 09:37:46 +0300
commit77f974279efe0b78709ed6933547e04a5e636dbb (patch)
tree70e5b891425e8c2490a67f7e927590d8d2e3596d
parentb291c820d0a871e7cee0c620ceabc8ae73205608 (diff)
downloadydb-77f974279efe0b78709ed6933547e04a5e636dbb.tar.gz
add support of sync command
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp3
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp76
-rw-r--r--ydb/core/pgproxy/pg_proxy_types.h10
3 files changed, 40 insertions, 49 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index 8fd6229fe2..3b43f753bd 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -280,8 +280,7 @@ public:
}
switch (paramType) {
case INT2OID:
- paramsBuilder.AddParam(TStringBuilder() << "$_" << idxParam + 1).Int16(atoi(paramValue.data())).Build();
- injectedQuery << "DECLARE $_" << idxParam + 1 << " AS Int16;" << Endl;
+ paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build();
break;
}
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index fd914990f6..3c9a1adaec 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -20,6 +20,7 @@ public:
PasswordMessage = 'p',
Parse = 'P',
ParameterStatus = 'S',
+ Sync = 'S',
Bind = 'B',
Describe = 'D',
Execute = 'E',
@@ -50,6 +51,7 @@ public:
std::shared_ptr<TPGInitial> InitialMessage;
ui64 IncomingSequenceNumber = 1;
ui64 OutgoingSequenceNumber = 1;
+ ui64 SyncSequenceNumber = 1;
std::deque<TAutoPtr<IEventHandle>> PostponedEvents;
TPGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy)
@@ -139,7 +141,6 @@ protected:
{'C', "CommandComplete"},
{'X', "Terminate"},
{'T', "RowDescription"},
- {'S', "ParameterStatus"},
{'I', "EmptyQueryResponse"},
{'p', "PasswordMessage"},
{'P', "Parse"},
@@ -154,10 +155,12 @@ protected:
static const std::unordered_map<char, TStringBuf> incomingMessageName = {
{'E', "Execute"},
{'D', "Describe"},
+ {'S', "Sync"},
};
static const std::unordered_map<char, TStringBuf> outgoingMessageName = {
{'E', "ErrorResponse"},
{'D', "DataRow"},
+ {'S', "ParameterStatus"},
};
switch (direction) {
case EDirection::Incoming:
@@ -179,8 +182,14 @@ protected:
return ((const TPGInitial&)message).Dump();
case 'Q':
return ((const TPGQuery&)message).Dump();
- case 'S':
- return ((const TPGParameterStatus&)message).Dump();
+ case 'S': {
+ switch (direction) {
+ case EDirection::Incoming:
+ return ((const TPGSync&)message).Dump();
+ case EDirection::Outgoing:
+ return ((const TPGParameterStatus&)message).Dump();
+ }
+ }
case 'Z':
return ((const TPGReadyForQuery&)message).Dump();
case 'C':
@@ -299,8 +308,17 @@ protected:
}
void BecomeReadyForQuery() {
- SendReadyForQuery();
++OutgoingSequenceNumber;
+ SendReadyForQuery();
+ ReplayPostponedEvents();
+ FlushAndPoll();
+ }
+
+ void BecomeReadyForQueryOnSync() {
+ if (++OutgoingSequenceNumber == SyncSequenceNumber) {
+ ++OutgoingSequenceNumber;
+ SendReadyForQuery();
+ }
ReplayPostponedEvents();
FlushAndPoll();
}
@@ -391,53 +409,17 @@ protected:
}
void HandleMessage(const TPGParse* message) {
- if (IsQueryEmpty(message->GetQueryData().Query)) {
- SendMessage(TPGEmptyQueryResponse());
- BecomeReadyForQuery();
- } else {
- Send(DatabaseProxy, new TEvPGEvents::TEvParse(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++);
- }
+ Send(DatabaseProxy, new TEvPGEvents::TEvParse(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++);
}
- void HandleMessage(const TPGParameterStatus* message) {
- Y_UNUSED(message);
+ void HandleMessage(const TPGSync*) {
+ SyncSequenceNumber = IncomingSequenceNumber++;
}
void HandleMessage(const TPGBind* message) {
Send(DatabaseProxy, new TEvPGEvents::TEvBind(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++);
}
- // void HandleMessage(const TPGDescribe* message) {
- // Y_UNUSED(message);
- // // describe current statement
- // auto ev = std::make_unique<TEvPGEvents::TEvRowDescription>();
- // ev->Fields.push_back({
- // .Name = "column1",
- // });
- // Send(SelfId(), ev.release());
- // }
-
- // void HandleMessage(const TPGExecute* message) {
- // Y_UNUSED(message);
- // // execute current statement
- // auto ev = std::make_unique<NPG::TEvPGEvents::TEvDataRows>();
- // {
- // ev->Rows.emplace_back();
- // auto& row = ev->Rows.back();
- // row.resize(1);
- // row[0] = "345";
- // }
- // {
- // ev->Rows.emplace_back();
- // auto& row = ev->Rows.back();
- // row.resize(1);
- // row[0] = "456";
- // }
- // Send(SelfId(), ev.release());
- // //
- // Send(SelfId(), new NPG::TEvPGEvents::TEvCommandComplete("OK"));
- // }
-
void HandleMessage(const TPGDescribe* message) {
Send(DatabaseProxy, new TEvPGEvents::TEvDescribe(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++);
}
@@ -611,7 +593,7 @@ protected:
errorResponse << '\0';
SendStream(errorResponse);
}
- BecomeReadyForQuery();
+ BecomeReadyForQueryOnSync();
} else {
PostponeEvent(ev);
}
@@ -621,7 +603,7 @@ protected:
if (IsEventExpected(ev)) {
TPGStreamOutput<TPGParseComplete> parseComplete;
SendStream(parseComplete);
- BecomeReadyForQuery();
+ BecomeReadyForQueryOnSync();
} else {
PostponeEvent(ev);
}
@@ -689,8 +671,8 @@ protected:
case EMessageCode::Parse:
HandleMessage(static_cast<const TPGParse*>(message));
break;
- case EMessageCode::ParameterStatus:
- HandleMessage(static_cast<const TPGParameterStatus*>(message));
+ case EMessageCode::Sync:
+ HandleMessage(static_cast<const TPGSync*>(message));
break;
case EMessageCode::Bind:
HandleMessage(static_cast<const TPGBind*>(message));
diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h
index be4f876f43..3350f4ab0b 100644
--- a/ydb/core/pgproxy/pg_proxy_types.h
+++ b/ydb/core/pgproxy/pg_proxy_types.h
@@ -40,6 +40,10 @@ struct TPGMessage {
bool Empty() const {
return GetDataSize() == 0;
}
+
+ TString Dump() const {
+ return {};
+ }
};
struct TPGInitial : TPGMessage { // it's not true, because we don't receive message code from a network, but imply it on the start
@@ -122,6 +126,12 @@ struct TPGParameterStatus : TPGMessage {
}
};
+struct TPGSync : TPGMessage {
+ TPGSync() {
+ Message = 'S';
+ }
+};
+
struct TPGReadyForQuery : TPGMessage {
TPGReadyForQuery() {
Message = 'Z';