diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/www/www.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/www/www.cpp')
-rw-r--r-- | library/cpp/messagebus/www/www.cpp | 930 |
1 files changed, 930 insertions, 0 deletions
diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp new file mode 100644 index 0000000000..62ec241d85 --- /dev/null +++ b/library/cpp/messagebus/www/www.cpp @@ -0,0 +1,930 @@ +#include "www.h" + +#include "concat_strings.h" +#include "html_output.h" + +#include <library/cpp/messagebus/remote_connection_status.h> +#include <library/cpp/monlib/deprecated/json/writer.h> + +#include <library/cpp/archive/yarchive.h> +#include <library/cpp/http/fetch/httpfsm.h> +#include <library/cpp/http/fetch/httpheader.h> +#include <library/cpp/http/server/http.h> +#include <library/cpp/json/writer/json.h> +#include <library/cpp/uri/http_url.h> + +#include <util/string/cast.h> +#include <util/string/printf.h> +#include <util/system/mutex.h> + +#include <utility> + +using namespace NBus; +using namespace NBus::NPrivate; +using namespace NActor; +using namespace NActor::NPrivate; + +static const char HTTP_OK_JS[] = "HTTP/1.1 200 Ok\r\nContent-Type: text/javascript\r\nConnection: Close\r\n\r\n"; +static const char HTTP_OK_JSON[] = "HTTP/1.1 200 Ok\r\nContent-Type: application/json; charset=utf-8\r\nConnection: Close\r\n\r\n"; +static const char HTTP_OK_PNG[] = "HTTP/1.1 200 Ok\r\nContent-Type: image/png\r\nConnection: Close\r\n\r\n"; +static const char HTTP_OK_BIN[] = "HTTP/1.1 200 Ok\r\nContent-Type: application/octet-stream\r\nConnection: Close\r\n\r\n"; +static const char HTTP_OK_HTML[] = "HTTP/1.1 200 Ok\r\nContent-Type: text/html; charset=utf-8\r\nConnection: Close\r\n\r\n"; + +namespace { + typedef TIntrusivePtr<TBusModuleInternal> TBusModuleInternalPtr; + + template <typename TValuePtr> + struct TNamedValues { + TVector<std::pair<TString, TValuePtr>> Entries; + + TValuePtr FindByName(TStringBuf name) { + Y_VERIFY(!!name); + + for (unsigned i = 0; i < Entries.size(); ++i) { + if (Entries[i].first == name) { + return Entries[i].second; + } + } + return TValuePtr(); + } + + TString FindNameByPtr(TValuePtr value) { + Y_VERIFY(!!value); + + for (unsigned i = 0; i < Entries.size(); ++i) { + if (Entries[i].second.Get() == value.Get()) { + return Entries[i].first; + } + } + + Y_FAIL("unregistered"); + } + + void Add(TValuePtr p) { + Y_VERIFY(!!p); + + // Do not add twice + for (unsigned i = 0; i < Entries.size(); ++i) { + if (Entries[i].second.Get() == p.Get()) { + return; + } + } + + if (!!p->GetNameInternal()) { + TValuePtr current = FindByName(p->GetNameInternal()); + + if (!current) { + Entries.emplace_back(p->GetNameInternal(), p); + return; + } + } + + for (unsigned i = 1;; ++i) { + TString prefix = p->GetNameInternal(); + if (!prefix) { + prefix = "unnamed"; + } + TString name = ConcatStrings(prefix, "-", i); + + TValuePtr current = FindByName(name); + + if (!current) { + Entries.emplace_back(name, p); + return; + } + } + } + + size_t size() const { + return Entries.size(); + } + + bool operator!() const { + return size() == 0; + } + }; + + template <typename TSessionPtr> + struct TSessionValues: public TNamedValues<TSessionPtr> { + typedef TNamedValues<TSessionPtr> TBase; + + TVector<TString> GetNamesForQueue(TBusMessageQueue* queue) { + TVector<TString> r; + for (unsigned i = 0; i < TBase::size(); ++i) { + if (TBase::Entries[i].second->GetQueue() == queue) { + r.push_back(TBase::Entries[i].first); + } + } + return r; + } + }; +} + +namespace { + TString RootHref() { + return ConcatStrings("?"); + } + + TString QueueHref(TStringBuf name) { + return ConcatStrings("?q=", name); + } + + TString ServerSessionHref(TStringBuf name) { + return ConcatStrings("?ss=", name); + } + + TString ClientSessionHref(TStringBuf name) { + return ConcatStrings("?cs=", name); + } + + TString OldModuleHref(TStringBuf name) { + return ConcatStrings("?om=", name); + } + + /* + static void RootLink() { + A(RootHref(), "root"); + } + */ + + void QueueLink(TStringBuf name) { + A(QueueHref(name), name); + } + + void ServerSessionLink(TStringBuf name) { + A(ServerSessionHref(name), name); + } + + void ClientSessionLink(TStringBuf name) { + A(ClientSessionHref(name), name); + } + + void OldModuleLink(TStringBuf name) { + A(OldModuleHref(name), name); + } + +} + +const unsigned char WWW_STATIC_DATA[] = { +#include "www_static.inc" +}; + +class TWwwStaticLoader: public TArchiveReader { +public: + TWwwStaticLoader() + : TArchiveReader(TBlob::NoCopy(WWW_STATIC_DATA, sizeof(WWW_STATIC_DATA))) + { + } +}; + +struct TBusWww::TImpl { + // TODO: use weak pointers + TNamedValues<TBusMessageQueuePtr> Queues; + TSessionValues<TIntrusivePtr<TBusClientSession>> ClientSessions; + TSessionValues<TIntrusivePtr<TBusServerSession>> ServerSessions; + TSessionValues<TBusModuleInternalPtr> Modules; + + TMutex Mutex; + + void RegisterClientSession(TBusClientSessionPtr s) { + Y_VERIFY(!!s); + TGuard<TMutex> g(Mutex); + ClientSessions.Add(s.Get()); + Queues.Add(s->GetQueue()); + } + + void RegisterServerSession(TBusServerSessionPtr s) { + Y_VERIFY(!!s); + TGuard<TMutex> g(Mutex); + ServerSessions.Add(s.Get()); + Queues.Add(s->GetQueue()); + } + + void RegisterQueue(TBusMessageQueuePtr q) { + Y_VERIFY(!!q); + TGuard<TMutex> g(Mutex); + Queues.Add(q); + } + + void RegisterModule(TBusModule* module) { + Y_VERIFY(!!module); + TGuard<TMutex> g(Mutex); + + { + TVector<TBusClientSessionPtr> clientSessions = module->GetInternal()->GetClientSessionsInternal(); + for (unsigned i = 0; i < clientSessions.size(); ++i) { + RegisterClientSession(clientSessions[i]); + } + } + + { + TVector<TBusServerSessionPtr> serverSessions = module->GetInternal()->GetServerSessionsInternal(); + for (unsigned i = 0; i < serverSessions.size(); ++i) { + RegisterServerSession(serverSessions[i]); + } + } + + Queues.Add(module->GetInternal()->GetQueue()); + Modules.Add(module->GetInternal()); + } + + TString FindQueueNameBySessionName(TStringBuf sessionName, bool client) { + TIntrusivePtr<TBusClientSession> clientSession; + TIntrusivePtr<TBusServerSession> serverSession; + TBusSession* session; + if (client) { + clientSession = ClientSessions.FindByName(sessionName); + session = clientSession.Get(); + } else { + serverSession = ServerSessions.FindByName(sessionName); + session = serverSession.Get(); + } + Y_VERIFY(!!session); + return Queues.FindNameByPtr(session->GetQueue()); + } + + struct TRequest { + TImpl* const Outer; + IOutputStream& Os; + const TCgiParameters& CgiParams; + const TOptionalParams& Params; + + TRequest(TImpl* outer, IOutputStream& os, const TCgiParameters& cgiParams, const TOptionalParams& params) + : Outer(outer) + , Os(os) + , CgiParams(cgiParams) + , Params(params) + { + } + + void CrumbsParentLinks() { + for (unsigned i = 0; i < Params.ParentLinks.size(); ++i) { + const TLink& link = Params.ParentLinks[i]; + TTagGuard li("li"); + A(link.Href, link.Title); + } + } + + void Crumb(TStringBuf name, TStringBuf href = "") { + if (!!href) { + TTagGuard li("li"); + A(href, name); + } else { + LiWithClass("active", name); + } + } + + void BreadcrumbRoot() { + TTagGuard ol("ol", "breadcrumb"); + CrumbsParentLinks(); + Crumb("MessageBus"); + } + + void BreadcrumbQueue(TStringBuf queueName) { + TTagGuard ol("ol", "breadcrumb"); + CrumbsParentLinks(); + Crumb("MessageBus", RootHref()); + Crumb(ConcatStrings("queue ", queueName)); + } + + void BreadcrumbSession(TStringBuf sessionName, bool client) { + TString queueName = Outer->FindQueueNameBySessionName(sessionName, client); + TStringBuf whatSession = client ? "client session" : "server session"; + + TTagGuard ol("ol", "breadcrumb"); + CrumbsParentLinks(); + Crumb("MessageBus", RootHref()); + Crumb(ConcatStrings("queue ", queueName), QueueHref(queueName)); + Crumb(ConcatStrings(whatSession, " ", sessionName)); + } + + void ServeSessionsOfQueue(TBusMessageQueuePtr queue, bool includeQueue) { + TVector<TString> clientNames = Outer->ClientSessions.GetNamesForQueue(queue.Get()); + TVector<TString> serverNames = Outer->ServerSessions.GetNamesForQueue(queue.Get()); + TVector<TString> moduleNames = Outer->Modules.GetNamesForQueue(queue.Get()); + + TTagGuard table("table", "table table-condensed table-bordered"); + + { + TTagGuard colgroup("colgroup"); + TagWithClass("col", "col-md-2"); + TagWithClass("col", "col-md-2"); + TagWithClass("col", "col-md-8"); + } + + { + TTagGuard tr("tr"); + Th("What", "span2"); + Th("Name", "span2"); + Th("Status", "span6"); + } + + if (includeQueue) { + TTagGuard tr1("tr"); + Td("queue"); + + { + TTagGuard td("td"); + QueueLink(Outer->Queues.FindNameByPtr(queue)); + } + + { + TTagGuard tr2("td"); + Pre(queue->GetStatusSingleLine()); + } + } + + for (unsigned j = 0; j < clientNames.size(); ++j) { + TTagGuard tr("tr"); + Td("client session"); + + { + TTagGuard td("td"); + ClientSessionLink(clientNames[j]); + } + + { + TTagGuard td("td"); + Pre(Outer->ClientSessions.FindByName(clientNames[j])->GetStatusSingleLine()); + } + } + + for (unsigned j = 0; j < serverNames.size(); ++j) { + TTagGuard tr("tr"); + Td("server session"); + + { + TTagGuard td("td"); + ServerSessionLink(serverNames[j]); + } + + { + TTagGuard td("td"); + Pre(Outer->ServerSessions.FindByName(serverNames[j])->GetStatusSingleLine()); + } + } + + for (unsigned j = 0; j < moduleNames.size(); ++j) { + TTagGuard tr("tr"); + Td("module"); + + { + TTagGuard td("td"); + if (false) { + OldModuleLink(moduleNames[j]); + } else { + // TODO + Text(moduleNames[j]); + } + } + + { + TTagGuard td("td"); + Pre(Outer->Modules.FindByName(moduleNames[j])->GetStatusSingleLine()); + } + } + } + + void ServeQueue(const TString& name) { + TBusMessageQueuePtr queue = Outer->Queues.FindByName(name); + + if (!queue) { + BootstrapError(ConcatStrings("queue not found by name: ", name)); + return; + } + + BreadcrumbQueue(name); + + TDivGuard container("container"); + + H1(ConcatStrings("MessageBus queue ", '"', name, '"')); + + TBusMessageQueueStatus status = queue->GetStatusRecordInternal(); + + Pre(status.PrintToString()); + + ServeSessionsOfQueue(queue, false); + + HnWithSmall(3, "Peak queue size", "(stored for an hour)"); + + { + TDivGuard div; + TDivGuard div2(TAttr("id", "queue-size-graph"), TAttr("style", "height: 300px")); + } + + { + TScriptFunctionGuard script; + + NJsonWriter::TBuf data(NJsonWriter::HEM_ESCAPE_HTML); + NJsonWriter::TBuf ticks(NJsonWriter::HEM_ESCAPE_HTML); + + const TExecutorHistory& history = status.ExecutorStatus.History; + + data.BeginList(); + ticks.BeginList(); + for (unsigned i = 0; i < history.HistoryRecords.size(); ++i) { + ui64 secondOfMinute = (history.FirstHistoryRecordSecond() + i) % 60; + ui64 minuteOfHour = (history.FirstHistoryRecordSecond() + i) / 60 % 60; + + unsigned printEach; + + if (history.HistoryRecords.size() <= 500) { + printEach = 1; + } else if (history.HistoryRecords.size() <= 1000) { + printEach = 2; + } else if (history.HistoryRecords.size() <= 3000) { + printEach = 6; + } else { + printEach = 12; + } + + if (secondOfMinute % printEach != 0) { + continue; + } + + ui32 max = 0; + for (unsigned j = 0; j < printEach; ++j) { + if (i < j) { + continue; + } + max = Max<ui32>(max, history.HistoryRecords[i - j].MaxQueueSize); + } + + data.BeginList(); + data.WriteString(ToString(i)); + data.WriteInt(max); + data.EndList(); + + // TODO: can be done with flot time plugin + if (history.HistoryRecords.size() <= 20) { + ticks.BeginList(); + ticks.WriteInt(i); + ticks.WriteString(ToString(secondOfMinute)); + ticks.EndList(); + } else if (history.HistoryRecords.size() <= 60) { + if (secondOfMinute % 5 == 0) { + ticks.BeginList(); + ticks.WriteInt(i); + ticks.WriteString(ToString(secondOfMinute)); + ticks.EndList(); + } + } else { + bool needTick; + if (history.HistoryRecords.size() <= 3 * 60) { + needTick = secondOfMinute % 15 == 0; + } else if (history.HistoryRecords.size() <= 7 * 60) { + needTick = secondOfMinute % 30 == 0; + } else if (history.HistoryRecords.size() <= 20 * 60) { + needTick = secondOfMinute == 0; + } else { + needTick = secondOfMinute == 0 && minuteOfHour % 5 == 0; + } + if (needTick) { + ticks.BeginList(); + ticks.WriteInt(i); + ticks.WriteString(Sprintf(":%02u:%02u", (unsigned)minuteOfHour, (unsigned)secondOfMinute)); + ticks.EndList(); + } + } + } + ticks.EndList(); + data.EndList(); + + HtmlOutputStream() << " var data = " << data.Str() << ";\n"; + HtmlOutputStream() << " var ticks = " << ticks.Str() << ";\n"; + HtmlOutputStream() << " plotQueueSize('#queue-size-graph', data, ticks);\n"; + } + } + + void ServeSession(TStringBuf name, bool client) { + TIntrusivePtr<TBusClientSession> clientSession; + TIntrusivePtr<TBusServerSession> serverSession; + TBusSession* session; + TStringBuf whatSession; + if (client) { + whatSession = "client session"; + clientSession = Outer->ClientSessions.FindByName(name); + session = clientSession.Get(); + } else { + whatSession = "server session"; + serverSession = Outer->ServerSessions.FindByName(name); + session = serverSession.Get(); + } + if (!session) { + BootstrapError(ConcatStrings(whatSession, " not found by name: ", name)); + return; + } + + TSessionDumpStatus dumpStatus = session->GetStatusRecordInternal(); + + TBusMessageQueuePtr queue = session->GetQueue(); + TString queueName = Outer->Queues.FindNameByPtr(session->GetQueue()); + + BreadcrumbSession(name, client); + + TDivGuard container("container"); + + H1(ConcatStrings("MessageBus ", whatSession, " ", '"', name, '"')); + + TBusMessageQueueStatus queueStatus = queue->GetStatusRecordInternal(); + + { + H3(ConcatStrings("queue ", queueName)); + Pre(queueStatus.PrintToString()); + } + + TSessionDumpStatus status = session->GetStatusRecordInternal(); + + if (status.Shutdown) { + BootstrapError("Session shut down"); + return; + } + + H3("Basic"); + Pre(status.Head); + + if (status.ConnectionStatusSummary.Server) { + H3("Acceptors"); + Pre(status.Acceptors); + } + + H3("Connections"); + Pre(status.ConnectionsSummary); + + { + TDivGuard div; + TTagGuard button("button", + TAttr("type", "button"), + TAttr("class", "btn"), + TAttr("data-toggle", "collapse"), + TAttr("data-target", "#connections")); + Text("Show connection details"); + } + { + TDivGuard div(TAttr("id", "connections"), TAttr("class", "collapse")); + Pre(status.Connections); + } + + H3("TBusSessionConfig"); + Pre(status.Config.PrintToString()); + + if (!client) { + H3("Message process time histogram"); + + const TDurationHistogram& h = + dumpStatus.ConnectionStatusSummary.WriterStatus.Incremental.ProcessDurationHistogram; + + { + TDivGuard div; + TDivGuard div2(TAttr("id", "h"), TAttr("style", "height: 300px")); + } + + { + TScriptFunctionGuard script; + + NJsonWriter::TBuf buf(NJsonWriter::HEM_ESCAPE_HTML); + buf.BeginList(); + for (unsigned i = 0; i < h.Times.size(); ++i) { + TString label = TDurationHistogram::LabelBefore(i); + buf.BeginList(); + buf.WriteString(label); + buf.WriteLongLong(h.Times[i]); + buf.EndList(); + } + buf.EndList(); + + HtmlOutputStream() << " var hist = " << buf.Str() << ";\n"; + HtmlOutputStream() << " plotHist('#h', hist);\n"; + } + } + } + + void ServeDefault() { + if (!Outer->Queues) { + BootstrapError("no queues"); + return; + } + + BreadcrumbRoot(); + + TDivGuard container("container"); + + H1("MessageBus queues"); + + for (unsigned i = 0; i < Outer->Queues.size(); ++i) { + TString queueName = Outer->Queues.Entries[i].first; + TBusMessageQueuePtr queue = Outer->Queues.Entries[i].second; + + HnWithSmall(3, queueName, "(queue)"); + + ServeSessionsOfQueue(queue, true); + } + } + + void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) { + auto status = queue->GetStatusRecordInternal(); + sj.OpenMetric(); + sj.WriteLabels("mb_queue", queueName, "sensor", "WorkQueueSize"); + sj.WriteValue(status.ExecutorStatus.WorkQueueSize); + sj.CloseMetric(); + } + + void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj, + TStringBuf labelName, TStringBuf sessionName, bool read, const TMessageCounter& counter) { + TStringBuf readOrWrite = read ? "read" : "write"; + + sj.OpenMetric(); + sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageBytes"); + sj.WriteValue(counter.BytesData); + sj.WriteModeDeriv(); + sj.CloseMetric(); + + sj.OpenMetric(); + sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageCount"); + sj.WriteValue(counter.Count); + sj.WriteModeDeriv(); + sj.CloseMetric(); + } + + void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client, + TBusSession* session) { + TStringBuf labelName = client ? "mb_client_session" : "mb_server_session"; + + auto status = session->GetStatusRecordInternal(); + + sj.OpenMetric(); + sj.WriteLabels(labelName, sessionName, "sensor", "InFlightCount"); + sj.WriteValue(status.Status.InFlightCount); + sj.CloseMetric(); + + sj.OpenMetric(); + sj.WriteLabels(labelName, sessionName, "sensor", "InFlightSize"); + sj.WriteValue(status.Status.InFlightSize); + sj.CloseMetric(); + + sj.OpenMetric(); + sj.WriteLabels(labelName, sessionName, "sensor", "SendQueueSize"); + sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.SendQueueSize); + sj.CloseMetric(); + + if (client) { + sj.OpenMetric(); + sj.WriteLabels(labelName, sessionName, "sensor", "AckMessagesSize"); + sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.AckMessagesSize); + sj.CloseMetric(); + } + + WriteMessageCounterSensors(sj, labelName, sessionName, false, + status.ConnectionStatusSummary.WriterStatus.Incremental.MessageCounter); + WriteMessageCounterSensors(sj, labelName, sessionName, true, + status.ConnectionStatusSummary.ReaderStatus.Incremental.MessageCounter); + } + + void ServeSolomonJson(const TString& q, const TString& cs, const TString& ss) { + Y_UNUSED(q); + Y_UNUSED(cs); + Y_UNUSED(ss); + bool all = q == "" && cs == "" && ss == ""; + + NMonitoring::TDeprecatedJsonWriter sj(&Os); + + sj.OpenDocument(); + sj.OpenMetrics(); + + for (unsigned i = 0; i < Outer->Queues.size(); ++i) { + TString queueName = Outer->Queues.Entries[i].first; + TBusMessageQueuePtr queue = Outer->Queues.Entries[i].second; + if (all || q == queueName) { + WriteQueueSensors(sj, queueName, &*queue); + } + + TVector<TString> clientNames = Outer->ClientSessions.GetNamesForQueue(queue.Get()); + TVector<TString> serverNames = Outer->ServerSessions.GetNamesForQueue(queue.Get()); + TVector<TString> moduleNames = Outer->Modules.GetNamesForQueue(queue.Get()); + for (auto& sessionName : clientNames) { + if (all || cs == sessionName) { + auto session = Outer->ClientSessions.FindByName(sessionName); + WriteSessionStatus(sj, sessionName, true, &*session); + } + } + + for (auto& sessionName : serverNames) { + if (all || ss == sessionName) { + auto session = Outer->ServerSessions.FindByName(sessionName); + WriteSessionStatus(sj, sessionName, false, &*session); + } + } + } + + sj.CloseMetrics(); + sj.CloseDocument(); + } + + void ServeStatic(IOutputStream& os, TStringBuf path) { + if (path.EndsWith(".js")) { + os << HTTP_OK_JS; + } else if (path.EndsWith(".png")) { + os << HTTP_OK_PNG; + } else { + os << HTTP_OK_BIN; + } + TBlob blob = Singleton<TWwwStaticLoader>()->ObjectBlobByKey(TString("/") + TString(path)); + os.Write(blob.Data(), blob.Size()); + } + + void HeaderJsCss() { + LinkStylesheet("//yandex.st/bootstrap/3.0.2/css/bootstrap.css"); + LinkFavicon("?file=bus-ico.png"); + ScriptHref("//yandex.st/jquery/2.0.3/jquery.js"); + ScriptHref("//yandex.st/bootstrap/3.0.2/js/bootstrap.js"); + ScriptHref("//cdnjs.cloudflare.com/ajax/libs/flot/0.8.1/jquery.flot.min.js"); + ScriptHref("//cdnjs.cloudflare.com/ajax/libs/flot/0.8.1/jquery.flot.categories.min.js"); + ScriptHref("?file=messagebus.js"); + } + + void Serve() { + THtmlOutputStreamPushPop pp(&Os); + + TCgiParameters::const_iterator file = CgiParams.Find("file"); + if (file != CgiParams.end()) { + ServeStatic(Os, file->second); + return; + } + + bool solomonJson = false; + TCgiParameters::const_iterator fmt = CgiParams.Find("fmt"); + if (fmt != CgiParams.end()) { + if (fmt->second == "solomon-json") { + solomonJson = true; + } + } + + TCgiParameters::const_iterator cs = CgiParams.Find("cs"); + TCgiParameters::const_iterator ss = CgiParams.Find("ss"); + TCgiParameters::const_iterator q = CgiParams.Find("q"); + + if (solomonJson) { + Os << HTTP_OK_JSON; + + TString qp = q != CgiParams.end() ? q->first : ""; + TString csp = cs != CgiParams.end() ? cs->first : ""; + TString ssp = ss != CgiParams.end() ? ss->first : ""; + ServeSolomonJson(qp, csp, ssp); + } else { + Os << HTTP_OK_HTML; + + Doctype(); + + TTagGuard html("html"); + { + TTagGuard head("head"); + + HeaderJsCss(); + // ✉ 🚌 + Title(TChars("MessageBus", false)); + } + + TTagGuard body("body"); + + if (cs != CgiParams.end()) { + ServeSession(cs->second, true); + } else if (ss != CgiParams.end()) { + ServeSession(ss->second, false); + } else if (q != CgiParams.end()) { + ServeQueue(q->second); + } else { + ServeDefault(); + } + } + } + }; + + void ServeHttp(IOutputStream& os, const TCgiParameters& queryArgs, const TBusWww::TOptionalParams& params) { + TGuard<TMutex> g(Mutex); + + TRequest request(this, os, queryArgs, params); + + request.Serve(); + } +}; + +NBus::TBusWww::TBusWww() + : Impl(new TImpl) +{ +} + +NBus::TBusWww::~TBusWww() { +} + +void NBus::TBusWww::RegisterClientSession(TBusClientSessionPtr s) { + Impl->RegisterClientSession(s); +} + +void TBusWww::RegisterServerSession(TBusServerSessionPtr s) { + Impl->RegisterServerSession(s); +} + +void TBusWww::RegisterQueue(TBusMessageQueuePtr q) { + Impl->RegisterQueue(q); +} + +void TBusWww::RegisterModule(TBusModule* module) { + Impl->RegisterModule(module); +} + +void TBusWww::ServeHttp(IOutputStream& httpOutputStream, + const TCgiParameters& queryArgs, + const TBusWww::TOptionalParams& params) { + Impl->ServeHttp(httpOutputStream, queryArgs, params); +} + +struct TBusWwwHttpServer::TImpl: public THttpServer::ICallBack { + TIntrusivePtr<TBusWww> Www; + THttpServer HttpServer; + + static THttpServer::TOptions MakeHttpServerOptions(unsigned port) { + Y_VERIFY(port > 0); + THttpServer::TOptions r; + r.Port = port; + return r; + } + + TImpl(TIntrusivePtr<TBusWww> www, unsigned port) + : Www(www) + , HttpServer(this, MakeHttpServerOptions(port)) + { + HttpServer.Start(); + } + + struct TClientRequestImpl: public TClientRequest { + TBusWwwHttpServer::TImpl* const Outer; + + TClientRequestImpl(TBusWwwHttpServer::TImpl* outer) + : Outer(outer) + { + } + + bool Reply(void*) override { + Outer->ServeRequest(Input(), Output()); + return true; + } + }; + + TString MakeSimpleResponse(unsigned code, TString text, TString content = "") { + if (!content) { + TStringStream contentSs; + contentSs << code << " " << text; + content = contentSs.Str(); + } + TStringStream ss; + ss << "HTTP/1.1 " + << code << " " << text << "\r\nConnection: Close\r\n\r\n" + << content; + return ss.Str(); + } + + void ServeRequest(THttpInput& input, THttpOutput& output) { + TCgiParameters cgiParams; + try { + THttpRequestHeader header; + THttpHeaderParser parser; + parser.Init(&header); + if (parser.Execute(input.FirstLine()) < 0) { + HtmlOutputStream() << MakeSimpleResponse(400, "Bad request"); + return; + } + THttpURL url; + if (url.Parse(header.GetUrl()) != THttpURL::ParsedOK) { + HtmlOutputStream() << MakeSimpleResponse(400, "Invalid url"); + return; + } + cgiParams.Scan(url.Get(THttpURL::FieldQuery)); + + TBusWww::TOptionalParams params; + //params.ParentLinks.emplace_back(); + //params.ParentLinks.back().Title = "temp"; + //params.ParentLinks.back().Href = "http://wiki.yandex-team.ru/"; + + Www->ServeHttp(output, cgiParams, params); + } catch (...) { + output << MakeSimpleResponse(500, "Exception", + TString() + "Exception: " + CurrentExceptionMessage()); + } + } + + TClientRequest* CreateClient() override { + return new TClientRequestImpl(this); + } + + ~TImpl() override { + HttpServer.Stop(); + } +}; + +NBus::TBusWwwHttpServer::TBusWwwHttpServer(TIntrusivePtr<TBusWww> www, unsigned port) + : Impl(new TImpl(www, port)) +{ +} + +NBus::TBusWwwHttpServer::~TBusWwwHttpServer() { +} |