diff options
| author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
|---|---|---|
| committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
| commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
| tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/interconnect/interconnect_mon.cpp | |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_mon.cpp')
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_mon.cpp | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_mon.cpp b/library/cpp/actors/interconnect/interconnect_mon.cpp new file mode 100644 index 00000000000..cf924ccbf9d --- /dev/null +++ b/library/cpp/actors/interconnect/interconnect_mon.cpp @@ -0,0 +1,276 @@ +#include "interconnect_mon.h" +#include "interconnect_tcp_proxy.h" + +#include <library/cpp/json/json_value.h> +#include <library/cpp/json/json_writer.h> +#include <library/cpp/monlib/service/pages/templates.h> + +#include <openssl/ssl.h> +#include <openssl/pem.h> + +namespace NInterconnect { + + using namespace NActors; + + class TInterconnectMonActor : public TActor<TInterconnectMonActor> { + class TQueryProcessor : public TActorBootstrapped<TQueryProcessor> { + const TActorId Sender; + const bool Json; + TMap<ui32, TInterconnectProxyTCP::TProxyStats> Stats; + ui32 PendingReplies = 0; + + public: + static constexpr IActor::EActorActivity ActorActivityType() { + return INTERCONNECT_MONACTOR; + } + + TQueryProcessor(const TActorId& sender, bool json) + : Sender(sender) + , Json(json) + {} + + void Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateFunc, ctx, TDuration::Seconds(5), new TEvents::TEvWakeup); + Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes); + } + + void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev, const TActorContext& ctx) { + TActorSystem* const as = ctx.ExecutorThread.ActorSystem; + for (const auto& node : ev->Get()->Nodes) { + Send(as->InterconnectProxy(node.NodeId), new TInterconnectProxyTCP::TEvQueryStats, IEventHandle::FlagTrackDelivery); + ++PendingReplies; + } + GenerateResultWhenReady(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvInterconnect::TEvNodesInfo, Handle) + HFunc(TInterconnectProxyTCP::TEvStats, Handle) + CFunc(TEvents::TSystem::Undelivered, HandleUndelivered) + CFunc(TEvents::TSystem::Wakeup, HandleWakeup) + ) + + void Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx) { + auto *msg = ev->Get(); + Stats.emplace(msg->PeerNodeId, std::move(msg->ProxyStats)); + --PendingReplies; + GenerateResultWhenReady(ctx); + } + + void HandleUndelivered(const TActorContext& ctx) { + --PendingReplies; + GenerateResultWhenReady(ctx); + } + + void HandleWakeup(const TActorContext& ctx) { + PendingReplies = 0; + GenerateResultWhenReady(ctx); + } + + void GenerateResultWhenReady(const TActorContext& ctx) { + if (!PendingReplies) { + if (Json) { + ctx.Send(Sender, new NMon::TEvHttpInfoRes(GenerateJson(), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + } else { + ctx.Send(Sender, new NMon::TEvHttpInfoRes(GenerateHtml())); + } + Die(ctx); + } + } + + TString GenerateHtml() { + TStringStream str; + HTML(str) { + TABLE_CLASS("table-sortable table") { + TABLEHEAD() { + TABLER() { + TABLEH() { str << "Peer node id"; } + TABLEH() { str << "State"; } + TABLEH() { str << "Ping"; } + TABLEH() { str << "Clock skew"; } + TABLEH() { str << "Scope id"; } + TABLEH() { str << "Encryption"; } + TABLEH() { str << "LastSessionDieTime"; } + TABLEH() { str << "TotalOutputQueueSize"; } + TABLEH() { str << "Connected"; } + TABLEH() { str << "Host"; } + TABLEH() { str << "Port"; } + TABLEH() { str << "LastErrorTimestamp"; } + TABLEH() { str << "LastErrorKind"; } + TABLEH() { str << "LastErrorExplanation"; } + } + } + TABLEBODY() { + for (const auto& kv : Stats) { + TABLER() { + TABLED() { str << "<a href='" << kv.second.Path << "'>" << kv.first << "</a>"; } + TABLED() { str << kv.second.State; } + TABLED() { + if (kv.second.Ping != TDuration::Zero()) { + str << kv.second.Ping; + } + } + TABLED() { + if (kv.second.ClockSkew < 0) { + str << "-" << TDuration::MicroSeconds(-kv.second.ClockSkew); + } else { + str << "+" << TDuration::MicroSeconds(kv.second.ClockSkew); + } + } + TABLED() { str << ScopeIdToString(kv.second.PeerScopeId); } + TABLED() { + const char *color = kv.second.Encryption != "none" ? "green" : "red"; + str << "<font color='" << color << "'>" << kv.second.Encryption << "</font>"; + } + TABLED() { + if (kv.second.LastSessionDieTime != TInstant::Zero()) { + str << kv.second.LastSessionDieTime; + } + } + TABLED() { str << kv.second.TotalOutputQueueSize; } + TABLED() { str << (kv.second.Connected ? "yes" : "<strong>no</strong>"); } + TABLED() { str << kv.second.Host; } + TABLED() { str << kv.second.Port; } + TABLED() { + str << "<strong>"; + if (kv.second.LastErrorTimestamp != TInstant::Zero()) { + str << kv.second.LastErrorTimestamp; + } + str << "</strong>"; + } + TABLED() { str << "<strong>" << kv.second.LastErrorKind << "</strong>"; } + TABLED() { str << "<strong>" << kv.second.LastErrorExplanation << "</strong>"; } + } + } + } + } + } + return str.Str(); + } + + TString GenerateJson() { + NJson::TJsonValue json; + for (const auto& [nodeId, info] : Stats) { + NJson::TJsonValue item; + item["NodeId"] = nodeId; + + auto id = [](const auto& x) { return x; }; + auto toString = [](const auto& x) { return x.ToString(); }; + +#define JSON(NAME, FUN) item[#NAME] = FUN(info.NAME); + JSON(Path, id) + JSON(State, id) + JSON(PeerScopeId, ScopeIdToString) + JSON(LastSessionDieTime, toString) + JSON(TotalOutputQueueSize, id) + JSON(Connected, id) + JSON(Host, id) + JSON(Port, id) + JSON(LastErrorTimestamp, toString) + JSON(LastErrorKind, id) + JSON(LastErrorExplanation, id) + JSON(Ping, toString) + JSON(ClockSkew, id) + JSON(Encryption, id) +#undef JSON + + json[ToString(nodeId)] = item; + } + TStringStream str(NMonitoring::HTTPOKJSON); + NJson::WriteJson(&str, &json); + return str.Str(); + } + }; + + private: + TIntrusivePtr<TInterconnectProxyCommon> Common; + + public: + static constexpr IActor::EActorActivity ActorActivityType() { + return INTERCONNECT_MONACTOR; + } + + TInterconnectMonActor(TIntrusivePtr<TInterconnectProxyCommon> common) + : TActor(&TThis::StateFunc) + , Common(std::move(common)) + {} + + STRICT_STFUNC(StateFunc, + HFunc(NMon::TEvHttpInfo, Handle) + ) + + void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { + const auto& params = ev->Get()->Request.GetParams(); + int certinfo = 0; + if (TryFromString(params.Get("certinfo"), certinfo) && certinfo) { + ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(GetCertInfoJson(), ev->Get()->SubRequestId, + NMon::TEvHttpInfoRes::Custom)); + } else { + const bool json = params.Has("fmt") && params.Get("fmt") == "json"; + ctx.Register(new TQueryProcessor(ev->Sender, json)); + } + } + + TString GetCertInfoJson() const { + NJson::TJsonValue json(NJson::JSON_MAP); + if (const TString cert = Common ? Common->Settings.Certificate : TString()) { + struct TEx : yexception {}; + try { + const auto& cert = Common->Settings.Certificate; + std::unique_ptr<BIO, void(*)(BIO*)> bio(BIO_new_mem_buf(cert.data(), cert.size()), &BIO_vfree); + if (!bio) { + throw TEx() << "BIO_new_mem_buf failed"; + } + std::unique_ptr<X509, void(*)(X509*)> x509(PEM_read_bio_X509(bio.get(), nullptr, nullptr, nullptr), + &X509_free); + if (!x509) { + throw TEx() << "PEM_read_bio_X509 failed"; + } + X509_NAME *name = X509_get_subject_name(x509.get()); + if (!name) { + throw TEx() << "X509_get_subject_name failed"; + } + char buffer[4096]; + if (char *p = X509_NAME_oneline(name, buffer, sizeof(buffer))) { + json["Subject"] = p; + } + if (int loc = X509_NAME_get_index_by_NID(name, NID_commonName, -1); loc >= 0) { + if (X509_NAME_ENTRY *entry = X509_NAME_get_entry(name, loc)) { + if (ASN1_STRING *data = X509_NAME_ENTRY_get_data(entry)) { + unsigned char *cn; + if (const int len = ASN1_STRING_to_UTF8(&cn, data); len >= 0) { + json["CommonName"] = TString(reinterpret_cast<char*>(cn), len); + OPENSSL_free(cn); + } + } + } + } + auto time = [](const ASN1_TIME *t, const char *name) -> TString { + if (t) { + struct tm tm; + if (ASN1_TIME_to_tm(t, &tm)) { + return Strftime("%Y-%m-%dT%H:%M:%S%z", &tm); + } else { + throw TEx() << "ASN1_TIME_to_tm failed"; + } + } else { + throw TEx() << name << " failed"; + } + }; + json["NotBefore"] = time(X509_get0_notBefore(x509.get()), "X509_get0_notBefore"); + json["NotAfter"] = time(X509_get0_notAfter(x509.get()), "X509_get0_notAfter"); + } catch (const TEx& ex) { + json["Error"] = ex.what(); + } + } + TStringStream str(NMonitoring::HTTPOKJSON); + NJson::WriteJson(&str, &json); + return str.Str(); + } + }; + + IActor *CreateInterconnectMonActor(TIntrusivePtr<TInterconnectProxyCommon> common) { + return new TInterconnectMonActor(std::move(common)); + } + +} // NInterconnect |
