#include "www.h"
#include "concat_strings.h"
#include "html_output.h"
#include <library/cpp/messagebus/remote_connection_status.h>
#include <library/cpp/monlib/deprecated/json/writer.h>
#include <library/cpp/http/fetch/httpfsm.h>
#include <library/cpp/http/fetch/httpheader.h>
#include <library/cpp/http/server/http.h>
#include <library/cpp/json/writer/json.h>
#include <library/cpp/resource/resource.h>
#include <library/cpp/uri/http_url.h>
#include <util/string/cast.h>
#include <util/string/printf.h>
#include <util/system/mutex.h>
#include <utility>
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NActor;
using namespace NActor::NPrivate;
static const char HTTP_OK_JS[] = "HTTP/1.1 200 Ok\r\nContent-Type: text/javascript\r\nConnection: Close\r\n\r\n";
static const char HTTP_OK_JSON[] = "HTTP/1.1 200 Ok\r\nContent-Type: application/json; charset=utf-8\r\nConnection: Close\r\n\r\n";
static const char HTTP_OK_PNG[] = "HTTP/1.1 200 Ok\r\nContent-Type: image/png\r\nConnection: Close\r\n\r\n";
static const char HTTP_OK_BIN[] = "HTTP/1.1 200 Ok\r\nContent-Type: application/octet-stream\r\nConnection: Close\r\n\r\n";
static const char HTTP_OK_HTML[] = "HTTP/1.1 200 Ok\r\nContent-Type: text/html; charset=utf-8\r\nConnection: Close\r\n\r\n";
namespace {
typedef TIntrusivePtr<TBusModuleInternal> TBusModuleInternalPtr;
template <typename TValuePtr>
struct TNamedValues {
TVector<std::pair<TString, TValuePtr>> Entries;
TValuePtr FindByName(TStringBuf name) {
Y_ABORT_UNLESS(!!name);
for (unsigned i = 0; i < Entries.size(); ++i) {
if (Entries[i].first == name) {
return Entries[i].second;
}
}
return TValuePtr();
}
TString FindNameByPtr(TValuePtr value) {
Y_ABORT_UNLESS(!!value);
for (unsigned i = 0; i < Entries.size(); ++i) {
if (Entries[i].second.Get() == value.Get()) {
return Entries[i].first;
}
}
Y_ABORT("unregistered");
}
void Add(TValuePtr p) {
Y_ABORT_UNLESS(!!p);
// Do not add twice
for (unsigned i = 0; i < Entries.size(); ++i) {
if (Entries[i].second.Get() == p.Get()) {
return;
}
}
if (!!p->GetNameInternal()) {
TValuePtr current = FindByName(p->GetNameInternal());
if (!current) {
Entries.emplace_back(p->GetNameInternal(), p);
return;
}
}
for (unsigned i = 1;; ++i) {
TString prefix = p->GetNameInternal();
if (!prefix) {
prefix = "unnamed";
}
TString name = ConcatStrings(prefix, "-", i);
TValuePtr current = FindByName(name);
if (!current) {
Entries.emplace_back(name, p);
return;
}
}
}
size_t size() const {
return Entries.size();
}
bool operator!() const {
return size() == 0;
}
};
template <typename TSessionPtr>
struct TSessionValues: public TNamedValues<TSessionPtr> {
typedef TNamedValues<TSessionPtr> TBase;
TVector<TString> GetNamesForQueue(TBusMessageQueue* queue) {
TVector<TString> r;
for (unsigned i = 0; i < TBase::size(); ++i) {
if (TBase::Entries[i].second->GetQueue() == queue) {
r.push_back(TBase::Entries[i].first);
}
}
return r;
}
};
}
namespace {
TString RootHref() {
return ConcatStrings("?");
}
TString QueueHref(TStringBuf name) {
return ConcatStrings("?q=", name);
}
TString ServerSessionHref(TStringBuf name) {
return ConcatStrings("?ss=", name);
}
TString ClientSessionHref(TStringBuf name) {
return ConcatStrings("?cs=", name);
}
TString OldModuleHref(TStringBuf name) {
return ConcatStrings("?om=", name);
}
/*
static void RootLink() {
A(RootHref(), "root");
}
*/
void QueueLink(TStringBuf name) {
A(QueueHref(name), name);
}
void ServerSessionLink(TStringBuf name) {
A(ServerSessionHref(name), name);
}
void ClientSessionLink(TStringBuf name) {
A(ClientSessionHref(name), name);
}
void OldModuleLink(TStringBuf name) {
A(OldModuleHref(name), name);
}
}
struct TBusWww::TImpl {
// TODO: use weak pointers
TNamedValues<TBusMessageQueuePtr> Queues;
TSessionValues<TIntrusivePtr<TBusClientSession>> ClientSessions;
TSessionValues<TIntrusivePtr<TBusServerSession>> ServerSessions;
TSessionValues<TBusModuleInternalPtr> Modules;
TMutex Mutex;
void RegisterClientSession(TBusClientSessionPtr s) {
Y_ABORT_UNLESS(!!s);
TGuard<TMutex> g(Mutex);
ClientSessions.Add(s.Get());
Queues.Add(s->GetQueue());
}
void RegisterServerSession(TBusServerSessionPtr s) {
Y_ABORT_UNLESS(!!s);
TGuard<TMutex> g(Mutex);
ServerSessions.Add(s.Get());
Queues.Add(s->GetQueue());
}
void RegisterQueue(TBusMessageQueuePtr q) {
Y_ABORT_UNLESS(!!q);
TGuard<TMutex> g(Mutex);
Queues.Add(q);
}
void RegisterModule(TBusModule* module) {
Y_ABORT_UNLESS(!!module);
TGuard<TMutex> g(Mutex);
{
TVector<TBusClientSessionPtr> clientSessions = module->GetInternal()->GetClientSessionsInternal();
for (unsigned i = 0; i < clientSessions.size(); ++i) {
RegisterClientSession(clientSessions[i]);
}
}
{
TVector<TBusServerSessionPtr> serverSessions = module->GetInternal()->GetServerSessionsInternal();
for (unsigned i = 0; i < serverSessions.size(); ++i) {
RegisterServerSession(serverSessions[i]);
}
}
Queues.Add(module->GetInternal()->GetQueue());
Modules.Add(module->GetInternal());
}
TString FindQueueNameBySessionName(TStringBuf sessionName, bool client) {
TIntrusivePtr<TBusClientSession> clientSession;
TIntrusivePtr<TBusServerSession> serverSession;
TBusSession* session;
if (client) {
clientSession = ClientSessions.FindByName(sessionName);
session = clientSession.Get();
} else {
serverSession = ServerSessions.FindByName(sessionName);
session = serverSession.Get();
}
Y_ABORT_UNLESS(!!session);
return Queues.FindNameByPtr(session->GetQueue());
}
struct TRequest {
TImpl* const Outer;
IOutputStream& Os;
const TCgiParameters& CgiParams;
const TOptionalParams& Params;
TRequest(TImpl* outer, IOutputStream& os, const TCgiParameters& cgiParams, const TOptionalParams& params)
: Outer(outer)
, Os(os)
, CgiParams(cgiParams)
, Params(params)
{
}
void CrumbsParentLinks() {
for (unsigned i = 0; i < Params.ParentLinks.size(); ++i) {
const TLink& link = Params.ParentLinks[i];
TTagGuard li("li");
A(link.Href, link.Title);
}
}
void Crumb(TStringBuf name, TStringBuf href = "") {
if (!!href) {
TTagGuard li("li");
A(href, name);
} else {
LiWithClass("active", name);
}
}
void BreadcrumbRoot() {
TTagGuard ol("ol", "breadcrumb");
CrumbsParentLinks();
Crumb("MessageBus");
}
void BreadcrumbQueue(TStringBuf queueName) {
TTagGuard ol("ol", "breadcrumb");
CrumbsParentLinks();
Crumb("MessageBus", RootHref());
Crumb(ConcatStrings("queue ", queueName));
}
void BreadcrumbSession(TStringBuf sessionName, bool client) {
TString queueName = Outer->FindQueueNameBySessionName(sessionName, client);
TStringBuf whatSession = client ? "client session" : "server session";
TTagGuard ol("ol", "breadcrumb");
CrumbsParentLinks();
Crumb("MessageBus", RootHref());
Crumb(ConcatStrings("queue ", queueName), QueueHref(queueName));
Crumb(ConcatStrings(whatSession, " ", sessionName));
}
void ServeSessionsOfQueue(TBusMessageQueuePtr queue, bool includeQueue) {
TVector<TString> clientNames = Outer->ClientSessions.GetNamesForQueue(queue.Get());
TVector<TString> serverNames = Outer->ServerSessions.GetNamesForQueue(queue.Get());
TVector<TString> moduleNames = Outer->Modules.GetNamesForQueue(queue.Get());
TTagGuard table("table", "table table-condensed table-bordered");
{
TTagGuard colgroup("colgroup");
TagWithClass("col", "col-md-2");
TagWithClass("col", "col-md-2");
TagWithClass("col", "col-md-8");
}
{
TTagGuard tr("tr");
Th("What", "span2");
Th("Name", "span2");
Th("Status", "span6");
}
if (includeQueue) {
TTagGuard tr1("tr");
Td("queue");
{
TTagGuard td("td");
QueueLink(Outer->Queues.FindNameByPtr(queue));
}
{
TTagGuard tr2("td");
Pre(queue->GetStatusSingleLine());
}
}
for (unsigned j = 0; j < clientNames.size(); ++j) {
TTagGuard tr("tr");
Td("client session");
{
TTagGuard td("td");
ClientSessionLink(clientNames[j]);
}
{
TTagGuard td("td");
Pre(Outer->ClientSessions.FindByName(clientNames[j])->GetStatusSingleLine());
}
}
for (unsigned j = 0; j < serverNames.size(); ++j) {
TTagGuard tr("tr");
Td("server session");
{
TTagGuard td("td");
ServerSessionLink(serverNames[j]);
}
{
TTagGuard td("td");
Pre(Outer->ServerSessions.FindByName(serverNames[j])->GetStatusSingleLine());
}
}
for (unsigned j = 0; j < moduleNames.size(); ++j) {
TTagGuard tr("tr");
Td("module");
{
TTagGuard td("td");
if (false) {
OldModuleLink(moduleNames[j]);
} else {
// TODO
Text(moduleNames[j]);
}
}
{
TTagGuard td("td");
Pre(Outer->Modules.FindByName(moduleNames[j])->GetStatusSingleLine());
}
}
}
void ServeQueue(const TString& name) {
TBusMessageQueuePtr queue = Outer->Queues.FindByName(name);
if (!queue) {
BootstrapError(ConcatStrings("queue not found by name: ", name));
return;
}
BreadcrumbQueue(name);
TDivGuard container("container");
H1(ConcatStrings("MessageBus queue ", '"', name, '"'));
TBusMessageQueueStatus status = queue->GetStatusRecordInternal();
Pre(status.PrintToString());
ServeSessionsOfQueue(queue, false);
HnWithSmall(3, "Peak queue size", "(stored for an hour)");
{
TDivGuard div;
TDivGuard div2(TAttr("id", "queue-size-graph"), TAttr("style", "height: 300px"));
}
{
TScriptFunctionGuard script;
NJsonWriter::TBuf data(NJsonWriter::HEM_ESCAPE_HTML);
NJsonWriter::TBuf ticks(NJsonWriter::HEM_ESCAPE_HTML);
const TExecutorHistory& history = status.ExecutorStatus.History;
data.BeginList();
ticks.BeginList();
for (unsigned i = 0; i < history.HistoryRecords.size(); ++i) {
ui64 secondOfMinute = (history.FirstHistoryRecordSecond() + i) % 60;
ui64 minuteOfHour = (history.FirstHistoryRecordSecond() + i) / 60 % 60;
unsigned printEach;
if (history.HistoryRecords.size() <= 500) {
printEach = 1;
} else if (history.HistoryRecords.size() <= 1000) {
printEach = 2;
} else if (history.HistoryRecords.size() <= 3000) {
printEach = 6;
} else {
printEach = 12;
}
if (secondOfMinute % printEach != 0) {
continue;
}
ui32 max = 0;
for (unsigned j = 0; j < printEach; ++j) {
if (i < j) {
continue;
}
max = Max<ui32>(max, history.HistoryRecords[i - j].MaxQueueSize);
}
data.BeginList();
data.WriteString(ToString(i));
data.WriteInt(max);
data.EndList();
// TODO: can be done with flot time plugin
if (history.HistoryRecords.size() <= 20) {
ticks.BeginList();
ticks.WriteInt(i);
ticks.WriteString(ToString(secondOfMinute));
ticks.EndList();
} else if (history.HistoryRecords.size() <= 60) {
if (secondOfMinute % 5 == 0) {
ticks.BeginList();
ticks.WriteInt(i);
ticks.WriteString(ToString(secondOfMinute));
ticks.EndList();
}
} else {
bool needTick;
if (history.HistoryRecords.size() <= 3 * 60) {
needTick = secondOfMinute % 15 == 0;
} else if (history.HistoryRecords.size() <= 7 * 60) {
needTick = secondOfMinute % 30 == 0;
} else if (history.HistoryRecords.size() <= 20 * 60) {
needTick = secondOfMinute == 0;
} else {
needTick = secondOfMinute == 0 && minuteOfHour % 5 == 0;
}
if (needTick) {
ticks.BeginList();
ticks.WriteInt(i);
ticks.WriteString(Sprintf(":%02u:%02u", (unsigned)minuteOfHour, (unsigned)secondOfMinute));
ticks.EndList();
}
}
}
ticks.EndList();
data.EndList();
HtmlOutputStream() << " var data = " << data.Str() << ";\n";
HtmlOutputStream() << " var ticks = " << ticks.Str() << ";\n";
HtmlOutputStream() << " plotQueueSize('#queue-size-graph', data, ticks);\n";
}
}
void ServeSession(TStringBuf name, bool client) {
TIntrusivePtr<TBusClientSession> clientSession;
TIntrusivePtr<TBusServerSession> serverSession;
TBusSession* session;
TStringBuf whatSession;
if (client) {
whatSession = "client session";
clientSession = Outer->ClientSessions.FindByName(name);
session = clientSession.Get();
} else {
whatSession = "server session";
serverSession = Outer->ServerSessions.FindByName(name);
session = serverSession.Get();
}
if (!session) {
BootstrapError(ConcatStrings(whatSession, " not found by name: ", name));
return;
}
TSessionDumpStatus dumpStatus = session->GetStatusRecordInternal();
TBusMessageQueuePtr queue = session->GetQueue();
TString queueName = Outer->Queues.FindNameByPtr(session->GetQueue());
BreadcrumbSession(name, client);
TDivGuard container("container");
H1(ConcatStrings("MessageBus ", whatSession, " ", '"', name, '"'));
TBusMessageQueueStatus queueStatus = queue->GetStatusRecordInternal();
{
H3(ConcatStrings("queue ", queueName));
Pre(queueStatus.PrintToString());
}
TSessionDumpStatus status = session->GetStatusRecordInternal();
if (status.Shutdown) {
BootstrapError("Session shut down");
return;
}
H3("Basic");
Pre(status.Head);
if (status.ConnectionStatusSummary.Server) {
H3("Acceptors");
Pre(status.Acceptors);
}
H3("Connections");
Pre(status.ConnectionsSummary);
{
TDivGuard div;
TTagGuard button("button",
TAttr("type", "button"),
TAttr("class", "btn"),
TAttr("data-toggle", "collapse"),
TAttr("data-target", "#connections"));
Text("Show connection details");
}
{
TDivGuard div(TAttr("id", "connections"), TAttr("class", "collapse"));
Pre(status.Connections);
}
H3("TBusSessionConfig");
Pre(status.Config.PrintToString());
if (!client) {
H3("Message process time histogram");
const TDurationHistogram& h =
dumpStatus.ConnectionStatusSummary.WriterStatus.Incremental.ProcessDurationHistogram;
{
TDivGuard div;
TDivGuard div2(TAttr("id", "h"), TAttr("style", "height: 300px"));
}
{
TScriptFunctionGuard script;
NJsonWriter::TBuf buf(NJsonWriter::HEM_ESCAPE_HTML);
buf.BeginList();
for (unsigned i = 0; i < h.Times.size(); ++i) {
TString label = TDurationHistogram::LabelBefore(i);
buf.BeginList();
buf.WriteString(label);
buf.WriteLongLong(h.Times[i]);
buf.EndList();
}
buf.EndList();
HtmlOutputStream() << " var hist = " << buf.Str() << ";\n";
HtmlOutputStream() << " plotHist('#h', hist);\n";
}
}
}
void ServeDefault() {
if (!Outer->Queues) {
BootstrapError("no queues");
return;
}
BreadcrumbRoot();
TDivGuard container("container");
H1("MessageBus queues");
for (unsigned i = 0; i < Outer->Queues.size(); ++i) {
TString queueName = Outer->Queues.Entries[i].first;
TBusMessageQueuePtr queue = Outer->Queues.Entries[i].second;
HnWithSmall(3, queueName, "(queue)");
ServeSessionsOfQueue(queue, true);
}
}
void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) {
auto status = queue->GetStatusRecordInternal();
sj.OpenMetric();
sj.WriteLabels("mb_queue", queueName, "sensor", "WorkQueueSize");
sj.WriteValue(status.ExecutorStatus.WorkQueueSize);
sj.CloseMetric();
}
void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj,
TStringBuf labelName, TStringBuf sessionName, bool read, const TMessageCounter& counter) {
TStringBuf readOrWrite = read ? "read" : "write";
sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageBytes");
sj.WriteValue(counter.BytesData);
sj.WriteModeDeriv();
sj.CloseMetric();
sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageCount");
sj.WriteValue(counter.Count);
sj.WriteModeDeriv();
sj.CloseMetric();
}
void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client,
TBusSession* session) {
TStringBuf labelName = client ? "mb_client_session" : "mb_server_session";
auto status = session->GetStatusRecordInternal();
sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "InFlightCount");
sj.WriteValue(status.Status.InFlightCount);
sj.CloseMetric();
sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "InFlightSize");
sj.WriteValue(status.Status.InFlightSize);
sj.CloseMetric();
sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "SendQueueSize");
sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.SendQueueSize);
sj.CloseMetric();
if (client) {
sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "AckMessagesSize");
sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.AckMessagesSize);
sj.CloseMetric();
}
WriteMessageCounterSensors(sj, labelName, sessionName, false,
status.ConnectionStatusSummary.WriterStatus.Incremental.MessageCounter);
WriteMessageCounterSensors(sj, labelName, sessionName, true,
status.ConnectionStatusSummary.ReaderStatus.Incremental.MessageCounter);
}
void ServeSolomonJson(const TString& q, const TString& cs, const TString& ss) {
Y_UNUSED(q);
Y_UNUSED(cs);
Y_UNUSED(ss);
bool all = q == "" && cs == "" && ss == "";
NMonitoring::TDeprecatedJsonWriter sj(&Os);
sj.OpenDocument();
sj.OpenMetrics();
for (unsigned i = 0; i < Outer->Queues.size(); ++i) {
TString queueName = Outer->Queues.Entries[i].first;
TBusMessageQueuePtr queue = Outer->Queues.Entries[i].second;
if (all || q == queueName) {
WriteQueueSensors(sj, queueName, &*queue);
}
TVector<TString> clientNames = Outer->ClientSessions.GetNamesForQueue(queue.Get());
TVector<TString> serverNames = Outer->ServerSessions.GetNamesForQueue(queue.Get());
TVector<TString> moduleNames = Outer->Modules.GetNamesForQueue(queue.Get());
for (auto& sessionName : clientNames) {
if (all || cs == sessionName) {
auto session = Outer->ClientSessions.FindByName(sessionName);
WriteSessionStatus(sj, sessionName, true, &*session);
}
}
for (auto& sessionName : serverNames) {
if (all || ss == sessionName) {
auto session = Outer->ServerSessions.FindByName(sessionName);
WriteSessionStatus(sj, sessionName, false, &*session);
}
}
}
sj.CloseMetrics();
sj.CloseDocument();
}
void ServeStatic(IOutputStream& os, TStringBuf path) {
if (path.EndsWith(".js")) {
os << HTTP_OK_JS;
} else if (path.EndsWith(".png")) {
os << HTTP_OK_PNG;
} else {
os << HTTP_OK_BIN;
}
auto blob = NResource::Find(TString("/") + TString(path));
os.Write(blob.Data(), blob.Size());
}
void HeaderJsCss() {
LinkStylesheet("//yandex.st/bootstrap/3.0.2/css/bootstrap.css");
LinkFavicon("?file=bus-ico.png");
ScriptHref("//yandex.st/jquery/2.0.3/jquery.js");
ScriptHref("//yandex.st/bootstrap/3.0.2/js/bootstrap.js");
ScriptHref("//cdnjs.cloudflare.com/ajax/libs/flot/0.8.1/jquery.flot.min.js");
ScriptHref("//cdnjs.cloudflare.com/ajax/libs/flot/0.8.1/jquery.flot.categories.min.js");
ScriptHref("?file=messagebus.js");
}
void Serve() {
THtmlOutputStreamPushPop pp(&Os);
TCgiParameters::const_iterator file = CgiParams.Find("file");
if (file != CgiParams.end()) {
ServeStatic(Os, file->second);
return;
}
bool solomonJson = false;
TCgiParameters::const_iterator fmt = CgiParams.Find("fmt");
if (fmt != CgiParams.end()) {
if (fmt->second == "solomon-json") {
solomonJson = true;
}
}
TCgiParameters::const_iterator cs = CgiParams.Find("cs");
TCgiParameters::const_iterator ss = CgiParams.Find("ss");
TCgiParameters::const_iterator q = CgiParams.Find("q");
if (solomonJson) {
Os << HTTP_OK_JSON;
TString qp = q != CgiParams.end() ? q->first : "";
TString csp = cs != CgiParams.end() ? cs->first : "";
TString ssp = ss != CgiParams.end() ? ss->first : "";
ServeSolomonJson(qp, csp, ssp);
} else {
Os << HTTP_OK_HTML;
Doctype();
TTagGuard html("html");
{
TTagGuard head("head");
HeaderJsCss();
// ✉ 🚌
Title(TChars("MessageBus", false));
}
TTagGuard body("body");
if (cs != CgiParams.end()) {
ServeSession(cs->second, true);
} else if (ss != CgiParams.end()) {
ServeSession(ss->second, false);
} else if (q != CgiParams.end()) {
ServeQueue(q->second);
} else {
ServeDefault();
}
}
}
};
void ServeHttp(IOutputStream& os, const TCgiParameters& queryArgs, const TBusWww::TOptionalParams& params) {
TGuard<TMutex> g(Mutex);
TRequest request(this, os, queryArgs, params);
request.Serve();
}
};
NBus::TBusWww::TBusWww()
: Impl(new TImpl)
{
}
NBus::TBusWww::~TBusWww() {
}
void NBus::TBusWww::RegisterClientSession(TBusClientSessionPtr s) {
Impl->RegisterClientSession(s);
}
void TBusWww::RegisterServerSession(TBusServerSessionPtr s) {
Impl->RegisterServerSession(s);
}
void TBusWww::RegisterQueue(TBusMessageQueuePtr q) {
Impl->RegisterQueue(q);
}
void TBusWww::RegisterModule(TBusModule* module) {
Impl->RegisterModule(module);
}
void TBusWww::ServeHttp(IOutputStream& httpOutputStream,
const TCgiParameters& queryArgs,
const TBusWww::TOptionalParams& params) {
Impl->ServeHttp(httpOutputStream, queryArgs, params);
}
struct TBusWwwHttpServer::TImpl: public THttpServer::ICallBack {
TIntrusivePtr<TBusWww> Www;
THttpServer HttpServer;
static THttpServer::TOptions MakeHttpServerOptions(unsigned port) {
Y_ABORT_UNLESS(port > 0);
THttpServer::TOptions r;
r.Port = port;
return r;
}
TImpl(TIntrusivePtr<TBusWww> www, unsigned port)
: Www(www)
, HttpServer(this, MakeHttpServerOptions(port))
{
HttpServer.Start();
}
struct TClientRequestImpl: public TClientRequest {
TBusWwwHttpServer::TImpl* const Outer;
TClientRequestImpl(TBusWwwHttpServer::TImpl* outer)
: Outer(outer)
{
}
bool Reply(void*) override {
Outer->ServeRequest(Input(), Output());
return true;
}
};
TString MakeSimpleResponse(unsigned code, TString text, TString content = "") {
if (!content) {
TStringStream contentSs;
contentSs << code << " " << text;
content = contentSs.Str();
}
TStringStream ss;
ss << "HTTP/1.1 "
<< code << " " << text << "\r\nConnection: Close\r\n\r\n"
<< content;
return ss.Str();
}
void ServeRequest(THttpInput& input, THttpOutput& output) {
TCgiParameters cgiParams;
try {
THttpRequestHeader header;
THttpHeaderParser parser;
parser.Init(&header);
if (parser.Execute(input.FirstLine()) < 0) {
HtmlOutputStream() << MakeSimpleResponse(400, "Bad request");
return;
}
THttpURL url;
if (url.Parse(header.GetUrl()) != THttpURL::ParsedOK) {
HtmlOutputStream() << MakeSimpleResponse(400, "Invalid url");
return;
}
cgiParams.Scan(url.Get(THttpURL::FieldQuery));
TBusWww::TOptionalParams params;
//params.ParentLinks.emplace_back();
//params.ParentLinks.back().Title = "temp";
//params.ParentLinks.back().Href = "http://wiki.yandex-team.ru/";
Www->ServeHttp(output, cgiParams, params);
} catch (...) {
output << MakeSimpleResponse(500, "Exception",
TString() + "Exception: " + CurrentExceptionMessage());
}
}
TClientRequest* CreateClient() override {
return new TClientRequestImpl(this);
}
~TImpl() override {
HttpServer.Stop();
}
};
NBus::TBusWwwHttpServer::TBusWwwHttpServer(TIntrusivePtr<TBusWww> www, unsigned port)
: Impl(new TImpl(www, port))
{
}
NBus::TBusWwwHttpServer::~TBusWwwHttpServer() {
}