summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrei Rykov <[email protected]>2024-02-13 18:56:20 +0100
committerGitHub <[email protected]>2024-02-13 18:56:20 +0100
commitf672e573da7fb133ad2bd577eebcc55900fe067c (patch)
treed85eaaa2f0b111f4056717afb47351fb38c30405
parent5fd07a879b1b0aee88b4586600df2661af2a507c (diff)
YDB-1065 added proxying for render handler (#1872)
-rw-r--r--ydb/core/viewer/json_render.h277
-rw-r--r--ydb/core/viewer/json_tenantinfo.h12
-rw-r--r--ydb/core/viewer/protos/viewer.proto11
-rw-r--r--ydb/core/viewer/protos/ya.make1
-rw-r--r--ydb/core/viewer/viewer_request.cpp5
5 files changed, 223 insertions, 83 deletions
diff --git a/ydb/core/viewer/json_render.h b/ydb/core/viewer/json_render.h
index e8e0451153a..86fdd26ada9 100644
--- a/ydb/core/viewer/json_render.h
+++ b/ydb/core/viewer/json_render.h
@@ -4,6 +4,8 @@
#include <ydb/core/graph/api/service.h>
#include <ydb/core/graph/api/events.h>
#include <library/cpp/json/json_writer.h>
+#include "json_pipe_req.h"
+#include "viewer_request.h"
#include "viewer.h"
#include "log.h"
@@ -11,12 +13,23 @@ namespace NKikimr {
namespace NViewer {
using namespace NActors;
+using namespace NMonitoring;
-class TJsonRender : public TActorBootstrapped<TJsonRender> {
+class TJsonRender : public TViewerPipeClient<TJsonRender> {
+ using TThis = TJsonRender;
+ using TBase = TViewerPipeClient<TJsonRender>;
IViewer* Viewer;
NMon::TEvHttpInfo::TPtr Event;
+ TEvViewer::TEvViewerRequest::TPtr ViewerRequest;
+ ui32 Timeout = 0;
std::vector<TString> Metrics;
+ TString Database;
+ TCgiParameters Params;
+ std::optional<TNodeId> SubscribedNodeId;
+ std::vector<TNodeId> TenantDynamicNodes;
+ bool Direct = false;
+ bool MadeProxyRequest = false;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::VIEWER_HANDLER;
@@ -25,116 +38,242 @@ public:
TJsonRender(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev)
: Viewer(viewer)
, Event(ev)
- {}
+ {
+ const auto& params(Event->Get()->Request.GetParams());
+
+ InitConfig(params);
+ Database = params.Get("database");
+ Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
+ Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 30000);
+ }
+
+ TJsonRender(TEvViewer::TEvViewerRequest::TPtr& ev)
+ : ViewerRequest(ev)
+ {
+ auto& request = ViewerRequest->Get()->Record.GetRenderRequest();
+
+ TCgiParameters params(request.GetUri());
+ InitConfig(params);
+ Direct = true;
+ Timeout = ViewerRequest->Get()->Record.GetTimeout();
+ }
void Bootstrap() {
- auto postData = Event->Get()->Request.GetPostContent();
+ auto postData = Event
+ ? Event->Get()->Request.GetPostContent()
+ : ViewerRequest->Get()->Record.GetRenderRequest().GetContent();
BLOG_D("PostData=" << postData);
NKikimrGraph::TEvGetMetrics getRequest;
if (postData) {
- TCgiParameters params(postData);
- if (params.Has("target")) {
+ Params = TCgiParameters(postData);
+ if (Params.Has("target")) {
TString metric;
size_t num = 0;
for (;;) {
- metric = params.Get("target", num);
+ metric = Params.Get("target", num);
if (metric.empty()) {
break;
}
Metrics.push_back(metric);
++num;
}
- //StringSplitter(params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics);
- for (const auto& metric : Metrics) {
- getRequest.AddMetrics(metric);
- }
- } else {
- static const TString png1x1 = "\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52\x00\x00\x00\x01\x00\x00\x00\x01\x01"
- "\x03\x00\x00\x00\x25\xdb\x56\xca\x00\x00\x00\x03\x50\x4c\x54\x45\x00\x00\x00\xa7\x7a\x3d\xda\x00\x00"
- "\x00\x01\x74\x52\x4e\x53\x00\x40\xe6\xd8\x66\x00\x00\x00\x0a\x49\x44\x41\x54\x08\xd7\x63\x60\x00\x00"
- "\x00\x02\x00\x01\xe2\x21\xbc\x33\x00\x00\x00\x00\x49\x45\x4e\x44\xae\x42\x60\x82";
- Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOK(Event->Get(), "image/png", png1x1), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
- return PassAway();
}
- if (params.Has("from")) {
- getRequest.SetTimeFrom(FromStringWithDefault<ui32>(params.Get("from")));
- }
- if (params.Has("until")) {
- getRequest.SetTimeTo(FromStringWithDefault<ui32>(params.Get("until")));
+ //StringSplitter(Params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics);
+
+ if (Database && !Direct) {
+ RequestStateStorageEndpointsLookup(Database); // to find some dynamic node and redirect there
}
- if (params.Has("maxDataPoints")) {
- getRequest.SetMaxPoints(FromStringWithDefault<ui32>(params.Get("maxDataPoints"), 1000));
+ if (Requests == 0) {
+ SendGraphRequest();
}
} else {
- Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request"), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
- return PassAway();
+ ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request"));
+ return;
}
- Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvGetMetrics(std::move(getRequest)));
- Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup());
- Become(&TThis::StateWork);
+
+ Become(&TThis::StateWork, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
+ }
+
+ void PassAway() override {
+ if (SubscribedNodeId.has_value()) {
+ Send(TActivationContext::InterconnectProxy(SubscribedNodeId.value()), new TEvents::TEvUnsubscribe());
+ }
+ TBase::PassAway();
+ BLOG_TRACE("PassAway()");
}
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(TEvents::TEvUndelivered, Undelivered);
+ hFunc(TEvInterconnect::TEvNodeConnected, Connected);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected);
+ hFunc(TEvViewer::TEvViewerResponse, Handle);
hFunc(NGraph::TEvGraph::TEvMetricsResult, Handle);
- cFunc(TEvents::TSystem::Wakeup, Timeout);
+
+ cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
}
- void Handle(NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) {
- const auto& response(ev->Get()->Record);
- NJson::TJsonValue json;
+ void Connected(TEvInterconnect::TEvNodeConnected::TPtr &) {}
- if (response.GetError()) {
- json["status"] = "error";
- json["error"] = response.GetError();
- Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
- return PassAway();
+ void Undelivered(TEvents::TEvUndelivered::TPtr &ev) {
+ if (ev->Get()->SourceType == NViewer::TEvViewer::EvViewerRequest) {
+ SendGraphRequest();
+ }
+ }
+
+ void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr &) {
+ SendGraphRequest();
+ }
+
+ void SendDynamicNodeRenderRequest() {
+ ui64 hash = std::hash<TString>()(Event->Get()->Request.GetRemoteAddr());
+
+ auto itPos = std::next(TenantDynamicNodes.begin(), hash % TenantDynamicNodes.size());
+ std::nth_element(TenantDynamicNodes.begin(), itPos, TenantDynamicNodes.end());
+
+ TNodeId nodeId = *itPos;
+ SubscribedNodeId = nodeId;
+ TActorId viewerServiceId = MakeViewerID(nodeId);
+
+ THolder<TEvViewer::TEvViewerRequest> request = MakeHolder<TEvViewer::TEvViewerRequest>();
+ request->Record.SetTimeout(Timeout);
+ auto renderRequest = request->Record.MutableRenderRequest();
+ renderRequest->SetUri(TString(Event->Get()->Request.GetUri()));
+
+ TStringBuf content = Event->Get()->Request.GetPostContent();
+ renderRequest->SetContent(TString(content));
+
+ ViewerWhiteboardCookie cookie(NKikimrViewer::TEvViewerRequest::kRenderRequest, nodeId);
+ SendRequest(viewerServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie.ToUi64());
+ }
+
+ void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
+ BLOG_TRACE("Received TEvBoardInfo");
+ if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ for (const auto& [actorId, infoEntry] : ev->Get()->InfoEntries) {
+ TenantDynamicNodes.emplace_back(actorId.NodeId());
+ }
+ }
+ if (TenantDynamicNodes.empty()) {
+ SendGraphRequest();
+ } else {
+ SendDynamicNodeRenderRequest();
+ }
+ }
+
+ void SendGraphRequest() {
+ if (MadeProxyRequest) {
+ return;
}
- if (response.DataSize() != Metrics.size()) {
- json["status"] = "error";
- json["error"] = "Invalid data size received";
- Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
+ MadeProxyRequest = true;
+ NKikimrGraph::TEvGetMetrics getRequest;
+ if (Metrics.size() > 0) {
+ for (const auto& metric : Metrics) {
+ getRequest.AddMetrics(metric);
+ }
+ } else {
+ static const TString png1x1 = "\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52\x00\x00\x00\x01\x00\x00\x00\x01\x01"
+ "\x03\x00\x00\x00\x25\xdb\x56\xca\x00\x00\x00\x03\x50\x4c\x54\x45\x00\x00\x00\xa7\x7a\x3d\xda\x00\x00"
+ "\x00\x01\x74\x52\x4e\x53\x00\x40\xe6\xd8\x66\x00\x00\x00\x0a\x49\x44\x41\x54\x08\xd7\x63\x60\x00\x00"
+ "\x00\x02\x00\x01\xe2\x21\xbc\x33\x00\x00\x00\x00\x49\x45\x4e\x44\xae\x42\x60\x82";
+ Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOK(Event->Get(), "image/png", png1x1), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
return PassAway();
}
- for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) {
- const auto& protoMetric(response.GetData(nMetric));
- if (response.TimeSize() != protoMetric.ValuesSize()) {
+ if (Params.Has("from")) {
+ getRequest.SetTimeFrom(FromStringWithDefault<ui32>(Params.Get("from")));
+ }
+ if (Params.Has("until")) {
+ getRequest.SetTimeTo(FromStringWithDefault<ui32>(Params.Get("until")));
+ }
+ if (Params.Has("maxDataPoints")) {
+ getRequest.SetMaxPoints(FromStringWithDefault<ui32>(Params.Get("maxDataPoints"), 1000));
+ }
+ Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvGetMetrics(std::move(getRequest)));
+ }
+
+ void HandleRenderResponse(NKikimrGraph::TEvMetricsResult& response) {
+ if (Event) {
+ NJson::TJsonValue json;
+
+ if (response.GetError()) {
json["status"] = "error";
- json["error"] = "Invalid value size received";
- Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
- return PassAway();
+ json["error"] = response.GetError();
+ ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false));
+ return;
+ }
+ if (response.DataSize() != Metrics.size()) {
+ json["status"] = "error";
+ json["error"] = "Invalid data size received";
+ ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false));
+ return;
}
- }
- { // graphite
- json.SetType(NJson::JSON_ARRAY);
for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) {
const auto& protoMetric(response.GetData(nMetric));
- NJson::TJsonValue& jsonMetric(json.AppendValue({}));
- jsonMetric["target"] = Metrics[nMetric];
- jsonMetric["title"] = Metrics[nMetric];
- jsonMetric["tags"]["name"] = Metrics[nMetric];
- NJson::TJsonValue& jsonDataPoints(jsonMetric["datapoints"]);
- jsonDataPoints.SetType(NJson::JSON_ARRAY);
- for (size_t nTime = 0; nTime < response.TimeSize(); ++nTime) {
- NJson::TJsonValue& jsonDataPoint(jsonDataPoints.AppendValue({}));
- double value = protoMetric.GetValues(nTime);
- if (isnan(value)) {
- jsonDataPoint.AppendValue(NJson::TJsonValue(NJson::JSON_NULL));
- } else {
- jsonDataPoint.AppendValue(value);
+ if (response.TimeSize() != protoMetric.ValuesSize()) {
+ json["status"] = "error";
+ json["error"] = "Invalid value size received";
+ ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false));
+ return;
+ }
+ }
+ { // graphite
+ json.SetType(NJson::JSON_ARRAY);
+ for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) {
+ const auto& protoMetric(response.GetData(nMetric));
+ NJson::TJsonValue& jsonMetric(json.AppendValue({}));
+ jsonMetric["target"] = Metrics[nMetric];
+ jsonMetric["title"] = Metrics[nMetric];
+ jsonMetric["tags"]["name"] = Metrics[nMetric];
+ NJson::TJsonValue& jsonDataPoints(jsonMetric["datapoints"]);
+ jsonDataPoints.SetType(NJson::JSON_ARRAY);
+ for (size_t nTime = 0; nTime < response.TimeSize(); ++nTime) {
+ NJson::TJsonValue& jsonDataPoint(jsonDataPoints.AppendValue({}));
+ double value = protoMetric.GetValues(nTime);
+ if (isnan(value)) {
+ jsonDataPoint.AppendValue(NJson::TJsonValue(NJson::JSON_NULL));
+ } else {
+ jsonDataPoint.AppendValue(value);
+ }
+ jsonDataPoint.AppendValue(response.GetTime(nTime));
}
- jsonDataPoint.AppendValue(response.GetTime(nTime));
}
}
+
+ ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false));
+ } else {
+ TEvViewer::TEvViewerResponse* viewerResponse = new TEvViewer::TEvViewerResponse();
+ viewerResponse->Record.MutableRenderResponse()->CopyFrom(response);
+ ReplyAndPassAway(viewerResponse);
}
+ }
+
+ void Handle(NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) {
+ HandleRenderResponse(ev->Get()->Record);
+ }
+
+ void Handle(TEvViewer::TEvViewerResponse::TPtr& ev) {
+ HandleRenderResponse(*(ev.Get()->Get()->Record.MutableRenderResponse()));
+ }
+
+ void HandleTimeout() {
+ if (Event) {
+ ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()));
+ } else {
+ auto* response = new TEvViewer::TEvViewerResponse();
+ response->Record.MutableRenderResponse()->SetError("Request timed out");
+ ReplyAndPassAway(response);
+ }
+ }
- Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
+ void ReplyAndPassAway(TEvViewer::TEvViewerResponse* response) {
+ Send(ViewerRequest->Sender, response);
PassAway();
}
- void Timeout() {
- Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
+ void ReplyAndPassAway(TString data) {
+ Send(Event->Sender, new NMon::TEvHttpInfoRes(std::move(data), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
PassAway();
}
};
@@ -144,6 +283,8 @@ struct TJsonRequestParameters<TJsonRender> {
static TString GetParameters() {
return R"___([{"name":"target","in":"query","description":"metrics comma delimited","required":true,"type":"string"},
{"name":"from","in":"query","description":"time in seconds","required":false,"type":"integer"},
+ {"name":"database","in":"query","description":"database name","required":false,"type":"string"},
+ {"name":"direct","in":"query","description":"force processing query on current node","required":false,"type":"boolean"},
{"name":"until","in":"query","description":"time in seconds","required":false,"type":"integer"},
{"name":"maxDataPoints","in":"query","description":"maximum number of data points","required":false,"type":"integer"},
{"name":"format","in":"query","description":"response format","required":false,"type":"string"}])___";
diff --git a/ydb/core/viewer/json_tenantinfo.h b/ydb/core/viewer/json_tenantinfo.h
index d507ff2a8b2..626d3ad30f8 100644
--- a/ydb/core/viewer/json_tenantinfo.h
+++ b/ydb/core/viewer/json_tenantinfo.h
@@ -388,11 +388,7 @@ public:
TenantNodesSystemInfo[tenantId] = std::move(ev->Get()->Record);
RequestDone();
break;
- case NKikimrViewer::TEvViewerResponse::kQueryResponse:
- case NKikimrViewer::TEvViewerResponse::kReserved14:
- case NKikimrViewer::TEvViewerResponse::kReserved15:
- case NKikimrViewer::TEvViewerResponse::kReserved16:
- case NKikimrViewer::TEvViewerResponse::RESPONSE_NOT_SET:
+ default:
break;
}
}
@@ -424,11 +420,7 @@ public:
case NKikimrViewer::TEvViewerRequest::kSystemRequest:
SendViewerSystemRequest(tenantId);
break;
- case NKikimrViewer::TEvViewerRequest::kQueryRequest:
- case NKikimrViewer::TEvViewerRequest::kReserved14:
- case NKikimrViewer::TEvViewerRequest::kReserved15:
- case NKikimrViewer::TEvViewerRequest::kReserved16:
- case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET:
+ default:
break;
}
RequestDone();
diff --git a/ydb/core/viewer/protos/viewer.proto b/ydb/core/viewer/protos/viewer.proto
index 62218dde8ac..e5c8fa93a60 100644
--- a/ydb/core/viewer/protos/viewer.proto
+++ b/ydb/core/viewer/protos/viewer.proto
@@ -1,5 +1,6 @@
syntax = "proto3";
+import "ydb/core/graph/protos/graph.proto";
import "ydb/core/protos/tablet_counters.proto";
import "ydb/core/protos/node_whiteboard.proto";
import "ydb/core/protos/flat_scheme_op.proto";
@@ -519,7 +520,7 @@ message TNodeLocation {
repeated uint32 NodeId = 1;
}
-message TQueryRequest {
+message THttpProxyRequest {
string Uri = 1;
bytes Content = 2;
bytes UserToken = 3;
@@ -531,10 +532,11 @@ message TEvViewerRequest {
oneof Request {
NKikimrWhiteboard.TEvTabletStateRequest TabletRequest = 11;
NKikimrWhiteboard.TEvSystemStateRequest SystemRequest = 12;
- TQueryRequest QueryRequest = 13;
- bytes Reserved14 = 14;
+ THttpProxyRequest QueryRequest = 13;
+ THttpProxyRequest RenderRequest = 14;
bytes Reserved15 = 15;
bytes Reserved16 = 16;
+ bytes Reserved17 = 17;
}
}
@@ -544,9 +546,10 @@ message TEvViewerResponse {
NKikimrWhiteboard.TEvTabletStateResponse TabletResponse = 11;
NKikimrWhiteboard.TEvSystemStateResponse SystemResponse = 12;
NKikimrKqp.TEvQueryResponse QueryResponse = 13;
- bytes Reserved14 = 14;
+ NKikimrGraph.TEvMetricsResult RenderResponse = 14;
bytes Reserved15 = 15;
bytes Reserved16 = 16;
+ bytes Reserved17 = 17;
}
}
diff --git a/ydb/core/viewer/protos/ya.make b/ydb/core/viewer/protos/ya.make
index 0bb08d57334..11e935009d8 100644
--- a/ydb/core/viewer/protos/ya.make
+++ b/ydb/core/viewer/protos/ya.make
@@ -6,6 +6,7 @@ SRCS(
PEERDIR(
ydb/core/protos
+ ydb/core/graph/protos
)
EXCLUDE_TAGS(GO_PROTO)
diff --git a/ydb/core/viewer/viewer_request.cpp b/ydb/core/viewer/viewer_request.cpp
index ef8892d7877..38801d59d33 100644
--- a/ydb/core/viewer/viewer_request.cpp
+++ b/ydb/core/viewer/viewer_request.cpp
@@ -6,6 +6,7 @@
#include "json_tabletinfo.h"
#include "json_sysinfo.h"
#include "json_query.h"
+#include "json_render.h"
namespace NKikimr {
namespace NViewer {
@@ -74,9 +75,11 @@ IActor* CreateViewerRequestHandler(TEvViewer::TEvViewerRequest::TPtr& request) {
return new TViewerWhiteboardRequest<TEvWhiteboard::TEvSystemStateRequest, TEvWhiteboard::TEvSystemStateResponse>(request);
case NKikimrViewer::TEvViewerRequest::kQueryRequest:
return new TJsonQuery(request);
- case NKikimrViewer::TEvViewerRequest::kReserved14:
+ case NKikimrViewer::TEvViewerRequest::kRenderRequest:
+ return new TJsonRender(request);
case NKikimrViewer::TEvViewerRequest::kReserved15:
case NKikimrViewer::TEvViewerRequest::kReserved16:
+ case NKikimrViewer::TEvViewerRequest::kReserved17:
case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET:
return nullptr;
}