diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-05-04 10:13:58 +0300 |
---|---|---|
committer | Alexey Efimov <xeno@prnwatch.com> | 2022-05-04 10:13:58 +0300 |
commit | 6faf680f58ba8341a694dcbadf572d37197ae888 (patch) | |
tree | e5fd053607e59e6db7bdec1ea100f46e149a6f97 /library | |
parent | b4c4f96bf4475170a9ac5e4568cc7b05789986c8 (diff) | |
download | ydb-6faf680f58ba8341a694dcbadf572d37197ae888.tar.gz |
add async monitoring http KIKIMR-14742
ref:c51d608f0ae78f08597b88f837491da33a953ef6
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/http/http.cpp | 61 | ||||
-rw-r--r-- | library/cpp/actors/http/http.h | 55 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy.cpp | 22 | ||||
-rw-r--r-- | library/cpp/actors/http/http_ut.cpp | 17 | ||||
-rw-r--r-- | library/cpp/lwtrace/mon/mon_lwtrace.cpp | 6 | ||||
-rw-r--r-- | library/cpp/lwtrace/mon/mon_lwtrace.h | 2 | ||||
-rw-r--r-- | library/cpp/monlib/service/monservice.h | 4 |
7 files changed, 141 insertions, 26 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp index 7125f9d8b0..3d07c870ce 100644 --- a/library/cpp/actors/http/http.cpp +++ b/library/cpp/actors/http/http.cpp @@ -63,6 +63,14 @@ void THttpRequest::Clear() { } template <> +bool THttpParser<THttpRequest, TSocketBuffer>::HaveBody() const { + if (!Body.empty()) { + return true; + } + return (!ContentType.empty() || !ContentLength.empty() || !TransferEncoding.empty()); +} + +template <> void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { TStringBuf data(Pos(), len); while (!data.empty()) { @@ -98,7 +106,6 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { case EParseStage::Header: { if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) { if (Header.empty()) { - Headers = TStringBuf(Headers.data(), data.begin() - Headers.begin()); if (HaveBody()) { Stage = EParseStage::Body; } else { @@ -107,6 +114,7 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { } else { ProcessHeader(Header); } + Headers = TStringBuf(Headers.data(), data.data() - Headers.data()); } break; } @@ -116,8 +124,13 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { Body = Content; Stage = EParseStage::Done; } - } else if (TransferEncoding == "chunked") { + } else if (TEqNoCase()(TransferEncoding, "chunked")) { Stage = EParseStage::ChunkLength; + } else if (TotalSize.has_value()) { + if (ProcessData(Content, data, GetBodySizeFromTotalSize())) { + Body = Content; + Stage = EParseStage::Done; + } } else { // Invalid body encoding Stage = EParseStage::Error; @@ -189,6 +202,15 @@ THttpParser<THttpRequest, TSocketBuffer>::EParseStage THttpParser<THttpRequest, } template <> +bool THttpParser<THttpResponse, TSocketBuffer>::HaveBody() const { + if (!Body.empty()) { + return true; + } + return (!Status.starts_with("1") && Status != "204" && Status != "304") + && (!ContentType.empty() || !ContentLength.empty() || !TransferEncoding.empty()); +} + +template <> THttpParser<THttpResponse, TSocketBuffer>::EParseStage THttpParser<THttpResponse, TSocketBuffer>::GetInitialStage() { return EParseStage::Protocol; } @@ -237,6 +259,8 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) { if (Header.empty()) { if (HaveBody() && (ContentLength.empty() || ContentLength != "0")) { Stage = EParseStage::Body; + } else if (TotalSize.has_value() && !data.empty()) { + Stage = EParseStage::Body; } else { Stage = EParseStage::Done; } @@ -252,8 +276,13 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) { if (ProcessData(Body, data, FromString(ContentLength))) { Stage = EParseStage::Done; } - } else if (TransferEncoding == "chunked") { + } else if (TEqNoCase()(TransferEncoding, "chunked")) { Stage = EParseStage::ChunkLength; + } else if (TotalSize.has_value()) { + if (ProcessData(Content, data, GetBodySizeFromTotalSize())) { + Body = Content; + Stage = EParseStage::Done; + } } else { // Invalid body encoding Stage = EParseStage::Error; @@ -333,9 +362,19 @@ void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() { } THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) { + THttpParser<THttpResponse, TSocketBuffer> parser(data); + THeadersBuilder headers(parser.Headers); + if (!WorkerName.empty()) { + headers.Set("X-Worker-Name", WorkerName); + } THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this); - response->Append(data); - response->Reparse(); + response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message); + response->Set(headers); + if (parser.HaveBody()) { + response->SetBody(parser.Body); + } else { + response->Set<&THttpResponse::ContentLength>("0"); + } return response; } @@ -601,11 +640,17 @@ void TCookiesBuilder::Set(TStringBuf name, TStringBuf data) { } THeaders::THeaders(TStringBuf headers) { + Parse(headers); +} + +size_t THeaders::Parse(TStringBuf headers) { + auto start = headers.begin(); for (TStringBuf param = headers.NextTok("\r\n"); !param.empty(); param = headers.NextTok("\r\n")) { TStringBuf name = param.NextTok(":"); param.SkipPrefix(" "); Headers[name] = param; } + return headers.begin() - start; } TStringBuf THeaders::operator [](TStringBuf name) const { @@ -636,7 +681,11 @@ TString THeaders::Render() const { } THeadersBuilder::THeadersBuilder() - :THeaders(TStringBuf()) + : THeaders(TStringBuf()) +{} + +THeadersBuilder::THeadersBuilder(TStringBuf headers) + : THeaders(headers) {} THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) { diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 96c5c1ec48..3e5e39e719 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -40,6 +40,17 @@ struct TLessNoCase { } }; +struct TEqNoCase { + bool operator()(TStringBuf l, TStringBuf r) const { + auto ll = l.length(); + auto rl = r.length(); + if (ll != rl) { + return false; + } + return strnicmp(l.data(), r.data(), ll) == 0; + } +}; + struct TUrlParameters { THashMap<TStringBuf, TStringBuf> Parameters; @@ -77,6 +88,7 @@ struct THeaders { TStringBuf operator [](TStringBuf name) const; bool Has(TStringBuf name) const; TStringBuf Get(TStringBuf name) const; // raw + size_t Parse(TStringBuf headers); TString Render() const; }; @@ -84,6 +96,7 @@ struct THeadersBuilder : THeaders { TDeque<std::pair<TString, TString>> Data; THeadersBuilder(); + THeadersBuilder(TStringBuf headers); THeadersBuilder(const THeadersBuilder& builder); void Set(TStringBuf name, TStringBuf data); }; @@ -188,6 +201,7 @@ public: size_t ChunkLength = 0; size_t ContentSize = 0; TString Content; + std::optional<size_t> TotalSize; THttpParser(const THttpParser& src) : HeaderType(src) @@ -285,6 +299,10 @@ public: void Advance(size_t len); void ConnectionClosed(); + size_t GetBodySizeFromTotalSize() const { + return TotalSize.value() - (HeaderType::Headers.end() - BufferType::Data()); + } + void Clear() { BufferType::Clear(); HeaderType::Clear(); @@ -333,9 +351,7 @@ public: return IsReady() || IsError(); } - bool HaveBody() const { - return !HeaderType::ContentType.empty() || !HeaderType::ContentLength.empty() || !HeaderType::TransferEncoding.empty(); - } + bool HaveBody() const; bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) { bool result = BufferType::EnsureEnoughSpaceAvailable(need); @@ -395,6 +411,16 @@ public: : Stage(GetInitialStage()) , LastSuccessStage(Stage) {} + + THttpParser(TStringBuf data) + : Stage(GetInitialStage()) + , LastSuccessStage(Stage) + { + BufferType::Assign(data.data(), data.size()); + BufferType::Clear(); // reset position to 0 + TotalSize = data.size(); + Advance(data.size()); + } }; template <typename HeaderType, typename BufferType> @@ -440,14 +466,21 @@ public: Y_VERIFY_DEBUG(Stage == ERenderStage::Header); Append(name); Append(": "); + auto data = BufferType::Pos(); Append(value); + auto cit = HeaderType::HeadersLocation.find(name); + if (cit != HeaderType::HeadersLocation.end()) { + (this->*cit->second) = TStringBuf(data, BufferType::Pos()); + } Append("\r\n"); HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); } void Set(const THeaders& headers) { Y_VERIFY_DEBUG(Stage == ERenderStage::Header); - Append(headers.Render()); + for (const auto& [name, value] : headers.Headers) { + Set(name, value); + } HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); } @@ -497,6 +530,10 @@ public: Stage = ERenderStage::Done; } + void FinishBody() { + Stage = ERenderStage::Done; + } + bool IsDone() const { return Stage == ERenderStage::Done; } @@ -505,6 +542,10 @@ public: switch (Stage) { case ERenderStage::Header: FinishHeader(); + FinishBody(); + break; + case ERenderStage::Body: + FinishBody(); break; default: break; @@ -599,7 +640,7 @@ public: if (Connection.empty()) { return Version == "1.0"; } else { - return Connection == "close"; + return TEqNoCase()(Connection, "close"); } } @@ -679,14 +720,14 @@ public: bool IsConnectionClose() const { if (!Connection.empty()) { - return Connection == "close"; + return TEqNoCase()(Connection, "close"); } else { return Request->IsConnectionClose(); } } bool IsNeedBody() const { - return Status != "204"; + return GetRequest()->Method != "HEAD" && Status != "204"; } THttpIncomingRequestPtr GetRequest() const { diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index e0204f6ed0..e347a3e1c2 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -68,13 +68,14 @@ protected: return; } else { if (url.EndsWith('/')) { - url.Trunc(url.size() - 1); - } - size_t pos = url.rfind('/'); - if (pos == TStringBuf::npos) { - break; + url.Chop(1); } else { - url = url.substr(0, pos + 1); + size_t pos = url.rfind('/'); + if (pos == TStringBuf::npos) { + break; + } else { + url = url.substr(0, pos + 1); + } } } } @@ -117,7 +118,8 @@ protected: Connections.erase(event->Get()->ConnectionID); } - void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) { + void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { + LOG_DEBUG_S(ctx, HttpLog, "Register handler " << event->Get()->Path << " to " << event->Get()->Handler); Handlers[event->Get()->Path] = event->Get()->Handler; } @@ -207,6 +209,12 @@ protected: } void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) { + for (const TActorId& acceptor : Acceptors) { + Send(acceptor, new NActors::TEvents::TEvPoisonPill()); + } + for (const TActorId& connection : Connections) { + Send(connection, new NActors::TEvents::TEvPoisonPill()); + } PassAway(); } diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index caa5b3e183..cdb9025ff1 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -50,7 +50,20 @@ Y_UNIT_TEST_SUITE(HttpProxy) { UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); } - Y_UNIT_TEST(BasicParsingChunkedBody) { + Y_UNIT_TEST(BasicParsingChunkedBodyRequest) { + NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); + EatWholeString(request, "POST /Url HTTP/1.1\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); + UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); + UNIT_ASSERT_EQUAL(request->Method, "POST"); + UNIT_ASSERT_EQUAL(request->URL, "/Url"); + UNIT_ASSERT_EQUAL(request->Connection, "close"); + UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(request->Version, "1.1"); + UNIT_ASSERT_EQUAL(request->TransferEncoding, "chunked"); + UNIT_ASSERT_EQUAL(request->Body, "this is test."); + } + + Y_UNIT_TEST(BasicParsingChunkedBodyResponse) { NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); @@ -83,7 +96,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) { UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test."); } - Y_UNIT_TEST(CreateRepsonseWithCompressedBody) { + Y_UNIT_TEST(CreateResponseWithCompressedBody) { NHttp::THttpIncomingRequestPtr request = nullptr; NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK"); response->Set<&NHttp::THttpResponse::ContentEncoding>("gzip"); diff --git a/library/cpp/lwtrace/mon/mon_lwtrace.cpp b/library/cpp/lwtrace/mon/mon_lwtrace.cpp index a61ee9ce22..5e79716834 100644 --- a/library/cpp/lwtrace/mon/mon_lwtrace.cpp +++ b/library/cpp/lwtrace/mon/mon_lwtrace.cpp @@ -4671,12 +4671,12 @@ private: } }; -void RegisterPages(NMonitoring::TMonService2* mon, bool allowUnsafe) { +void RegisterPages(NMonitoring::TIndexMonPage* index, bool allowUnsafe) { THolder<NLwTraceMonPage::TLWTraceMonPage> p = MakeHolder<NLwTraceMonPage::TLWTraceMonPage>(allowUnsafe); - mon->Register(p.Release()); + index->Register(p.Release()); #define WWW_STATIC_FILE(file, type) \ - mon->Register(new TResourceMonPage(file, file, NMonitoring::TResourceMonPage::type)); + index->Register(new TResourceMonPage(file, file, NMonitoring::TResourceMonPage::type)); WWW_STATIC_FILE("lwtrace/mon/static/common.css", CSS); WWW_STATIC_FILE("lwtrace/mon/static/common.js", JAVASCRIPT); WWW_STATIC_FILE("lwtrace/mon/static/css/bootstrap.min.css", CSS); diff --git a/library/cpp/lwtrace/mon/mon_lwtrace.h b/library/cpp/lwtrace/mon/mon_lwtrace.h index 8030f6ea61..f196adeabd 100644 --- a/library/cpp/lwtrace/mon/mon_lwtrace.h +++ b/library/cpp/lwtrace/mon/mon_lwtrace.h @@ -19,7 +19,7 @@ public: void Output(TStringStream& ss); }; -void RegisterPages(NMonitoring::TMonService2* mon, bool allowUnsafe = false); +void RegisterPages(NMonitoring::TIndexMonPage* index, bool allowUnsafe = false); NLWTrace::TProbeRegistry& ProbeRegistry(); // This is not safe to use this function before main() NLWTrace::TManager& TraceManager(bool allowUnsafe = false); TDashboardRegistry& DashboardRegistry(); diff --git a/library/cpp/monlib/service/monservice.h b/library/cpp/monlib/service/monservice.h index 8f5e52fcdb..3116a76765 100644 --- a/library/cpp/monlib/service/monservice.h +++ b/library/cpp/monlib/service/monservice.h @@ -68,6 +68,10 @@ namespace NMonitoring { IMonPage* FindPage(const TString& relativePath); TIndexMonPage* FindIndexPage(const TString& relativePath); void SortPages(); + + TIndexMonPage* GetRoot() { + return IndexMonPage.Get(); + } }; } |