diff options
author | Vitalii Gridnev <[email protected]> | 2022-03-21 18:18:26 +0300 |
---|---|---|
committer | Vitalii Gridnev <[email protected]> | 2022-03-21 18:18:26 +0300 |
commit | d5ddb08d0f28ad1933afb0bbc65cb1353d67dcb5 (patch) | |
tree | 9d5d518fbb553c496c30a89ee9142e343fe3b341 | |
parent | cc6a2b1d8898f6908b30857eff17a9f860adb835 (diff) |
support remote session creation KIKIMR-11464
ref:ddd5b39af7acd165c9ed94226e246a9d0ca26475
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 26 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rpc/local_rpc.h | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_create_session.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 48 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 5 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/resources/ydb_resources.cpp | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/resources/ydb_resources.h | 1 |
7 files changed, 90 insertions, 1 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 2b9344e8fd4..ec9bd0a0626 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -239,6 +239,8 @@ public: class IRequestCtxBase { public: virtual ~IRequestCtxBase() = default; + // Returns true if client has the specified capability + virtual bool HasClientCapability(const TString& capability) const = 0; // Returns client provided database name virtual const TMaybe<TString> GetDatabaseName() const = 0; // Returns "internal" token (result of ticket parser authentication) @@ -436,6 +438,10 @@ public: InternalToken_ = token; } + bool HasClientCapability(const TString&) const override { + return false; + } + const TMaybe<TString> GetDatabaseName() const override { return Database_; } @@ -572,6 +578,16 @@ public: return TString{res[0]}; } + bool HasClientCapability(const TString& capability) const override { + const auto& values = Ctx_->GetPeerMetaValues(NYdb::YDB_CLIENT_CAPABILITIES); + for(auto& value: values) { + if (value == capability) + return true; + } + + return false; + } + const TMaybe<TString> GetDatabaseName() const override { const auto& res = Ctx_->GetPeerMetaValues(NYdb::YDB_DATABASE_HEADER); if (res.empty()) { @@ -818,6 +834,16 @@ public: return TString{res[0]}; } + bool HasClientCapability(const TString& capability) const override { + const auto& values = Ctx_->GetPeerMetaValues(NYdb::YDB_CLIENT_CAPABILITIES); + for(auto& value: values) { + if (capability == value) + return true; + } + + return false; + } + const TMaybe<TString> GetDatabaseName() const override { const auto& res = Ctx_->GetPeerMetaValues(NYdb::YDB_DATABASE_HEADER); if (res.empty()) { diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index 4f95c495409..da29f6ae9ec 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -37,6 +37,10 @@ public: , InternalToken(token) {} + bool HasClientCapability(const TString&) const override { + return false; + } + const TMaybe<TString> GetDatabaseName() const override { if (DatabaseName.empty()) return Nothing(); diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp index c4e6c6f9ff5..eeaf8c1eaef 100644 --- a/ydb/core/grpc_services/rpc_create_session.cpp +++ b/ydb/core/grpc_services/rpc_create_session.cpp @@ -9,6 +9,7 @@ #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> namespace NKikimr { namespace NGRpcService { @@ -63,6 +64,10 @@ private: ev->Record.SetTraceId(traceId.GetRef()); } + if (Request().HasClientCapability(NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER)) { + ev->Record.SetCanCreateRemoteSession(true); + } + SetDatabase(ev, Request()); Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release()); diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index b323cb2f84d..dea435aacd5 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -53,6 +53,7 @@ static constexpr TDuration DEFAULT_KEEP_ALIVE_TIMEOUT = TDuration::MilliSeconds( static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds(10); static constexpr TDuration DEFAULT_PUBLISH_BATCHING_INTERVAL = TDuration::MilliSeconds(1000); static constexpr TDuration DEFAUL_BOARD_LOOKUP_INTERVAL = TDuration::MilliSeconds(5000); +static constexpr TDuration DEFAULT_CREATE_SESSION_TIMEOUT = TDuration::MilliSeconds(5000); std::optional<ui32> GetDefaultStateStorageGroupId(const TString& database) { @@ -285,6 +286,7 @@ public: Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext()); ModuleResolverState = MakeIntrusive<TModuleResolverState>(); + RandomProvider = AppData()->RandomProvider; if (!GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver)) { TStringStream errorStream; ModuleResolverState->ExprCtx.IssueManager.GetIssues().PrintTo(errorStream); @@ -503,6 +505,12 @@ public: KQP_PROXY_LOG_D("Failed to get system details"); break; + case TKqpEvents::EvCreateSessionRequest: { + KQP_PROXY_LOG_D("Remote create session request failed"); + ReplyProcessError(Ydb::StatusIds::UNAVAILABLE, "Session not found.", ev->Cookie); + break; + } + case TKqpEvents::EvQueryRequest: case TKqpEvents::EvPingSessionRequest: { KQP_PROXY_LOG_D("Session not found, targetId: " << ev->Sender << " requestId: " << ev->Cookie); @@ -531,10 +539,37 @@ public: } } + bool CreateRemoteSession(TEvKqp::TEvCreateSessionRequest::TPtr& ev) { + auto& event = ev->Get()->Record; + if (!event.GetCanCreateRemoteSession() || LocalDatacenterProxies.empty()) { + return false; + } + + ui64 randomNumber = RandomProvider->GenRand(); + ui32 nodeId = LocalDatacenterProxies[randomNumber % LocalDatacenterProxies.size()]; + if (nodeId == SelfId().NodeId()){ + return false; + } + + std::unique_ptr<TEvKqp::TEvCreateSessionRequest> remoteRequest = std::make_unique<TEvKqp::TEvCreateSessionRequest>(); + remoteRequest->Record.SetDeadlineUs(event.GetDeadlineUs()); + remoteRequest->Record.SetTraceId(event.GetTraceId()); + remoteRequest->Record.MutableRequest()->SetDatabase(event.GetRequest().GetDatabase()); + + ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest); + Send(MakeKqpProxyID(nodeId), remoteRequest.release(), IEventHandle::FlagTrackDelivery, requestId); + TDuration timeout = DEFAULT_CREATE_SESSION_TIMEOUT; + StartQueryTimeout(requestId, timeout); + return true; + } + void Handle(TEvKqp::TEvCreateSessionRequest::TPtr& ev) { auto& event = ev->Get()->Record; auto& request = event.GetRequest(); TKqpRequestInfo requestInfo(event.GetTraceId()); + if (CreateRemoteSession(ev)) { + return; + } auto responseEv = MakeHolder<TEvKqp::TEvCreateSessionResponse>(); @@ -757,14 +792,22 @@ public: return; } + Y_VERIFY(SelfDataCenterId); PeerProxyNodeResources.resize(boardInfo->InfoEntries.size()); size_t idx = 0; + auto getDataCenterId = [](const auto& entry) { + return entry.HasDataCenterId() ? entry.GetDataCenterId() : DataCenterToString(entry.GetDataCenterNumId()); + }; + + LocalDatacenterProxies.clear(); for(auto& [ownerId, entry] : boardInfo->InfoEntries) { Y_PROTOBUF_SUPPRESS_NODISCARD PeerProxyNodeResources[idx].ParseFromString(entry.Payload); + if (getDataCenterId(PeerProxyNodeResources[idx]) == *SelfDataCenterId) { + LocalDatacenterProxies.emplace_back(PeerProxyNodeResources[idx].GetNodeId()); + } ++idx; } - Y_VERIFY(SelfDataCenterId, "Unexpected case: empty info about DC!"); PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId); TryKickSession(); } @@ -1016,6 +1059,7 @@ public: hFunc(TEvKqp::TEvInitiateShutdownRequest, Handle); hFunc(TEvPrivate::TEvOnRequestTimeout, Handle); hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle); + hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent); default: Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?"); @@ -1299,6 +1343,8 @@ private: bool ServerWorkerBalancerComplete = false; std::optional<TString> SelfDataCenterId; + TIntrusivePtr<IRandomProvider> RandomProvider; + std::vector<ui64> LocalDatacenterProxies; TVector<NKikimrKqp::TKqpProxyNodeResources> PeerProxyNodeResources; bool ResourcesPublishScheduled = false; TString PublishBoardPath; diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 9b46b720e11..81ee704aede 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -307,6 +307,11 @@ message TEvCreateSessionRequest { optional TCreateSessionRequest Request = 1; optional string TraceId = 2; optional uint64 DeadlineUs = 3; + // the flag that indicates that session can be created on the different + // node in the cluster. + // If flag is true, kqp proxy will create session on the different node, + // if flag is false, in this case proxy will create session locally. + optional bool CanCreateRemoteSession = 4 [default = false]; } message TCreateSessionResponse { diff --git a/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp b/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp index 079e8a59a23..542bd0f1d47 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp +++ b/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp @@ -22,6 +22,8 @@ const char* YDB_SESSION_CLOSE = "session-close"; // The client should send a feature capability-header in order to enable a feature on the server side. // Send ("x-ydb-client-capabilities", "session-balancer") pair in metadata with a СreateSession request to enable server side session balancing feature. const char* YDB_CLIENT_CAPABILITIES = "x-ydb-client-capabilities"; +const char* YDB_CLIENT_CAPABILITY_SESSION_BALANCER = "session-balancer"; + TString GetSdkSemver() { return NResource::Find("ydb_sdk_version.txt"); diff --git a/ydb/public/sdk/cpp/client/resources/ydb_resources.h b/ydb/public/sdk/cpp/client/resources/ydb_resources.h index 61dee0d2f4c..27443f976c1 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_resources.h +++ b/ydb/public/sdk/cpp/client/resources/ydb_resources.h @@ -13,6 +13,7 @@ extern const char* YDB_CONSUMED_UNITS_HEADER; extern const char* YDB_SERVER_HINTS; extern const char* YDB_CLIENT_CAPABILITIES; extern const char* YDB_SESSION_CLOSE; +extern const char* YDB_CLIENT_CAPABILITY_SESSION_BALANCER; TString GetSdkSemver(); |