diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-05-17 11:34:57 +0300 |
---|---|---|
committer | Alexey Efimov <xeno@prnwatch.com> | 2022-05-17 11:34:57 +0300 |
commit | 742718e4953ed9e9f897f5194871fc7cec65caa3 (patch) | |
tree | 6501c24969779138b433d19606a22791742e38b9 | |
parent | 1e1dfc1b3545147948cc2190bb15161f630507e9 (diff) | |
download | ydb-742718e4953ed9e9f897f5194871fc7cec65caa3.tar.gz |
add http-mon via ic proxy KIKIMR-14742
ref:b3a758f33f4578f7f8ce2e822783faf41a5ab6cd
-rw-r--r-- | library/cpp/actors/http/http.h | 11 | ||||
-rw-r--r-- | library/cpp/actors/http/http_ut.cpp | 3 | ||||
-rw-r--r-- | library/cpp/monlib/service/pages/index_mon_page.cpp | 33 | ||||
-rw-r--r-- | library/cpp/monlib/service/pages/index_mon_page.h | 5 | ||||
-rw-r--r-- | library/cpp/monlib/service/pages/mon_page.h | 4 | ||||
-rw-r--r-- | ydb/core/mon/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/mon/async_http_mon.cpp | 225 | ||||
-rw-r--r-- | ydb/core/mon/async_http_mon.h | 1 | ||||
-rw-r--r-- | ydb/core/mon/mon_impl.h | 17 | ||||
-rw-r--r-- | ydb/core/protos/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/protos/mon.proto | 28 |
11 files changed, 313 insertions, 16 deletions
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 54ed5d6d8f..38e616cafa 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -116,6 +116,11 @@ public: } return true; } + + // non-destructive variant of AsString + TString AsString() const { + return TString(Data(), Size()); + } }; class THttpRequest { @@ -674,6 +679,12 @@ public: , Address(address) {} + THttpIncomingRequest(TStringBuf content, std::shared_ptr<THttpEndpointInfo> endpoint, const THttpConfig::SocketAddressType& address) + : THttpParser(content) + , Endpoint(std::move(endpoint)) + , Address(address) + {} + bool IsConnectionClose() const { if (Connection.empty()) { return Version == "1.0"; diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index 4ec7761da9..e87de6b6e1 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -182,8 +182,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) { cookies.Set("cookie2", "45678"); headers.Set("Cookie", cookies.Render()); request->Set(headers); - TString requestData; - request->AsString(requestData); + TString requestData = request->AsString(); UNIT_ASSERT_VALUES_EQUAL(requestData, "GET /data/url HTTP/1.1\r\nHost: www.yandex.ru\r\nAccept: */*\r\nCookie: cookie1=123456; cookie2=45678;\r\n"); } diff --git a/library/cpp/monlib/service/pages/index_mon_page.cpp b/library/cpp/monlib/service/pages/index_mon_page.cpp index 1d3d6dac37..de2a5a058c 100644 --- a/library/cpp/monlib/service/pages/index_mon_page.cpp +++ b/library/cpp/monlib/service/pages/index_mon_page.cpp @@ -106,6 +106,39 @@ IMonPage* TIndexMonPage::FindPage(const TString& relativePath) { } } +IMonPage* TIndexMonPage::FindPageByAbsolutePath(const TString& absolutePath) { + TIndexMonPage* page = this; + TString path = absolutePath; + if (path.StartsWith("/")) { + path.erase(0, 1); + } + while (!path.empty()) { + TString tryPath = path; + while (!tryPath.empty()) { + IMonPage* found = page->FindPage(tryPath); + if (found == nullptr) { + size_t slash = tryPath.find_last_of('/'); + if (slash == TString::npos) { + return nullptr; + } + tryPath.resize(slash); + } else { + if (tryPath.size() == path.size()) { + return found; + } + if (found->IsIndex()) { + path = path.substr(tryPath.size() + 1); + page = static_cast<TIndexMonPage*>(found); + break; + } else { + return found; + } + } + } + } + return nullptr; +} + TIndexMonPage* TIndexMonPage::FindIndexPage(const TString& relativePath) { return VerifyDynamicCast<TIndexMonPage*>(FindPage(relativePath)); } diff --git a/library/cpp/monlib/service/pages/index_mon_page.h b/library/cpp/monlib/service/pages/index_mon_page.h index bf514a3105..af96bcd2b9 100644 --- a/library/cpp/monlib/service/pages/index_mon_page.h +++ b/library/cpp/monlib/service/pages/index_mon_page.h @@ -18,6 +18,10 @@ namespace NMonitoring { ~TIndexMonPage() override { } + bool IsIndex() const override { + return true; + } + void Output(IMonHttpRequest& request) override; void OutputIndexPage(IMonHttpRequest& request); virtual void OutputIndex(IOutputStream& out, bool pathEndsWithSlash); @@ -30,6 +34,7 @@ namespace NMonitoring { IMonPage* FindPage(const TString& relativePath); TIndexMonPage* FindIndexPage(const TString& relativePath); + IMonPage* FindPageByAbsolutePath(const TString& absolutePath); void SortPages(); void ClearPages(); diff --git a/library/cpp/monlib/service/pages/mon_page.h b/library/cpp/monlib/service/pages/mon_page.h index 136647e5be..215b58f0ae 100644 --- a/library/cpp/monlib/service/pages/mon_page.h +++ b/library/cpp/monlib/service/pages/mon_page.h @@ -61,6 +61,10 @@ namespace NMonitoring { return !Title.empty(); } + virtual bool IsIndex() const { + return false; + } + virtual void Output(IMonHttpRequest& request) = 0; }; diff --git a/ydb/core/mon/CMakeLists.txt b/ydb/core/mon/CMakeLists.txt index 6ad51ae7b1..1620c5d252 100644 --- a/ydb/core/mon/CMakeLists.txt +++ b/ydb/core/mon/CMakeLists.txt @@ -15,6 +15,7 @@ target_link_libraries(ydb-core-mon PUBLIC cpp-lwtrace-mon cpp-string_utils-url ydb-core-base + ydb-core-protos ydb-library-aclib ) target_sources(ydb-core-mon PRIVATE diff --git a/ydb/core/mon/async_http_mon.cpp b/ydb/core/mon/async_http_mon.cpp index f6e10a7aee..7785d456b4 100644 --- a/ydb/core/mon/async_http_mon.cpp +++ b/ydb/core/mon/async_http_mon.cpp @@ -16,10 +16,31 @@ #include <util/system/hostname.h> +#include <ydb/core/protos/mon.pb.h> + #include "mon_impl.h" namespace NActors { +struct TEvMon { + enum { + EvMonitoringRequest = NActors::NMon::HttpInfo + 10, + EvMonitoringResponse, + End + }; + + static_assert(EvMonitoringRequest > NMon::End, "expect EvMonitoringRequest > NMon::End"); + static_assert(End < EventSpaceEnd(NActors::TEvents::ES_MON), "expect End < EventSpaceEnd(NActors::TEvents::ES_MON)"); + + struct TEvMonitoringRequest : TEventPB<TEvMonitoringRequest, NKikimrMonProto::TEvMonitoringRequest, EvMonitoringRequest> { + TEvMonitoringRequest() = default; + }; + + struct TEvMonitoringResponse : TEventPB<TEvMonitoringResponse, NKikimrMonProto::TEvMonitoringResponse, EvMonitoringResponse> { + TEvMonitoringResponse() = default; + }; +}; + // compatibility layer class THttpMonRequest : public NMonitoring::IMonHttpRequest { public: @@ -39,11 +60,11 @@ public: { } - static TString GetPathFromUrl(TStringBuf url) { - return TString(url.Before('?')); + static TStringBuf GetPathFromUrl(TStringBuf url) { + return url.Before('?'); } - static TString GetPathInfoFromUrl(NMonitoring::IMonPage* page, TStringBuf url) { + static TStringBuf GetPathInfoFromUrl(NMonitoring::IMonPage* page, TStringBuf url) { TString path = GetPageFullPath(page); url.SkipPrefix(path); return GetPathFromUrl(url); @@ -145,7 +166,7 @@ public: class THttpMonRequestContainer : public TStringStream, public THttpMonRequest { public: THttpMonRequestContainer(NHttp::THttpIncomingRequestPtr request, NMonitoring::IMonPage* index) - : THttpMonRequest(request, *this, index, GetPathInfoFromUrl(index, request->URL)) + : THttpMonRequest(request, *this, index, TString(GetPathInfoFromUrl(index, request->URL))) { } }; @@ -371,26 +392,42 @@ public: }; // receives everyhing not related to actor communcation, converts them to request-actors -class THttpMonServiceLegacyIndex : public TActorBootstrapped<THttpMonServiceLegacyIndex> { +class THttpMonServiceLegacyIndex : public TActor<THttpMonServiceLegacyIndex> { public: THttpMonServiceLegacyIndex(TIntrusivePtr<NMonitoring::TIndexMonPage> indexMonPage, const TString& redirectRoot = {}) - : IndexMonPage(std::move(indexMonPage)) + : TActor(&THttpMonServiceLegacyIndex::StateWork) + , IndexMonPage(std::move(indexMonPage)) , RedirectRoot(redirectRoot) { } - void Bootstrap() { - Become(&THttpMonServiceLegacyIndex::StateWork); - } - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { if (ev->Get()->Request->URL == "/" && RedirectRoot) { TStringBuilder response; response << "HTTP/1.1 302 Found\r\nLocation: " << RedirectRoot << "\r\n\r\n"; Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response))); - } else { - Register(new THttpMonLegacyIndexRequest(std::move(ev), IndexMonPage.Get())); + return; + } else if (!ev->Get()->Request->URL.ends_with("/") && ev->Get()->Request->URL.find('?') == TStringBuf::npos) { + TString url(ev->Get()->Request->URL); + bool index = false; + auto itPage = IndexPages.find(url); + if (itPage == IndexPages.end()) { + auto page = IndexMonPage->FindPageByAbsolutePath(url); + if (page) { + index = page->IsIndex(); + IndexPages[url] = index; + } + } else { + index = itPage->second; + } + if (index) { + TStringBuilder response; + response << "HTTP/1.1 302 Found\r\nLocation: " << url + "/" << "\r\n\r\n"; + Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response))); + return; + } } + Register(new THttpMonLegacyIndexRequest(std::move(ev), IndexMonPage.Get())); } STATEFN(StateWork) { @@ -401,6 +438,159 @@ public: TIntrusivePtr<NMonitoring::TIndexMonPage> IndexMonPage; TString RedirectRoot; + std::unordered_map<TString, bool> IndexPages; +}; + +inline TActorId MakeNodeProxyId(ui32 node) { + char x[12] = "nodeproxy"; + return TActorId(node, TStringBuf(x, 12)); +} + +class THttpMonServiceNodeRequest : public TActorBootstrapped<THttpMonServiceNodeRequest> { +public: + std::shared_ptr<NHttp::THttpEndpointInfo> Endpoint; + TEvMon::TEvMonitoringRequest::TPtr Event; + TActorId HttpProxyActorId; + + THttpMonServiceNodeRequest(std::shared_ptr<NHttp::THttpEndpointInfo> endpoint, TEvMon::TEvMonitoringRequest::TPtr event, TActorId httpProxyActorId) + : Endpoint(std::move(endpoint)) + , Event(std::move(event)) + , HttpProxyActorId(httpProxyActorId) + {} + + static void FromProto(NHttp::THttpConfig::SocketAddressType& address, const NKikimrMonProto::TSockAddr& proto) { + switch (proto.GetFamily()) { + case AF_INET: + //address = TSockAddrInet(proto.GetSockAddr4().GetAddress(), proto.GetSockAddr4().GetPort()); + break; + case AF_INET6: + address = TSockAddrInet6(proto.GetSockAddr6().GetAddress().data(), proto.GetSockAddr6().GetPort()); + break; + } + } + + void Bootstrap() { + NHttp::THttpConfig::SocketAddressType address; + FromProto(address, Event->Get()->Record.GetAddress()); + NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(Event->Get()->Record.GetHttpRequest(), Endpoint, address); + TStringBuilder prefix; + prefix << "/node/" << TActivationContext::ActorSystem()->NodeId; + if (request->URL.SkipPrefix(prefix)) { + Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(std::move(request))); + Become(&THttpMonServiceNodeRequest::StateWork); + } else { + auto response = std::make_unique<TEvMon::TEvMonitoringResponse>(); + auto httpResponse = request->CreateResponseBadRequest(); + response->Record.SetHttpResponse(httpResponse->AsString()); + Send(Event->Sender, response.release(), 0, Event->Cookie); + PassAway(); + } + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr& ev) { + auto response = std::make_unique<TEvMon::TEvMonitoringResponse>(); + response->Record.SetHttpResponse(ev->Get()->Response->AsString()); + Send(Event->Sender, response.release(), 0, Event->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + } + } +}; + +class THttpMonServiceMonRequest : public TActorBootstrapped<THttpMonServiceMonRequest> { +public: + NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event; + ui32 NodeId; + + THttpMonServiceMonRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, ui32 nodeId) + : Event(std::move(event)) + , NodeId(nodeId) + {} + + static void ToProto(NKikimrMonProto::TSockAddr& proto, const NHttp::THttpConfig::SocketAddressType& address) { + proto.SetFamily(AF_INET6); + proto.MutableSockAddr6()->SetAddress(address.GetIp()); + proto.MutableSockAddr6()->SetPort(address.GetPort()); + } + + void Bootstrap() { + TActorId monServiceNodeProxy = MakeNodeProxyId(NodeId); + auto request = std::make_unique<TEvMon::TEvMonitoringRequest>(); + request->Record.SetHttpRequest(Event->Get()->Request->AsString()); + // TODO address + Send(monServiceNodeProxy, request.release(), IEventHandle::FlagTrackDelivery); + Become(&THttpMonServiceMonRequest::StateWork); + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + TString reason; + switch (ev->Get()->Reason) { + case TEvents::TEvUndelivered::ReasonUnknown: + reason = "ReasonUnknown"; + break; + case TEvents::TEvUndelivered::ReasonActorUnknown: + reason = "ReasonActorUnknown"; + break; + case TEvents::TEvUndelivered::Disconnected: + reason = "Disconnected"; + break; + } + Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(Event->Get()->Request->CreateResponseServiceUnavailable(reason)), 0, Event->Cookie); + PassAway(); + } + + void Handle(TEvMon::TEvMonitoringResponse::TPtr& ev) { + Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(Event->Get()->Request->CreateResponseString(ev->Get()->Record.GetHttpResponse())), 0, Event->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvMon::TEvMonitoringResponse, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + } + } +}; + +// receives requests to another nodes +class THttpMonServiceNodeProxy : public TActor<THttpMonServiceNodeProxy> { +public: + THttpMonServiceNodeProxy(TActorId httpProxyActorId) + : TActor(&THttpMonServiceNodeProxy::StateWork) + , HttpProxyActorId(httpProxyActorId) + , Endpoint(std::make_shared<NHttp::THttpEndpointInfo>()) + { + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { + TStringBuf url = ev->Get()->Request->URL; + TStringBuf node; + ui32 nodeId; + if (url.SkipPrefix("/node/") && url.NextTok('/', node) && TryFromStringWithDefault<ui32>(node, nodeId)) { + Register(new THttpMonServiceMonRequest(std::move(ev), nodeId)); + return; + } + Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseBadRequest("bad request")), 0, ev->Cookie); + } + + void Handle(TEvMon::TEvMonitoringRequest::TPtr& ev) { + Register(new THttpMonServiceNodeRequest(Endpoint, ev, HttpProxyActorId)); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + hFunc(TEvMon::TEvMonitoringRequest, Handle); + } + } + +protected: + TActorId HttpProxyActorId; + std::shared_ptr<NHttp::THttpEndpointInfo> Endpoint; }; TAsyncHttpMon::TAsyncHttpMon(TConfig config) @@ -427,6 +617,13 @@ void TAsyncHttpMon::Start(TActorSystem* actorSystem) { new THttpMonServiceLegacyIndex(IndexMonPage, Config.RedirectMainPageTo), TMailboxType::ReadAsFilled, ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId); + auto nodeProxyActorId = ActorSystem->Register( + new THttpMonServiceNodeProxy(HttpProxyActorId), + TMailboxType::ReadAsFilled, + ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId); + NodeProxyServiceActorId = MakeNodeProxyId(ActorSystem->NodeId); + ActorSystem->RegisterLocalService(NodeProxyServiceActorId, nodeProxyActorId); + TStringBuilder workerName; workerName << FQDNHostName() << ":" << Config.Port; auto addPort = std::make_unique<NHttp::TEvHttpProxy::TEvAddListeningPort>(); @@ -437,11 +634,12 @@ void TAsyncHttpMon::Start(TActorSystem* actorSystem) { "text/plain", "text/html", "text/css", - "application/javascript", + "text/javascript", "application/json", }; ActorSystem->Send(HttpProxyActorId, addPort.release()); ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/", HttpMonServiceActorId)); + ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/node", NodeProxyServiceActorId)); for (NMonitoring::IMonPage* page : ActorMonPages) { RegisterActorMonPage(page); } @@ -455,6 +653,7 @@ void TAsyncHttpMon::Stop() { for (const TActorId& actorId : ActorServices) { ActorSystem->Send(actorId, new TEvents::TEvPoisonPill); } + ActorSystem->Send(NodeProxyServiceActorId, new TEvents::TEvPoisonPill); ActorSystem->Send(HttpMonServiceActorId, new TEvents::TEvPoisonPill); ActorSystem->Send(HttpProxyActorId, new TEvents::TEvPoisonPill); } diff --git a/ydb/core/mon/async_http_mon.h b/ydb/core/mon/async_http_mon.h index c312bc1030..e7948e14ff 100644 --- a/ydb/core/mon/async_http_mon.h +++ b/ydb/core/mon/async_http_mon.h @@ -32,6 +32,7 @@ protected: TActorSystem* ActorSystem = {}; TActorId HttpProxyActorId; TActorId HttpMonServiceActorId; + TActorId NodeProxyServiceActorId; std::vector<NMonitoring::IMonPage*> ActorMonPages; std::vector<TActorId> ActorServices; diff --git a/ydb/core/mon/mon_impl.h b/ydb/core/mon/mon_impl.h index 495fc47a9b..1ef978750e 100644 --- a/ydb/core/mon/mon_impl.h +++ b/ydb/core/mon/mon_impl.h @@ -319,7 +319,22 @@ public: } void Output(IMonHttpRequest& request) override { - IndexMonPage->Output(request); + auto& out = request.Output(); + out << HTTPOKHTML; + out << "<html>\n"; + IndexMonPage->OutputHead(out); + + out << "<body>\n"; + + // part of common navbar + IndexMonPage->OutputNavBar(out); + + out << "<div class='container'>\n" + << "<h2>" << IndexMonPage->Title << "</h2>\n"; + IndexMonPage->OutputIndex(out, true); + out << "</div>\n" + << "</body>\n"; + out << "</html>\n"; } TIntrusivePtr<TIndexMonPage> IndexMonPage; diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt index ff5d89bbe2..9fdd736a52 100644 --- a/ydb/core/protos/CMakeLists.txt +++ b/ydb/core/protos/CMakeLists.txt @@ -89,6 +89,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/long_tx_service.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/metrics.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/minikql_engine.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/mon.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/msgbus.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/msgbus_health.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/msgbus_kv.proto diff --git a/ydb/core/protos/mon.proto b/ydb/core/protos/mon.proto new file mode 100644 index 0000000000..a0ad78fbda --- /dev/null +++ b/ydb/core/protos/mon.proto @@ -0,0 +1,28 @@ +package NKikimrMonProto; + +message TSockAddr4 { + optional uint32 Address = 1; + optional uint32 Port = 2; +} + +message TSockAddr6 { + optional string Address = 1; + optional uint32 Port = 2; +} + +message TSockAddr { + optional uint32 Family = 1; + oneof Data { + TSockAddr4 SockAddr4 = 2; + TSockAddr6 SockAddr6 = 3; + } +} + +message TEvMonitoringRequest { + optional string HttpRequest = 1; + optional TSockAddr Address = 2; +} + +message TEvMonitoringResponse { + optional string HttpResponse = 1; +} |