diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2023-07-05 17:12:25 +0000 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-07-05 20:12:25 +0300 |
commit | 701022a4119724b624e7dad57637935a93c04b23 (patch) | |
tree | 69123331582a81a688666ef0d1c4c762916e311d | |
parent | 4f33ac20e49231f7a57fc9f1c8fb43ce7ee2ada4 (diff) | |
download | ydb-701022a4119724b624e7dad57637935a93c04b23.tar.gz |
Do not forward ping session response via proxy in case of local session. KIKIMR-18250
Pull Request resolved: #293
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 26 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 33 |
2 files changed, 49 insertions, 10 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 12d9f3577c6..7ee094c6c39 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -661,17 +661,18 @@ public: auto& event = ev->Get()->Record; auto& request = event.GetRequest(); - auto traceId = event.GetTraceId(); + const auto traceId = event.GetTraceId(); TKqpRequestInfo requestInfo(traceId); - auto sessionId = request.GetSessionId(); - ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest); + const auto sessionId = request.GetSessionId(); const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; - KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId); Counters->ReportPingSession(dbCounters, request.ByteSize()); TActorId targetId; + // Local session if (sessionInfo) { + KQP_PROXY_LOG_D("Received ping session request, has local session, trace_id: " << traceId); + targetId = sessionInfo->WorkerId; const bool isIdle = LocalSessions->IsSessionIdle(sessionInfo); if (isIdle) { @@ -686,13 +687,18 @@ public: ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY; record.MutableResponse()->SetSessionStatus(sessionStatus); - Send(SelfId(), result.release(), IEventHandle::FlagTrackDelivery, requestId); + Send(ev->Sender, result.release(), 0, ev->Cookie); + return; + } + + // Forward request to another proxy + ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest); + + KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId); + + targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); + if (!targetId) { return; - } else { - targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); - if (!targetId) { - return; - } } TDuration timeout = DEFAULT_KEEP_ALIVE_TIMEOUT; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index e4592c1ae22..15a8961b8ec 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -17,6 +17,8 @@ #include <library/cpp/actors/interconnect/interconnect_impl.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> + #include <util/generic/vector.h> #include <memory> @@ -522,5 +524,36 @@ Y_UNIT_TEST_SUITE(KqpProxy) { UNIT_ASSERT_VALUES_EQUAL(listResult.GetChildren().size(), 1); UNIT_ASSERT_VALUES_EQUAL(listResult.GetChildren()[0].Name, ".sys"); } + + Y_UNIT_TEST(PingNotExistedSession) { + NKikimrConfig::TAppConfig appConfig; + NYdb::TKikimrWithGrpcAndRootSchema server(appConfig); + + ui16 grpc = server.GetPort(); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_PROXY, NActors::NLog::PRI_DEBUG); + + TString location = TStringBuilder() << "localhost:" << grpc; + auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); + bool allDoneOk = false; + + { + NGrpc::TGRpcClientLow clientLow; + auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Table::V1::TableService>(clientConfig); + + Ydb::Table::KeepAliveRequest request; + request.set_session_id("ydb://session/3?node_id=2&id=YDB0NDRhNjItYWQwZmIzMTktMWUyOTE4ZWYtYzE0NzJjNg=="); + + NGrpc::TResponseCallback<Ydb::Table::KeepAliveResponse> responseCb = + [&allDoneOk](NGrpc::TGrpcStatus&& grpcStatus, Ydb::Table::KeepAliveResponse&& response) -> void { + UNIT_ASSERT(grpcStatus.GRpcStatusCode == 0); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_SESSION); + allDoneOk = true; + }; + + connection->DoRequest(request, std::move(responseCb), &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive); + } + + UNIT_ASSERT(allDoneOk); + } } // namspace NKqp } // namespace NKikimr |