aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrew-rykov <arykov@ydb.tech>2023-08-03 13:09:13 +0300
committerandrew-rykov <arykov@ydb.tech>2023-08-03 13:09:13 +0300
commita31dbe76e19c44ca466f2fb326fbfe4f3f4a95b3 (patch)
treeed1d7b9d2131faee7022d89b64c5764f3789da0e
parent17008b8af29774dd4a121912c7e54998f0ea4f23 (diff)
downloadydb-a31dbe76e19c44ca466f2fb326fbfe4f3f4a95b3.tar.gz
KIKIMR-18328 query handler add discovery and proxy
fixed TEvHttpInfoRes handler KIKIMR-18328 quer handler add discovery
-rw-r--r--ydb/core/viewer/json_query.h281
-rw-r--r--ydb/core/viewer/json_tenantinfo.h2
-rw-r--r--ydb/core/viewer/protos/viewer.proto9
-rw-r--r--ydb/core/viewer/viewer_request.cpp3
4 files changed, 231 insertions, 64 deletions
diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h
index d703976a93..ef51e3479d 100644
--- a/ydb/core/viewer/json_query.h
+++ b/ydb/core/viewer/json_query.h
@@ -15,95 +15,151 @@
//#include <ydb/public/lib/deprecated/kicli/kicli.h>
#include <ydb/public/lib/json_value/ydb_json_value.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+#include "json_pipe_req.h"
+#include "viewer_request.h"
namespace NKikimr {
namespace NViewer {
using namespace NActors;
+using namespace NMonitoring;
using ::google::protobuf::FieldDescriptor;
-class TJsonQuery : public TActorBootstrapped<TJsonQuery> {
+class TJsonQuery : public TViewerPipeClient<TJsonQuery> {
using TThis = TJsonQuery;
- using TBase = TActorBootstrapped<TJsonQuery>;
+ using TBase = TViewerPipeClient<TJsonQuery>;
IViewer* Viewer;
TJsonSettings JsonSettings;
- TActorId Initiator;
NMon::TEvHttpInfo::TPtr Event;
+ TEvViewer::TEvViewerRequest::TPtr ViewerRequest;
ui32 Timeout = 0;
TVector<Ydb::ResultSet> ResultSets;
+ TString Query;
+ TString Database;
TString Action;
TString Stats;
TString Schema = "classic";
TString Syntax;
+ TString UserToken;
+
+ std::optional<TNodeId> SubscribedNodeId;
+ std::vector<TNodeId> TenantDynamicNodes;
+ bool Direct = false;
+ bool MadeKqpProxyRequest = false;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::VIEWER_HANDLER;
}
- TJsonQuery(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
- : Viewer(viewer)
- , Initiator(ev->Sender)
- , Event(ev)
- {}
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleReply);
- hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleReply);
- hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply);
- hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply);
- hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply);
-
- cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
-
- default: {
- Cerr << "Unexpected event received in TJsonQuery::StateWork: " << ev->GetTypeRewrite() << Endl;
- }
- }
- }
-
- void Bootstrap() {
- const auto& params(Event->Get()->Request.GetParams());
- auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ void ParseCgiParameters(const TCgiParameters& params) {
JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), false);
JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false);
Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 60000);
- TString query = params.Get("query");
- TString database = params.Get("database");
+ Query = params.Get("query");
+ Database = params.Get("database");
Stats = params.Get("stats");
Action = params.Get("action");
Schema = params.Get("schema");
- Syntax = params.Get("syntax");
if (Schema.empty()) {
Schema = "classic";
}
- if (query.empty() && Event->Get()->Request.GetMethod() == HTTP_METHOD_POST) {
- TStringBuf content = Event->Get()->Request.GetPostContent();
+ Syntax = params.Get("syntax");
+ Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
+ }
+
+ void ParsePostContent(const TStringBuf& content) {
+ static NJson::TJsonReaderConfig JsonConfig;
+ NJson::TJsonValue requestData;
+ bool success = NJson::ReadJsonTree(content, &JsonConfig, &requestData);
+ if (success) {
+ Query = Query.empty() ? requestData["query"].GetStringSafe({}) : Query;
+ Database = Database.empty() ? requestData["database"].GetStringSafe({}) : Database;
+ Stats = Stats.empty() ? requestData["stats"].GetStringSafe({}) : Stats;
+ Action = Action.empty() ? requestData["action"].GetStringSafe({}) : Action;
+ Syntax = Syntax.empty() ? requestData["syntax"].GetStringSafe({}) : Syntax;
+ }
+ }
+
+ bool IsPostContent() {
+ if (Event->Get()->Request.GetMethod() == HTTP_METHOD_POST) {
const THttpHeaders& headers = Event->Get()->Request.GetHeaders();
auto itContentType = FindIf(headers, [](const auto& header) { return header.Name() == "Content-Type"; });
if (itContentType != headers.end()) {
TStringBuf contentTypeHeader = itContentType->Value();
TStringBuf contentType = contentTypeHeader.NextTok(';');
- if (contentType == "application/json") {
- static NJson::TJsonReaderConfig JsonConfig;
- NJson::TJsonValue requestData;
- bool success = NJson::ReadJsonTree(content, &JsonConfig, &requestData);
- if (success) {
- query = requestData["query"].GetStringSafe({});
- database = requestData["database"].GetStringSafe({});
- Stats = requestData["stats"].GetStringSafe({});
- Action = requestData["action"].GetStringSafe({});
- }
- }
+ return contentType == "application/json";
}
}
- if (query.empty()) {
- ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request"));
+ return false;
+ }
+
+ TJsonQuery(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
+ : Viewer(viewer)
+ , Event(ev)
+ {
+ const auto& params(Event->Get()->Request.GetParams());
+ InitConfig(params);
+ ParseCgiParameters(params);
+ if (IsPostContent()) {
+ TStringBuf content = Event->Get()->Request.GetPostContent();
+ ParsePostContent(content);
+ }
+ UserToken = Event->Get()->UserToken;
+ }
+
+ TJsonQuery(TEvViewer::TEvViewerRequest::TPtr& ev)
+ : ViewerRequest(ev)
+ {
+ auto& request = ViewerRequest->Get()->Record.GetQueryRequest();
+
+ TCgiParameters params(request.GetUri());
+ InitConfig(params);
+ ParseCgiParameters(params);
+
+ TStringBuf content = request.GetContent();
+ if (content) {
+ ParsePostContent(content);
+ }
+
+ Timeout = ViewerRequest->Get()->Record.GetTimeout();
+ UserToken = request.GetUserToken();
+ Direct = true;
+ }
+
+ void PassAway() override {
+ if (SubscribedNodeId.has_value()) {
+ Send(TActivationContext::InterconnectProxy(SubscribedNodeId.value()), new TEvents::TEvUnsubscribe());
+ }
+ TBase::PassAway();
+ BLOG_TRACE("PassAway()");
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStateStorage::TEvBoardInfo, HandleReply);
+ hFunc(TEvents::TEvUndelivered, Undelivered);
+ hFunc(TEvInterconnect::TEvNodeConnected, Connected);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected);
+ hFunc(TEvViewer::TEvViewerResponse, HandleReply);
+ hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleReply);
+ hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleReply);
+ hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply);
+ hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply);
+ hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply);
+
+ cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ }
+ }
+
+ void SendKpqProxyRequest() {
+ if (MadeKqpProxyRequest) {
return;
}
+ MadeKqpProxyRequest = true;
+ auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
- request.SetQuery(query);
+ request.SetQuery(Query);
if (Action.empty() || Action == "execute-script" || Action == "execute") {
request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
@@ -140,11 +196,11 @@ public:
request.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_PROFILE);
request.SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE);
}
- if (database) {
- request.SetDatabase(database);
+ if (Database) {
+ request.SetDatabase(Database);
}
- if (!Event->Get()->UserToken.empty()) {
- event->Record.SetUserToken(Event->Get()->UserToken);
+ if (UserToken) {
+ event->Record.SetUserToken(UserToken);
}
if (Syntax == "yql_v1") {
request.SetSyntax(Ydb::Query::Syntax::SYNTAX_YQL_V1);
@@ -153,7 +209,27 @@ public:
}
ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+ }
+ void Bootstrap() {
+ if (Query.empty()) {
+ if (Event) {
+ ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request"));
+ } else {
+ auto* response = new TEvViewer::TEvViewerResponse();
+ response->Record.MutableQueryResponse()->SetYdbStatus(Ydb::StatusIds::BAD_REQUEST);
+ ReplyAndPassAway(response);
+ }
+ return;
+ }
+
+ if (Database && !Direct) {
+ RequestStateStorageEndpointsLookup(Database); // to find some dynamic node and redirect query there
+ }
+
+ if (Requests == 0) {
+ SendKpqProxyRequest();
+ }
Become(&TThis::StateWork, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
}
@@ -244,23 +320,88 @@ private:
}
}
- void HandleReply(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
- TStringBuilder out;
- NJson::TJsonValue jsonResponse;
- NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
- if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- MakeOkReply(out, jsonResponse, record);
+ void Connected(TEvInterconnect::TEvNodeConnected::TPtr &) {}
+
+ void Undelivered(TEvents::TEvUndelivered::TPtr &ev) {
+ if (ev->Get()->SourceType == NViewer::TEvViewer::EvViewerRequest) {
+ SendKpqProxyRequest();
+ }
+ }
+
+ void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr &) {
+ SendKpqProxyRequest();
+ }
+
+ void SendDynamicNodeQueryRequest() {
+ ui64 hash = std::hash<TString>()(Event->Get()->Request.GetRemoteAddr());
+
+ auto itPos = std::next(TenantDynamicNodes.begin(), hash % TenantDynamicNodes.size());
+ std::nth_element(TenantDynamicNodes.begin(), itPos, TenantDynamicNodes.end());
+
+ TNodeId nodeId = *itPos;
+ SubscribedNodeId = nodeId;
+ TActorId viewerServiceId = MakeViewerID(nodeId);
+
+ THolder<TEvViewer::TEvViewerRequest> request = MakeHolder<TEvViewer::TEvViewerRequest>();
+ request->Record.SetTimeout(Timeout);
+ auto queryRequest = request->Record.MutableQueryRequest();
+ queryRequest->SetUri(TString(Event->Get()->Request.GetUri()));
+ if (IsPostContent()) {
+ TStringBuf content = Event->Get()->Request.GetPostContent();
+ queryRequest->SetContent(TString(content));
+ }
+ if (UserToken) {
+ queryRequest->SetUserToken(UserToken);
+ }
+
+ ViewerWhiteboardCookie cookie(NKikimrViewer::TEvViewerRequest::kQueryRequest, nodeId);
+ SendRequest(viewerServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie.ToUi64());
+ }
+
+ void HandleReply(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
+ BLOG_TRACE("Received TEvBoardInfo");
+ if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ for (const auto& [actorId, infoEntry] : ev->Get()->InfoEntries) {
+ TenantDynamicNodes.emplace_back(actorId.NodeId());
+ }
+ }
+ if (TenantDynamicNodes.empty()) {
+ SendKpqProxyRequest();
} else {
- MakeErrorReply(out, jsonResponse, record);
+ SendDynamicNodeQueryRequest();
}
+ }
+
+ void Handle(NKikimrKqp::TEvQueryResponse& record) {
+ if (Event) {
+ TStringBuilder out;
+ NJson::TJsonValue jsonResponse;
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ MakeOkReply(out, jsonResponse, record);
+ } else {
+ MakeErrorReply(out, jsonResponse, record);
+ }
+
+ if (Schema == "classic" && Stats.empty() && (Action.empty() || Action == "execute")) {
+ jsonResponse = std::move(jsonResponse["result"]);
+ }
+
+ out << NJson::WriteJson(jsonResponse, false);
- if (Schema == "classic" && Stats.empty() && (Action.empty() || Action == "execute")) {
- jsonResponse = std::move(jsonResponse["result"]);
+ ReplyAndPassAway(out);
+ } else {
+ TEvViewer::TEvViewerResponse* response = new TEvViewer::TEvViewerResponse();
+ response->Record.MutableQueryResponse()->CopyFrom(record);
+ ReplyAndPassAway(response);
}
+ }
- out << NJson::WriteJson(jsonResponse, false);
+ void HandleReply(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ Handle(ev->Get()->Record.GetRef());
+ }
- ReplyAndPassAway(out);
+ void HandleReply(TEvViewer::TEvViewerResponse::TPtr& ev) {
+ Handle(*(ev.Get()->Get()->Record.MutableQueryResponse()));
}
void HandleReply(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) {
@@ -287,11 +428,22 @@ private:
}
void HandleTimeout() {
- ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()));
+ if (Event) {
+ ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()));
+ } else {
+ auto* response = new TEvViewer::TEvViewerResponse();
+ response->Record.MutableQueryResponse()->SetYdbStatus(Ydb::StatusIds::TIMEOUT);
+ ReplyAndPassAway(response);
+ }
+ }
+
+ void ReplyAndPassAway(TEvViewer::TEvViewerResponse* response) {
+ Send(ViewerRequest->Sender, response);
+ PassAway();
}
void ReplyAndPassAway(TString data) {
- Send(Initiator, new NMon::TEvHttpInfoRes(std::move(data), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
+ Send(Event->Sender, new NMon::TEvHttpInfoRes(std::move(data), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
PassAway();
}
@@ -421,6 +573,7 @@ struct TJsonRequestParameters<TJsonQuery> {
static TString GetParameters() {
return R"___([{"name":"ui64","in":"query","description":"return ui64 as number","required":false,"type":"boolean"},
{"name":"query","in":"query","description":"query text","required":true,"type":"string"},
+ {"name":"direct","in":"query","description":"force processing query on current node","required":false,"type":"boolean"},
{"name":"syntax","in":"query","description":"query syntax (yql_v1, pg)","required":false,"type":"string"},
{"name":"database","in":"query","description":"database name","required":false,"type":"string"},
{"name":"schema","in":"query","description":"result format schema (classic, modern, ydb)","required":false,"type":"string"},
diff --git a/ydb/core/viewer/json_tenantinfo.h b/ydb/core/viewer/json_tenantinfo.h
index 846f1947d0..0ab7e32811 100644
--- a/ydb/core/viewer/json_tenantinfo.h
+++ b/ydb/core/viewer/json_tenantinfo.h
@@ -386,6 +386,7 @@ public:
TenantNodesSystemInfo[tenantId] = std::move(ev->Get()->Record);
RequestDone();
break;
+ case NKikimrViewer::TEvViewerResponse::kQueryResponse:
case NKikimrViewer::TEvViewerResponse::RESPONSE_NOT_SET:
break;
}
@@ -418,6 +419,7 @@ public:
case NKikimrViewer::TEvViewerRequest::kSystemRequest:
SendViewerSystemRequest(tenantId);
break;
+ case NKikimrViewer::TEvViewerRequest::kQueryRequest:
case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET:
break;
}
diff --git a/ydb/core/viewer/protos/viewer.proto b/ydb/core/viewer/protos/viewer.proto
index 90dc90f1df..e5e0a2f427 100644
--- a/ydb/core/viewer/protos/viewer.proto
+++ b/ydb/core/viewer/protos/viewer.proto
@@ -5,6 +5,7 @@ import "ydb/core/protos/node_whiteboard.proto";
import "ydb/core/protos/flat_scheme_op.proto";
import "ydb/core/protos/tablet.proto";
import "ydb/core/protos/hive.proto";
+import "ydb/core/protos/kqp.proto";
import "ydb/public/api/protos/ydb_cms.proto";
package NKikimrViewer;
@@ -498,12 +499,19 @@ message TNodeLocation {
repeated uint32 NodeId = 1;
}
+message TQueryRequest {
+ string Uri = 1;
+ bytes Content = 2;
+ string UserToken = 3;
+}
+
message TEvViewerRequest {
TNodeLocation Location = 1;
uint32 Timeout = 2; // ms
oneof Request {
NKikimrWhiteboard.TEvTabletStateRequest TabletRequest = 11;
NKikimrWhiteboard.TEvSystemStateRequest SystemRequest = 12;
+ TQueryRequest QueryRequest = 13;
}
}
@@ -512,6 +520,7 @@ message TEvViewerResponse {
oneof Response {
NKikimrWhiteboard.TEvTabletStateResponse TabletResponse = 11;
NKikimrWhiteboard.TEvSystemStateResponse SystemResponse = 12;
+ NKikimrKqp.TEvQueryResponse QueryResponse = 13;
}
}
diff --git a/ydb/core/viewer/viewer_request.cpp b/ydb/core/viewer/viewer_request.cpp
index 2721c5b42c..867a5cb034 100644
--- a/ydb/core/viewer/viewer_request.cpp
+++ b/ydb/core/viewer/viewer_request.cpp
@@ -5,6 +5,7 @@
#include "json_tabletinfo.h"
#include "json_sysinfo.h"
+#include "json_query.h"
namespace NKikimr {
namespace NViewer {
@@ -71,6 +72,8 @@ IActor* CreateViewerRequestHandler(TEvViewer::TEvViewerRequest::TPtr request) {
return new TViewerWhiteboardRequest<TEvWhiteboard::TEvTabletStateRequest, TEvWhiteboard::TEvTabletStateResponse>(request);
case NKikimrViewer::TEvViewerRequest::kSystemRequest:
return new TViewerWhiteboardRequest<TEvWhiteboard::TEvSystemStateRequest, TEvWhiteboard::TEvSystemStateResponse>(request);
+ case NKikimrViewer::TEvViewerRequest::kQueryRequest:
+ return new TJsonQuery(request);
case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET:
return nullptr;
}