diff options
| author | Andrei Rykov <[email protected]> | 2024-02-13 18:56:20 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-13 18:56:20 +0100 |
| commit | f672e573da7fb133ad2bd577eebcc55900fe067c (patch) | |
| tree | d85eaaa2f0b111f4056717afb47351fb38c30405 | |
| parent | 5fd07a879b1b0aee88b4586600df2661af2a507c (diff) | |
YDB-1065 added proxying for render handler (#1872)
| -rw-r--r-- | ydb/core/viewer/json_render.h | 277 | ||||
| -rw-r--r-- | ydb/core/viewer/json_tenantinfo.h | 12 | ||||
| -rw-r--r-- | ydb/core/viewer/protos/viewer.proto | 11 | ||||
| -rw-r--r-- | ydb/core/viewer/protos/ya.make | 1 | ||||
| -rw-r--r-- | ydb/core/viewer/viewer_request.cpp | 5 |
5 files changed, 223 insertions, 83 deletions
diff --git a/ydb/core/viewer/json_render.h b/ydb/core/viewer/json_render.h index e8e0451153a..86fdd26ada9 100644 --- a/ydb/core/viewer/json_render.h +++ b/ydb/core/viewer/json_render.h @@ -4,6 +4,8 @@ #include <ydb/core/graph/api/service.h> #include <ydb/core/graph/api/events.h> #include <library/cpp/json/json_writer.h> +#include "json_pipe_req.h" +#include "viewer_request.h" #include "viewer.h" #include "log.h" @@ -11,12 +13,23 @@ namespace NKikimr { namespace NViewer { using namespace NActors; +using namespace NMonitoring; -class TJsonRender : public TActorBootstrapped<TJsonRender> { +class TJsonRender : public TViewerPipeClient<TJsonRender> { + using TThis = TJsonRender; + using TBase = TViewerPipeClient<TJsonRender>; IViewer* Viewer; NMon::TEvHttpInfo::TPtr Event; + TEvViewer::TEvViewerRequest::TPtr ViewerRequest; + ui32 Timeout = 0; std::vector<TString> Metrics; + TString Database; + TCgiParameters Params; + std::optional<TNodeId> SubscribedNodeId; + std::vector<TNodeId> TenantDynamicNodes; + bool Direct = false; + bool MadeProxyRequest = false; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::VIEWER_HANDLER; @@ -25,116 +38,242 @@ public: TJsonRender(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev) : Viewer(viewer) , Event(ev) - {} + { + const auto& params(Event->Get()->Request.GetParams()); + + InitConfig(params); + Database = params.Get("database"); + Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct); + Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 30000); + } + + TJsonRender(TEvViewer::TEvViewerRequest::TPtr& ev) + : ViewerRequest(ev) + { + auto& request = ViewerRequest->Get()->Record.GetRenderRequest(); + + TCgiParameters params(request.GetUri()); + InitConfig(params); + Direct = true; + Timeout = ViewerRequest->Get()->Record.GetTimeout(); + } void Bootstrap() { - auto postData = Event->Get()->Request.GetPostContent(); + auto postData = Event + ? Event->Get()->Request.GetPostContent() + : ViewerRequest->Get()->Record.GetRenderRequest().GetContent(); BLOG_D("PostData=" << postData); NKikimrGraph::TEvGetMetrics getRequest; if (postData) { - TCgiParameters params(postData); - if (params.Has("target")) { + Params = TCgiParameters(postData); + if (Params.Has("target")) { TString metric; size_t num = 0; for (;;) { - metric = params.Get("target", num); + metric = Params.Get("target", num); if (metric.empty()) { break; } Metrics.push_back(metric); ++num; } - //StringSplitter(params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics); - for (const auto& metric : Metrics) { - getRequest.AddMetrics(metric); - } - } else { - static const TString png1x1 = "\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52\x00\x00\x00\x01\x00\x00\x00\x01\x01" - "\x03\x00\x00\x00\x25\xdb\x56\xca\x00\x00\x00\x03\x50\x4c\x54\x45\x00\x00\x00\xa7\x7a\x3d\xda\x00\x00" - "\x00\x01\x74\x52\x4e\x53\x00\x40\xe6\xd8\x66\x00\x00\x00\x0a\x49\x44\x41\x54\x08\xd7\x63\x60\x00\x00" - "\x00\x02\x00\x01\xe2\x21\xbc\x33\x00\x00\x00\x00\x49\x45\x4e\x44\xae\x42\x60\x82"; - Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOK(Event->Get(), "image/png", png1x1), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); - return PassAway(); } - if (params.Has("from")) { - getRequest.SetTimeFrom(FromStringWithDefault<ui32>(params.Get("from"))); - } - if (params.Has("until")) { - getRequest.SetTimeTo(FromStringWithDefault<ui32>(params.Get("until"))); + //StringSplitter(Params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics); + + if (Database && !Direct) { + RequestStateStorageEndpointsLookup(Database); // to find some dynamic node and redirect there } - if (params.Has("maxDataPoints")) { - getRequest.SetMaxPoints(FromStringWithDefault<ui32>(params.Get("maxDataPoints"), 1000)); + if (Requests == 0) { + SendGraphRequest(); } } else { - Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request"), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); - return PassAway(); + ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request")); + return; } - Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvGetMetrics(std::move(getRequest))); - Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); - Become(&TThis::StateWork); + + Become(&TThis::StateWork, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup()); + } + + void PassAway() override { + if (SubscribedNodeId.has_value()) { + Send(TActivationContext::InterconnectProxy(SubscribedNodeId.value()), new TEvents::TEvUnsubscribe()); + } + TBase::PassAway(); + BLOG_TRACE("PassAway()"); } STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { + hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(TEvents::TEvUndelivered, Undelivered); + hFunc(TEvInterconnect::TEvNodeConnected, Connected); + hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected); + hFunc(TEvViewer::TEvViewerResponse, Handle); hFunc(NGraph::TEvGraph::TEvMetricsResult, Handle); - cFunc(TEvents::TSystem::Wakeup, Timeout); + + cFunc(TEvents::TSystem::Wakeup, HandleTimeout); } } - void Handle(NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) { - const auto& response(ev->Get()->Record); - NJson::TJsonValue json; + void Connected(TEvInterconnect::TEvNodeConnected::TPtr &) {} - if (response.GetError()) { - json["status"] = "error"; - json["error"] = response.GetError(); - Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); - return PassAway(); + void Undelivered(TEvents::TEvUndelivered::TPtr &ev) { + if (ev->Get()->SourceType == NViewer::TEvViewer::EvViewerRequest) { + SendGraphRequest(); + } + } + + void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr &) { + SendGraphRequest(); + } + + void SendDynamicNodeRenderRequest() { + ui64 hash = std::hash<TString>()(Event->Get()->Request.GetRemoteAddr()); + + auto itPos = std::next(TenantDynamicNodes.begin(), hash % TenantDynamicNodes.size()); + std::nth_element(TenantDynamicNodes.begin(), itPos, TenantDynamicNodes.end()); + + TNodeId nodeId = *itPos; + SubscribedNodeId = nodeId; + TActorId viewerServiceId = MakeViewerID(nodeId); + + THolder<TEvViewer::TEvViewerRequest> request = MakeHolder<TEvViewer::TEvViewerRequest>(); + request->Record.SetTimeout(Timeout); + auto renderRequest = request->Record.MutableRenderRequest(); + renderRequest->SetUri(TString(Event->Get()->Request.GetUri())); + + TStringBuf content = Event->Get()->Request.GetPostContent(); + renderRequest->SetContent(TString(content)); + + ViewerWhiteboardCookie cookie(NKikimrViewer::TEvViewerRequest::kRenderRequest, nodeId); + SendRequest(viewerServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie.ToUi64()); + } + + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { + BLOG_TRACE("Received TEvBoardInfo"); + if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + for (const auto& [actorId, infoEntry] : ev->Get()->InfoEntries) { + TenantDynamicNodes.emplace_back(actorId.NodeId()); + } + } + if (TenantDynamicNodes.empty()) { + SendGraphRequest(); + } else { + SendDynamicNodeRenderRequest(); + } + } + + void SendGraphRequest() { + if (MadeProxyRequest) { + return; } - if (response.DataSize() != Metrics.size()) { - json["status"] = "error"; - json["error"] = "Invalid data size received"; - Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + MadeProxyRequest = true; + NKikimrGraph::TEvGetMetrics getRequest; + if (Metrics.size() > 0) { + for (const auto& metric : Metrics) { + getRequest.AddMetrics(metric); + } + } else { + static const TString png1x1 = "\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52\x00\x00\x00\x01\x00\x00\x00\x01\x01" + "\x03\x00\x00\x00\x25\xdb\x56\xca\x00\x00\x00\x03\x50\x4c\x54\x45\x00\x00\x00\xa7\x7a\x3d\xda\x00\x00" + "\x00\x01\x74\x52\x4e\x53\x00\x40\xe6\xd8\x66\x00\x00\x00\x0a\x49\x44\x41\x54\x08\xd7\x63\x60\x00\x00" + "\x00\x02\x00\x01\xe2\x21\xbc\x33\x00\x00\x00\x00\x49\x45\x4e\x44\xae\x42\x60\x82"; + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOK(Event->Get(), "image/png", png1x1), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); return PassAway(); } - for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { - const auto& protoMetric(response.GetData(nMetric)); - if (response.TimeSize() != protoMetric.ValuesSize()) { + if (Params.Has("from")) { + getRequest.SetTimeFrom(FromStringWithDefault<ui32>(Params.Get("from"))); + } + if (Params.Has("until")) { + getRequest.SetTimeTo(FromStringWithDefault<ui32>(Params.Get("until"))); + } + if (Params.Has("maxDataPoints")) { + getRequest.SetMaxPoints(FromStringWithDefault<ui32>(Params.Get("maxDataPoints"), 1000)); + } + Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvGetMetrics(std::move(getRequest))); + } + + void HandleRenderResponse(NKikimrGraph::TEvMetricsResult& response) { + if (Event) { + NJson::TJsonValue json; + + if (response.GetError()) { json["status"] = "error"; - json["error"] = "Invalid value size received"; - Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); - return PassAway(); + json["error"] = response.GetError(); + ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false)); + return; + } + if (response.DataSize() != Metrics.size()) { + json["status"] = "error"; + json["error"] = "Invalid data size received"; + ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false)); + return; } - } - { // graphite - json.SetType(NJson::JSON_ARRAY); for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { const auto& protoMetric(response.GetData(nMetric)); - NJson::TJsonValue& jsonMetric(json.AppendValue({})); - jsonMetric["target"] = Metrics[nMetric]; - jsonMetric["title"] = Metrics[nMetric]; - jsonMetric["tags"]["name"] = Metrics[nMetric]; - NJson::TJsonValue& jsonDataPoints(jsonMetric["datapoints"]); - jsonDataPoints.SetType(NJson::JSON_ARRAY); - for (size_t nTime = 0; nTime < response.TimeSize(); ++nTime) { - NJson::TJsonValue& jsonDataPoint(jsonDataPoints.AppendValue({})); - double value = protoMetric.GetValues(nTime); - if (isnan(value)) { - jsonDataPoint.AppendValue(NJson::TJsonValue(NJson::JSON_NULL)); - } else { - jsonDataPoint.AppendValue(value); + if (response.TimeSize() != protoMetric.ValuesSize()) { + json["status"] = "error"; + json["error"] = "Invalid value size received"; + ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false)); + return; + } + } + { // graphite + json.SetType(NJson::JSON_ARRAY); + for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { + const auto& protoMetric(response.GetData(nMetric)); + NJson::TJsonValue& jsonMetric(json.AppendValue({})); + jsonMetric["target"] = Metrics[nMetric]; + jsonMetric["title"] = Metrics[nMetric]; + jsonMetric["tags"]["name"] = Metrics[nMetric]; + NJson::TJsonValue& jsonDataPoints(jsonMetric["datapoints"]); + jsonDataPoints.SetType(NJson::JSON_ARRAY); + for (size_t nTime = 0; nTime < response.TimeSize(); ++nTime) { + NJson::TJsonValue& jsonDataPoint(jsonDataPoints.AppendValue({})); + double value = protoMetric.GetValues(nTime); + if (isnan(value)) { + jsonDataPoint.AppendValue(NJson::TJsonValue(NJson::JSON_NULL)); + } else { + jsonDataPoint.AppendValue(value); + } + jsonDataPoint.AppendValue(response.GetTime(nTime)); } - jsonDataPoint.AppendValue(response.GetTime(nTime)); } } + + ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false)); + } else { + TEvViewer::TEvViewerResponse* viewerResponse = new TEvViewer::TEvViewerResponse(); + viewerResponse->Record.MutableRenderResponse()->CopyFrom(response); + ReplyAndPassAway(viewerResponse); } + } + + void Handle(NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) { + HandleRenderResponse(ev->Get()->Record); + } + + void Handle(TEvViewer::TEvViewerResponse::TPtr& ev) { + HandleRenderResponse(*(ev.Get()->Get()->Record.MutableRenderResponse())); + } + + void HandleTimeout() { + if (Event) { + ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get())); + } else { + auto* response = new TEvViewer::TEvViewerResponse(); + response->Record.MutableRenderResponse()->SetError("Request timed out"); + ReplyAndPassAway(response); + } + } - Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + void ReplyAndPassAway(TEvViewer::TEvViewerResponse* response) { + Send(ViewerRequest->Sender, response); PassAway(); } - void Timeout() { - Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + void ReplyAndPassAway(TString data) { + Send(Event->Sender, new NMon::TEvHttpInfoRes(std::move(data), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); PassAway(); } }; @@ -144,6 +283,8 @@ struct TJsonRequestParameters<TJsonRender> { static TString GetParameters() { return R"___([{"name":"target","in":"query","description":"metrics comma delimited","required":true,"type":"string"}, {"name":"from","in":"query","description":"time in seconds","required":false,"type":"integer"}, + {"name":"database","in":"query","description":"database name","required":false,"type":"string"}, + {"name":"direct","in":"query","description":"force processing query on current node","required":false,"type":"boolean"}, {"name":"until","in":"query","description":"time in seconds","required":false,"type":"integer"}, {"name":"maxDataPoints","in":"query","description":"maximum number of data points","required":false,"type":"integer"}, {"name":"format","in":"query","description":"response format","required":false,"type":"string"}])___"; diff --git a/ydb/core/viewer/json_tenantinfo.h b/ydb/core/viewer/json_tenantinfo.h index d507ff2a8b2..626d3ad30f8 100644 --- a/ydb/core/viewer/json_tenantinfo.h +++ b/ydb/core/viewer/json_tenantinfo.h @@ -388,11 +388,7 @@ public: TenantNodesSystemInfo[tenantId] = std::move(ev->Get()->Record); RequestDone(); break; - case NKikimrViewer::TEvViewerResponse::kQueryResponse: - case NKikimrViewer::TEvViewerResponse::kReserved14: - case NKikimrViewer::TEvViewerResponse::kReserved15: - case NKikimrViewer::TEvViewerResponse::kReserved16: - case NKikimrViewer::TEvViewerResponse::RESPONSE_NOT_SET: + default: break; } } @@ -424,11 +420,7 @@ public: case NKikimrViewer::TEvViewerRequest::kSystemRequest: SendViewerSystemRequest(tenantId); break; - case NKikimrViewer::TEvViewerRequest::kQueryRequest: - case NKikimrViewer::TEvViewerRequest::kReserved14: - case NKikimrViewer::TEvViewerRequest::kReserved15: - case NKikimrViewer::TEvViewerRequest::kReserved16: - case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET: + default: break; } RequestDone(); diff --git a/ydb/core/viewer/protos/viewer.proto b/ydb/core/viewer/protos/viewer.proto index 62218dde8ac..e5c8fa93a60 100644 --- a/ydb/core/viewer/protos/viewer.proto +++ b/ydb/core/viewer/protos/viewer.proto @@ -1,5 +1,6 @@ syntax = "proto3"; +import "ydb/core/graph/protos/graph.proto"; import "ydb/core/protos/tablet_counters.proto"; import "ydb/core/protos/node_whiteboard.proto"; import "ydb/core/protos/flat_scheme_op.proto"; @@ -519,7 +520,7 @@ message TNodeLocation { repeated uint32 NodeId = 1; } -message TQueryRequest { +message THttpProxyRequest { string Uri = 1; bytes Content = 2; bytes UserToken = 3; @@ -531,10 +532,11 @@ message TEvViewerRequest { oneof Request { NKikimrWhiteboard.TEvTabletStateRequest TabletRequest = 11; NKikimrWhiteboard.TEvSystemStateRequest SystemRequest = 12; - TQueryRequest QueryRequest = 13; - bytes Reserved14 = 14; + THttpProxyRequest QueryRequest = 13; + THttpProxyRequest RenderRequest = 14; bytes Reserved15 = 15; bytes Reserved16 = 16; + bytes Reserved17 = 17; } } @@ -544,9 +546,10 @@ message TEvViewerResponse { NKikimrWhiteboard.TEvTabletStateResponse TabletResponse = 11; NKikimrWhiteboard.TEvSystemStateResponse SystemResponse = 12; NKikimrKqp.TEvQueryResponse QueryResponse = 13; - bytes Reserved14 = 14; + NKikimrGraph.TEvMetricsResult RenderResponse = 14; bytes Reserved15 = 15; bytes Reserved16 = 16; + bytes Reserved17 = 17; } } diff --git a/ydb/core/viewer/protos/ya.make b/ydb/core/viewer/protos/ya.make index 0bb08d57334..11e935009d8 100644 --- a/ydb/core/viewer/protos/ya.make +++ b/ydb/core/viewer/protos/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( ydb/core/protos + ydb/core/graph/protos ) EXCLUDE_TAGS(GO_PROTO) diff --git a/ydb/core/viewer/viewer_request.cpp b/ydb/core/viewer/viewer_request.cpp index ef8892d7877..38801d59d33 100644 --- a/ydb/core/viewer/viewer_request.cpp +++ b/ydb/core/viewer/viewer_request.cpp @@ -6,6 +6,7 @@ #include "json_tabletinfo.h" #include "json_sysinfo.h" #include "json_query.h" +#include "json_render.h" namespace NKikimr { namespace NViewer { @@ -74,9 +75,11 @@ IActor* CreateViewerRequestHandler(TEvViewer::TEvViewerRequest::TPtr& request) { return new TViewerWhiteboardRequest<TEvWhiteboard::TEvSystemStateRequest, TEvWhiteboard::TEvSystemStateResponse>(request); case NKikimrViewer::TEvViewerRequest::kQueryRequest: return new TJsonQuery(request); - case NKikimrViewer::TEvViewerRequest::kReserved14: + case NKikimrViewer::TEvViewerRequest::kRenderRequest: + return new TJsonRender(request); case NKikimrViewer::TEvViewerRequest::kReserved15: case NKikimrViewer::TEvViewerRequest::kReserved16: + case NKikimrViewer::TEvViewerRequest::kReserved17: case NKikimrViewer::TEvViewerRequest::REQUEST_NOT_SET: return nullptr; } |
