aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2023-07-05 17:12:25 +0000
committerdcherednik <dcherednik@ydb.tech>2023-07-05 20:12:25 +0300
commit701022a4119724b624e7dad57637935a93c04b23 (patch)
tree69123331582a81a688666ef0d1c4c762916e311d
parent4f33ac20e49231f7a57fc9f1c8fb43ce7ee2ada4 (diff)
downloadydb-701022a4119724b624e7dad57637935a93c04b23.tar.gz
Do not forward ping session response via proxy in case of local session. KIKIMR-18250
Pull Request resolved: #293
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp26
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp33
2 files changed, 49 insertions, 10 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 12d9f3577c6..7ee094c6c39 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -661,17 +661,18 @@ public:
auto& event = ev->Get()->Record;
auto& request = event.GetRequest();
- auto traceId = event.GetTraceId();
+ const auto traceId = event.GetTraceId();
TKqpRequestInfo requestInfo(traceId);
- auto sessionId = request.GetSessionId();
- ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest);
+ const auto sessionId = request.GetSessionId();
const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
- KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId);
Counters->ReportPingSession(dbCounters, request.ByteSize());
TActorId targetId;
+ // Local session
if (sessionInfo) {
+ KQP_PROXY_LOG_D("Received ping session request, has local session, trace_id: " << traceId);
+
targetId = sessionInfo->WorkerId;
const bool isIdle = LocalSessions->IsSessionIdle(sessionInfo);
if (isIdle) {
@@ -686,13 +687,18 @@ public:
? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
: Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY;
record.MutableResponse()->SetSessionStatus(sessionStatus);
- Send(SelfId(), result.release(), IEventHandle::FlagTrackDelivery, requestId);
+ Send(ev->Sender, result.release(), 0, ev->Cookie);
+ return;
+ }
+
+ // Forward request to another proxy
+ ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest);
+
+ KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId);
+
+ targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
+ if (!targetId) {
return;
- } else {
- targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
- if (!targetId) {
- return;
- }
}
TDuration timeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
index e4592c1ae22..15a8961b8ec 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
@@ -17,6 +17,8 @@
#include <library/cpp/actors/interconnect/interconnect_impl.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
+
#include <util/generic/vector.h>
#include <memory>
@@ -522,5 +524,36 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
UNIT_ASSERT_VALUES_EQUAL(listResult.GetChildren().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(listResult.GetChildren()[0].Name, ".sys");
}
+
+ Y_UNIT_TEST(PingNotExistedSession) {
+ NKikimrConfig::TAppConfig appConfig;
+ NYdb::TKikimrWithGrpcAndRootSchema server(appConfig);
+
+ ui16 grpc = server.GetPort();
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_PROXY, NActors::NLog::PRI_DEBUG);
+
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);
+ bool allDoneOk = false;
+
+ {
+ NGrpc::TGRpcClientLow clientLow;
+ auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Table::V1::TableService>(clientConfig);
+
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id("ydb://session/3?node_id=2&id=YDB0NDRhNjItYWQwZmIzMTktMWUyOTE4ZWYtYzE0NzJjNg==");
+
+ NGrpc::TResponseCallback<Ydb::Table::KeepAliveResponse> responseCb =
+ [&allDoneOk](NGrpc::TGrpcStatus&& grpcStatus, Ydb::Table::KeepAliveResponse&& response) -> void {
+ UNIT_ASSERT(grpcStatus.GRpcStatusCode == 0);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_SESSION);
+ allDoneOk = true;
+ };
+
+ connection->DoRequest(request, std::move(responseCb), &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive);
+ }
+
+ UNIT_ASSERT(allDoneOk);
+ }
} // namspace NKqp
} // namespace NKikimr