aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib
diff options
context:
space:
mode:
authorelviandante <elviandante@yandex-team.ru>2022-02-10 16:49:47 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:47 +0300
commit621a17b75565a8d70df465a0ac5c93a7c6d2e61f (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/monlib
parent643ddee8bd6125a18c4b1506c35bee857f64f4d2 (diff)
downloadydb-621a17b75565a8d70df465a0ac5c93a7c6d2e61f.tar.gz
Restoring authorship annotation for <elviandante@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/monlib')
-rw-r--r--library/cpp/monlib/counters/counters.h54
-rw-r--r--library/cpp/monlib/service/service.cpp268
-rw-r--r--library/cpp/monlib/service/service.h110
-rw-r--r--library/cpp/monlib/ya.make2
4 files changed, 217 insertions, 217 deletions
diff --git a/library/cpp/monlib/counters/counters.h b/library/cpp/monlib/counters/counters.h
index b2e2d2794b..038b55f0c8 100644
--- a/library/cpp/monlib/counters/counters.h
+++ b/library/cpp/monlib/counters/counters.h
@@ -1,5 +1,5 @@
#pragma once
-
+
#include <util/datetime/base.h>
#include <util/generic/algorithm.h>
#include <util/generic/list.h>
@@ -8,27 +8,27 @@
#include <util/generic/singleton.h>
#include <util/generic/vector.h>
#include <util/str_stl.h>
-#include <util/stream/output.h>
+#include <util/stream/output.h>
#include <util/string/util.h>
-#include <util/system/atomic.h>
+#include <util/system/atomic.h>
#include <util/system/defaults.h>
#include <util/system/guard.h>
#include <util/system/sem.h>
#include <util/system/spinlock.h>
-
+
#include <array>
-namespace NMonitoring {
+namespace NMonitoring {
#define BEGIN_OUTPUT_COUNTERS \
void OutputImpl(IOutputStream& out) { \
char prettyBuf[32];
#define END_OUTPUT_COUNTERS \
out.Flush(); \
}
-
+
#define OUTPUT_NAMED_COUNTER(var, name) out << name << ": \t" << var << NMonitoring::PrettyNum(var, prettyBuf, 32) << '\n'
#define OUTPUT_COUNTER(var) OUTPUT_NAMED_COUNTER(var, #var);
-
+
char* PrettyNumShort(i64 val, char* buf, size_t size);
char* PrettyNum(i64 val, char* buf, size_t size);
@@ -39,13 +39,13 @@ namespace NMonitoring {
public:
using TValue = TAtomic;
using TValueBase = TAtomicBase;
-
+
TDeprecatedCounter()
: Value()
, Derivative(false)
{
}
-
+
TDeprecatedCounter(TValue value, bool derivative = false)
: Value(value)
, Derivative(derivative)
@@ -62,7 +62,7 @@ namespace NMonitoring {
TValueBase Val() const {
return AtomicGet(Value);
}
-
+
void Set(TValue val) {
AtomicSet(Value, val);
}
@@ -73,7 +73,7 @@ namespace NMonitoring {
TValueBase Dec() {
return AtomicDecrement(Value);
}
-
+
TValueBase Add(const TValue val) {
return AtomicAdd(Value, val);
}
@@ -88,21 +88,21 @@ namespace NMonitoring {
void operator++(int) {
Inc();
}
-
+
void operator--() {
Dec();
}
void operator--(int) {
Dec();
}
-
+
void operator+=(TValue rhs) {
Add(rhs);
}
void operator-=(TValue rhs) {
Sub(rhs);
}
-
+
TValueBase operator=(TValue rhs) {
AtomicSwap(&Value, rhs);
return rhs;
@@ -111,7 +111,7 @@ namespace NMonitoring {
bool operator!() const {
return AtomicGet(Value) == 0;
}
-
+
TAtomic& GetAtomic() {
return Value;
}
@@ -120,14 +120,14 @@ namespace NMonitoring {
TAtomic Value;
bool Derivative;
};
-
+
template <typename T>
struct TDeprecatedCountersBase {
virtual ~TDeprecatedCountersBase() {
}
-
+
virtual void OutputImpl(IOutputStream&) = 0;
-
+
static T& Instance() {
return *Singleton<T>();
}
@@ -337,14 +337,14 @@ namespace NMonitoring {
}
static inline IOutputStream& operator<<(IOutputStream& o, const NMonitoring::TDeprecatedCounter& rhs) {
- return o << rhs.Val();
-}
-
-template <size_t N>
+ return o << rhs.Val();
+}
+
+template <size_t N>
static inline IOutputStream& operator<<(IOutputStream& o, const std::array<NMonitoring::TDeprecatedCounter, N>& rhs) {
for (typename std::array<NMonitoring::TDeprecatedCounter, N>::const_iterator it = rhs.begin(); it != rhs.end(); ++it) {
- if (!!*it)
- o << *it << Endl;
- }
- return o;
-}
+ if (!!*it)
+ o << *it << Endl;
+ }
+ return o;
+}
diff --git a/library/cpp/monlib/service/service.cpp b/library/cpp/monlib/service/service.cpp
index 559aba661e..929efbf816 100644
--- a/library/cpp/monlib/service/service.cpp
+++ b/library/cpp/monlib/service/service.cpp
@@ -5,64 +5,64 @@
#include <library/cpp/http/fetch/httpheader.h>
#include <library/cpp/http/fetch/httpfsm.h>
#include <library/cpp/uri/http_url.h>
-
+
#include <util/generic/buffer.h>
#include <util/stream/str.h>
#include <util/stream/buffer.h>
#include <util/stream/zerocopy.h>
#include <util/string/vector.h>
-namespace NMonitoring {
+namespace NMonitoring {
class THttpClient: public IHttpRequest {
- public:
+ public:
void ServeRequest(THttpInput& in, IOutputStream& out, const NAddr::IRemoteAddr* remoteAddr, const THandler& Handler) {
- try {
- try {
+ try {
+ try {
RemoteAddr = remoteAddr;
- THttpHeaderParser parser;
- parser.Init(&Header);
+ THttpHeaderParser parser;
+ parser.Init(&Header);
if (parser.Execute(in.FirstLine().data(), in.FirstLine().size()) < 0) {
- out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
- return;
- }
+ out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
+ return;
+ }
if (Url.Parse(Header.GetUrl().data()) != THttpURL::ParsedOK) {
- out << "HTTP/1.1 400 Invalid url\r\nConnection: Close\r\n\r\n";
- return;
- }
+ out << "HTTP/1.1 400 Invalid url\r\nConnection: Close\r\n\r\n";
+ return;
+ }
TString path = GetPath();
if (!path.StartsWith('/')) {
out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
return;
}
Headers = &in.Headers();
- CgiParams.Scan(Url.Get(THttpURL::FieldQuery));
+ CgiParams.Scan(Url.Get(THttpURL::FieldQuery));
} catch (...) {
- out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n";
+ out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n";
YSYSLOG(TLOG_ERR, "THttpClient: internal error while serving monitoring request: %s", CurrentExceptionMessage().data());
- }
-
+ }
+
if (Header.http_method == HTTP_METHOD_POST)
TransferData(&in, &PostContent);
- Handler(out, *this);
- out.Finish();
+ Handler(out, *this);
+ out.Finish();
} catch (...) {
auto msg = CurrentExceptionMessage();
out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n" << msg;
out.Finish();
YSYSLOG(TLOG_ERR, "THttpClient: error while serving monitoring request: %s", msg.data());
- }
- }
-
+ }
+ }
+
const char* GetURI() const override {
return Header.request_uri.c_str();
- }
+ }
const char* GetPath() const override {
- return Url.Get(THttpURL::FieldPath);
- }
+ return Url.Get(THttpURL::FieldPath);
+ }
const TCgiParameters& GetParams() const override {
- return CgiParams;
- }
+ return CgiParams;
+ }
const TCgiParameters& GetPostParams() const override {
if (PostParams.empty() && !PostContent.Buffer().Empty())
const_cast<THttpClient*>(this)->ScanPostParams();
@@ -90,125 +90,125 @@ namespace NMonitoring {
return RemoteAddr ? NAddr::PrintHostAndPort(*RemoteAddr) : TString();
}
- private:
- THttpRequestHeader Header;
+ private:
+ THttpRequestHeader Header;
const THttpHeaders* Headers = nullptr;
- THttpURL Url;
+ THttpURL Url;
TCgiParameters CgiParams;
TCgiParameters PostParams;
TBufferOutput PostContent;
const NAddr::IRemoteAddr* RemoteAddr = nullptr;
- };
-
- /* TCoHttpServer */
-
+ };
+
+ /* TCoHttpServer */
+
class TCoHttpServer::TConnection: public THttpClient {
- public:
+ public:
TConnection(const TCoHttpServer::TAcceptFull& acc, const TCoHttpServer& parent)
: Socket(acc.S->Release())
, RemoteAddr(acc.Remote)
- , Parent(parent)
- {
- }
-
+ , Parent(parent)
+ {
+ }
+
void operator()(TCont* c) {
- try {
- THolder<TConnection> me(this);
- TContIO io(Socket, c);
- THttpInput in(&io);
- THttpOutput out(&io, &in);
- // buffer reply so there will be ne context switching
- TStringStream s;
+ try {
+ THolder<TConnection> me(this);
+ TContIO io(Socket, c);
+ THttpInput in(&io);
+ THttpOutput out(&io, &in);
+ // buffer reply so there will be ne context switching
+ TStringStream s;
ServeRequest(in, s, RemoteAddr, Parent.Handler);
- out << s.Str();
- out.Finish();
+ out << s.Str();
+ out.Finish();
} catch (...) {
YSYSLOG(TLOG_WARNING, "TCoHttpServer::TConnection: error: %s\n", CurrentExceptionMessage().data());
- }
- }
+ }
+ }
- private:
- TSocketHolder Socket;
+ private:
+ TSocketHolder Socket;
const NAddr::IRemoteAddr* RemoteAddr;
- const TCoHttpServer& Parent;
- };
-
+ const TCoHttpServer& Parent;
+ };
+
TCoHttpServer::TCoHttpServer(TContExecutor& executor, const TString& bindAddr, TIpPort port, THandler handler)
- : Executor(executor)
- , Listener(this, &executor)
+ : Executor(executor)
+ , Listener(this, &executor)
, Handler(std::move(handler))
- , BindAddr(bindAddr)
- , Port(port)
- {
+ , BindAddr(bindAddr)
+ , Port(port)
+ {
try {
Listener.Bind(TIpAddress(bindAddr, port));
} catch (yexception e) {
Y_FAIL("TCoHttpServer::TCoHttpServer: couldn't bind to %s:%d\n", bindAddr.data(), port);
}
- }
-
- void TCoHttpServer::Start() {
- Listener.Listen();
- }
-
- void TCoHttpServer::Stop() {
- Listener.Stop();
- }
-
+ }
+
+ void TCoHttpServer::Start() {
+ Listener.Listen();
+ }
+
+ void TCoHttpServer::Stop() {
+ Listener.Stop();
+ }
+
void TCoHttpServer::OnAcceptFull(const TAcceptFull& acc) {
THolder<TConnection> conn(new TConnection(acc, *this));
- Executor.Create(*conn, "client");
+ Executor.Create(*conn, "client");
Y_UNUSED(conn.Release());
- }
-
- void TCoHttpServer::OnError() {
- throw; // just rethrow
- }
-
+ }
+
+ void TCoHttpServer::OnError() {
+ throw; // just rethrow
+ }
+
void TCoHttpServer::ProcessRequest(IOutputStream& out, const IHttpRequest& request) {
- try {
- TNetworkAddress addr(BindAddr, Port);
- TSocket sock(addr);
- TSocketOutput sock_out(sock);
- TSocketInput sock_in(sock);
+ try {
+ TNetworkAddress addr(BindAddr, Port);
+ TSocket sock(addr);
+ TSocketOutput sock_out(sock);
+ TSocketInput sock_in(sock);
sock_out << "GET " << request.GetURI() << " HTTP/1.0\r\n\r\n";
- THttpInput http_in(&sock_in);
- try {
- out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
- TransferData(&http_in, &out);
+ THttpInput http_in(&sock_in);
+ try {
+ out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
+ TransferData(&http_in, &out);
} catch (...) {
YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
- }
+ }
} catch (const yexception& /*e*/) {
- out << "HTTP/1.1 500 Internal server error\nConnection: Close\n\n";
+ out << "HTTP/1.1 500 Internal server error\nConnection: Close\n\n";
YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
- }
- }
-
- /* TMtHttpServer */
-
+ }
+ }
+
+ /* TMtHttpServer */
+
class TMtHttpServer::TConnection: public TClientRequest, public THttpClient {
- public:
- TConnection(const TMtHttpServer& parent)
- : Parent(parent)
- {
- }
-
+ public:
+ TConnection(const TMtHttpServer& parent)
+ : Parent(parent)
+ {
+ }
+
bool Reply(void*) override {
ServeRequest(Input(), Output(), NAddr::GetPeerAddr(Socket()).Get(), Parent.Handler);
- return true;
- }
+ return true;
+ }
+
+ private:
+ const TMtHttpServer& Parent;
+ };
- private:
- const TMtHttpServer& Parent;
- };
-
TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, IThreadFactory* pool)
- : THttpServer(this, options, pool)
+ : THttpServer(this, options, pool)
, Handler(std::move(handler))
- {
- }
-
+ {
+ }
+
TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, TSimpleSharedPtr<IThreadPool> pool)
: THttpServer(this, /* mainWorkers = */pool, /* failWorkers = */pool, options)
, Handler(std::move(handler))
@@ -233,36 +233,36 @@ namespace NMonitoring {
THttpServer::Stop();
}
- TClientRequest* TMtHttpServer::CreateClient() {
- return new TConnection(*this);
- }
-
- /* TService */
-
+ TClientRequest* TMtHttpServer::CreateClient() {
+ return new TConnection(*this);
+ }
+
+ /* TService */
+
TMonService::TMonService(TContExecutor& executor, TIpPort internalPort, TIpPort externalPort,
THandler coHandler, THandler mtHandler)
: CoServer(executor, "127.0.0.1", internalPort, std::move(coHandler))
, MtServer(THttpServerOptions(externalPort), std::bind(&TMonService::DispatchRequest, this, std::placeholders::_1, std::placeholders::_2))
, MtHandler(std::move(mtHandler))
- {
- }
-
- void TMonService::Start() {
- MtServer.Start();
- CoServer.Start();
- }
-
- void TMonService::Stop() {
- MtServer.Stop();
- CoServer.Stop();
- }
-
+ {
+ }
+
+ void TMonService::Start() {
+ MtServer.Start();
+ CoServer.Start();
+ }
+
+ void TMonService::Stop() {
+ MtServer.Stop();
+ CoServer.Stop();
+ }
+
void TMonService::DispatchRequest(IOutputStream& out, const IHttpRequest& request) {
- if (strcmp(request.GetPath(), "/") == 0) {
- out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
- MtHandler(out, request);
+ if (strcmp(request.GetPath(), "/") == 0) {
+ out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
+ MtHandler(out, request);
} else
- CoServer.ProcessRequest(out, request);
- }
-
+ CoServer.ProcessRequest(out, request);
+ }
+
}
diff --git a/library/cpp/monlib/service/service.h b/library/cpp/monlib/service/service.h
index 2ea5f0f4a2..2f66dddaf8 100644
--- a/library/cpp/monlib/service/service.h
+++ b/library/cpp/monlib/service/service.h
@@ -1,65 +1,65 @@
#pragma once
-
+
#include <library/cpp/coroutine/engine/impl.h>
#include <library/cpp/coroutine/listener/listen.h>
#include <library/cpp/http/fetch/httpheader.h>
#include <library/cpp/http/server/http.h>
#include <library/cpp/logger/all.h>
-#include <util/network/ip.h>
+#include <util/network/ip.h>
#include <library/cpp/cgiparam/cgiparam.h>
-
+
#include <functional>
-
+
struct TMonitor;
-
-namespace NMonitoring {
- struct IHttpRequest {
+
+namespace NMonitoring {
+ struct IHttpRequest {
virtual ~IHttpRequest() {
}
- virtual const char* GetURI() const = 0;
- virtual const char* GetPath() const = 0;
+ virtual const char* GetURI() const = 0;
+ virtual const char* GetPath() const = 0;
virtual const TCgiParameters& GetParams() const = 0;
virtual const TCgiParameters& GetPostParams() const = 0;
virtual TStringBuf GetPostContent() const = 0;
virtual HTTP_METHOD GetMethod() const = 0;
virtual const THttpHeaders& GetHeaders() const = 0;
virtual TString GetRemoteAddr() const = 0;
- };
- // first param - output stream to write result to
- // second param - URL of request
+ };
+ // first param - output stream to write result to
+ // second param - URL of request
typedef std::function<void(IOutputStream&, const IHttpRequest&)> THandler;
-
+
class TCoHttpServer: private TContListener::ICallBack {
- public:
- // initialize and schedule coroutines for execution
+ public:
+ // initialize and schedule coroutines for execution
TCoHttpServer(TContExecutor& executor, const TString& bindAddr, TIpPort port, THandler handler);
- void Start();
- void Stop();
-
- // this function implements THandler interface
- // by forwarding it to the httpserver
- // @note this call may be blocking; don't use inside coroutines
- // @throws may throw in case of connection error, etc
+ void Start();
+ void Stop();
+
+ // this function implements THandler interface
+ // by forwarding it to the httpserver
+ // @note this call may be blocking; don't use inside coroutines
+ // @throws may throw in case of connection error, etc
void ProcessRequest(IOutputStream&, const IHttpRequest&);
- private:
- class TConnection;
-
- // ICallBack implementation
+ private:
+ class TConnection;
+
+ // ICallBack implementation
void OnAcceptFull(const TAcceptFull& a) override;
void OnError() override;
- private:
- TContExecutor& Executor;
- TContListener Listener;
- THandler Handler;
+ private:
+ TContExecutor& Executor;
+ TContListener Listener;
+ THandler Handler;
TString BindAddr;
- TIpPort Port;
- };
-
+ TIpPort Port;
+ };
+
class TMtHttpServer: public THttpServer, private THttpServer::ICallBack {
- public:
+ public:
TMtHttpServer(const TOptions& options, THandler handler, IThreadFactory* pool = nullptr);
TMtHttpServer(const TOptions& options, THandler handler, TSimpleSharedPtr<IThreadPool> pool);
@@ -82,31 +82,31 @@ namespace NMonitoring {
*/
void Stop();
- private:
- class TConnection;
+ private:
+ class TConnection;
TClientRequest* CreateClient() override;
-
- THandler Handler;
- };
-
- // this class implements hybrid coroutine and threaded approach
- // requests for main page which holds counters and simple tables are served in a thread
- // requests for other pages which include access with inter-thread synchonization
- // will be served in a coroutine context
- class TMonService {
- public:
+
+ THandler Handler;
+ };
+
+ // this class implements hybrid coroutine and threaded approach
+ // requests for main page which holds counters and simple tables are served in a thread
+ // requests for other pages which include access with inter-thread synchonization
+ // will be served in a coroutine context
+ class TMonService {
+ public:
TMonService(TContExecutor& executor, TIpPort internalPort, TIpPort externalPort,
THandler coHandler, THandler mtHandler);
- void Start();
- void Stop();
+ void Start();
+ void Stop();
- protected:
+ protected:
void DispatchRequest(IOutputStream& out, const IHttpRequest&);
- private:
- TCoHttpServer CoServer;
- TMtHttpServer MtServer;
- THandler MtHandler;
- };
-
+ private:
+ TCoHttpServer CoServer;
+ TMtHttpServer MtServer;
+ THandler MtHandler;
+ };
+
}
diff --git a/library/cpp/monlib/ya.make b/library/cpp/monlib/ya.make
index 9736635f59..9bd236d6fd 100644
--- a/library/cpp/monlib/ya.make
+++ b/library/cpp/monlib/ya.make
@@ -42,4 +42,4 @@ RECURSE(
service/auth/tvm
service/pages
service/pages/tablesorter
-)
+)