summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2022-03-21 18:18:26 +0300
committerVitalii Gridnev <[email protected]>2022-03-21 18:18:26 +0300
commitd5ddb08d0f28ad1933afb0bbc65cb1353d67dcb5 (patch)
tree9d5d518fbb553c496c30a89ee9142e343fe3b341
parentcc6a2b1d8898f6908b30857eff17a9f860adb835 (diff)
support remote session creation KIKIMR-11464
ref:ddd5b39af7acd165c9ed94226e246a9d0ca26475
-rw-r--r--ydb/core/grpc_services/base/base.h26
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h4
-rw-r--r--ydb/core/grpc_services/rpc_create_session.cpp5
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp48
-rw-r--r--ydb/core/protos/kqp.proto5
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_resources.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_resources.h1
7 files changed, 90 insertions, 1 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 2b9344e8fd4..ec9bd0a0626 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -239,6 +239,8 @@ public:
class IRequestCtxBase {
public:
virtual ~IRequestCtxBase() = default;
+ // Returns true if client has the specified capability
+ virtual bool HasClientCapability(const TString& capability) const = 0;
// Returns client provided database name
virtual const TMaybe<TString> GetDatabaseName() const = 0;
// Returns "internal" token (result of ticket parser authentication)
@@ -436,6 +438,10 @@ public:
InternalToken_ = token;
}
+ bool HasClientCapability(const TString&) const override {
+ return false;
+ }
+
const TMaybe<TString> GetDatabaseName() const override {
return Database_;
}
@@ -572,6 +578,16 @@ public:
return TString{res[0]};
}
+ bool HasClientCapability(const TString& capability) const override {
+ const auto& values = Ctx_->GetPeerMetaValues(NYdb::YDB_CLIENT_CAPABILITIES);
+ for(auto& value: values) {
+ if (value == capability)
+ return true;
+ }
+
+ return false;
+ }
+
const TMaybe<TString> GetDatabaseName() const override {
const auto& res = Ctx_->GetPeerMetaValues(NYdb::YDB_DATABASE_HEADER);
if (res.empty()) {
@@ -818,6 +834,16 @@ public:
return TString{res[0]};
}
+ bool HasClientCapability(const TString& capability) const override {
+ const auto& values = Ctx_->GetPeerMetaValues(NYdb::YDB_CLIENT_CAPABILITIES);
+ for(auto& value: values) {
+ if (capability == value)
+ return true;
+ }
+
+ return false;
+ }
+
const TMaybe<TString> GetDatabaseName() const override {
const auto& res = Ctx_->GetPeerMetaValues(NYdb::YDB_DATABASE_HEADER);
if (res.empty()) {
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index 4f95c495409..da29f6ae9ec 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -37,6 +37,10 @@ public:
, InternalToken(token)
{}
+ bool HasClientCapability(const TString&) const override {
+ return false;
+ }
+
const TMaybe<TString> GetDatabaseName() const override {
if (DatabaseName.empty())
return Nothing();
diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp
index c4e6c6f9ff5..eeaf8c1eaef 100644
--- a/ydb/core/grpc_services/rpc_create_session.cpp
+++ b/ydb/core/grpc_services/rpc_create_session.cpp
@@ -9,6 +9,7 @@
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
namespace NKikimr {
namespace NGRpcService {
@@ -63,6 +64,10 @@ private:
ev->Record.SetTraceId(traceId.GetRef());
}
+ if (Request().HasClientCapability(NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER)) {
+ ev->Record.SetCanCreateRemoteSession(true);
+ }
+
SetDatabase(ev, Request());
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release());
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index b323cb2f84d..dea435aacd5 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -53,6 +53,7 @@ static constexpr TDuration DEFAULT_KEEP_ALIVE_TIMEOUT = TDuration::MilliSeconds(
static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds(10);
static constexpr TDuration DEFAULT_PUBLISH_BATCHING_INTERVAL = TDuration::MilliSeconds(1000);
static constexpr TDuration DEFAUL_BOARD_LOOKUP_INTERVAL = TDuration::MilliSeconds(5000);
+static constexpr TDuration DEFAULT_CREATE_SESSION_TIMEOUT = TDuration::MilliSeconds(5000);
std::optional<ui32> GetDefaultStateStorageGroupId(const TString& database) {
@@ -285,6 +286,7 @@ public:
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext());
ModuleResolverState = MakeIntrusive<TModuleResolverState>();
+ RandomProvider = AppData()->RandomProvider;
if (!GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver)) {
TStringStream errorStream;
ModuleResolverState->ExprCtx.IssueManager.GetIssues().PrintTo(errorStream);
@@ -503,6 +505,12 @@ public:
KQP_PROXY_LOG_D("Failed to get system details");
break;
+ case TKqpEvents::EvCreateSessionRequest: {
+ KQP_PROXY_LOG_D("Remote create session request failed");
+ ReplyProcessError(Ydb::StatusIds::UNAVAILABLE, "Session not found.", ev->Cookie);
+ break;
+ }
+
case TKqpEvents::EvQueryRequest:
case TKqpEvents::EvPingSessionRequest: {
KQP_PROXY_LOG_D("Session not found, targetId: " << ev->Sender << " requestId: " << ev->Cookie);
@@ -531,10 +539,37 @@ public:
}
}
+ bool CreateRemoteSession(TEvKqp::TEvCreateSessionRequest::TPtr& ev) {
+ auto& event = ev->Get()->Record;
+ if (!event.GetCanCreateRemoteSession() || LocalDatacenterProxies.empty()) {
+ return false;
+ }
+
+ ui64 randomNumber = RandomProvider->GenRand();
+ ui32 nodeId = LocalDatacenterProxies[randomNumber % LocalDatacenterProxies.size()];
+ if (nodeId == SelfId().NodeId()){
+ return false;
+ }
+
+ std::unique_ptr<TEvKqp::TEvCreateSessionRequest> remoteRequest = std::make_unique<TEvKqp::TEvCreateSessionRequest>();
+ remoteRequest->Record.SetDeadlineUs(event.GetDeadlineUs());
+ remoteRequest->Record.SetTraceId(event.GetTraceId());
+ remoteRequest->Record.MutableRequest()->SetDatabase(event.GetRequest().GetDatabase());
+
+ ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest);
+ Send(MakeKqpProxyID(nodeId), remoteRequest.release(), IEventHandle::FlagTrackDelivery, requestId);
+ TDuration timeout = DEFAULT_CREATE_SESSION_TIMEOUT;
+ StartQueryTimeout(requestId, timeout);
+ return true;
+ }
+
void Handle(TEvKqp::TEvCreateSessionRequest::TPtr& ev) {
auto& event = ev->Get()->Record;
auto& request = event.GetRequest();
TKqpRequestInfo requestInfo(event.GetTraceId());
+ if (CreateRemoteSession(ev)) {
+ return;
+ }
auto responseEv = MakeHolder<TEvKqp::TEvCreateSessionResponse>();
@@ -757,14 +792,22 @@ public:
return;
}
+ Y_VERIFY(SelfDataCenterId);
PeerProxyNodeResources.resize(boardInfo->InfoEntries.size());
size_t idx = 0;
+ auto getDataCenterId = [](const auto& entry) {
+ return entry.HasDataCenterId() ? entry.GetDataCenterId() : DataCenterToString(entry.GetDataCenterNumId());
+ };
+
+ LocalDatacenterProxies.clear();
for(auto& [ownerId, entry] : boardInfo->InfoEntries) {
Y_PROTOBUF_SUPPRESS_NODISCARD PeerProxyNodeResources[idx].ParseFromString(entry.Payload);
+ if (getDataCenterId(PeerProxyNodeResources[idx]) == *SelfDataCenterId) {
+ LocalDatacenterProxies.emplace_back(PeerProxyNodeResources[idx].GetNodeId());
+ }
++idx;
}
- Y_VERIFY(SelfDataCenterId, "Unexpected case: empty info about DC!");
PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId);
TryKickSession();
}
@@ -1016,6 +1059,7 @@ public:
hFunc(TEvKqp::TEvInitiateShutdownRequest, Handle);
hFunc(TEvPrivate::TEvOnRequestTimeout, Handle);
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
+ hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
default:
Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?");
@@ -1299,6 +1343,8 @@ private:
bool ServerWorkerBalancerComplete = false;
std::optional<TString> SelfDataCenterId;
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+ std::vector<ui64> LocalDatacenterProxies;
TVector<NKikimrKqp::TKqpProxyNodeResources> PeerProxyNodeResources;
bool ResourcesPublishScheduled = false;
TString PublishBoardPath;
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 9b46b720e11..81ee704aede 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -307,6 +307,11 @@ message TEvCreateSessionRequest {
optional TCreateSessionRequest Request = 1;
optional string TraceId = 2;
optional uint64 DeadlineUs = 3;
+ // the flag that indicates that session can be created on the different
+ // node in the cluster.
+ // If flag is true, kqp proxy will create session on the different node,
+ // if flag is false, in this case proxy will create session locally.
+ optional bool CanCreateRemoteSession = 4 [default = false];
}
message TCreateSessionResponse {
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp b/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp
index 079e8a59a23..542bd0f1d47 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp
+++ b/ydb/public/sdk/cpp/client/resources/ydb_resources.cpp
@@ -22,6 +22,8 @@ const char* YDB_SESSION_CLOSE = "session-close";
// The client should send a feature capability-header in order to enable a feature on the server side.
// Send ("x-ydb-client-capabilities", "session-balancer") pair in metadata with a СreateSession request to enable server side session balancing feature.
const char* YDB_CLIENT_CAPABILITIES = "x-ydb-client-capabilities";
+const char* YDB_CLIENT_CAPABILITY_SESSION_BALANCER = "session-balancer";
+
TString GetSdkSemver() {
return NResource::Find("ydb_sdk_version.txt");
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_resources.h b/ydb/public/sdk/cpp/client/resources/ydb_resources.h
index 61dee0d2f4c..27443f976c1 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_resources.h
+++ b/ydb/public/sdk/cpp/client/resources/ydb_resources.h
@@ -13,6 +13,7 @@ extern const char* YDB_CONSUMED_UNITS_HEADER;
extern const char* YDB_SERVER_HINTS;
extern const char* YDB_CLIENT_CAPABILITIES;
extern const char* YDB_SESSION_CLOSE;
+extern const char* YDB_CLIENT_CAPABILITY_SESSION_BALANCER;
TString GetSdkSemver();