diff options
author | elviandante <elviandante@yandex-team.ru> | 2022-02-10 16:49:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:47 +0300 |
commit | 621a17b75565a8d70df465a0ac5c93a7c6d2e61f (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/monlib | |
parent | 643ddee8bd6125a18c4b1506c35bee857f64f4d2 (diff) | |
download | ydb-621a17b75565a8d70df465a0ac5c93a7c6d2e61f.tar.gz |
Restoring authorship annotation for <elviandante@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/monlib')
-rw-r--r-- | library/cpp/monlib/counters/counters.h | 54 | ||||
-rw-r--r-- | library/cpp/monlib/service/service.cpp | 268 | ||||
-rw-r--r-- | library/cpp/monlib/service/service.h | 110 | ||||
-rw-r--r-- | library/cpp/monlib/ya.make | 2 |
4 files changed, 217 insertions, 217 deletions
diff --git a/library/cpp/monlib/counters/counters.h b/library/cpp/monlib/counters/counters.h index b2e2d2794b..038b55f0c8 100644 --- a/library/cpp/monlib/counters/counters.h +++ b/library/cpp/monlib/counters/counters.h @@ -1,5 +1,5 @@ #pragma once - + #include <util/datetime/base.h> #include <util/generic/algorithm.h> #include <util/generic/list.h> @@ -8,27 +8,27 @@ #include <util/generic/singleton.h> #include <util/generic/vector.h> #include <util/str_stl.h> -#include <util/stream/output.h> +#include <util/stream/output.h> #include <util/string/util.h> -#include <util/system/atomic.h> +#include <util/system/atomic.h> #include <util/system/defaults.h> #include <util/system/guard.h> #include <util/system/sem.h> #include <util/system/spinlock.h> - + #include <array> -namespace NMonitoring { +namespace NMonitoring { #define BEGIN_OUTPUT_COUNTERS \ void OutputImpl(IOutputStream& out) { \ char prettyBuf[32]; #define END_OUTPUT_COUNTERS \ out.Flush(); \ } - + #define OUTPUT_NAMED_COUNTER(var, name) out << name << ": \t" << var << NMonitoring::PrettyNum(var, prettyBuf, 32) << '\n' #define OUTPUT_COUNTER(var) OUTPUT_NAMED_COUNTER(var, #var); - + char* PrettyNumShort(i64 val, char* buf, size_t size); char* PrettyNum(i64 val, char* buf, size_t size); @@ -39,13 +39,13 @@ namespace NMonitoring { public: using TValue = TAtomic; using TValueBase = TAtomicBase; - + TDeprecatedCounter() : Value() , Derivative(false) { } - + TDeprecatedCounter(TValue value, bool derivative = false) : Value(value) , Derivative(derivative) @@ -62,7 +62,7 @@ namespace NMonitoring { TValueBase Val() const { return AtomicGet(Value); } - + void Set(TValue val) { AtomicSet(Value, val); } @@ -73,7 +73,7 @@ namespace NMonitoring { TValueBase Dec() { return AtomicDecrement(Value); } - + TValueBase Add(const TValue val) { return AtomicAdd(Value, val); } @@ -88,21 +88,21 @@ namespace NMonitoring { void operator++(int) { Inc(); } - + void operator--() { Dec(); } void operator--(int) { Dec(); } - + void operator+=(TValue rhs) { Add(rhs); } void operator-=(TValue rhs) { Sub(rhs); } - + TValueBase operator=(TValue rhs) { AtomicSwap(&Value, rhs); return rhs; @@ -111,7 +111,7 @@ namespace NMonitoring { bool operator!() const { return AtomicGet(Value) == 0; } - + TAtomic& GetAtomic() { return Value; } @@ -120,14 +120,14 @@ namespace NMonitoring { TAtomic Value; bool Derivative; }; - + template <typename T> struct TDeprecatedCountersBase { virtual ~TDeprecatedCountersBase() { } - + virtual void OutputImpl(IOutputStream&) = 0; - + static T& Instance() { return *Singleton<T>(); } @@ -337,14 +337,14 @@ namespace NMonitoring { } static inline IOutputStream& operator<<(IOutputStream& o, const NMonitoring::TDeprecatedCounter& rhs) { - return o << rhs.Val(); -} - -template <size_t N> + return o << rhs.Val(); +} + +template <size_t N> static inline IOutputStream& operator<<(IOutputStream& o, const std::array<NMonitoring::TDeprecatedCounter, N>& rhs) { for (typename std::array<NMonitoring::TDeprecatedCounter, N>::const_iterator it = rhs.begin(); it != rhs.end(); ++it) { - if (!!*it) - o << *it << Endl; - } - return o; -} + if (!!*it) + o << *it << Endl; + } + return o; +} diff --git a/library/cpp/monlib/service/service.cpp b/library/cpp/monlib/service/service.cpp index 559aba661e..929efbf816 100644 --- a/library/cpp/monlib/service/service.cpp +++ b/library/cpp/monlib/service/service.cpp @@ -5,64 +5,64 @@ #include <library/cpp/http/fetch/httpheader.h> #include <library/cpp/http/fetch/httpfsm.h> #include <library/cpp/uri/http_url.h> - + #include <util/generic/buffer.h> #include <util/stream/str.h> #include <util/stream/buffer.h> #include <util/stream/zerocopy.h> #include <util/string/vector.h> -namespace NMonitoring { +namespace NMonitoring { class THttpClient: public IHttpRequest { - public: + public: void ServeRequest(THttpInput& in, IOutputStream& out, const NAddr::IRemoteAddr* remoteAddr, const THandler& Handler) { - try { - try { + try { + try { RemoteAddr = remoteAddr; - THttpHeaderParser parser; - parser.Init(&Header); + THttpHeaderParser parser; + parser.Init(&Header); if (parser.Execute(in.FirstLine().data(), in.FirstLine().size()) < 0) { - out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n"; - return; - } + out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n"; + return; + } if (Url.Parse(Header.GetUrl().data()) != THttpURL::ParsedOK) { - out << "HTTP/1.1 400 Invalid url\r\nConnection: Close\r\n\r\n"; - return; - } + out << "HTTP/1.1 400 Invalid url\r\nConnection: Close\r\n\r\n"; + return; + } TString path = GetPath(); if (!path.StartsWith('/')) { out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n"; return; } Headers = &in.Headers(); - CgiParams.Scan(Url.Get(THttpURL::FieldQuery)); + CgiParams.Scan(Url.Get(THttpURL::FieldQuery)); } catch (...) { - out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n"; + out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n"; YSYSLOG(TLOG_ERR, "THttpClient: internal error while serving monitoring request: %s", CurrentExceptionMessage().data()); - } - + } + if (Header.http_method == HTTP_METHOD_POST) TransferData(&in, &PostContent); - Handler(out, *this); - out.Finish(); + Handler(out, *this); + out.Finish(); } catch (...) { auto msg = CurrentExceptionMessage(); out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n" << msg; out.Finish(); YSYSLOG(TLOG_ERR, "THttpClient: error while serving monitoring request: %s", msg.data()); - } - } - + } + } + const char* GetURI() const override { return Header.request_uri.c_str(); - } + } const char* GetPath() const override { - return Url.Get(THttpURL::FieldPath); - } + return Url.Get(THttpURL::FieldPath); + } const TCgiParameters& GetParams() const override { - return CgiParams; - } + return CgiParams; + } const TCgiParameters& GetPostParams() const override { if (PostParams.empty() && !PostContent.Buffer().Empty()) const_cast<THttpClient*>(this)->ScanPostParams(); @@ -90,125 +90,125 @@ namespace NMonitoring { return RemoteAddr ? NAddr::PrintHostAndPort(*RemoteAddr) : TString(); } - private: - THttpRequestHeader Header; + private: + THttpRequestHeader Header; const THttpHeaders* Headers = nullptr; - THttpURL Url; + THttpURL Url; TCgiParameters CgiParams; TCgiParameters PostParams; TBufferOutput PostContent; const NAddr::IRemoteAddr* RemoteAddr = nullptr; - }; - - /* TCoHttpServer */ - + }; + + /* TCoHttpServer */ + class TCoHttpServer::TConnection: public THttpClient { - public: + public: TConnection(const TCoHttpServer::TAcceptFull& acc, const TCoHttpServer& parent) : Socket(acc.S->Release()) , RemoteAddr(acc.Remote) - , Parent(parent) - { - } - + , Parent(parent) + { + } + void operator()(TCont* c) { - try { - THolder<TConnection> me(this); - TContIO io(Socket, c); - THttpInput in(&io); - THttpOutput out(&io, &in); - // buffer reply so there will be ne context switching - TStringStream s; + try { + THolder<TConnection> me(this); + TContIO io(Socket, c); + THttpInput in(&io); + THttpOutput out(&io, &in); + // buffer reply so there will be ne context switching + TStringStream s; ServeRequest(in, s, RemoteAddr, Parent.Handler); - out << s.Str(); - out.Finish(); + out << s.Str(); + out.Finish(); } catch (...) { YSYSLOG(TLOG_WARNING, "TCoHttpServer::TConnection: error: %s\n", CurrentExceptionMessage().data()); - } - } + } + } - private: - TSocketHolder Socket; + private: + TSocketHolder Socket; const NAddr::IRemoteAddr* RemoteAddr; - const TCoHttpServer& Parent; - }; - + const TCoHttpServer& Parent; + }; + TCoHttpServer::TCoHttpServer(TContExecutor& executor, const TString& bindAddr, TIpPort port, THandler handler) - : Executor(executor) - , Listener(this, &executor) + : Executor(executor) + , Listener(this, &executor) , Handler(std::move(handler)) - , BindAddr(bindAddr) - , Port(port) - { + , BindAddr(bindAddr) + , Port(port) + { try { Listener.Bind(TIpAddress(bindAddr, port)); } catch (yexception e) { Y_FAIL("TCoHttpServer::TCoHttpServer: couldn't bind to %s:%d\n", bindAddr.data(), port); } - } - - void TCoHttpServer::Start() { - Listener.Listen(); - } - - void TCoHttpServer::Stop() { - Listener.Stop(); - } - + } + + void TCoHttpServer::Start() { + Listener.Listen(); + } + + void TCoHttpServer::Stop() { + Listener.Stop(); + } + void TCoHttpServer::OnAcceptFull(const TAcceptFull& acc) { THolder<TConnection> conn(new TConnection(acc, *this)); - Executor.Create(*conn, "client"); + Executor.Create(*conn, "client"); Y_UNUSED(conn.Release()); - } - - void TCoHttpServer::OnError() { - throw; // just rethrow - } - + } + + void TCoHttpServer::OnError() { + throw; // just rethrow + } + void TCoHttpServer::ProcessRequest(IOutputStream& out, const IHttpRequest& request) { - try { - TNetworkAddress addr(BindAddr, Port); - TSocket sock(addr); - TSocketOutput sock_out(sock); - TSocketInput sock_in(sock); + try { + TNetworkAddress addr(BindAddr, Port); + TSocket sock(addr); + TSocketOutput sock_out(sock); + TSocketInput sock_in(sock); sock_out << "GET " << request.GetURI() << " HTTP/1.0\r\n\r\n"; - THttpInput http_in(&sock_in); - try { - out << "HTTP/1.1 200 Ok\nConnection: Close\n\n"; - TransferData(&http_in, &out); + THttpInput http_in(&sock_in); + try { + out << "HTTP/1.1 200 Ok\nConnection: Close\n\n"; + TransferData(&http_in, &out); } catch (...) { YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data()); - } + } } catch (const yexception& /*e*/) { - out << "HTTP/1.1 500 Internal server error\nConnection: Close\n\n"; + out << "HTTP/1.1 500 Internal server error\nConnection: Close\n\n"; YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data()); - } - } - - /* TMtHttpServer */ - + } + } + + /* TMtHttpServer */ + class TMtHttpServer::TConnection: public TClientRequest, public THttpClient { - public: - TConnection(const TMtHttpServer& parent) - : Parent(parent) - { - } - + public: + TConnection(const TMtHttpServer& parent) + : Parent(parent) + { + } + bool Reply(void*) override { ServeRequest(Input(), Output(), NAddr::GetPeerAddr(Socket()).Get(), Parent.Handler); - return true; - } + return true; + } + + private: + const TMtHttpServer& Parent; + }; - private: - const TMtHttpServer& Parent; - }; - TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, IThreadFactory* pool) - : THttpServer(this, options, pool) + : THttpServer(this, options, pool) , Handler(std::move(handler)) - { - } - + { + } + TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, TSimpleSharedPtr<IThreadPool> pool) : THttpServer(this, /* mainWorkers = */pool, /* failWorkers = */pool, options) , Handler(std::move(handler)) @@ -233,36 +233,36 @@ namespace NMonitoring { THttpServer::Stop(); } - TClientRequest* TMtHttpServer::CreateClient() { - return new TConnection(*this); - } - - /* TService */ - + TClientRequest* TMtHttpServer::CreateClient() { + return new TConnection(*this); + } + + /* TService */ + TMonService::TMonService(TContExecutor& executor, TIpPort internalPort, TIpPort externalPort, THandler coHandler, THandler mtHandler) : CoServer(executor, "127.0.0.1", internalPort, std::move(coHandler)) , MtServer(THttpServerOptions(externalPort), std::bind(&TMonService::DispatchRequest, this, std::placeholders::_1, std::placeholders::_2)) , MtHandler(std::move(mtHandler)) - { - } - - void TMonService::Start() { - MtServer.Start(); - CoServer.Start(); - } - - void TMonService::Stop() { - MtServer.Stop(); - CoServer.Stop(); - } - + { + } + + void TMonService::Start() { + MtServer.Start(); + CoServer.Start(); + } + + void TMonService::Stop() { + MtServer.Stop(); + CoServer.Stop(); + } + void TMonService::DispatchRequest(IOutputStream& out, const IHttpRequest& request) { - if (strcmp(request.GetPath(), "/") == 0) { - out << "HTTP/1.1 200 Ok\nConnection: Close\n\n"; - MtHandler(out, request); + if (strcmp(request.GetPath(), "/") == 0) { + out << "HTTP/1.1 200 Ok\nConnection: Close\n\n"; + MtHandler(out, request); } else - CoServer.ProcessRequest(out, request); - } - + CoServer.ProcessRequest(out, request); + } + } diff --git a/library/cpp/monlib/service/service.h b/library/cpp/monlib/service/service.h index 2ea5f0f4a2..2f66dddaf8 100644 --- a/library/cpp/monlib/service/service.h +++ b/library/cpp/monlib/service/service.h @@ -1,65 +1,65 @@ #pragma once - + #include <library/cpp/coroutine/engine/impl.h> #include <library/cpp/coroutine/listener/listen.h> #include <library/cpp/http/fetch/httpheader.h> #include <library/cpp/http/server/http.h> #include <library/cpp/logger/all.h> -#include <util/network/ip.h> +#include <util/network/ip.h> #include <library/cpp/cgiparam/cgiparam.h> - + #include <functional> - + struct TMonitor; - -namespace NMonitoring { - struct IHttpRequest { + +namespace NMonitoring { + struct IHttpRequest { virtual ~IHttpRequest() { } - virtual const char* GetURI() const = 0; - virtual const char* GetPath() const = 0; + virtual const char* GetURI() const = 0; + virtual const char* GetPath() const = 0; virtual const TCgiParameters& GetParams() const = 0; virtual const TCgiParameters& GetPostParams() const = 0; virtual TStringBuf GetPostContent() const = 0; virtual HTTP_METHOD GetMethod() const = 0; virtual const THttpHeaders& GetHeaders() const = 0; virtual TString GetRemoteAddr() const = 0; - }; - // first param - output stream to write result to - // second param - URL of request + }; + // first param - output stream to write result to + // second param - URL of request typedef std::function<void(IOutputStream&, const IHttpRequest&)> THandler; - + class TCoHttpServer: private TContListener::ICallBack { - public: - // initialize and schedule coroutines for execution + public: + // initialize and schedule coroutines for execution TCoHttpServer(TContExecutor& executor, const TString& bindAddr, TIpPort port, THandler handler); - void Start(); - void Stop(); - - // this function implements THandler interface - // by forwarding it to the httpserver - // @note this call may be blocking; don't use inside coroutines - // @throws may throw in case of connection error, etc + void Start(); + void Stop(); + + // this function implements THandler interface + // by forwarding it to the httpserver + // @note this call may be blocking; don't use inside coroutines + // @throws may throw in case of connection error, etc void ProcessRequest(IOutputStream&, const IHttpRequest&); - private: - class TConnection; - - // ICallBack implementation + private: + class TConnection; + + // ICallBack implementation void OnAcceptFull(const TAcceptFull& a) override; void OnError() override; - private: - TContExecutor& Executor; - TContListener Listener; - THandler Handler; + private: + TContExecutor& Executor; + TContListener Listener; + THandler Handler; TString BindAddr; - TIpPort Port; - }; - + TIpPort Port; + }; + class TMtHttpServer: public THttpServer, private THttpServer::ICallBack { - public: + public: TMtHttpServer(const TOptions& options, THandler handler, IThreadFactory* pool = nullptr); TMtHttpServer(const TOptions& options, THandler handler, TSimpleSharedPtr<IThreadPool> pool); @@ -82,31 +82,31 @@ namespace NMonitoring { */ void Stop(); - private: - class TConnection; + private: + class TConnection; TClientRequest* CreateClient() override; - - THandler Handler; - }; - - // this class implements hybrid coroutine and threaded approach - // requests for main page which holds counters and simple tables are served in a thread - // requests for other pages which include access with inter-thread synchonization - // will be served in a coroutine context - class TMonService { - public: + + THandler Handler; + }; + + // this class implements hybrid coroutine and threaded approach + // requests for main page which holds counters and simple tables are served in a thread + // requests for other pages which include access with inter-thread synchonization + // will be served in a coroutine context + class TMonService { + public: TMonService(TContExecutor& executor, TIpPort internalPort, TIpPort externalPort, THandler coHandler, THandler mtHandler); - void Start(); - void Stop(); + void Start(); + void Stop(); - protected: + protected: void DispatchRequest(IOutputStream& out, const IHttpRequest&); - private: - TCoHttpServer CoServer; - TMtHttpServer MtServer; - THandler MtHandler; - }; - + private: + TCoHttpServer CoServer; + TMtHttpServer MtServer; + THandler MtHandler; + }; + } diff --git a/library/cpp/monlib/ya.make b/library/cpp/monlib/ya.make index 9736635f59..9bd236d6fd 100644 --- a/library/cpp/monlib/ya.make +++ b/library/cpp/monlib/ya.make @@ -42,4 +42,4 @@ RECURSE( service/auth/tvm service/pages service/pages/tablesorter -) +) |