aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorAlexey Efimov <xeno@prnwatch.com>2022-05-04 10:13:58 +0300
committerAlexey Efimov <xeno@prnwatch.com>2022-05-04 10:13:58 +0300
commit6faf680f58ba8341a694dcbadf572d37197ae888 (patch)
treee5fd053607e59e6db7bdec1ea100f46e149a6f97 /library
parentb4c4f96bf4475170a9ac5e4568cc7b05789986c8 (diff)
downloadydb-6faf680f58ba8341a694dcbadf572d37197ae888.tar.gz
add async monitoring http KIKIMR-14742
ref:c51d608f0ae78f08597b88f837491da33a953ef6
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/http/http.cpp61
-rw-r--r--library/cpp/actors/http/http.h55
-rw-r--r--library/cpp/actors/http/http_proxy.cpp22
-rw-r--r--library/cpp/actors/http/http_ut.cpp17
-rw-r--r--library/cpp/lwtrace/mon/mon_lwtrace.cpp6
-rw-r--r--library/cpp/lwtrace/mon/mon_lwtrace.h2
-rw-r--r--library/cpp/monlib/service/monservice.h4
7 files changed, 141 insertions, 26 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp
index 7125f9d8b0..3d07c870ce 100644
--- a/library/cpp/actors/http/http.cpp
+++ b/library/cpp/actors/http/http.cpp
@@ -63,6 +63,14 @@ void THttpRequest::Clear() {
}
template <>
+bool THttpParser<THttpRequest, TSocketBuffer>::HaveBody() const {
+ if (!Body.empty()) {
+ return true;
+ }
+ return (!ContentType.empty() || !ContentLength.empty() || !TransferEncoding.empty());
+}
+
+template <>
void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
TStringBuf data(Pos(), len);
while (!data.empty()) {
@@ -98,7 +106,6 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
case EParseStage::Header: {
if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) {
if (Header.empty()) {
- Headers = TStringBuf(Headers.data(), data.begin() - Headers.begin());
if (HaveBody()) {
Stage = EParseStage::Body;
} else {
@@ -107,6 +114,7 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
} else {
ProcessHeader(Header);
}
+ Headers = TStringBuf(Headers.data(), data.data() - Headers.data());
}
break;
}
@@ -116,8 +124,13 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
Body = Content;
Stage = EParseStage::Done;
}
- } else if (TransferEncoding == "chunked") {
+ } else if (TEqNoCase()(TransferEncoding, "chunked")) {
Stage = EParseStage::ChunkLength;
+ } else if (TotalSize.has_value()) {
+ if (ProcessData(Content, data, GetBodySizeFromTotalSize())) {
+ Body = Content;
+ Stage = EParseStage::Done;
+ }
} else {
// Invalid body encoding
Stage = EParseStage::Error;
@@ -189,6 +202,15 @@ THttpParser<THttpRequest, TSocketBuffer>::EParseStage THttpParser<THttpRequest,
}
template <>
+bool THttpParser<THttpResponse, TSocketBuffer>::HaveBody() const {
+ if (!Body.empty()) {
+ return true;
+ }
+ return (!Status.starts_with("1") && Status != "204" && Status != "304")
+ && (!ContentType.empty() || !ContentLength.empty() || !TransferEncoding.empty());
+}
+
+template <>
THttpParser<THttpResponse, TSocketBuffer>::EParseStage THttpParser<THttpResponse, TSocketBuffer>::GetInitialStage() {
return EParseStage::Protocol;
}
@@ -237,6 +259,8 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
if (Header.empty()) {
if (HaveBody() && (ContentLength.empty() || ContentLength != "0")) {
Stage = EParseStage::Body;
+ } else if (TotalSize.has_value() && !data.empty()) {
+ Stage = EParseStage::Body;
} else {
Stage = EParseStage::Done;
}
@@ -252,8 +276,13 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
if (ProcessData(Body, data, FromString(ContentLength))) {
Stage = EParseStage::Done;
}
- } else if (TransferEncoding == "chunked") {
+ } else if (TEqNoCase()(TransferEncoding, "chunked")) {
Stage = EParseStage::ChunkLength;
+ } else if (TotalSize.has_value()) {
+ if (ProcessData(Content, data, GetBodySizeFromTotalSize())) {
+ Body = Content;
+ Stage = EParseStage::Done;
+ }
} else {
// Invalid body encoding
Stage = EParseStage::Error;
@@ -333,9 +362,19 @@ void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() {
}
THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
+ THttpParser<THttpResponse, TSocketBuffer> parser(data);
+ THeadersBuilder headers(parser.Headers);
+ if (!WorkerName.empty()) {
+ headers.Set("X-Worker-Name", WorkerName);
+ }
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
- response->Append(data);
- response->Reparse();
+ response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
+ response->Set(headers);
+ if (parser.HaveBody()) {
+ response->SetBody(parser.Body);
+ } else {
+ response->Set<&THttpResponse::ContentLength>("0");
+ }
return response;
}
@@ -601,11 +640,17 @@ void TCookiesBuilder::Set(TStringBuf name, TStringBuf data) {
}
THeaders::THeaders(TStringBuf headers) {
+ Parse(headers);
+}
+
+size_t THeaders::Parse(TStringBuf headers) {
+ auto start = headers.begin();
for (TStringBuf param = headers.NextTok("\r\n"); !param.empty(); param = headers.NextTok("\r\n")) {
TStringBuf name = param.NextTok(":");
param.SkipPrefix(" ");
Headers[name] = param;
}
+ return headers.begin() - start;
}
TStringBuf THeaders::operator [](TStringBuf name) const {
@@ -636,7 +681,11 @@ TString THeaders::Render() const {
}
THeadersBuilder::THeadersBuilder()
- :THeaders(TStringBuf())
+ : THeaders(TStringBuf())
+{}
+
+THeadersBuilder::THeadersBuilder(TStringBuf headers)
+ : THeaders(headers)
{}
THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) {
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h
index 96c5c1ec48..3e5e39e719 100644
--- a/library/cpp/actors/http/http.h
+++ b/library/cpp/actors/http/http.h
@@ -40,6 +40,17 @@ struct TLessNoCase {
}
};
+struct TEqNoCase {
+ bool operator()(TStringBuf l, TStringBuf r) const {
+ auto ll = l.length();
+ auto rl = r.length();
+ if (ll != rl) {
+ return false;
+ }
+ return strnicmp(l.data(), r.data(), ll) == 0;
+ }
+};
+
struct TUrlParameters {
THashMap<TStringBuf, TStringBuf> Parameters;
@@ -77,6 +88,7 @@ struct THeaders {
TStringBuf operator [](TStringBuf name) const;
bool Has(TStringBuf name) const;
TStringBuf Get(TStringBuf name) const; // raw
+ size_t Parse(TStringBuf headers);
TString Render() const;
};
@@ -84,6 +96,7 @@ struct THeadersBuilder : THeaders {
TDeque<std::pair<TString, TString>> Data;
THeadersBuilder();
+ THeadersBuilder(TStringBuf headers);
THeadersBuilder(const THeadersBuilder& builder);
void Set(TStringBuf name, TStringBuf data);
};
@@ -188,6 +201,7 @@ public:
size_t ChunkLength = 0;
size_t ContentSize = 0;
TString Content;
+ std::optional<size_t> TotalSize;
THttpParser(const THttpParser& src)
: HeaderType(src)
@@ -285,6 +299,10 @@ public:
void Advance(size_t len);
void ConnectionClosed();
+ size_t GetBodySizeFromTotalSize() const {
+ return TotalSize.value() - (HeaderType::Headers.end() - BufferType::Data());
+ }
+
void Clear() {
BufferType::Clear();
HeaderType::Clear();
@@ -333,9 +351,7 @@ public:
return IsReady() || IsError();
}
- bool HaveBody() const {
- return !HeaderType::ContentType.empty() || !HeaderType::ContentLength.empty() || !HeaderType::TransferEncoding.empty();
- }
+ bool HaveBody() const;
bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) {
bool result = BufferType::EnsureEnoughSpaceAvailable(need);
@@ -395,6 +411,16 @@ public:
: Stage(GetInitialStage())
, LastSuccessStage(Stage)
{}
+
+ THttpParser(TStringBuf data)
+ : Stage(GetInitialStage())
+ , LastSuccessStage(Stage)
+ {
+ BufferType::Assign(data.data(), data.size());
+ BufferType::Clear(); // reset position to 0
+ TotalSize = data.size();
+ Advance(data.size());
+ }
};
template <typename HeaderType, typename BufferType>
@@ -440,14 +466,21 @@ public:
Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
Append(name);
Append(": ");
+ auto data = BufferType::Pos();
Append(value);
+ auto cit = HeaderType::HeadersLocation.find(name);
+ if (cit != HeaderType::HeadersLocation.end()) {
+ (this->*cit->second) = TStringBuf(data, BufferType::Pos());
+ }
Append("\r\n");
HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
}
void Set(const THeaders& headers) {
Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
- Append(headers.Render());
+ for (const auto& [name, value] : headers.Headers) {
+ Set(name, value);
+ }
HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
}
@@ -497,6 +530,10 @@ public:
Stage = ERenderStage::Done;
}
+ void FinishBody() {
+ Stage = ERenderStage::Done;
+ }
+
bool IsDone() const {
return Stage == ERenderStage::Done;
}
@@ -505,6 +542,10 @@ public:
switch (Stage) {
case ERenderStage::Header:
FinishHeader();
+ FinishBody();
+ break;
+ case ERenderStage::Body:
+ FinishBody();
break;
default:
break;
@@ -599,7 +640,7 @@ public:
if (Connection.empty()) {
return Version == "1.0";
} else {
- return Connection == "close";
+ return TEqNoCase()(Connection, "close");
}
}
@@ -679,14 +720,14 @@ public:
bool IsConnectionClose() const {
if (!Connection.empty()) {
- return Connection == "close";
+ return TEqNoCase()(Connection, "close");
} else {
return Request->IsConnectionClose();
}
}
bool IsNeedBody() const {
- return Status != "204";
+ return GetRequest()->Method != "HEAD" && Status != "204";
}
THttpIncomingRequestPtr GetRequest() const {
diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp
index e0204f6ed0..e347a3e1c2 100644
--- a/library/cpp/actors/http/http_proxy.cpp
+++ b/library/cpp/actors/http/http_proxy.cpp
@@ -68,13 +68,14 @@ protected:
return;
} else {
if (url.EndsWith('/')) {
- url.Trunc(url.size() - 1);
- }
- size_t pos = url.rfind('/');
- if (pos == TStringBuf::npos) {
- break;
+ url.Chop(1);
} else {
- url = url.substr(0, pos + 1);
+ size_t pos = url.rfind('/');
+ if (pos == TStringBuf::npos) {
+ break;
+ } else {
+ url = url.substr(0, pos + 1);
+ }
}
}
}
@@ -117,7 +118,8 @@ protected:
Connections.erase(event->Get()->ConnectionID);
}
- void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) {
+ void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, HttpLog, "Register handler " << event->Get()->Path << " to " << event->Get()->Handler);
Handlers[event->Get()->Path] = event->Get()->Handler;
}
@@ -207,6 +209,12 @@ protected:
}
void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) {
+ for (const TActorId& acceptor : Acceptors) {
+ Send(acceptor, new NActors::TEvents::TEvPoisonPill());
+ }
+ for (const TActorId& connection : Connections) {
+ Send(connection, new NActors::TEvents::TEvPoisonPill());
+ }
PassAway();
}
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index caa5b3e183..cdb9025ff1 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -50,7 +50,20 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
}
- Y_UNIT_TEST(BasicParsingChunkedBody) {
+ Y_UNIT_TEST(BasicParsingChunkedBodyRequest) {
+ NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
+ EatWholeString(request, "POST /Url HTTP/1.1\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
+ UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(request->Method, "POST");
+ UNIT_ASSERT_EQUAL(request->URL, "/Url");
+ UNIT_ASSERT_EQUAL(request->Connection, "close");
+ UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(request->Version, "1.1");
+ UNIT_ASSERT_EQUAL(request->TransferEncoding, "chunked");
+ UNIT_ASSERT_EQUAL(request->Body, "this is test.");
+ }
+
+ Y_UNIT_TEST(BasicParsingChunkedBodyResponse) {
NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
@@ -83,7 +96,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test.");
}
- Y_UNIT_TEST(CreateRepsonseWithCompressedBody) {
+ Y_UNIT_TEST(CreateResponseWithCompressedBody) {
NHttp::THttpIncomingRequestPtr request = nullptr;
NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK");
response->Set<&NHttp::THttpResponse::ContentEncoding>("gzip");
diff --git a/library/cpp/lwtrace/mon/mon_lwtrace.cpp b/library/cpp/lwtrace/mon/mon_lwtrace.cpp
index a61ee9ce22..5e79716834 100644
--- a/library/cpp/lwtrace/mon/mon_lwtrace.cpp
+++ b/library/cpp/lwtrace/mon/mon_lwtrace.cpp
@@ -4671,12 +4671,12 @@ private:
}
};
-void RegisterPages(NMonitoring::TMonService2* mon, bool allowUnsafe) {
+void RegisterPages(NMonitoring::TIndexMonPage* index, bool allowUnsafe) {
THolder<NLwTraceMonPage::TLWTraceMonPage> p = MakeHolder<NLwTraceMonPage::TLWTraceMonPage>(allowUnsafe);
- mon->Register(p.Release());
+ index->Register(p.Release());
#define WWW_STATIC_FILE(file, type) \
- mon->Register(new TResourceMonPage(file, file, NMonitoring::TResourceMonPage::type));
+ index->Register(new TResourceMonPage(file, file, NMonitoring::TResourceMonPage::type));
WWW_STATIC_FILE("lwtrace/mon/static/common.css", CSS);
WWW_STATIC_FILE("lwtrace/mon/static/common.js", JAVASCRIPT);
WWW_STATIC_FILE("lwtrace/mon/static/css/bootstrap.min.css", CSS);
diff --git a/library/cpp/lwtrace/mon/mon_lwtrace.h b/library/cpp/lwtrace/mon/mon_lwtrace.h
index 8030f6ea61..f196adeabd 100644
--- a/library/cpp/lwtrace/mon/mon_lwtrace.h
+++ b/library/cpp/lwtrace/mon/mon_lwtrace.h
@@ -19,7 +19,7 @@ public:
void Output(TStringStream& ss);
};
-void RegisterPages(NMonitoring::TMonService2* mon, bool allowUnsafe = false);
+void RegisterPages(NMonitoring::TIndexMonPage* index, bool allowUnsafe = false);
NLWTrace::TProbeRegistry& ProbeRegistry(); // This is not safe to use this function before main()
NLWTrace::TManager& TraceManager(bool allowUnsafe = false);
TDashboardRegistry& DashboardRegistry();
diff --git a/library/cpp/monlib/service/monservice.h b/library/cpp/monlib/service/monservice.h
index 8f5e52fcdb..3116a76765 100644
--- a/library/cpp/monlib/service/monservice.h
+++ b/library/cpp/monlib/service/monservice.h
@@ -68,6 +68,10 @@ namespace NMonitoring {
IMonPage* FindPage(const TString& relativePath);
TIndexMonPage* FindIndexPage(const TString& relativePath);
void SortPages();
+
+ TIndexMonPage* GetRoot() {
+ return IndexMonPage.Get();
+ }
};
}