summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Efimov <[email protected]>2024-12-17 16:02:53 +0100
committerGitHub <[email protected]>2024-12-17 22:02:53 +0700
commite1f89b09d963220259d8c0ecc76b1db6d9797bec (patch)
treeae034203f9d4e1354d27311cdaaa1d0346973964
parent426ac15d371f5a5310ff7e10821b24a509feb098 (diff)
get rid of sync http mon (#12637)
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp10
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/astest.h18
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp6
-rw-r--r--ydb/core/driver_lib/run/run.cpp13
-rw-r--r--ydb/core/http_proxy/ut/datastreams_fixture.h6
-rw-r--r--ydb/core/mon/async_http_mon.cpp941
-rw-r--r--ydb/core/mon/async_http_mon.h54
-rw-r--r--ydb/core/mon/mon.cpp979
-rw-r--r--ydb/core/mon/mon.h53
-rw-r--r--ydb/core/mon/sync_http_mon.cpp120
-rw-r--r--ydb/core/mon/sync_http_mon.h37
-rw-r--r--ydb/core/mon/ya.make4
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp2
-rw-r--r--ydb/core/protos/feature_flags.proto2
-rw-r--r--ydb/core/testlib/actors/test_runtime.cpp21
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp10
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp6
17 files changed, 1046 insertions, 1236 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp
index 9dab031de00..a63f61128a2 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp
@@ -4,7 +4,7 @@
#include "blobstorage_pdisk_ut_base_test.h"
#include <ydb/core/base/appdata.h>
-#include <ydb/core/mon/sync_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/blobstorage/crypto/default.h>
#include <ydb/library/pdisk_io/aio.h>
#include <ydb/core/util/random.h>
@@ -142,17 +142,21 @@ void Run(TVector<IActor*> tests, TTestRunConfig runCfg) {
if (IsMonitoringEnabled) {
// Monitoring startup
- monitoring.Reset(new NActors::TSyncHttpMon({
+ monitoring.Reset(new NActors::TMon({
.Port = pm.GetPort(8081),
.Title = "TestYard monitoring"
}));
appData.Mon = monitoring.Get();
monitoring->RegisterCountersPage("counters", "Counters", mainCounters);
- monitoring->Start();
}
actorSystem1->Start();
+
+ if (IsMonitoringEnabled) {
+ monitoring->Start(actorSystem1.Get());
+ }
+
Sleep(TDuration::MilliSeconds(runCfg.BeforeTestSleepMs));
VERBOSE_COUT("Sending TEvBoot to test");
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/astest.h b/ydb/core/blobstorage/ut_vdisk/lib/astest.h
index fe8c6b0e124..8d74c586a99 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/astest.h
+++ b/ydb/core/blobstorage/ut_vdisk/lib/astest.h
@@ -6,7 +6,7 @@
#include <ydb/library/actors/core/executor_pool_basic.h>
#include <ydb/library/actors/core/executor_pool_io.h>
#include <ydb/library/actors/core/scheduler_basic.h>
-#include <ydb/core/mon/sync_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/library/actors/interconnect/interconnect.h>
#include <ydb/library/actors/protos/services_common.pb.h>
#include <ydb/core/base/appdata.h>
@@ -82,6 +82,7 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) {
NKikimrServices::EServiceKikimr_Name
);
TString explanation;
+ logSettings->SetLevel(NLog::PRI_TRACE, NActorsServices::EServiceCommon::HTTP, explanation);
//logSettings->SetLevel(NLog::PRI_INFO, NKikimrServices::BS_SKELETON, explanation);
//logSettings->SetLevel(NLog::PRI_INFO, NKikimrServices::BS_HULLCOMP, explan
NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(logSettings,
@@ -92,12 +93,6 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) {
setup1->LocalServices.push_back(std::move(loggerActorPair));
//////////////////////////////////////////////////////////////////////////////
- ///////////////////////// SETUP TEST ACTOR ///////////////////////////////////
- NActors::TActorId testActorId = NActors::TActorId(1, "test123");
- TActorSetupCmd testActorSetup(testActor, TMailboxType::Simple, 0);
- setup1->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(testActorId, std::move(testActorSetup)));
- //////////////////////////////////////////////////////////////////////////////
-
///////////////////////// TYPE REGISTRY //////////////////////////////////////
TIntrusivePtr<NKikimr::NScheme::TTypeRegistry> typeRegistry(new NKikimr::NScheme::TKikimrTypeRegistry());
//////////////////////////////////////////////////////////////////////////////
@@ -106,14 +101,13 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) {
if (!MonPort) {
MonPort = pm.GetPort(MonPort);
}
- Monitoring.reset(new NActors::TSyncHttpMon({
+ Monitoring.reset(new NActors::TMon({
.Port = MonPort,
.Title = "at"
}));
NMonitoring::TIndexMonPage *actorsMonPage = Monitoring->RegisterIndexPage("actors", "Actors");
Y_UNUSED(actorsMonPage);
Monitoring->RegisterCountersPage("counters", "Counters", Counters);
- Monitoring->Start();
loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Monitoring settings set up");
//////////////////////////////////////////////////////////////////////////////
@@ -126,11 +120,15 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) {
ActorSystem1.reset(new TActorSystem(setup1, AppData.get(), logSettings));
loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Actor system created");
-
ActorSystem1->Start();
LOG_NOTICE(*ActorSystem1, NActorsServices::TEST, "Actor system started");
+ Monitoring->Start(ActorSystem1.get()).wait();
+ ///////////////////////// SETUP TEST ACTOR ///////////////////////////////////
+ NActors::TActorId testActorId = NActors::TActorId(1, "test123");
+ ActorSystem1->RegisterLocalService(testActorId, ActorSystem1->Register(testActor, TMailboxType::Simple, 0));
+ //////////////////////////////////////////////////////////////////////////////
DoneEvent.Wait();
ActorSystem1->Stop();
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp b/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp
index cbc459a4cc8..c692fb26663 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp
@@ -9,7 +9,7 @@
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h>
-#include <ydb/core/mon/sync_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/scheme/scheme_type_registry.h>
#include <ydb/library/actors/core/executor_pool_basic.h>
@@ -368,13 +368,12 @@ void TConfiguration::Prepare(IVDiskSetup *vdiskSetup, bool newPDisks, bool runRe
//////////////////////////////////////////////////////////////////////////////
///////////////////////// MONITORING SETTINGS /////////////////////////////////
- Monitoring.reset(new NActors::TSyncHttpMon({
+ Monitoring.reset(new NActors::TMon({
.Port = 8088,
.Title = "at"
}));
NMonitoring::TIndexMonPage *actorsMonPage = Monitoring->RegisterIndexPage("actors", "Actors");
Monitoring->RegisterCountersPage("counters", "Counters", Counters);
- Monitoring->Start();
loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Monitoring settings set up");
//////////////////////////////////////////////////////////////////////////////
@@ -391,6 +390,7 @@ void TConfiguration::Prepare(IVDiskSetup *vdiskSetup, bool newPDisks, bool runRe
loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Actor system created");
ActorSystem1->Start();
+ Monitoring->Start(ActorSystem1.get());
LOG_NOTICE(*ActorSystem1, NActorsServices::TEST, "Actor system started");
}
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 788e82d98cc..9eea1050c65 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -35,8 +35,7 @@
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/grpc_services/grpc_mon.h>
#include <ydb/core/log_backend/log_backend.h>
-#include <ydb/core/mon/sync_http_mon.h>
-#include <ydb/core/mon/async_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/mon/crossref.h>
#include <ydb/core/mon_alloc/profiler.h>
@@ -473,11 +472,7 @@ void TKikimrRunner::InitializeMonitoring(const TKikimrRunConfig& runConfig, bool
if (ModuleFactories && ModuleFactories->MonitoringFactory) {
Monitoring = ModuleFactories->MonitoringFactory(std::move(monConfig), appConfig);
} else {
- if (appConfig.GetFeatureFlags().GetEnableAsyncHttpMon()) {
- Monitoring = new NActors::TAsyncHttpMon(std::move(monConfig));
- } else {
- Monitoring = new NActors::TSyncHttpMon(std::move(monConfig));
- }
+ Monitoring = new NActors::TMon(std::move(monConfig));
}
if (Monitoring) {
Monitoring->RegisterCountersPage("counters", "Counters", Counters);
@@ -1783,10 +1778,10 @@ void TKikimrRunner::KikimrStop(bool graceful) {
if (enableReleaseNodeNameOnGracefulShutdown) {
using namespace NKikimr::NNodeBroker;
using TEvent = TEvNodeBroker::TEvGracefulShutdownRequest;
-
+
const ui32 nodeId = ActorSystem->NodeId;
bool isDynamicNode = AppData->DynamicNameserviceConfig->MinDynamicNodeId <= nodeId;
-
+
if (isDynamicNode) {
NTabletPipe::TClientConfig pipeConfig;
pipeConfig.RetryPolicy = {.RetryLimitCount = 10};
diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h
index ed8ab1c05d2..af00bc66209 100644
--- a/ydb/core/http_proxy/ut/datastreams_fixture.h
+++ b/ydb/core/http_proxy/ut/datastreams_fixture.h
@@ -17,7 +17,7 @@
#include <ydb/core/http_proxy/http_req.h>
#include <ydb/core/http_proxy/http_service.h>
#include <ydb/core/http_proxy/metrics_actor.h>
-#include <ydb/core/mon/sync_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/ymq/actor/auth_multi_factory.h>
#include <ydb/library/aclib/aclib.h>
@@ -839,7 +839,7 @@ private:
MonPort = TPortManager().GetPort();
Counters = new NMonitoring::TDynamicCounters();
- Monitoring.Reset(new NActors::TSyncHttpMon({
+ Monitoring.Reset(new NActors::TMon({
.Port = MonPort,
.Address = "127.0.0.1",
.Threads = 3,
@@ -847,7 +847,7 @@ private:
.Host = "127.0.0.1",
}));
Monitoring->RegisterCountersPage("counters", "Counters", Counters);
- Monitoring->Start();
+ Monitoring->Start(ActorRuntime->GetAnyNodeActorSystem());
Sleep(TDuration::Seconds(1));
diff --git a/ydb/core/mon/async_http_mon.cpp b/ydb/core/mon/async_http_mon.cpp
deleted file mode 100644
index b7a92e6dfac..00000000000
--- a/ydb/core/mon/async_http_mon.cpp
+++ /dev/null
@@ -1,941 +0,0 @@
-#include "async_http_mon.h"
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-#include <ydb/library/actors/http/http_proxy.h>
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/grpc_services/base/base.h>
-#include <ydb/core/base/ticket_parser.h>
-
-#include <library/cpp/json/json_writer.h>
-#include <library/cpp/lwtrace/all.h>
-#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
-#include <ydb/library/actors/core/probes.h>
-#include <ydb/core/base/monitoring_provider.h>
-
-#include <library/cpp/monlib/service/pages/version_mon_page.h>
-#include <library/cpp/monlib/service/pages/mon_page.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/dynamic_counters/page.h>
-
-#include <util/system/hostname.h>
-
-#include <ydb/core/base/counters.h>
-#include <ydb/core/protos/mon.pb.h>
-
-#include "mon_impl.h"
-
-namespace NActors {
-
-struct TEvMon {
- enum {
- EvMonitoringRequest = NActors::NMon::HttpInfo + 10,
- EvMonitoringResponse,
- End
- };
-
- static_assert(EvMonitoringRequest > NMon::End, "expect EvMonitoringRequest > NMon::End");
- static_assert(End < EventSpaceEnd(NActors::TEvents::ES_MON), "expect End < EventSpaceEnd(NActors::TEvents::ES_MON)");
-
- struct TEvMonitoringRequest : TEventPB<TEvMonitoringRequest, NKikimrMonProto::TEvMonitoringRequest, EvMonitoringRequest> {
- TEvMonitoringRequest() = default;
- };
-
- struct TEvMonitoringResponse : TEventPB<TEvMonitoringResponse, NKikimrMonProto::TEvMonitoringResponse, EvMonitoringResponse> {
- TEvMonitoringResponse() = default;
- };
-};
-
-// compatibility layer
-class THttpMonRequest : public NMonitoring::IMonHttpRequest {
-public:
- NHttp::THttpIncomingRequestPtr Request;
- TStringStream& Response;
- NMonitoring::IMonPage* Page;
- TString PathInfo;
- mutable std::unique_ptr<THttpHeaders> Headers;
- mutable std::unique_ptr<TCgiParameters> Params;
- mutable std::unique_ptr<TCgiParameters> PostParams;
-
- THttpMonRequest(NHttp::THttpIncomingRequestPtr request, TStringStream& response, NMonitoring::IMonPage* page, const TString& pathInfo)
- : Request(request)
- , Response(response)
- , Page(page)
- , PathInfo(pathInfo)
- {
- }
-
- static TStringBuf GetPathFromUrl(TStringBuf url) {
- return url.Before('?');
- }
-
- static TStringBuf GetPathInfoFromUrl(NMonitoring::IMonPage* page, TStringBuf url) {
- TString path = GetPageFullPath(page);
- url.SkipPrefix(path);
- return GetPathFromUrl(url);
- }
-
- virtual IOutputStream& Output() override {
- return Response;
- }
-
- virtual HTTP_METHOD GetMethod() const override {
- if (Request->Method == "GET") {
- return HTTP_METHOD_GET;
- }
- if (Request->Method == "OPTIONS") {
- return HTTP_METHOD_OPTIONS;
- }
- if (Request->Method == "POST") {
- return HTTP_METHOD_POST;
- }
- if (Request->Method == "HEAD") {
- return HTTP_METHOD_HEAD;
- }
- if (Request->Method == "PUT") {
- return HTTP_METHOD_PUT;
- }
- if (Request->Method == "DELETE") {
- return HTTP_METHOD_DELETE;
- }
- return HTTP_METHOD_UNDEFINED;
- }
-
- virtual TStringBuf GetPath() const override {
- return GetPathFromUrl(Request->URL);
- }
-
- virtual TStringBuf GetPathInfo() const override {
- return PathInfo;
- }
-
- virtual TStringBuf GetUri() const override {
- return Request->URL;
- }
-
- virtual const TCgiParameters& GetParams() const override {
- if (!Params) {
- Params = std::make_unique<TCgiParameters>(Request->URL.After('?'));
- }
- return *Params;
- }
-
- virtual const TCgiParameters& GetPostParams() const override {
- if (!PostParams) {
- PostParams = std::make_unique<TCgiParameters>(Request->Body);
- }
- return *PostParams;
- }
-
- virtual TStringBuf GetPostContent() const override {
- return Request->Body;
- }
-
- virtual const THttpHeaders& GetHeaders() const override {
- if (!Headers) {
- TString strHeaders(Request->Headers);
- TStringInput headers(strHeaders);
- Headers = std::make_unique<THttpHeaders>(&headers);
- }
- return *Headers;
- }
-
- virtual TStringBuf GetHeader(TStringBuf name) const override {
- auto header = GetHeaders().FindHeader(name);
- if (header) {
- return header->Value();
- }
- return {};
- }
-
- bool AcceptsJsonResponse() {
- TStringBuf acceptHeader = GetHeader("Accept");
- return acceptHeader.find(TStringBuf("application/json")) != TStringBuf::npos;
- }
-
- virtual TStringBuf GetCookie(TStringBuf name) const override {
- NHttp::TCookies cookies(GetHeader("Cookie"));
- return cookies.Get(name);
- }
-
- virtual TString GetRemoteAddr() const override {
- if (Request->Address) {
- return Request->Address->ToString();
- }
- return {};
- }
-
- virtual TString GetServiceTitle() const override {
- return {};
- }
-
- virtual NMonitoring::IMonPage* GetPage() const override {
- return Page;
- }
-
- virtual IMonHttpRequest* MakeChild(NMonitoring::IMonPage* page, const TString& pathInfo) const override {
- return new THttpMonRequest(Request, Response, page, pathInfo);
- }
-};
-
-// container for legacy requests
-class THttpMonRequestContainer : public TStringStream, public THttpMonRequest {
-public:
- THttpMonRequestContainer(NHttp::THttpIncomingRequestPtr request, NMonitoring::IMonPage* index)
- : THttpMonRequest(request, *this, index, TString(GetPathInfoFromUrl(index, request->URL)))
- {
- }
-};
-
-// handles actor communication
-class THttpMonLegacyActorRequest : public TActorBootstrapped<THttpMonLegacyActorRequest> {
-public:
- NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
- THttpMonRequestContainer Container;
- TIntrusivePtr<TActorMonPage> ActorMonPage;
-
- THttpMonLegacyActorRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, TIntrusivePtr<TActorMonPage> actorMonPage)
- : Event(std::move(event))
- , Container(Event->Get()->Request, actorMonPage.Get())
- , ActorMonPage(actorMonPage)
- {}
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_REQUEST;
- }
-
- void Bootstrap() {
- if (Event->Get()->Request->Method == "OPTIONS") {
- return ReplyOptionsAndPassAway();
- }
- Become(&THttpMonLegacyActorRequest::StateFunc);
- if (ActorMonPage->Authorizer) {
- NActors::IEventHandle* handle = ActorMonPage->Authorizer(SelfId(), Container);
- if (handle) {
- TActivationContext::Send(handle);
- return;
- }
- }
- SendRequest();
- }
- void ReplyWith(NHttp::THttpOutgoingResponsePtr response) {
- if (response->Status.StartsWith("2")) {
- TString url(Event->Get()->Request->URL.Before('?'));
- TString status(response->Status);
- NMonitoring::THistogramPtr ResponseTimeHgram = NKikimr::GetServiceCounters(NKikimr::AppData()->Counters,
- ActorMonPage->MonServiceName)
- ->GetSubgroup("subsystem", "mon")
- ->GetSubgroup("url", url)
- ->GetSubgroup("status", status)
- ->GetHistogram("ResponseTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1));
- ResponseTimeHgram->Collect(Event->Get()->Request->Timer.Passed() * 1000);
- }
- Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- }
-
- void ReplyOptionsAndPassAway() {
- NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
- TString url(request->URL.Before('?'));
- TString type = mimetypeByExt(url.data());
- if (type.empty()) {
- type = "application/json";
- }
- NHttp::THeaders headers(request->Headers);
- TString origin = TString(headers["Origin"]);
- if (origin.empty()) {
- origin = "*";
- }
- TStringBuilder response;
- response << "HTTP/1.1 204 No Content\r\n"
- "Access-Control-Allow-Origin: " << origin << "\r\n"
- "Access-Control-Allow-Credentials: true\r\n"
- "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept,X-Trace-Verbosity,X-Want-Trace,traceparent\r\n"
- "Access-Control-Expose-Headers: traceresponse,X-Worker-Name\r\n"
- "Access-Control-Allow-Methods: OPTIONS,GET,POST,PUT,DELETE\r\n"
- "Content-Type: " << type << "\r\n"
- "Connection: keep-alive\r\n\r\n";
- ReplyWith(request->CreateResponseString(response));
- PassAway();
- }
-
- bool CredentialsProvided() {
- return Container.GetCookie("ydb_session_id") || Container.GetHeader("Authorization");
- }
-
- TString YdbToHttpError(Ydb::StatusIds::StatusCode status) {
- switch (status) {
- case Ydb::StatusIds::UNAUTHORIZED:
- // YDB status UNAUTHORIZED is used for both access denied case and if no credentials were provided.
- return CredentialsProvided() ? "403 Forbidden" : "401 Unauthorized";
- case Ydb::StatusIds::INTERNAL_ERROR:
- return "500 Internal Server Error";
- case Ydb::StatusIds::UNAVAILABLE:
- return "503 Service Unavailable";
- case Ydb::StatusIds::OVERLOADED:
- return "429 Too Many Requests";
- case Ydb::StatusIds::TIMEOUT:
- return "408 Request Timeout";
- case Ydb::StatusIds::PRECONDITION_FAILED:
- return "412 Precondition Failed";
- default:
- return "400 Bad Request";
- }
- }
-
- void ReplyErrorAndPassAway(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result) {
- ReplyErrorAndPassAway(result.Status, result.Issues, true);
- }
-
- void ReplyErrorAndPassAway(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues, bool addAccessControlHeaders) {
- NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
- TStringBuilder response;
- TStringBuilder body;
- TStringBuf contentType;
- const TString httpError = YdbToHttpError(status);
-
- if (Container.AcceptsJsonResponse()) {
- contentType = "application/json";
- NJson::TJsonValue json;
- TString message;
- MakeJsonErrorReply(json, message, issues, NYdb::EStatus(status));
- NJson::WriteJson(&body.Out, &json);
- } else {
- contentType = "text/html";
- body << "<html><body><h1>" << httpError << "</h1>";
- if (issues) {
- body << "<p>" << issues.ToString() << "</p>";
- }
- body << "</body></html>";
- }
-
- response << "HTTP/1.1 " << httpError << "\r\n";
- if (addAccessControlHeaders) {
- NHttp::THeaders headers(request->Headers);
- TString origin = TString(headers["Origin"]);
- if (origin.empty()) {
- origin = "*";
- }
- response << "Access-Control-Allow-Origin: " << origin << "\r\n";
- response << "Access-Control-Allow-Credentials: true\r\n";
- response << "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept\r\n";
- response << "Access-Control-Allow-Methods: OPTIONS, GET, POST, PUT, DELETE\r\n";
- }
-
- response << "Content-Type: " << contentType << "\r\n";
- response << "Content-Length: " << body.size() << "\r\n";
- response << "\r\n";
- response << body;
- ReplyWith(request->CreateResponseString(response));
- PassAway();
- }
-
- void ReplyForbiddenAndPassAway(const TString& error = {}) {
- NYql::TIssues issues;
- issues.AddIssue(error);
- ReplyErrorAndPassAway(Ydb::StatusIds::UNAUTHORIZED, issues, false);
- }
-
- void SendRequest(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult* result = nullptr) {
- NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
- if (ActorMonPage->Authorizer) {
- TString user = (result && result->UserToken) ? result->UserToken->GetUserSID() : "anonymous";
- LOG_NOTICE_S(*TlsActivationContext, NActorsServices::HTTP,
- (request->Address ? request->Address->ToString() : "")
- << " " << user
- << " " << request->Method
- << " " << request->URL);
- }
- TString serializedToken;
- if (result && result->UserToken) {
- serializedToken = result->UserToken->GetSerializedToken();
- }
- Send(ActorMonPage->TargetActorId, new NMon::TEvHttpInfo(
- Container, serializedToken), IEventHandle::FlagTrackDelivery);
- }
-
- void HandleUndelivered(TEvents::TEvUndelivered::TPtr&) {
- NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
- ReplyWith(request->CreateResponseServiceUnavailable(
- TStringBuilder() << "Actor " << ActorMonPage->TargetActorId << " is not available"));
- PassAway();
- }
-
- void HandleResponse(NMon::IEvHttpInfoRes::TPtr& ev) {
- if (ev->Get()->GetContentType() == NMon::IEvHttpInfoRes::Html) {
- THtmlResultMonPage resultPage(ActorMonPage->Path, ActorMonPage->Title, ActorMonPage->Host, ActorMonPage->PreTag, *(ev->Get()));
- resultPage.Parent = ActorMonPage->Parent;
- resultPage.Output(Container);
- } else {
- ev->Get()->Output(Container);
- }
- ReplyWith(Event->Get()->Request->CreateResponseString(Container.Str()));
- PassAway();
- }
-
- void Handle(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult::TPtr& ev) {
- const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result(*ev->Get());
- if (result.Status != Ydb::StatusIds::SUCCESS) {
- return ReplyErrorAndPassAway(result);
- }
- bool found = false;
- if (result.UserToken) {
- for (const TString& sid : ActorMonPage->AllowedSIDs) {
- if (result.UserToken->IsExist(sid)) {
- found = true;
- break;
- }
- }
- }
- if (found || ActorMonPage->AllowedSIDs.empty() || !result.UserToken) {
- SendRequest(&result);
- } else {
- return ReplyForbiddenAndPassAway("SID is not allowed");
- }
- }
-
- STATEFN(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvents::TEvUndelivered, HandleUndelivered);
- hFunc(NMon::IEvHttpInfoRes, HandleResponse);
- hFunc(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult, Handle);
- }
- }
-};
-
-// handles all indexes and static data in synchronous way
-class THttpMonLegacyIndexRequest : public TActorBootstrapped<THttpMonLegacyIndexRequest> {
-public:
- NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
- THttpMonRequestContainer Container;
-
- THttpMonLegacyIndexRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, NMonitoring::IMonPage* index)
- : Event(std::move(event))
- , Container(Event->Get()->Request, index)
- {}
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_REQUEST;
- }
-
- void Bootstrap() {
- ProcessRequest();
- }
-
- void ProcessRequest() {
- Container.Page->Output(Container);
- NHttp::THttpOutgoingResponsePtr response = Event->Get()->Request->CreateResponseString(Container.Str());
- Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- PassAway();
- }
-};
-
-// receives all requests for one actor page and converts them to request-actors
-class THttpMonServiceLegacyActor : public TActorBootstrapped<THttpMonServiceLegacyActor> {
-public:
- THttpMonServiceLegacyActor(TIntrusivePtr<TActorMonPage> actorMonPage)
- : ActorMonPage(std::move(actorMonPage))
- {
- }
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_SERVICE;
- }
-
- void Bootstrap() {
- Become(&THttpMonServiceLegacyActor::StateWork);
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) {
- Register(new THttpMonLegacyActorRequest(std::move(ev), ActorMonPage));
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
- }
- }
-
- TIntrusivePtr<TActorMonPage> ActorMonPage;
-};
-
-// receives everyhing not related to actor communcation, converts them to request-actors
-class THttpMonServiceLegacyIndex : public TActor<THttpMonServiceLegacyIndex> {
-public:
- THttpMonServiceLegacyIndex(TIntrusivePtr<NMonitoring::TIndexMonPage> indexMonPage, const TString& redirectRoot = {})
- : TActor(&THttpMonServiceLegacyIndex::StateWork)
- , IndexMonPage(std::move(indexMonPage))
- , RedirectRoot(redirectRoot)
- {
- }
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_SERVICE;
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) {
- bool redirect = false;
- if (RedirectRoot && ev->Get()->Request->URL == "/") {
- NHttp::THeaders headers(ev->Get()->Request->Headers);
- if (!headers.Has("Referer")) {
- redirect = true;
- }
- }
- if (redirect) {
- TStringBuilder response;
- response << "HTTP/1.1 302 Found\r\nLocation: " << RedirectRoot << "\r\n\r\n";
- Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response)));
- return;
- } else if (!ev->Get()->Request->URL.ends_with("/") && ev->Get()->Request->URL.find('?') == TStringBuf::npos) {
- TString url(ev->Get()->Request->URL);
- bool index = false;
- auto itPage = IndexPages.find(url);
- if (itPage == IndexPages.end()) {
- auto page = IndexMonPage->FindPageByAbsolutePath(url);
- if (page) {
- index = page->IsIndex();
- IndexPages[url] = index;
- }
- } else {
- index = itPage->second;
- }
- if (index) {
- TStringBuilder response;
- auto p = url.rfind('/');
- if (p != TString::npos) {
- url = url.substr(p + 1);
- }
- url += '/';
- response << "HTTP/1.1 302 Found\r\nLocation: " << url << "\r\n\r\n";
- Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response)));
- return;
- }
- }
- Register(new THttpMonLegacyIndexRequest(std::move(ev), IndexMonPage.Get()));
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
- }
- }
-
- TIntrusivePtr<NMonitoring::TIndexMonPage> IndexMonPage;
- TString RedirectRoot;
- std::unordered_map<TString, bool> IndexPages;
-};
-
-inline TActorId MakeNodeProxyId(ui32 node) {
- char x[12] = "nodeproxy";
- return TActorId(node, TStringBuf(x, 12));
-}
-
-class THttpMonServiceNodeRequest : public TActorBootstrapped<THttpMonServiceNodeRequest> {
-public:
- std::shared_ptr<NHttp::THttpEndpointInfo> Endpoint;
- TEvMon::TEvMonitoringRequest::TPtr Event;
- TActorId HttpProxyActorId;
-
- THttpMonServiceNodeRequest(std::shared_ptr<NHttp::THttpEndpointInfo> endpoint, TEvMon::TEvMonitoringRequest::TPtr event, TActorId httpProxyActorId)
- : Endpoint(std::move(endpoint))
- , Event(std::move(event))
- , HttpProxyActorId(httpProxyActorId)
- {}
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_REQUEST;
- }
-
- static void FromProto(NHttp::THttpConfig::SocketAddressType& address, const NKikimrMonProto::TSockAddr& proto) {
- switch (proto.GetFamily()) {
- case AF_INET:
- address = std::make_shared<TSockAddrInet>(proto.GetAddress().data(), proto.GetPort());
- break;
- case AF_INET6:
- address = std::make_shared<TSockAddrInet6>(proto.GetAddress().data(), proto.GetPort());
- break;
- }
- }
-
- TString RewriteWithForwardedFromNode(const TString& response) {
- NHttp::THttpParser<NHttp::THttpRequest, NHttp::TSocketBuffer> parser(response);
-
- NHttp::THeadersBuilder headers(parser.Headers);
- headers.Set("X-Forwarded-From-Node", TStringBuilder() << Event->Sender.NodeId());
-
- NHttp::THttpRenderer<NHttp::THttpRequest, NHttp::TSocketBuffer> renderer;
- renderer.InitRequest(parser.Method, parser.URL, parser.Protocol, parser.Version);
- renderer.Set(headers);
- if (parser.HaveBody()) {
- renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea
- }
- renderer.Finish();
- return renderer.AsString();
- }
-
- void Bootstrap() {
- NHttp::THttpConfig::SocketAddressType address;
- FromProto(address, Event->Get()->Record.GetAddress());
- NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(RewriteWithForwardedFromNode(Event->Get()->Record.GetHttpRequest()), Endpoint, address);
- TStringBuilder prefix;
- prefix << "/node/" << TActivationContext::ActorSystem()->NodeId;
- if (request->URL.SkipPrefix(prefix)) {
- Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(std::move(request)));
- Become(&THttpMonServiceNodeRequest::StateWork);
- } else {
- auto response = std::make_unique<TEvMon::TEvMonitoringResponse>();
- auto httpResponse = request->CreateResponseBadRequest();
- response->Record.SetHttpResponse(httpResponse->AsString());
- Send(Event->Sender, response.release(), 0, Event->Cookie);
- PassAway();
- }
- }
-
- TString RewriteLocationWithNode(const TString& response) {
- NHttp::THttpParser<NHttp::THttpResponse, NHttp::TSocketBuffer> parser(response);
-
- NHttp::THeadersBuilder headers(parser.Headers);
- headers.Set("Location", TStringBuilder() << "/node/" << TActivationContext::ActorSystem()->NodeId << headers["Location"]);
-
- NHttp::THttpRenderer<NHttp::THttpResponse, NHttp::TSocketBuffer> renderer;
- renderer.InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
- renderer.Set(headers);
- if (parser.HaveBody()) {
- renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea
- }
- renderer.Finish();
- return renderer.AsString();
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr& ev) {
- TString httpResponse = ev->Get()->Response->AsString();
- switch (FromStringWithDefault<int>(ev->Get()->Response->Status)) {
- case 301:
- case 303:
- case 307:
- case 308:
- if (!NHttp::THeaders(ev->Get()->Response->Headers).Get("Location").starts_with("/node/")) {
- httpResponse = RewriteLocationWithNode(httpResponse);
- }
- break;
- }
- auto response = std::make_unique<TEvMon::TEvMonitoringResponse>();
- response->Record.SetHttpResponse(httpResponse);
- Send(Event->Sender, response.release(), 0, Event->Cookie);
- PassAway();
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
- }
- }
-};
-
-class THttpMonServiceMonRequest : public TActorBootstrapped<THttpMonServiceMonRequest> {
-public:
- NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
- ui32 NodeId;
-
- THttpMonServiceMonRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, ui32 nodeId)
- : Event(std::move(event))
- , NodeId(nodeId)
- {}
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::HTTP_MON_SERVICE_MON_REQUEST;
- }
-
- static void ToProto(NKikimrMonProto::TSockAddr& proto, const NHttp::THttpConfig::SocketAddressType& address) {
- if (address) {
- switch (address->SockAddr()->sa_family) {
- case AF_INET: {
- proto.SetFamily(AF_INET);
- sockaddr_in* addr = (sockaddr_in*)address->SockAddr();
- char ip[INET_ADDRSTRLEN];
- inet_ntop(AF_INET, (void*)&addr->sin_addr, ip, INET_ADDRSTRLEN);
- proto.SetAddress(ip);
- proto.SetPort(htons(addr->sin_port));
- }
- break;
- case AF_INET6: {
- proto.SetFamily(AF_INET6);
- sockaddr_in6* addr = (sockaddr_in6*)address->SockAddr();
- char ip6[INET6_ADDRSTRLEN];
- inet_ntop(AF_INET6, (void*)&addr->sin6_addr, ip6, INET6_ADDRSTRLEN);
- proto.SetAddress(ip6);
- proto.SetPort(htons(addr->sin6_port));
- }
- break;
- }
- }
- }
-
- void Bootstrap() {
- TActorId monServiceNodeProxy = MakeNodeProxyId(NodeId);
- auto request = std::make_unique<TEvMon::TEvMonitoringRequest>();
- request->Record.SetHttpRequest(Event->Get()->Request->AsString());
- ToProto(*request->Record.MutableAddress(), Event->Get()->Request->Address);
- Send(monServiceNodeProxy, request.release(), IEventHandle::FlagTrackDelivery);
- Become(&THttpMonServiceMonRequest::StateWork);
- }
-
- void Handle(TEvents::TEvUndelivered::TPtr& ev) {
- TString reason;
- switch (ev->Get()->Reason) {
- case TEvents::TEvUndelivered::ReasonUnknown:
- reason = "ReasonUnknown";
- break;
- case TEvents::TEvUndelivered::ReasonActorUnknown:
- reason = "ReasonActorUnknown";
- break;
- case TEvents::TEvUndelivered::Disconnected:
- reason = "Disconnected";
- break;
- }
- Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(Event->Get()->Request->CreateResponseServiceUnavailable(reason)), 0, Event->Cookie);
- PassAway();
- }
-
- void Handle(TEvMon::TEvMonitoringResponse::TPtr& ev) {
- TString responseTxt = ev->Get()->Record.GetHttpResponse();
- NHttp::THttpOutgoingResponsePtr responseObj = Event->Get()->Request->CreateResponseString(responseTxt);
- if (responseObj->Status == "301" || responseObj->Status == "302") {
- NHttp::THttpParser<NHttp::THttpResponse, NHttp::TSocketBuffer> parser(responseTxt);
- NHttp::THeadersBuilder headers(parser.Headers);
- if (headers["Location"].starts_with('/')) {
- NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(Event->Get()->Request);
- response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
-
- headers.Set("Location", TStringBuilder() << "/node/" << NodeId << headers["Location"]);
-
- response->Set(headers);
- if (parser.HaveBody()) {
- response->SetBody(parser.Body);
- }
- responseObj = response;
- }
- }
-
- Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(responseObj.Release()), 0, Event->Cookie);
- PassAway();
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvMon::TEvMonitoringResponse, Handle);
- hFunc(TEvents::TEvUndelivered, Handle);
- }
- }
-};
-
-// receives requests to another nodes
-class THttpMonServiceNodeProxy : public TActor<THttpMonServiceNodeProxy> {
-public:
- THttpMonServiceNodeProxy(TActorId httpProxyActorId)
- : TActor(&THttpMonServiceNodeProxy::StateWork)
- , HttpProxyActorId(httpProxyActorId)
- , Endpoint(std::make_shared<NHttp::THttpEndpointInfo>())
- {
- }
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_PROXY;
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) {
- TStringBuf url = ev->Get()->Request->URL;
- TStringBuf node;
- ui32 nodeId;
- if (url.SkipPrefix("/node/") && url.NextTok('/', node) && TryFromStringWithDefault<ui32>(node, nodeId)) {
- Register(new THttpMonServiceMonRequest(std::move(ev), nodeId));
- return;
- }
- Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseBadRequest("bad request")), 0, ev->Cookie);
- }
-
- void Handle(TEvMon::TEvMonitoringRequest::TPtr& ev) {
- Register(new THttpMonServiceNodeRequest(Endpoint, ev, HttpProxyActorId));
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- hFunc(TEvMon::TEvMonitoringRequest, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
- }
- }
-
-protected:
- TActorId HttpProxyActorId;
- std::shared_ptr<NHttp::THttpEndpointInfo> Endpoint;
-};
-
-TAsyncHttpMon::TAsyncHttpMon(TConfig config)
- : Config(std::move(config))
- , IndexMonPage(new NMonitoring::TIndexMonPage("", Config.Title))
-{
-}
-
-void TAsyncHttpMon::Start(TActorSystem* actorSystem) {
- if (actorSystem) {
- TGuard<TMutex> g(Mutex);
- ActorSystem = actorSystem;
- Register(new TIndexRedirectMonPage(IndexMonPage));
- Register(new NMonitoring::TVersionMonPage);
- Register(new NMonitoring::TBootstrapCssMonPage);
- Register(new NMonitoring::TTablesorterCssMonPage);
- Register(new NMonitoring::TBootstrapJsMonPage);
- Register(new NMonitoring::TJQueryJsMonPage);
- Register(new NMonitoring::TTablesorterJsMonPage);
- Register(new NMonitoring::TBootstrapFontsEotMonPage);
- Register(new NMonitoring::TBootstrapFontsSvgMonPage);
- Register(new NMonitoring::TBootstrapFontsTtfMonPage);
- Register(new NMonitoring::TBootstrapFontsWoffMonPage);
- NLwTraceMonPage::RegisterPages(IndexMonPage.Get());
- NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(ACTORLIB_PROVIDER));
- NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(MONITORING_PROVIDER));
- HttpProxyActorId = ActorSystem->Register(
- NHttp::CreateHttpProxy(),
- TMailboxType::ReadAsFilled,
- ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
- HttpMonServiceActorId = ActorSystem->Register(
- new THttpMonServiceLegacyIndex(IndexMonPage, Config.RedirectMainPageTo),
- TMailboxType::ReadAsFilled,
- ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
- auto nodeProxyActorId = ActorSystem->Register(
- new THttpMonServiceNodeProxy(HttpProxyActorId),
- TMailboxType::ReadAsFilled,
- ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
- NodeProxyServiceActorId = MakeNodeProxyId(ActorSystem->NodeId);
- ActorSystem->RegisterLocalService(NodeProxyServiceActorId, nodeProxyActorId);
-
- TStringBuilder workerName;
- workerName << FQDNHostName() << ":" << Config.Port;
- auto addPort = std::make_unique<NHttp::TEvHttpProxy::TEvAddListeningPort>();
- addPort->Port = Config.Port;
- addPort->WorkerName = workerName;
- addPort->Address = Config.Address;
- addPort->CompressContentTypes = {
- "text/plain",
- "text/html",
- "text/css",
- "text/javascript",
- "application/javascript",
- "application/json",
- "application/yaml",
- };
- addPort->SslCertificatePem = Config.Certificate;
- addPort->Secure = !Config.Certificate.empty();
- addPort->MaxRequestsPerSecond = Config.MaxRequestsPerSecond;
- ActorSystem->Send(HttpProxyActorId, addPort.release());
- ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/", HttpMonServiceActorId));
- ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/node", NodeProxyServiceActorId));
- for (auto& pageInfo : ActorMonPages) {
- if (pageInfo.Page) {
- RegisterActorMonPage(pageInfo);
- } else if (pageInfo.Handler) {
- ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, pageInfo.Handler));
- }
- }
- ActorMonPages.clear();
- }
-}
-
-void TAsyncHttpMon::Stop() {
- IndexMonPage->ClearPages(); // it's required to avoid loop-reference
- if (ActorSystem) {
- TGuard<TMutex> g(Mutex);
- for (const auto& [path, actorId] : ActorServices) {
- ActorSystem->Send(actorId, new TEvents::TEvPoisonPill);
- }
- ActorSystem->Send(NodeProxyServiceActorId, new TEvents::TEvPoisonPill);
- ActorSystem->Send(HttpMonServiceActorId, new TEvents::TEvPoisonPill);
- ActorSystem->Send(HttpProxyActorId, new TEvents::TEvPoisonPill);
- ActorSystem = nullptr;
- }
-}
-
-void TAsyncHttpMon::Register(NMonitoring::IMonPage* page) {
- IndexMonPage->Register(page);
-}
-
-NMonitoring::TIndexMonPage* TAsyncHttpMon::RegisterIndexPage(const TString& path, const TString& title) {
- auto page = IndexMonPage->RegisterIndexPage(path, title);
- IndexMonPage->SortPages();
- return page;
-}
-
-void TAsyncHttpMon::RegisterActorMonPage(const TActorMonPageInfo& pageInfo) {
- if (ActorSystem) {
- TActorMonPage* actorMonPage = static_cast<TActorMonPage*>(pageInfo.Page.Get());
- auto& actorId = ActorServices[pageInfo.Path];
- if (actorId) {
- ActorSystem->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
- }
- actorId = ActorSystem->Register(
- new THttpMonServiceLegacyActor(actorMonPage),
- TMailboxType::ReadAsFilled,
- ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
- ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, actorId));
- }
-}
-
-NMonitoring::IMonPage* TAsyncHttpMon::RegisterActorPage(TRegisterActorPageFields fields) {
- TGuard<TMutex> g(Mutex);
- NMonitoring::TMonPagePtr page = new TActorMonPage(
- fields.RelPath,
- fields.Title,
- Config.Host,
- fields.PreTag,
- fields.ActorSystem,
- fields.ActorId,
- fields.AllowedSIDs ? fields.AllowedSIDs : Config.AllowedSIDs,
- fields.UseAuth ? Config.Authorizer : TRequestAuthorizer(),
- fields.MonServiceName);
- if (fields.Index) {
- fields.Index->Register(page);
- if (fields.SortPages) {
- fields.Index->SortPages();
- }
- } else {
- Register(page.Get());
- }
-
- TActorMonPageInfo pageInfo = {
- .Page = page,
- .Path = GetPageFullPath(page.Get()),
- };
-
- if (ActorSystem && HttpProxyActorId) {
- RegisterActorMonPage(pageInfo);
- } else {
- ActorMonPages.emplace_back(pageInfo);
- }
-
- return page.Get();
-}
-
-NMonitoring::IMonPage* TAsyncHttpMon::RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
- TDynamicCountersPage* page = new TDynamicCountersPage(path, title, counters);
- page->SetUnknownGroupPolicy(EUnknownGroupPolicy::Ignore);
- Register(page);
- return page;
-}
-
-void TAsyncHttpMon::RegisterHandler(const TString& path, const TActorId& handler) {
- if (ActorSystem) {
- ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(path, handler));
- } else {
- TGuard<TMutex> g(Mutex);
- ActorMonPages.emplace_back(TActorMonPageInfo{
- .Handler = handler,
- .Path = path,
- });
- }
-}
-
-NMonitoring::IMonPage* TAsyncHttpMon::FindPage(const TString& relPath) {
- return IndexMonPage->FindPage(relPath);
-}
-
-}
diff --git a/ydb/core/mon/async_http_mon.h b/ydb/core/mon/async_http_mon.h
deleted file mode 100644
index 0f0abc1ec78..00000000000
--- a/ydb/core/mon/async_http_mon.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#pragma once
-
-#include <library/cpp/monlib/service/monservice.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/service/pages/index_mon_page.h>
-#include <library/cpp/monlib/service/pages/resources/css_mon_page.h>
-#include <library/cpp/monlib/service/pages/resources/fonts_mon_page.h>
-#include <library/cpp/monlib/service/pages/resources/js_mon_page.h>
-#include <library/cpp/monlib/service/pages/tablesorter/css_mon_page.h>
-#include <library/cpp/monlib/service/pages/tablesorter/js_mon_page.h>
-
-#include <ydb/library/actors/core/mon.h>
-#include <ydb/library/actors/http/http.h>
-
-#include "mon.h"
-
-namespace NActors {
-
-class TAsyncHttpMon : public TMon {
-public:
- TAsyncHttpMon(TConfig config);
-
- void Start(TActorSystem* actorSystem) override;
- void Stop() override;
-
- void Register(NMonitoring::IMonPage* page) override;
- NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title) override;
- NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields) override;
- NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) override;
- NMonitoring::IMonPage* FindPage(const TString& relPath) override;
- void RegisterHandler(const TString& path, const TActorId& handler) override;
-
-protected:
- TConfig Config;
- TIntrusivePtr<NMonitoring::TIndexMonPage> IndexMonPage;
- TActorSystem* ActorSystem = {};
- TActorId HttpProxyActorId;
- TActorId HttpMonServiceActorId;
- TActorId NodeProxyServiceActorId;
-
- struct TActorMonPageInfo {
- NMonitoring::TMonPagePtr Page;
- TActorId Handler;
- TString Path;
- };
-
- TMutex Mutex;
- std::vector<TActorMonPageInfo> ActorMonPages;
- THashMap<TString, TActorId> ActorServices;
-
- void RegisterActorMonPage(const TActorMonPageInfo& pageInfo);
-};
-
-} // NActors
diff --git a/ydb/core/mon/mon.cpp b/ydb/core/mon/mon.cpp
index 311132a04fd..e6926d9782a 100644
--- a/ydb/core/mon/mon.cpp
+++ b/ydb/core/mon/mon.cpp
@@ -1,24 +1,55 @@
#include "mon.h"
-
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/http/http_proxy.h>
#include <ydb/core/base/appdata.h>
-#include <ydb/core/base/ticket_parser.h>
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/base/ticket_parser.h>
-#include <ydb/core/protos/auth.pb.h>
-
-#include <library/cpp/json/json_value.h>
#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_writer.h>
#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/lwtrace/all.h>
+#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
+#include <ydb/library/actors/core/probes.h>
+#include <ydb/core/base/monitoring_provider.h>
-#include <util/string/ascii.h>
+#include <library/cpp/monlib/service/pages/version_mon_page.h>
+#include <library/cpp/monlib/service/pages/mon_page.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/page.h>
+
+#include <util/system/hostname.h>
+
+#include <ydb/core/base/counters.h>
+#include <ydb/core/protos/mon.pb.h>
+
+#include "mon_impl.h"
namespace NActors {
-using namespace NMonitoring;
-using namespace NKikimr;
+struct TEvMon {
+ enum {
+ EvMonitoringRequest = NActors::NMon::HttpInfo + 10,
+ EvMonitoringResponse,
+ End
+ };
+
+ static_assert(EvMonitoringRequest > NMon::End, "expect EvMonitoringRequest > NMon::End");
+ static_assert(End < EventSpaceEnd(NActors::TEvents::ES_MON), "expect End < EventSpaceEnd(NActors::TEvents::ES_MON)");
+
+ struct TEvMonitoringRequest : TEventPB<TEvMonitoringRequest, NKikimrMonProto::TEvMonitoringRequest, EvMonitoringRequest> {
+ TEvMonitoringRequest() = default;
+ };
+
+ struct TEvMonitoringResponse : TEventPB<TEvMonitoringResponse, NKikimrMonProto::TEvMonitoringResponse, EvMonitoringResponse> {
+ TEvMonitoringResponse() = default;
+ };
+};
namespace {
+using namespace NKikimr;
+
bool HasJsonContent(NMonitoring::IMonHttpRequest& request) {
const TStringBuf header = request.GetHeader("Content-Type");
return header.empty() || AsciiEqualsIgnoreCase(header, "application/json"); // by default we will try to parse json, no error will be generated if parsing fails
@@ -29,9 +60,8 @@ TString GetDatabase(NMonitoring::IMonHttpRequest& request) {
return dbIt->second;
}
if (request.GetMethod() == HTTP_METHOD_POST && HasJsonContent(request)) {
- static NJson::TJsonReaderConfig JsonConfig;
NJson::TJsonValue requestData;
- if (NJson::ReadJsonTree(request.GetPostContent(), &JsonConfig, &requestData)) {
+ if (NJson::ReadJsonTree(request.GetPostContent(), &requestData)) {
return requestData["database"].GetString(); // empty if not string or no such key
}
}
@@ -153,4 +183,933 @@ NActors::IEventHandle* TMon::DefaultAuthorizer(const NActors::TActorId& owner, N
return GetAuthorizeTicketResult(owner);
}
+// compatibility layer
+class THttpMonRequest : public NMonitoring::IMonHttpRequest {
+public:
+ NHttp::THttpIncomingRequestPtr Request;
+ TStringStream& Response;
+ NMonitoring::IMonPage* Page;
+ TString PathInfo;
+ mutable std::unique_ptr<THttpHeaders> Headers;
+ mutable std::unique_ptr<TCgiParameters> Params;
+ mutable std::unique_ptr<TCgiParameters> PostParams;
+
+ THttpMonRequest(NHttp::THttpIncomingRequestPtr request, TStringStream& response, NMonitoring::IMonPage* page, const TString& pathInfo)
+ : Request(request)
+ , Response(response)
+ , Page(page)
+ , PathInfo(pathInfo)
+ {
+ }
+
+ static TStringBuf GetPathFromUrl(TStringBuf url) {
+ return url.Before('?');
+ }
+
+ static TStringBuf GetPathInfoFromUrl(NMonitoring::IMonPage* page, TStringBuf url) {
+ TString path = GetPageFullPath(page);
+ url.SkipPrefix(path);
+ return GetPathFromUrl(url);
+ }
+
+ virtual IOutputStream& Output() override {
+ return Response;
+ }
+
+ virtual HTTP_METHOD GetMethod() const override {
+ if (Request->Method == "GET") {
+ return HTTP_METHOD_GET;
+ }
+ if (Request->Method == "OPTIONS") {
+ return HTTP_METHOD_OPTIONS;
+ }
+ if (Request->Method == "POST") {
+ return HTTP_METHOD_POST;
+ }
+ if (Request->Method == "HEAD") {
+ return HTTP_METHOD_HEAD;
+ }
+ if (Request->Method == "PUT") {
+ return HTTP_METHOD_PUT;
+ }
+ if (Request->Method == "DELETE") {
+ return HTTP_METHOD_DELETE;
+ }
+ return HTTP_METHOD_UNDEFINED;
+ }
+
+ virtual TStringBuf GetPath() const override {
+ return GetPathFromUrl(Request->URL);
+ }
+
+ virtual TStringBuf GetPathInfo() const override {
+ return PathInfo;
+ }
+
+ virtual TStringBuf GetUri() const override {
+ return Request->URL;
+ }
+
+ virtual const TCgiParameters& GetParams() const override {
+ if (!Params) {
+ Params = std::make_unique<TCgiParameters>(Request->URL.After('?'));
+ }
+ return *Params;
+ }
+
+ virtual const TCgiParameters& GetPostParams() const override {
+ if (!PostParams) {
+ PostParams = std::make_unique<TCgiParameters>(Request->Body);
+ }
+ return *PostParams;
+ }
+
+ virtual TStringBuf GetPostContent() const override {
+ return Request->Body;
+ }
+
+ virtual const THttpHeaders& GetHeaders() const override {
+ if (!Headers) {
+ TString strHeaders(Request->Headers);
+ TStringInput headers(strHeaders);
+ Headers = std::make_unique<THttpHeaders>(&headers);
+ }
+ return *Headers;
+ }
+
+ virtual TStringBuf GetHeader(TStringBuf name) const override {
+ auto header = GetHeaders().FindHeader(name);
+ if (header) {
+ return header->Value();
+ }
+ return {};
+ }
+
+ bool AcceptsJsonResponse() {
+ TStringBuf acceptHeader = GetHeader("Accept");
+ return acceptHeader.find(TStringBuf("application/json")) != TStringBuf::npos;
+ }
+
+ virtual TStringBuf GetCookie(TStringBuf name) const override {
+ NHttp::TCookies cookies(GetHeader("Cookie"));
+ return cookies.Get(name);
+ }
+
+ virtual TString GetRemoteAddr() const override {
+ if (Request->Address) {
+ return Request->Address->ToString();
+ }
+ return {};
+ }
+
+ virtual TString GetServiceTitle() const override {
+ return {};
+ }
+
+ virtual NMonitoring::IMonPage* GetPage() const override {
+ return Page;
+ }
+
+ virtual IMonHttpRequest* MakeChild(NMonitoring::IMonPage* page, const TString& pathInfo) const override {
+ return new THttpMonRequest(Request, Response, page, pathInfo);
+ }
+};
+
+// container for legacy requests
+class THttpMonRequestContainer : public TStringStream, public THttpMonRequest {
+public:
+ THttpMonRequestContainer(NHttp::THttpIncomingRequestPtr request, NMonitoring::IMonPage* index)
+ : THttpMonRequest(request, *this, index, TString(GetPathInfoFromUrl(index, request->URL)))
+ {
+ }
+};
+
+// handles actor communication
+class THttpMonLegacyActorRequest : public TActorBootstrapped<THttpMonLegacyActorRequest> {
+public:
+ NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
+ THttpMonRequestContainer Container;
+ TIntrusivePtr<TActorMonPage> ActorMonPage;
+
+ THttpMonLegacyActorRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, TIntrusivePtr<TActorMonPage> actorMonPage)
+ : Event(std::move(event))
+ , Container(Event->Get()->Request, actorMonPage.Get())
+ , ActorMonPage(actorMonPage)
+ {}
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_REQUEST;
+ }
+
+ void Bootstrap() {
+ if (Event->Get()->Request->Method == "OPTIONS") {
+ return ReplyOptionsAndPassAway();
+ }
+ Become(&THttpMonLegacyActorRequest::StateFunc);
+ if (ActorMonPage->Authorizer) {
+ NActors::IEventHandle* handle = ActorMonPage->Authorizer(SelfId(), Container);
+ if (handle) {
+ TActivationContext::Send(handle);
+ return;
+ }
+ }
+ SendRequest();
+ }
+ void ReplyWith(NHttp::THttpOutgoingResponsePtr response) {
+ if (response->Status.StartsWith("2")) {
+ TString url(Event->Get()->Request->URL.Before('?'));
+ TString status(response->Status);
+ NMonitoring::THistogramPtr ResponseTimeHgram = NKikimr::GetServiceCounters(NKikimr::AppData()->Counters,
+ ActorMonPage->MonServiceName)
+ ->GetSubgroup("subsystem", "mon")
+ ->GetSubgroup("url", url)
+ ->GetSubgroup("status", status)
+ ->GetHistogram("ResponseTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1));
+ ResponseTimeHgram->Collect(Event->Get()->Request->Timer.Passed() * 1000);
+ }
+ Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ }
+
+ void ReplyOptionsAndPassAway() {
+ NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
+ TString url(request->URL.Before('?'));
+ TString type = mimetypeByExt(url.data());
+ if (type.empty()) {
+ type = "application/json";
+ }
+ NHttp::THeaders headers(request->Headers);
+ TString origin = TString(headers["Origin"]);
+ if (origin.empty()) {
+ origin = "*";
+ }
+ TStringBuilder response;
+ response << "HTTP/1.1 204 No Content\r\n"
+ "Access-Control-Allow-Origin: " << origin << "\r\n"
+ "Access-Control-Allow-Credentials: true\r\n"
+ "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept,X-Trace-Verbosity,X-Want-Trace,traceparent\r\n"
+ "Access-Control-Expose-Headers: traceresponse,X-Worker-Name\r\n"
+ "Access-Control-Allow-Methods: OPTIONS,GET,POST,PUT,DELETE\r\n"
+ "Content-Type: " << type << "\r\n"
+ "Connection: keep-alive\r\n\r\n";
+ ReplyWith(request->CreateResponseString(response));
+ PassAway();
+ }
+
+ bool CredentialsProvided() {
+ return Container.GetCookie("ydb_session_id") || Container.GetHeader("Authorization");
+ }
+
+ TString YdbToHttpError(Ydb::StatusIds::StatusCode status) {
+ switch (status) {
+ case Ydb::StatusIds::UNAUTHORIZED:
+ // YDB status UNAUTHORIZED is used for both access denied case and if no credentials were provided.
+ return CredentialsProvided() ? "403 Forbidden" : "401 Unauthorized";
+ case Ydb::StatusIds::INTERNAL_ERROR:
+ return "500 Internal Server Error";
+ case Ydb::StatusIds::UNAVAILABLE:
+ return "503 Service Unavailable";
+ case Ydb::StatusIds::OVERLOADED:
+ return "429 Too Many Requests";
+ case Ydb::StatusIds::TIMEOUT:
+ return "408 Request Timeout";
+ case Ydb::StatusIds::PRECONDITION_FAILED:
+ return "412 Precondition Failed";
+ default:
+ return "400 Bad Request";
+ }
+ }
+
+ void ReplyErrorAndPassAway(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result) {
+ ReplyErrorAndPassAway(result.Status, result.Issues, true);
+ }
+
+ void ReplyErrorAndPassAway(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues, bool addAccessControlHeaders) {
+ NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
+ TStringBuilder response;
+ TStringBuilder body;
+ TStringBuf contentType;
+ const TString httpError = YdbToHttpError(status);
+
+ if (Container.AcceptsJsonResponse()) {
+ contentType = "application/json";
+ NJson::TJsonValue json;
+ TString message;
+ MakeJsonErrorReply(json, message, issues, NYdb::EStatus(status));
+ NJson::WriteJson(&body.Out, &json);
+ } else {
+ contentType = "text/html";
+ body << "<html><body><h1>" << httpError << "</h1>";
+ if (issues) {
+ body << "<p>" << issues.ToString() << "</p>";
+ }
+ body << "</body></html>";
+ }
+
+ response << "HTTP/1.1 " << httpError << "\r\n";
+ if (addAccessControlHeaders) {
+ NHttp::THeaders headers(request->Headers);
+ TString origin = TString(headers["Origin"]);
+ if (origin.empty()) {
+ origin = "*";
+ }
+ response << "Access-Control-Allow-Origin: " << origin << "\r\n";
+ response << "Access-Control-Allow-Credentials: true\r\n";
+ response << "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept\r\n";
+ response << "Access-Control-Allow-Methods: OPTIONS, GET, POST, PUT, DELETE\r\n";
+ }
+
+ response << "Content-Type: " << contentType << "\r\n";
+ response << "Content-Length: " << body.size() << "\r\n";
+ response << "\r\n";
+ response << body;
+ ReplyWith(request->CreateResponseString(response));
+ PassAway();
+ }
+
+ void ReplyForbiddenAndPassAway(const TString& error = {}) {
+ NYql::TIssues issues;
+ issues.AddIssue(error);
+ ReplyErrorAndPassAway(Ydb::StatusIds::UNAUTHORIZED, issues, false);
+ }
+
+ void SendRequest(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult* result = nullptr) {
+ NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
+ if (ActorMonPage->Authorizer) {
+ TString user = (result && result->UserToken) ? result->UserToken->GetUserSID() : "anonymous";
+ LOG_NOTICE_S(*TlsActivationContext, NActorsServices::HTTP,
+ (request->Address ? request->Address->ToString() : "")
+ << " " << user
+ << " " << request->Method
+ << " " << request->URL);
+ }
+ TString serializedToken;
+ if (result && result->UserToken) {
+ serializedToken = result->UserToken->GetSerializedToken();
+ }
+ Send(ActorMonPage->TargetActorId, new NMon::TEvHttpInfo(
+ Container, serializedToken), IEventHandle::FlagTrackDelivery);
+ }
+
+ void HandleUndelivered(TEvents::TEvUndelivered::TPtr&) {
+ NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
+ ReplyWith(request->CreateResponseServiceUnavailable(
+ TStringBuilder() << "Actor " << ActorMonPage->TargetActorId << " is not available"));
+ PassAway();
+ }
+
+ void HandleResponse(NMon::IEvHttpInfoRes::TPtr& ev) {
+ if (ev->Get()->GetContentType() == NMon::IEvHttpInfoRes::Html) {
+ THtmlResultMonPage resultPage(ActorMonPage->Path, ActorMonPage->Title, ActorMonPage->Host, ActorMonPage->PreTag, *(ev->Get()));
+ resultPage.Parent = ActorMonPage->Parent;
+ resultPage.Output(Container);
+ } else {
+ ev->Get()->Output(Container);
+ }
+ ReplyWith(Event->Get()->Request->CreateResponseString(Container.Str()));
+ PassAway();
+ }
+
+ void Handle(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult::TPtr& ev) {
+ const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result(*ev->Get());
+ if (result.Status != Ydb::StatusIds::SUCCESS) {
+ return ReplyErrorAndPassAway(result);
+ }
+ bool found = false;
+ if (result.UserToken) {
+ for (const TString& sid : ActorMonPage->AllowedSIDs) {
+ if (result.UserToken->IsExist(sid)) {
+ found = true;
+ break;
+ }
+ }
+ }
+ if (found || ActorMonPage->AllowedSIDs.empty() || !result.UserToken) {
+ SendRequest(&result);
+ } else {
+ return ReplyForbiddenAndPassAway("SID is not allowed");
+ }
+ }
+
+ STATEFN(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvents::TEvUndelivered, HandleUndelivered);
+ hFunc(NMon::IEvHttpInfoRes, HandleResponse);
+ hFunc(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult, Handle);
+ }
+ }
+};
+
+// handles all indexes and static data in synchronous way
+class THttpMonLegacyIndexRequest : public TActorBootstrapped<THttpMonLegacyIndexRequest> {
+public:
+ NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
+ THttpMonRequestContainer Container;
+
+ THttpMonLegacyIndexRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, NMonitoring::IMonPage* index)
+ : Event(std::move(event))
+ , Container(Event->Get()->Request, index)
+ {}
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_REQUEST;
+ }
+
+ void Bootstrap() {
+ ProcessRequest();
+ }
+
+ void ProcessRequest() {
+ Container.Page->Output(Container);
+ NHttp::THttpOutgoingResponsePtr response = Event->Get()->Request->CreateResponseString(Container.Str());
+ Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ PassAway();
+ }
+};
+
+// receives all requests for one actor page and converts them to request-actors
+class THttpMonServiceLegacyActor : public TActorBootstrapped<THttpMonServiceLegacyActor> {
+public:
+ THttpMonServiceLegacyActor(TIntrusivePtr<TActorMonPage> actorMonPage)
+ : ActorMonPage(std::move(actorMonPage))
+ {
+ }
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_SERVICE;
+ }
+
+ void Bootstrap() {
+ Become(&THttpMonServiceLegacyActor::StateWork);
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) {
+ Register(new THttpMonLegacyActorRequest(std::move(ev), ActorMonPage));
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ }
+ }
+
+ TIntrusivePtr<TActorMonPage> ActorMonPage;
+};
+
+// receives everyhing not related to actor communcation, converts them to request-actors
+class THttpMonServiceLegacyIndex : public TActor<THttpMonServiceLegacyIndex> {
+public:
+ THttpMonServiceLegacyIndex(TIntrusivePtr<NMonitoring::TIndexMonPage> indexMonPage, const TString& redirectRoot = {})
+ : TActor(&THttpMonServiceLegacyIndex::StateWork)
+ , IndexMonPage(std::move(indexMonPage))
+ , RedirectRoot(redirectRoot)
+ {
+ }
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_SERVICE;
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) {
+ bool redirect = false;
+ if (RedirectRoot && ev->Get()->Request->URL == "/") {
+ NHttp::THeaders headers(ev->Get()->Request->Headers);
+ if (!headers.Has("Referer")) {
+ redirect = true;
+ }
+ }
+ if (redirect) {
+ TStringBuilder response;
+ response << "HTTP/1.1 302 Found\r\nLocation: " << RedirectRoot << "\r\n\r\n";
+ Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response)));
+ return;
+ } else if (!ev->Get()->Request->URL.ends_with("/") && ev->Get()->Request->URL.find('?') == TStringBuf::npos) {
+ TString url(ev->Get()->Request->URL);
+ bool index = false;
+ auto itPage = IndexPages.find(url);
+ if (itPage == IndexPages.end()) {
+ auto page = IndexMonPage->FindPageByAbsolutePath(url);
+ if (page) {
+ index = page->IsIndex();
+ IndexPages[url] = index;
+ }
+ } else {
+ index = itPage->second;
+ }
+ if (index) {
+ TStringBuilder response;
+ auto p = url.rfind('/');
+ if (p != TString::npos) {
+ url = url.substr(p + 1);
+ }
+ url += '/';
+ response << "HTTP/1.1 302 Found\r\nLocation: " << url << "\r\n\r\n";
+ Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response)));
+ return;
+ }
+ }
+ Register(new THttpMonLegacyIndexRequest(std::move(ev), IndexMonPage.Get()));
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ }
+ }
+
+ TIntrusivePtr<NMonitoring::TIndexMonPage> IndexMonPage;
+ TString RedirectRoot;
+ std::unordered_map<TString, bool> IndexPages;
+};
+
+inline TActorId MakeNodeProxyId(ui32 node) {
+ char x[12] = "nodeproxy";
+ return TActorId(node, TStringBuf(x, 12));
+}
+
+class THttpMonServiceNodeRequest : public TActorBootstrapped<THttpMonServiceNodeRequest> {
+public:
+ std::shared_ptr<NHttp::THttpEndpointInfo> Endpoint;
+ TEvMon::TEvMonitoringRequest::TPtr Event;
+ TActorId HttpProxyActorId;
+
+ THttpMonServiceNodeRequest(std::shared_ptr<NHttp::THttpEndpointInfo> endpoint, TEvMon::TEvMonitoringRequest::TPtr event, TActorId httpProxyActorId)
+ : Endpoint(std::move(endpoint))
+ , Event(std::move(event))
+ , HttpProxyActorId(httpProxyActorId)
+ {}
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_REQUEST;
+ }
+
+ static void FromProto(NHttp::THttpConfig::SocketAddressType& address, const NKikimrMonProto::TSockAddr& proto) {
+ switch (proto.GetFamily()) {
+ case AF_INET:
+ address = std::make_shared<TSockAddrInet>(proto.GetAddress().data(), proto.GetPort());
+ break;
+ case AF_INET6:
+ address = std::make_shared<TSockAddrInet6>(proto.GetAddress().data(), proto.GetPort());
+ break;
+ }
+ }
+
+ TString RewriteWithForwardedFromNode(const TString& response) {
+ NHttp::THttpParser<NHttp::THttpRequest, NHttp::TSocketBuffer> parser(response);
+
+ NHttp::THeadersBuilder headers(parser.Headers);
+ headers.Set("X-Forwarded-From-Node", TStringBuilder() << Event->Sender.NodeId());
+
+ NHttp::THttpRenderer<NHttp::THttpRequest, NHttp::TSocketBuffer> renderer;
+ renderer.InitRequest(parser.Method, parser.URL, parser.Protocol, parser.Version);
+ renderer.Set(headers);
+ if (parser.HaveBody()) {
+ renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea
+ }
+ renderer.Finish();
+ return renderer.AsString();
+ }
+
+ void Bootstrap() {
+ NHttp::THttpConfig::SocketAddressType address;
+ FromProto(address, Event->Get()->Record.GetAddress());
+ NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(RewriteWithForwardedFromNode(Event->Get()->Record.GetHttpRequest()), Endpoint, address);
+ TStringBuilder prefix;
+ prefix << "/node/" << TActivationContext::ActorSystem()->NodeId;
+ if (request->URL.SkipPrefix(prefix)) {
+ Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(std::move(request)));
+ Become(&THttpMonServiceNodeRequest::StateWork);
+ } else {
+ auto response = std::make_unique<TEvMon::TEvMonitoringResponse>();
+ auto httpResponse = request->CreateResponseBadRequest();
+ response->Record.SetHttpResponse(httpResponse->AsString());
+ Send(Event->Sender, response.release(), 0, Event->Cookie);
+ PassAway();
+ }
+ }
+
+ TString RewriteLocationWithNode(const TString& response) {
+ NHttp::THttpParser<NHttp::THttpResponse, NHttp::TSocketBuffer> parser(response);
+
+ NHttp::THeadersBuilder headers(parser.Headers);
+ headers.Set("Location", TStringBuilder() << "/node/" << TActivationContext::ActorSystem()->NodeId << headers["Location"]);
+
+ NHttp::THttpRenderer<NHttp::THttpResponse, NHttp::TSocketBuffer> renderer;
+ renderer.InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
+ renderer.Set(headers);
+ if (parser.HaveBody()) {
+ renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea
+ }
+ renderer.Finish();
+ return renderer.AsString();
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr& ev) {
+ TString httpResponse = ev->Get()->Response->AsString();
+ switch (FromStringWithDefault<int>(ev->Get()->Response->Status)) {
+ case 301:
+ case 303:
+ case 307:
+ case 308:
+ if (!NHttp::THeaders(ev->Get()->Response->Headers).Get("Location").starts_with("/node/")) {
+ httpResponse = RewriteLocationWithNode(httpResponse);
+ }
+ break;
+ }
+ auto response = std::make_unique<TEvMon::TEvMonitoringResponse>();
+ response->Record.SetHttpResponse(httpResponse);
+ Send(Event->Sender, response.release(), 0, Event->Cookie);
+ PassAway();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
+ }
+ }
+};
+
+class THttpMonServiceMonRequest : public TActorBootstrapped<THttpMonServiceMonRequest> {
+public:
+ NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
+ ui32 NodeId;
+
+ THttpMonServiceMonRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, ui32 nodeId)
+ : Event(std::move(event))
+ , NodeId(nodeId)
+ {}
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::HTTP_MON_SERVICE_MON_REQUEST;
+ }
+
+ static void ToProto(NKikimrMonProto::TSockAddr& proto, const NHttp::THttpConfig::SocketAddressType& address) {
+ if (address) {
+ switch (address->SockAddr()->sa_family) {
+ case AF_INET: {
+ proto.SetFamily(AF_INET);
+ sockaddr_in* addr = (sockaddr_in*)address->SockAddr();
+ char ip[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, (void*)&addr->sin_addr, ip, INET_ADDRSTRLEN);
+ proto.SetAddress(ip);
+ proto.SetPort(htons(addr->sin_port));
+ }
+ break;
+ case AF_INET6: {
+ proto.SetFamily(AF_INET6);
+ sockaddr_in6* addr = (sockaddr_in6*)address->SockAddr();
+ char ip6[INET6_ADDRSTRLEN];
+ inet_ntop(AF_INET6, (void*)&addr->sin6_addr, ip6, INET6_ADDRSTRLEN);
+ proto.SetAddress(ip6);
+ proto.SetPort(htons(addr->sin6_port));
+ }
+ break;
+ }
+ }
+ }
+
+ void Bootstrap() {
+ TActorId monServiceNodeProxy = MakeNodeProxyId(NodeId);
+ auto request = std::make_unique<TEvMon::TEvMonitoringRequest>();
+ request->Record.SetHttpRequest(Event->Get()->Request->AsString());
+ ToProto(*request->Record.MutableAddress(), Event->Get()->Request->Address);
+ Send(monServiceNodeProxy, request.release(), IEventHandle::FlagTrackDelivery);
+ Become(&THttpMonServiceMonRequest::StateWork);
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ TString reason;
+ switch (ev->Get()->Reason) {
+ case TEvents::TEvUndelivered::ReasonUnknown:
+ reason = "ReasonUnknown";
+ break;
+ case TEvents::TEvUndelivered::ReasonActorUnknown:
+ reason = "ReasonActorUnknown";
+ break;
+ case TEvents::TEvUndelivered::Disconnected:
+ reason = "Disconnected";
+ break;
+ }
+ Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(Event->Get()->Request->CreateResponseServiceUnavailable(reason)), 0, Event->Cookie);
+ PassAway();
+ }
+
+ void Handle(TEvMon::TEvMonitoringResponse::TPtr& ev) {
+ TString responseTxt = ev->Get()->Record.GetHttpResponse();
+ NHttp::THttpOutgoingResponsePtr responseObj = Event->Get()->Request->CreateResponseString(responseTxt);
+ if (responseObj->Status == "301" || responseObj->Status == "302") {
+ NHttp::THttpParser<NHttp::THttpResponse, NHttp::TSocketBuffer> parser(responseTxt);
+ NHttp::THeadersBuilder headers(parser.Headers);
+ if (headers["Location"].starts_with('/')) {
+ NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(Event->Get()->Request);
+ response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
+
+ headers.Set("Location", TStringBuilder() << "/node/" << NodeId << headers["Location"]);
+
+ response->Set(headers);
+ if (parser.HaveBody()) {
+ response->SetBody(parser.Body);
+ }
+ responseObj = response;
+ }
+ }
+
+ Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(responseObj.Release()), 0, Event->Cookie);
+ PassAway();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvMon::TEvMonitoringResponse, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ }
+ }
+};
+
+// receives requests to another nodes
+class THttpMonServiceNodeProxy : public TActor<THttpMonServiceNodeProxy> {
+public:
+ THttpMonServiceNodeProxy(TActorId httpProxyActorId)
+ : TActor(&THttpMonServiceNodeProxy::StateWork)
+ , HttpProxyActorId(httpProxyActorId)
+ , Endpoint(std::make_shared<NHttp::THttpEndpointInfo>())
+ {
+ }
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_PROXY;
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) {
+ TStringBuf url = ev->Get()->Request->URL;
+ TStringBuf node;
+ ui32 nodeId;
+ if (url.SkipPrefix("/node/") && url.NextTok('/', node) && TryFromStringWithDefault<ui32>(node, nodeId)) {
+ Register(new THttpMonServiceMonRequest(std::move(ev), nodeId));
+ return;
+ }
+ Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseBadRequest("bad request")), 0, ev->Cookie);
+ }
+
+ void Handle(TEvMon::TEvMonitoringRequest::TPtr& ev) {
+ Register(new THttpMonServiceNodeRequest(Endpoint, ev, HttpProxyActorId));
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ hFunc(TEvMon::TEvMonitoringRequest, Handle);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ }
+ }
+
+protected:
+ TActorId HttpProxyActorId;
+ std::shared_ptr<NHttp::THttpEndpointInfo> Endpoint;
+};
+
+// initializes http and waits for the result
+class THttpMonInitializator : public TActorBootstrapped<THttpMonInitializator> {
+public:
+ THttpMonInitializator(TActorId httpProxyActorId, std::unique_ptr<NHttp::TEvHttpProxy::TEvAddListeningPort> config, std::promise<void> promise)
+ : HttpProxyActorId(httpProxyActorId)
+ , Config(std::move(config))
+ , Promise(std::move(promise))
+ {
+ }
+
+ void Bootstrap() {
+ Send(HttpProxyActorId, Config.release());
+ Become(&THttpMonInitializator::StateWork);
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvConfirmListen::TPtr& ev) {
+ Promise.set_value();
+ PassAway();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvConfirmListen, Handle);
+ }
+ }
+
+protected:
+ TActorId HttpProxyActorId;
+ std::unique_ptr<NHttp::TEvHttpProxy::TEvAddListeningPort> Config;
+ std::promise<void> Promise;
+};
+
+TMon::TMon(TConfig config)
+ : Config(std::move(config))
+ , IndexMonPage(new NMonitoring::TIndexMonPage("", Config.Title))
+{
+}
+
+std::future<void> TMon::Start(TActorSystem* actorSystem) {
+ Y_ABORT_UNLESS(actorSystem);
+ TGuard<TMutex> g(Mutex);
+ ActorSystem = actorSystem;
+ Register(new TIndexRedirectMonPage(IndexMonPage));
+ Register(new NMonitoring::TVersionMonPage);
+ Register(new NMonitoring::TBootstrapCssMonPage);
+ Register(new NMonitoring::TTablesorterCssMonPage);
+ Register(new NMonitoring::TBootstrapJsMonPage);
+ Register(new NMonitoring::TJQueryJsMonPage);
+ Register(new NMonitoring::TTablesorterJsMonPage);
+ Register(new NMonitoring::TBootstrapFontsEotMonPage);
+ Register(new NMonitoring::TBootstrapFontsSvgMonPage);
+ Register(new NMonitoring::TBootstrapFontsTtfMonPage);
+ Register(new NMonitoring::TBootstrapFontsWoffMonPage);
+ NLwTraceMonPage::RegisterPages(IndexMonPage.Get());
+ NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(ACTORLIB_PROVIDER));
+ NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(MONITORING_PROVIDER));
+ HttpProxyActorId = ActorSystem->Register(
+ NHttp::CreateHttpProxy(),
+ TMailboxType::ReadAsFilled,
+ ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
+ HttpMonServiceActorId = ActorSystem->Register(
+ new THttpMonServiceLegacyIndex(IndexMonPage, Config.RedirectMainPageTo),
+ TMailboxType::ReadAsFilled,
+ ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
+ auto nodeProxyActorId = ActorSystem->Register(
+ new THttpMonServiceNodeProxy(HttpProxyActorId),
+ TMailboxType::ReadAsFilled,
+ ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
+ NodeProxyServiceActorId = MakeNodeProxyId(ActorSystem->NodeId);
+ ActorSystem->RegisterLocalService(NodeProxyServiceActorId, nodeProxyActorId);
+
+ TStringBuilder workerName;
+ workerName << FQDNHostName() << ":" << Config.Port;
+ auto addPort = std::make_unique<NHttp::TEvHttpProxy::TEvAddListeningPort>();
+ addPort->Port = Config.Port;
+ addPort->WorkerName = workerName;
+ addPort->Address = Config.Address;
+ addPort->CompressContentTypes = {
+ "text/plain",
+ "text/html",
+ "text/css",
+ "text/javascript",
+ "application/javascript",
+ "application/json",
+ "application/yaml",
+ };
+ addPort->SslCertificatePem = Config.Certificate;
+ addPort->Secure = !Config.Certificate.empty();
+ addPort->MaxRequestsPerSecond = Config.MaxRequestsPerSecond;
+
+ std::promise<void> promise;
+ std::future<void> future = promise.get_future();
+ ActorSystem->Register(new THttpMonInitializator(HttpProxyActorId, std::move(addPort), std::move(promise)));
+ ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/", HttpMonServiceActorId));
+ ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/node", NodeProxyServiceActorId));
+ for (auto& pageInfo : ActorMonPages) {
+ if (pageInfo.Page) {
+ RegisterActorMonPage(pageInfo);
+ } else if (pageInfo.Handler) {
+ ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, pageInfo.Handler));
+ }
+ }
+ ActorMonPages.clear();
+ return future;
+}
+
+void TMon::Stop() {
+ IndexMonPage->ClearPages(); // it's required to avoid loop-reference
+ if (ActorSystem) {
+ TGuard<TMutex> g(Mutex);
+ for (const auto& [path, actorId] : ActorServices) {
+ ActorSystem->Send(actorId, new TEvents::TEvPoisonPill);
+ }
+ ActorSystem->Send(NodeProxyServiceActorId, new TEvents::TEvPoisonPill);
+ ActorSystem->Send(HttpMonServiceActorId, new TEvents::TEvPoisonPill);
+ ActorSystem->Send(HttpProxyActorId, new TEvents::TEvPoisonPill);
+ ActorSystem = nullptr;
+ }
+}
+
+void TMon::Register(NMonitoring::IMonPage* page) {
+ IndexMonPage->Register(page);
+}
+
+NMonitoring::TIndexMonPage* TMon::RegisterIndexPage(const TString& path, const TString& title) {
+ auto page = IndexMonPage->RegisterIndexPage(path, title);
+ IndexMonPage->SortPages();
+ return page;
+}
+
+void TMon::RegisterActorMonPage(const TActorMonPageInfo& pageInfo) {
+ if (ActorSystem) {
+ TActorMonPage* actorMonPage = static_cast<TActorMonPage*>(pageInfo.Page.Get());
+ auto& actorId = ActorServices[pageInfo.Path];
+ if (actorId) {
+ ActorSystem->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
+ }
+ actorId = ActorSystem->Register(
+ new THttpMonServiceLegacyActor(actorMonPage),
+ TMailboxType::ReadAsFilled,
+ ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
+ ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, actorId));
+ }
+}
+
+NMonitoring::IMonPage* TMon::RegisterActorPage(TRegisterActorPageFields fields) {
+ TGuard<TMutex> g(Mutex);
+ NMonitoring::TMonPagePtr page = new TActorMonPage(
+ fields.RelPath,
+ fields.Title,
+ Config.Host,
+ fields.PreTag,
+ fields.ActorSystem,
+ fields.ActorId,
+ fields.AllowedSIDs ? fields.AllowedSIDs : Config.AllowedSIDs,
+ fields.UseAuth ? Config.Authorizer : TRequestAuthorizer(),
+ fields.MonServiceName);
+ if (fields.Index) {
+ fields.Index->Register(page);
+ if (fields.SortPages) {
+ fields.Index->SortPages();
+ }
+ } else {
+ Register(page.Get());
+ }
+
+ TActorMonPageInfo pageInfo = {
+ .Page = page,
+ .Path = GetPageFullPath(page.Get()),
+ };
+
+ if (ActorSystem && HttpProxyActorId) {
+ RegisterActorMonPage(pageInfo);
+ } else {
+ ActorMonPages.emplace_back(pageInfo);
+ }
+
+ return page.Get();
+}
+
+NMonitoring::IMonPage* TMon::RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
+ TDynamicCountersPage* page = new TDynamicCountersPage(path, title, counters);
+ page->SetUnknownGroupPolicy(EUnknownGroupPolicy::Ignore);
+ Register(page);
+ return page;
+}
+
+void TMon::RegisterHandler(const TString& path, const TActorId& handler) {
+ if (ActorSystem) {
+ ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(path, handler));
+ } else {
+ TGuard<TMutex> g(Mutex);
+ ActorMonPages.emplace_back(TActorMonPageInfo{
+ .Handler = handler,
+ .Path = path,
+ });
+ }
+}
+
+NMonitoring::IMonPage* TMon::FindPage(const TString& relPath) {
+ return IndexMonPage->FindPage(relPath);
+}
+
}
diff --git a/ydb/core/mon/mon.h b/ydb/core/mon/mon.h
index a7781cd7c77..0fcbecf004b 100644
--- a/ydb/core/mon/mon.h
+++ b/ydb/core/mon/mon.h
@@ -1,29 +1,26 @@
#pragma once
-#include <library/cpp/json/writer/json_value.h>
+#include <future>
#include <library/cpp/monlib/service/monservice.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/service/pages/index_mon_page.h>
#include <library/cpp/monlib/service/pages/resources/css_mon_page.h>
#include <library/cpp/monlib/service/pages/resources/fonts_mon_page.h>
#include <library/cpp/monlib/service/pages/resources/js_mon_page.h>
#include <library/cpp/monlib/service/pages/tablesorter/css_mon_page.h>
#include <library/cpp/monlib/service/pages/tablesorter/js_mon_page.h>
-#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/mon.h>
+#include <ydb/library/actors/http/http.h>
#include <yql/essentials/public/issue/yql_issue.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
-namespace NActors {
+#include "mon.h"
-IEventHandle* SelectAuthorizationScheme(const NActors::TActorId& owner, NMonitoring::IMonHttpRequest& request);
-IEventHandle* GetAuthorizeTicketResult(const NActors::TActorId& owner);
+namespace NActors {
-void MakeJsonErrorReply(NJson::TJsonValue& jsonResponse, TString& message, const NYql::TIssues& issues, NYdb::EStatus status);
void MakeJsonErrorReply(NJson::TJsonValue& jsonResponse, TString& message, const NYdb::TStatus& status);
-
-class TActorSystem;
-struct TActorId;
+void MakeJsonErrorReply(NJson::TJsonValue& jsonResponse, TString& message, const NYql::TIssues& issues, NYdb::EStatus status);
class TMon {
public:
@@ -45,12 +42,14 @@ public:
TDuration InactivityTimeout = TDuration::Minutes(2);
};
+ TMon(TConfig config);
virtual ~TMon() = default;
- virtual void Start(TActorSystem* actorSystem = {}) = 0;
- virtual void Stop() = 0;
- virtual void Register(NMonitoring::IMonPage* page) = 0;
- virtual NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title) = 0;
+ std::future<void> Start(TActorSystem* actorSystem); // signals when monitoring is ready
+ void Stop();
+
+ void Register(NMonitoring::IMonPage* page);
+ NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title);
struct TRegisterActorPageFields {
TString Title;
@@ -65,12 +64,32 @@ public:
TString MonServiceName = "utils";
};
- virtual NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields) = 0;
+ NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields);
NMonitoring::IMonPage* RegisterActorPage(NMonitoring::TIndexMonPage* index, const TString& relPath,
const TString& title, bool preTag, TActorSystem* actorSystem, const TActorId& actorId, bool useAuth = true, bool sortPages = true);
- virtual NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) = 0;
- virtual NMonitoring::IMonPage* FindPage(const TString& relPath) = 0;
- virtual void RegisterHandler(const TString& path, const TActorId& handler) = 0;
+ NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters);
+ NMonitoring::IMonPage* FindPage(const TString& relPath);
+ void RegisterHandler(const TString& path, const TActorId& handler);
+
+protected:
+ TConfig Config;
+ TIntrusivePtr<NMonitoring::TIndexMonPage> IndexMonPage;
+ TActorSystem* ActorSystem = {};
+ TActorId HttpProxyActorId;
+ TActorId HttpMonServiceActorId;
+ TActorId NodeProxyServiceActorId;
+
+ struct TActorMonPageInfo {
+ NMonitoring::TMonPagePtr Page;
+ TActorId Handler;
+ TString Path;
+ };
+
+ TMutex Mutex;
+ std::vector<TActorMonPageInfo> ActorMonPages;
+ THashMap<TString, TActorId> ActorServices;
+
+ void RegisterActorMonPage(const TActorMonPageInfo& pageInfo);
};
} // NActors
diff --git a/ydb/core/mon/sync_http_mon.cpp b/ydb/core/mon/sync_http_mon.cpp
deleted file mode 100644
index f7ae9159df2..00000000000
--- a/ydb/core/mon/sync_http_mon.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-#include "sync_http_mon.h"
-
-#include <ydb/library/actors/core/actorsystem.h>
-#include <ydb/library/actors/core/hfunc.h>
-#include <ydb/library/actors/core/mon.h>
-#include <ydb/library/actors/core/probes.h>
-#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
-#include <library/cpp/mime/types/mime.h>
-#include <library/cpp/monlib/service/pages/version_mon_page.h>
-#include <library/cpp/monlib/service/pages/mon_page.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/dynamic_counters/page.h>
-#include <library/cpp/threading/future/future.h>
-#include <library/cpp/string_utils/url/url.h>
-#include <util/system/event.h>
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/base/monitoring_provider.h>
-#include <ydb/core/base/ticket_parser.h>
-
-#include "mon_impl.h"
-
-namespace NActors {
-
- ////////////////////////////////////////////////////////////////////////////////
- // TMON CLASS
- ////////////////////////////////////////////////////////////////////////////////
- TSyncHttpMon::TSyncHttpMon(TSyncHttpMon::TConfig config)
- : TBase(config.Port, config.Address, config.Threads, config.Title)
- , Config(std::move(config))
- {
- }
-
- TSyncHttpMon::~TSyncHttpMon() {
- Stop();
- }
-
- void TSyncHttpMon::Start(TActorSystem*) {
- TBase::Register(new TIndexRedirectMonPage(IndexMonPage));
- TBase::Register(new NMonitoring::TVersionMonPage);
- TBase::Register(new NMonitoring::TBootstrapCssMonPage);
- TBase::Register(new NMonitoring::TTablesorterCssMonPage);
- TBase::Register(new NMonitoring::TBootstrapJsMonPage);
- TBase::Register(new NMonitoring::TJQueryJsMonPage);
- TBase::Register(new NMonitoring::TTablesorterJsMonPage);
- TBase::Register(new NMonitoring::TBootstrapFontsEotMonPage);
- TBase::Register(new NMonitoring::TBootstrapFontsSvgMonPage);
- TBase::Register(new NMonitoring::TBootstrapFontsTtfMonPage);
- TBase::Register(new NMonitoring::TBootstrapFontsWoffMonPage);
-
- NLwTraceMonPage::RegisterPages(IndexMonPage.Get());
- NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(ACTORLIB_PROVIDER));
- NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(MONITORING_PROVIDER));
- TBase::Start();
- }
-
- void TSyncHttpMon::Stop() {
- IndexMonPage->ClearPages(); // it's required to avoid loop-reference
- TBase::Stop();
- }
-
- void TSyncHttpMon::Register(NMonitoring::IMonPage* page) {
- TBase::Register(page);
- TBase::SortPages();
- }
-
- TIndexMonPage* TSyncHttpMon::RegisterIndexPage(const TString& path, const TString& title) {
- auto page = TBase::RegisterIndexPage(path, title);
- TBase::SortPages();
- return page;
- }
-
- IMonPage* TSyncHttpMon::RegisterActorPage(TMon::TRegisterActorPageFields fields) {
- IMonPage* page = new TActorMonPage(
- fields.RelPath,
- fields.Title,
- Config.Host,
- fields.PreTag,
- fields.ActorSystem,
- fields.ActorId,
- fields.AllowedSIDs ? fields.AllowedSIDs : Config.AllowedSIDs,
- fields.UseAuth ? Config.Authorizer : TRequestAuthorizer(),
- fields.MonServiceName);
- if (fields.Index) {
- fields.Index->Register(page);
- if (fields.SortPages) {
- fields.Index->SortPages();
- }
- } else {
- Register(page);
- }
-
- return page;
- }
-
- IMonPage* TSyncHttpMon::RegisterCountersPage(const TString &path, const TString &title, TIntrusivePtr<TDynamicCounters> counters) {
- TDynamicCountersPage* page = new TDynamicCountersPage(path, title, counters);
- page->SetUnknownGroupPolicy(EUnknownGroupPolicy::Ignore);
- Register(page);
- return page;
- }
-
- void TSyncHttpMon::OutputIndexPage(IOutputStream& out) {
- if (Config.RedirectMainPageTo) {
- // XXX manual http response construction
- out << "HTTP/1.1 302 Found\r\n"
- << "Location: " << Config.RedirectMainPageTo << "\r\n"
- << "Connection: Close\r\n\r\n";
- } else {
- NMonitoring::TMonService2::OutputIndexPage(out);
- }
- }
-
- IMonPage* TSyncHttpMon::FindPage(const TString& relPath) {
- return TBase::FindPage(relPath);
- }
-
- void TSyncHttpMon::RegisterHandler(const TString& path, const TActorId& handler) {
- ALOG_ERROR(NActorsServices::HTTP, "Cannot register actor handler " << handler << " in sync mon for " << path);
- }
-} // NActors
diff --git a/ydb/core/mon/sync_http_mon.h b/ydb/core/mon/sync_http_mon.h
deleted file mode 100644
index 5634ca58226..00000000000
--- a/ydb/core/mon/sync_http_mon.h
+++ /dev/null
@@ -1,37 +0,0 @@
-#pragma once
-
-#include <library/cpp/monlib/service/monservice.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/service/pages/resources/css_mon_page.h>
-#include <library/cpp/monlib/service/pages/resources/fonts_mon_page.h>
-#include <library/cpp/monlib/service/pages/resources/js_mon_page.h>
-#include <library/cpp/monlib/service/pages/tablesorter/css_mon_page.h>
-#include <library/cpp/monlib/service/pages/tablesorter/js_mon_page.h>
-
-#include <ydb/library/actors/core/mon.h>
-
-#include "mon.h"
-
-namespace NActors {
-
-class TSyncHttpMon : public TMon, public NMonitoring::TMonService2 {
-public:
- TSyncHttpMon(TConfig config);
- virtual ~TSyncHttpMon();
- void Start(TActorSystem* actorSystem = {}) override;
- void Stop() override;
-
- void Register(NMonitoring::IMonPage *page) override;
- NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title) override;
- NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields) override;
- NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) override;
- void OutputIndexPage(IOutputStream& out) override;
- NMonitoring::IMonPage* FindPage(const TString& relPath) override;
- void RegisterHandler(const TString& path, const TActorId& handler) override;
-
-protected:
- typedef NMonitoring::TMonService2 TBase;
- TConfig Config;
-};
-
-} // NActors
diff --git a/ydb/core/mon/ya.make b/ydb/core/mon/ya.make
index 8d9d3e58a3c..9edde6bd845 100644
--- a/ydb/core/mon/ya.make
+++ b/ydb/core/mon/ya.make
@@ -1,12 +1,8 @@
LIBRARY()
SRCS(
- async_http_mon.cpp
- async_http_mon.h
mon.cpp
mon.h
- sync_http_mon.cpp
- sync_http_mon.h
crossref.cpp
crossref.h
)
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index a2d38248a42..a8f20cb2056 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -2,7 +2,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/env.h>
#include <ydb/core/base/tablet_pipecache.h>
-#include <ydb/core/mon/sync_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
#include <ydb/core/persqueue/percentile_counter.h>
#include <ydb/core/persqueue/partition.h>
diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto
index d035ca163ac..a97d0d80ee3 100644
--- a/ydb/core/protos/feature_flags.proto
+++ b/ydb/core/protos/feature_flags.proto
@@ -83,7 +83,7 @@ message TFeatureFlags {
optional bool EnableImplicitScanQueryInScripts = 61 [default = true];
reserved 62; // EnablePredicateExtractForScanQueries
optional bool AllowVDiskDefrag = 63 [default = true];
- optional bool EnableAsyncHttpMon = 64 [default = true];
+ optional bool EnableAsyncHttpMon = 64 [default = true]; // deprecated: always true
optional bool EnableChangefeeds = 65 [default = true];
reserved 66; // EnableKqpScanQueryStreamLookup
optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false];
diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp
index fa9c522ca3b..92a4666a748 100644
--- a/ydb/core/testlib/actors/test_runtime.cpp
+++ b/ydb/core/testlib/actors/test_runtime.cpp
@@ -4,8 +4,7 @@
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/base/counters.h>
#include <ydb/core/base/pool_stats_collector.h>
-#include <ydb/core/mon/sync_http_mon.h>
-#include <ydb/core/mon/async_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/mon_alloc/profiler.h>
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/tablet/tablet_impl.h>
@@ -193,19 +192,11 @@ namespace NActors {
if (NeedMonitoring && !SingleSysEnv) {
ui16 port = MonitoringPortOffset ? MonitoringPortOffset + nodeIndex : GetPortManager().GetPort();
- if (MonitoringTypeAsync) {
- node->Mon.Reset(new NActors::TAsyncHttpMon({
- .Port = port,
- .Threads = 10,
- .Title = "KIKIMR monitoring"
- }));
- } else {
- node->Mon.Reset(new NActors::TSyncHttpMon({
- .Port = port,
- .Threads = 10,
- .Title = "KIKIMR monitoring"
- }));
- }
+ node->Mon.Reset(new NActors::TMon({
+ .Port = port,
+ .Threads = 10,
+ .Title = "KIKIMR monitoring"
+ }));
nodeAppData->Mon = node->Mon.Get();
node->Mon->RegisterCountersPage("counters", "Counters", node->DynamicCounters);
auto actorsMonPage = node->Mon->RegisterIndexPage("actors", "Actors");
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index 1c2d05b5b9e..5705af7ba05 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -6,7 +6,7 @@
#include <ydb/core/testlib/test_pq_client.h>
#include <ydb/core/persqueue/cluster_tracker.h>
-#include <ydb/core/mon/sync_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/tablet/tablet_counters_aggregator.h>
#include <ydb/library/persqueue/obfuscate/obfuscate.h>
@@ -370,7 +370,7 @@ namespace NKikimr::NPersQueueTests {
const auto monPort = TPortManager().GetPort();
auto Counters = server.CleverServer->GetGRpcServerRootCounters();
- NActors::TSyncHttpMon Monitoring({
+ NActors::TMon Monitoring({
.Port = monPort,
.Address = "localhost",
.Threads = 3,
@@ -378,7 +378,7 @@ namespace NKikimr::NPersQueueTests {
.Host = "localhost",
});
Monitoring.RegisterCountersPage("counters", "Counters", Counters);
- Monitoring.Start();
+ Monitoring.Start(server.CleverServer->GetRuntime()->GetAnyNodeActorSystem());
auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg);
auto persQueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver);
@@ -562,7 +562,7 @@ namespace NKikimr::NPersQueueTests {
const auto monPort = TPortManager().GetPort();
auto Counters = server.CleverServer->GetGRpcServerRootCounters();
- NActors::TSyncHttpMon Monitoring({
+ NActors::TMon Monitoring({
.Port = monPort,
.Address = "localhost",
.Threads = 3,
@@ -570,7 +570,7 @@ namespace NKikimr::NPersQueueTests {
.Host = "localhost",
});
Monitoring.RegisterCountersPage("counters", "Counters", Counters);
- Monitoring.Start();
+ Monitoring.Start(server.CleverServer->GetRuntime()->GetAnyNodeActorSystem());
auto driverCfg = NYdb::TDriverConfig()
.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort)
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 7b1c98fd41c..3c24074969c 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -9,7 +9,7 @@
#include <ydb/services/persqueue_v1/ut/functions_executor_wrapper.h>
#include <ydb/core/base/appdata.h>
-#include <ydb/core/mon/sync_http_mon.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/core/testlib/test_pq_client.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/core/persqueue/cluster_tracker.h>
@@ -4012,7 +4012,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
const auto monPort = TPortManager().GetPort();
auto Counters = server.CleverServer->GetGRpcServerRootCounters();
- NActors::TSyncHttpMon Monitoring({
+ NActors::TMon Monitoring({
.Port = monPort,
.Address = "localhost",
.Threads = 3,
@@ -4020,7 +4020,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
.Host = "localhost",
});
Monitoring.RegisterCountersPage("counters", "Counters", Counters);
- Monitoring.Start();
+ Monitoring.Start(server.CleverServer->GetRuntime()->SingleSys());
server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::NET_CLASSIFIER });
server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_ERROR);