aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/service/service.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/service/service.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/service/service.cpp')
-rw-r--r--library/cpp/monlib/service/service.cpp268
1 files changed, 268 insertions, 0 deletions
diff --git a/library/cpp/monlib/service/service.cpp b/library/cpp/monlib/service/service.cpp
new file mode 100644
index 0000000000..929efbf816
--- /dev/null
+++ b/library/cpp/monlib/service/service.cpp
@@ -0,0 +1,268 @@
+#include "service.h"
+
+#include <library/cpp/coroutine/engine/sockpool.h>
+#include <library/cpp/http/io/stream.h>
+#include <library/cpp/http/fetch/httpheader.h>
+#include <library/cpp/http/fetch/httpfsm.h>
+#include <library/cpp/uri/http_url.h>
+
+#include <util/generic/buffer.h>
+#include <util/stream/str.h>
+#include <util/stream/buffer.h>
+#include <util/stream/zerocopy.h>
+#include <util/string/vector.h>
+
+namespace NMonitoring {
+ class THttpClient: public IHttpRequest {
+ public:
+ void ServeRequest(THttpInput& in, IOutputStream& out, const NAddr::IRemoteAddr* remoteAddr, const THandler& Handler) {
+ try {
+ try {
+ RemoteAddr = remoteAddr;
+ THttpHeaderParser parser;
+ parser.Init(&Header);
+ if (parser.Execute(in.FirstLine().data(), in.FirstLine().size()) < 0) {
+ out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
+ return;
+ }
+ if (Url.Parse(Header.GetUrl().data()) != THttpURL::ParsedOK) {
+ out << "HTTP/1.1 400 Invalid url\r\nConnection: Close\r\n\r\n";
+ return;
+ }
+ TString path = GetPath();
+ if (!path.StartsWith('/')) {
+ out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
+ return;
+ }
+ Headers = &in.Headers();
+ CgiParams.Scan(Url.Get(THttpURL::FieldQuery));
+ } catch (...) {
+ out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n";
+ YSYSLOG(TLOG_ERR, "THttpClient: internal error while serving monitoring request: %s", CurrentExceptionMessage().data());
+ }
+
+ if (Header.http_method == HTTP_METHOD_POST)
+ TransferData(&in, &PostContent);
+
+ Handler(out, *this);
+ out.Finish();
+ } catch (...) {
+ auto msg = CurrentExceptionMessage();
+ out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n" << msg;
+ out.Finish();
+ YSYSLOG(TLOG_ERR, "THttpClient: error while serving monitoring request: %s", msg.data());
+ }
+ }
+
+ const char* GetURI() const override {
+ return Header.request_uri.c_str();
+ }
+ const char* GetPath() const override {
+ return Url.Get(THttpURL::FieldPath);
+ }
+ const TCgiParameters& GetParams() const override {
+ return CgiParams;
+ }
+ const TCgiParameters& GetPostParams() const override {
+ if (PostParams.empty() && !PostContent.Buffer().Empty())
+ const_cast<THttpClient*>(this)->ScanPostParams();
+ return PostParams;
+ }
+ TStringBuf GetPostContent() const override {
+ return TStringBuf(PostContent.Buffer().Data(), PostContent.Buffer().Size());
+ }
+ HTTP_METHOD GetMethod() const override {
+ return (HTTP_METHOD)Header.http_method;
+ }
+ void ScanPostParams() {
+ PostParams.Scan(TStringBuf(PostContent.Buffer().data(), PostContent.Buffer().size()));
+ }
+
+ const THttpHeaders& GetHeaders() const override {
+ if (Headers != nullptr) {
+ return *Headers;
+ }
+ static THttpHeaders defaultHeaders;
+ return defaultHeaders;
+ }
+
+ TString GetRemoteAddr() const override {
+ return RemoteAddr ? NAddr::PrintHostAndPort(*RemoteAddr) : TString();
+ }
+
+ private:
+ THttpRequestHeader Header;
+ const THttpHeaders* Headers = nullptr;
+ THttpURL Url;
+ TCgiParameters CgiParams;
+ TCgiParameters PostParams;
+ TBufferOutput PostContent;
+ const NAddr::IRemoteAddr* RemoteAddr = nullptr;
+ };
+
+ /* TCoHttpServer */
+
+ class TCoHttpServer::TConnection: public THttpClient {
+ public:
+ TConnection(const TCoHttpServer::TAcceptFull& acc, const TCoHttpServer& parent)
+ : Socket(acc.S->Release())
+ , RemoteAddr(acc.Remote)
+ , Parent(parent)
+ {
+ }
+
+ void operator()(TCont* c) {
+ try {
+ THolder<TConnection> me(this);
+ TContIO io(Socket, c);
+ THttpInput in(&io);
+ THttpOutput out(&io, &in);
+ // buffer reply so there will be ne context switching
+ TStringStream s;
+ ServeRequest(in, s, RemoteAddr, Parent.Handler);
+ out << s.Str();
+ out.Finish();
+ } catch (...) {
+ YSYSLOG(TLOG_WARNING, "TCoHttpServer::TConnection: error: %s\n", CurrentExceptionMessage().data());
+ }
+ }
+
+ private:
+ TSocketHolder Socket;
+ const NAddr::IRemoteAddr* RemoteAddr;
+ const TCoHttpServer& Parent;
+ };
+
+ TCoHttpServer::TCoHttpServer(TContExecutor& executor, const TString& bindAddr, TIpPort port, THandler handler)
+ : Executor(executor)
+ , Listener(this, &executor)
+ , Handler(std::move(handler))
+ , BindAddr(bindAddr)
+ , Port(port)
+ {
+ try {
+ Listener.Bind(TIpAddress(bindAddr, port));
+ } catch (yexception e) {
+ Y_FAIL("TCoHttpServer::TCoHttpServer: couldn't bind to %s:%d\n", bindAddr.data(), port);
+ }
+ }
+
+ void TCoHttpServer::Start() {
+ Listener.Listen();
+ }
+
+ void TCoHttpServer::Stop() {
+ Listener.Stop();
+ }
+
+ void TCoHttpServer::OnAcceptFull(const TAcceptFull& acc) {
+ THolder<TConnection> conn(new TConnection(acc, *this));
+ Executor.Create(*conn, "client");
+ Y_UNUSED(conn.Release());
+ }
+
+ void TCoHttpServer::OnError() {
+ throw; // just rethrow
+ }
+
+ void TCoHttpServer::ProcessRequest(IOutputStream& out, const IHttpRequest& request) {
+ try {
+ TNetworkAddress addr(BindAddr, Port);
+ TSocket sock(addr);
+ TSocketOutput sock_out(sock);
+ TSocketInput sock_in(sock);
+ sock_out << "GET " << request.GetURI() << " HTTP/1.0\r\n\r\n";
+ THttpInput http_in(&sock_in);
+ try {
+ out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
+ TransferData(&http_in, &out);
+ } catch (...) {
+ YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
+ }
+ } catch (const yexception& /*e*/) {
+ out << "HTTP/1.1 500 Internal server error\nConnection: Close\n\n";
+ YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
+ }
+ }
+
+ /* TMtHttpServer */
+
+ class TMtHttpServer::TConnection: public TClientRequest, public THttpClient {
+ public:
+ TConnection(const TMtHttpServer& parent)
+ : Parent(parent)
+ {
+ }
+
+ bool Reply(void*) override {
+ ServeRequest(Input(), Output(), NAddr::GetPeerAddr(Socket()).Get(), Parent.Handler);
+ return true;
+ }
+
+ private:
+ const TMtHttpServer& Parent;
+ };
+
+ TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, IThreadFactory* pool)
+ : THttpServer(this, options, pool)
+ , Handler(std::move(handler))
+ {
+ }
+
+ TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, TSimpleSharedPtr<IThreadPool> pool)
+ : THttpServer(this, /* mainWorkers = */pool, /* failWorkers = */pool, options)
+ , Handler(std::move(handler))
+ {
+ }
+
+ bool TMtHttpServer::Start() {
+ return THttpServer::Start();
+ }
+
+ void TMtHttpServer::StartOrThrow() {
+ if (!Start()) {
+ const auto& opts = THttpServer::Options();
+ TNetworkAddress addr = opts.Host
+ ? TNetworkAddress(opts.Host, opts.Port)
+ : TNetworkAddress(opts.Port);
+ ythrow TSystemError(GetErrorCode()) << addr;
+ }
+ }
+
+ void TMtHttpServer::Stop() {
+ THttpServer::Stop();
+ }
+
+ TClientRequest* TMtHttpServer::CreateClient() {
+ return new TConnection(*this);
+ }
+
+ /* TService */
+
+ TMonService::TMonService(TContExecutor& executor, TIpPort internalPort, TIpPort externalPort,
+ THandler coHandler, THandler mtHandler)
+ : CoServer(executor, "127.0.0.1", internalPort, std::move(coHandler))
+ , MtServer(THttpServerOptions(externalPort), std::bind(&TMonService::DispatchRequest, this, std::placeholders::_1, std::placeholders::_2))
+ , MtHandler(std::move(mtHandler))
+ {
+ }
+
+ void TMonService::Start() {
+ MtServer.Start();
+ CoServer.Start();
+ }
+
+ void TMonService::Stop() {
+ MtServer.Stop();
+ CoServer.Stop();
+ }
+
+ void TMonService::DispatchRequest(IOutputStream& out, const IHttpRequest& request) {
+ if (strcmp(request.GetPath(), "/") == 0) {
+ out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
+ MtHandler(out, request);
+ } else
+ CoServer.ProcessRequest(out, request);
+ }
+
+}