diff options
author | andrew-rykov <arykov@ydb.tech> | 2023-08-03 13:09:13 +0300 |
---|---|---|
committer | andrew-rykov <arykov@ydb.tech> | 2023-08-03 13:09:13 +0300 |
commit | a31dbe76e19c44ca466f2fb326fbfe4f3f4a95b3 (patch) | |
tree | ed1d7b9d2131faee7022d89b64c5764f3789da0e | |
parent | 17008b8af29774dd4a121912c7e54998f0ea4f23 (diff) | |
download | ydb-a31dbe76e19c44ca466f2fb326fbfe4f3f4a95b3.tar.gz |
KIKIMR-18328 query handler add discovery and proxy
fixed TEvHttpInfoRes handler
KIKIMR-18328 quer handler add discovery
-rw-r--r-- | ydb/core/viewer/json_query.h | 281 | ||||
-rw-r--r-- | ydb/core/viewer/json_tenantinfo.h | 2 | ||||
-rw-r--r-- | ydb/core/viewer/protos/viewer.proto | 9 | ||||
-rw-r--r-- | ydb/core/viewer/viewer_request.cpp | 3 |
4 files changed, 231 insertions, 64 deletions
diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h index d703976a93..ef51e3479d 100644 --- a/ydb/core/viewer/json_query.h +++ b/ydb/core/viewer/json_query.h @@ -15,95 +15,151 @@ //#include <ydb/public/lib/deprecated/kicli/kicli.h> #include <ydb/public/lib/json_value/ydb_json_value.h> #include <ydb/public/sdk/cpp/client/ydb_result/result.h> +#include "json_pipe_req.h" +#include "viewer_request.h" namespace NKikimr { namespace NViewer { using namespace NActors; +using namespace NMonitoring; using ::google::protobuf::FieldDescriptor; -class TJsonQuery : public TActorBootstrapped<TJsonQuery> { +class TJsonQuery : public TViewerPipeClient<TJsonQuery> { using TThis = TJsonQuery; - using TBase = TActorBootstrapped<TJsonQuery>; + using TBase = TViewerPipeClient<TJsonQuery>; IViewer* Viewer; TJsonSettings JsonSettings; - TActorId Initiator; NMon::TEvHttpInfo::TPtr Event; + TEvViewer::TEvViewerRequest::TPtr ViewerRequest; ui32 Timeout = 0; TVector<Ydb::ResultSet> ResultSets; + TString Query; + TString Database; TString Action; TString Stats; TString Schema = "classic"; TString Syntax; + TString UserToken; + + std::optional<TNodeId> SubscribedNodeId; + std::vector<TNodeId> TenantDynamicNodes; + bool Direct = false; + bool MadeKqpProxyRequest = false; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::VIEWER_HANDLER; } - TJsonQuery(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Initiator(ev->Sender) - , Event(ev) - {} - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleReply); - hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleReply); - hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply); - hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply); - hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply); - - cFunc(TEvents::TSystem::Wakeup, HandleTimeout); - - default: { - Cerr << "Unexpected event received in TJsonQuery::StateWork: " << ev->GetTypeRewrite() << Endl; - } - } - } - - void Bootstrap() { - const auto& params(Event->Get()->Request.GetParams()); - auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + void ParseCgiParameters(const TCgiParameters& params) { JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), false); JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false); Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 60000); - TString query = params.Get("query"); - TString database = params.Get("database"); + Query = params.Get("query"); + Database = params.Get("database"); Stats = params.Get("stats"); Action = params.Get("action"); Schema = params.Get("schema"); - Syntax = params.Get("syntax"); if (Schema.empty()) { Schema = "classic"; } - if (query.empty() && Event->Get()->Request.GetMethod() == HTTP_METHOD_POST) { - TStringBuf content = Event->Get()->Request.GetPostContent(); + Syntax = params.Get("syntax"); + Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct); + } + + void ParsePostContent(const TStringBuf& content) { + static NJson::TJsonReaderConfig JsonConfig; + NJson::TJsonValue requestData; + bool success = NJson::ReadJsonTree(content, &JsonConfig, &requestData); + if (success) { + Query = Query.empty() ? requestData["query"].GetStringSafe({}) : Query; + Database = Database.empty() ? requestData["database"].GetStringSafe({}) : Database; + Stats = Stats.empty() ? requestData["stats"].GetStringSafe({}) : Stats; + Action = Action.empty() ? requestData["action"].GetStringSafe({}) : Action; + Syntax = Syntax.empty() ? requestData["syntax"].GetStringSafe({}) : Syntax; + } + } + + bool IsPostContent() { + if (Event->Get()->Request.GetMethod() == HTTP_METHOD_POST) { const THttpHeaders& headers = Event->Get()->Request.GetHeaders(); auto itContentType = FindIf(headers, [](const auto& header) { return header.Name() == "Content-Type"; }); if (itContentType != headers.end()) { TStringBuf contentTypeHeader = itContentType->Value(); TStringBuf contentType = contentTypeHeader.NextTok(';'); - if (contentType == "application/json") { - static NJson::TJsonReaderConfig JsonConfig; - NJson::TJsonValue requestData; - bool success = NJson::ReadJsonTree(content, &JsonConfig, &requestData); - if (success) { - query = requestData["query"].GetStringSafe({}); - database = requestData["database"].GetStringSafe({}); - Stats = requestData["stats"].GetStringSafe({}); - Action = requestData["action"].GetStringSafe({}); - } - } + return contentType == "application/json"; } } - if (query.empty()) { - ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request")); + return false; + } + + TJsonQuery(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) + : Viewer(viewer) + , Event(ev) + { + const auto& params(Event->Get()->Request.GetParams()); + InitConfig(params); + ParseCgiParameters(params); + if (IsPostContent()) { + TStringBuf content = Event->Get()->Request.GetPostContent(); + ParsePostContent(content); + } + UserToken = Event->Get()->UserToken; + } + + TJsonQuery(TEvViewer::TEvViewerRequest::TPtr& ev) + : ViewerRequest(ev) + { + auto& request = ViewerRequest->Get()->Record.GetQueryRequest(); + + TCgiParameters params(request.GetUri()); + InitConfig(params); + ParseCgiParameters(params); + + TStringBuf content = request.GetContent(); + if (content) { + ParsePostContent(content); + } + + Timeout = ViewerRequest->Get()->Record.GetTimeout(); + UserToken = request.GetUserToken(); + Direct = true; + } + + void PassAway() override { + if (SubscribedNodeId.has_value()) { + Send(TActivationContext::InterconnectProxy(SubscribedNodeId.value()), new TEvents::TEvUnsubscribe()); + } + TBase::PassAway(); + BLOG_TRACE("PassAway()"); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvStateStorage::TEvBoardInfo, HandleReply); + hFunc(TEvents::TEvUndelivered, Undelivered); + hFunc(TEvInterconnect::TEvNodeConnected, Connected); + hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected); + hFunc(TEvViewer::TEvViewerResponse, HandleReply); + hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleReply); + hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleReply); + hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply); + hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply); + hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply); + + cFunc(TEvents::TSystem::Wakeup, HandleTimeout); + } + } + + void SendKpqProxyRequest() { + if (MadeKqpProxyRequest) { return; } + MadeKqpProxyRequest = true; + auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); - request.SetQuery(query); + request.SetQuery(Query); if (Action.empty() || Action == "execute-script" || Action == "execute") { request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT); @@ -140,11 +196,11 @@ public: request.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_PROFILE); request.SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE); } - if (database) { - request.SetDatabase(database); + if (Database) { + request.SetDatabase(Database); } - if (!Event->Get()->UserToken.empty()) { - event->Record.SetUserToken(Event->Get()->UserToken); + if (UserToken) { + event->Record.SetUserToken(UserToken); } if (Syntax == "yql_v1") { request.SetSyntax(Ydb::Query::Syntax::SYNTAX_YQL_V1); @@ -153,7 +209,27 @@ public: } ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + } + void Bootstrap() { + if (Query.empty()) { + if (Event) { + ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request")); + } else { + auto* response = new TEvViewer::TEvViewerResponse(); + response->Record.MutableQueryResponse()->SetYdbStatus(Ydb::StatusIds::BAD_REQUEST); + ReplyAndPassAway(response); + } + return; + } + + if (Database && !Direct) { + RequestStateStorageEndpointsLookup(Database); // to find some dynamic node and redirect query there + } + + if (Requests == 0) { + SendKpqProxyRequest(); + } Become(&TThis::StateWork, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup()); } @@ -244,23 +320,88 @@ private: } } - void HandleReply(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { - TStringBuilder out; - NJson::TJsonValue jsonResponse; - NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - MakeOkReply(out, jsonResponse, record); + void Connected(TEvInterconnect::TEvNodeConnected::TPtr &) {} + + void Undelivered(TEvents::TEvUndelivered::TPtr &ev) { + if (ev->Get()->SourceType == NViewer::TEvViewer::EvViewerRequest) { + SendKpqProxyRequest(); + } + } + + void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr &) { + SendKpqProxyRequest(); + } + + void SendDynamicNodeQueryRequest() { + ui64 hash = std::hash<TString>()(Event->Get()->Request.GetRemoteAddr()); + + auto itPos = std::next(TenantDynamicNodes.begin(), hash % TenantDynamicNodes.size()); + std::nth_element(TenantDynamicNodes.begin(), itPos, TenantDynamicNodes.end()); + + TNodeId nodeId = *itPos; + SubscribedNodeId = nodeId; + TActorId viewerServiceId = MakeViewerID(nodeId); + + THolder<TEvViewer::TEvViewerRequest> request = MakeHolder<TEvViewer::TEvViewerRequest>(); + request->Record.SetTimeout(Timeout); + auto queryRequest = request->Record.MutableQueryRequest(); + queryRequest->SetUri(TString(Event->Get()->Request.GetUri())); + if (IsPostContent()) { + TStringBuf content = Event->Get()->Request.GetPostContent(); + queryRequest->SetContent(TString(content)); + } + if (UserToken) { + queryRequest->SetUserToken(UserToken); + } + + ViewerWhiteboardCookie cookie(NKikimrViewer::TEvViewerRequest::kQueryRequest, nodeId); + SendRequest(viewerServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie.ToUi64()); + } + + void HandleReply(TEvStateStorage::TEvBoardInfo::TPtr& ev) { + BLOG_TRACE("Received TEvBoardInfo"); + if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + for (const auto& [actorId, infoEntry] : ev->Get()->InfoEntries) { + TenantDynamicNodes.emplace_back(actorId.NodeId()); + } + } + if (TenantDynamicNodes.empty()) { + SendKpqProxyRequest(); } else { - MakeErrorReply(out, jsonResponse, record); + SendDynamicNodeQueryRequest(); } + } + + void Handle(NKikimrKqp::TEvQueryResponse& record) { + if (Event) { + TStringBuilder out; + NJson::TJsonValue jsonResponse; + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + MakeOkReply(out, jsonResponse, record); + } else { + MakeErrorReply(out, jsonResponse, record); + } + + if (Schema == "classic" && Stats.empty() && (Action.empty() || Action == "execute")) { + jsonResponse = std::move(jsonResponse["result"]); + } + + out << NJson::WriteJson(jsonResponse, false); - if (Schema == "classic" && Stats.empty() && (Action.empty() || Action == "execute")) { - jsonResponse = std::move(jsonResponse["result"]); + ReplyAndPassAway(out); + } else { + TEvViewer::TEvViewerResponse* response = new TEvViewer::TEvViewerResponse(); + response->Record.MutableQueryResponse()->CopyFrom(record); + ReplyAndPassAway(response); } + } - out << NJson::WriteJson(jsonResponse, false); + void HandleReply(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + Handle(ev->Get()->Record.GetRef()); + } - ReplyAndPassAway(out); + void HandleReply(TEvViewer::TEvViewerResponse::TPtr& ev) { + Handle(*(ev.Get()->Get()->Record.MutableQueryResponse())); } void HandleReply(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) { @@ -287,11 +428,22 @@ private: } void HandleTimeout() { - ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get())); + if (Event) { + ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get())); + } else { + auto* response = new TEvViewer::TEvViewerResponse(); + response->Record.MutableQueryResponse()->SetYdbStatus(Ydb::StatusIds::TIMEOUT); + ReplyAndPassAway(response); + } + } + + void ReplyAndPassAway(TEvViewer::TEvViewerResponse* response) { + Send(ViewerRequest->Sender, response); + PassAway(); } void ReplyAndPassAway(TString data) { - Send(Initiator, new NMon::TEvHttpInfoRes(std::move(data), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + Send(Event->Sender, new NMon::TEvHttpInfoRes(std::move(data), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); PassAway(); } @@ -421,6 +573,7 @@ struct TJsonRequestParameters<TJsonQuery> { static TString GetParameters() { return R"___([{"name":"ui64","in":"query","description":"return ui64 as number","required":false,"type":"boolean"}, {"name":"query","in":"query","description":"query text","required":true,"type":"string"}, + {"name":"direct","in":"query","description":"force processing query on current node","required":false,"type":"boolean"}, {"name":"syntax","in":"query","description":"query syntax (yql_v1, pg)","required":false,"type":"string"}, {"name":"database","in":"query","description":"database name","required":false,"type":"string"}, {"name":"schema","in":"query","description":"result format schema (classic, modern, ydb)","required":false,"type":"string"}, diff --git a/ydb/core/viewer/json_tenantinfo.h b/ydb/core/viewer/json_tenantinfo.h index 846f1947d0..0ab7e32811 100644 --- a/ydb/core/viewer/json_tenantinfo.h +++ b/ydb/core/viewer/json_tenantinfo.h @@ -386,6 +386,7 @@ public: TenantNodesSystemInfo[tenantId] = std::move(ev->Get()->Record); RequestDone(); break; + case NKikimrViewer::TEvViewerResponse::kQueryResponse: case NKikimrViewer::TEvViewerResponse::RESPONSE_NOT_SET: break; } @@ -418,6 +419,7 @@ public: case NKikimrViewer::TEvViewerRequest::kSystemRequest: SendViewerSystemRequest(tenantId); break; + case NKikimrViewer::TEvViewerRequest::kQueryRequest: case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET: break; } diff --git a/ydb/core/viewer/protos/viewer.proto b/ydb/core/viewer/protos/viewer.proto index 90dc90f1df..e5e0a2f427 100644 --- a/ydb/core/viewer/protos/viewer.proto +++ b/ydb/core/viewer/protos/viewer.proto @@ -5,6 +5,7 @@ import "ydb/core/protos/node_whiteboard.proto"; import "ydb/core/protos/flat_scheme_op.proto"; import "ydb/core/protos/tablet.proto"; import "ydb/core/protos/hive.proto"; +import "ydb/core/protos/kqp.proto"; import "ydb/public/api/protos/ydb_cms.proto"; package NKikimrViewer; @@ -498,12 +499,19 @@ message TNodeLocation { repeated uint32 NodeId = 1; } +message TQueryRequest { + string Uri = 1; + bytes Content = 2; + string UserToken = 3; +} + message TEvViewerRequest { TNodeLocation Location = 1; uint32 Timeout = 2; // ms oneof Request { NKikimrWhiteboard.TEvTabletStateRequest TabletRequest = 11; NKikimrWhiteboard.TEvSystemStateRequest SystemRequest = 12; + TQueryRequest QueryRequest = 13; } } @@ -512,6 +520,7 @@ message TEvViewerResponse { oneof Response { NKikimrWhiteboard.TEvTabletStateResponse TabletResponse = 11; NKikimrWhiteboard.TEvSystemStateResponse SystemResponse = 12; + NKikimrKqp.TEvQueryResponse QueryResponse = 13; } } diff --git a/ydb/core/viewer/viewer_request.cpp b/ydb/core/viewer/viewer_request.cpp index 2721c5b42c..867a5cb034 100644 --- a/ydb/core/viewer/viewer_request.cpp +++ b/ydb/core/viewer/viewer_request.cpp @@ -5,6 +5,7 @@ #include "json_tabletinfo.h" #include "json_sysinfo.h" +#include "json_query.h" namespace NKikimr { namespace NViewer { @@ -71,6 +72,8 @@ IActor* CreateViewerRequestHandler(TEvViewer::TEvViewerRequest::TPtr request) { return new TViewerWhiteboardRequest<TEvWhiteboard::TEvTabletStateRequest, TEvWhiteboard::TEvTabletStateResponse>(request); case NKikimrViewer::TEvViewerRequest::kSystemRequest: return new TViewerWhiteboardRequest<TEvWhiteboard::TEvSystemStateRequest, TEvWhiteboard::TEvSystemStateResponse>(request); + case NKikimrViewer::TEvViewerRequest::kQueryRequest: + return new TJsonQuery(request); case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET: return nullptr; } |