diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-06-01 02:28:37 +0300 |
---|---|---|
committer | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-06-01 02:28:37 +0300 |
commit | 1f8fed0d87db2306d61b41c0dfb7b33ce7417341 (patch) | |
tree | 1e23712360d41fc6dfebbc099933473b2212c69c /library/cpp | |
parent | 25a2cfb08f14d8e2368a8e329c90062165ead0da (diff) | |
download | ydb-1f8fed0d87db2306d61b41c0dfb7b33ce7417341.tar.gz |
intermediate changes
ref:c234234dab27a004d8ea6b9fa34ee745cc4736a1
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/http/http.cpp | 41 | ||||
-rw-r--r-- | library/cpp/actors/http/http.h | 21 | ||||
-rw-r--r-- | library/cpp/actors/http/http_cache.cpp | 1 | ||||
-rw-r--r-- | library/cpp/actors/http/http_compress.cpp | 36 | ||||
-rw-r--r-- | library/cpp/actors/http/http_ut.cpp | 13 |
5 files changed, 102 insertions, 10 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp index 4f8df434ca3..ef25a17a8a6 100644 --- a/library/cpp/actors/http/http.cpp +++ b/library/cpp/actors/http/http.cpp @@ -277,6 +277,10 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) { if (!ContentLength.empty()) { if (ProcessData(Body, data, FromString(ContentLength))) { Stage = EParseStage::Done; + if (Body && ContentEncoding == "deflate") { + Content = DecompressDeflate(Body); + Body = Content; + } } } else if (TEqNoCase()(TransferEncoding, "chunked")) { Stage = EParseStage::ChunkLength; @@ -284,6 +288,10 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) { if (ProcessData(Content, data, GetBodySizeFromTotalSize())) { Body = Content; Stage = EParseStage::Done; + if (Body && ContentEncoding == "deflate") { + Content = DecompressDeflate(Body); + Body = Content; + } } } else { // Invalid body encoding @@ -323,6 +331,10 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) { if (ChunkLength == 0) { Body = Content; Stage = EParseStage::Done; + if (Body && ContentEncoding == "deflate") { + Content = DecompressDeflate(Body); + Body = Content; + } } else { Stage = EParseStage::ChunkLength; } @@ -447,7 +459,7 @@ THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status, if (Method == "HEAD") { response->Set<&THttpResponse::ContentLength>(ToString(body.size())); } else { - response->Set<&THttpResponse::Body>(body); + response->SetBody(body); } } return response; @@ -468,9 +480,30 @@ THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPt } THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) { - THttpOutgoingResponsePtr response = new THttpOutgoingResponse(*this); - response->Reparse(); - response->Request = request; + THeadersBuilder headers(Headers); + if (!request->Endpoint->WorkerName.empty()) { + headers.Set("X-Worker-Name", request->Endpoint->WorkerName); + } + THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request); + response->InitResponse(Protocol, Version, Status, Message); + if (Body) { + if (ContentType && !request->Endpoint->CompressContentTypes.empty()) { + TStringBuf contentType = ContentType.Before(';'); + Trim(contentType, ' '); + if (Count(request->Endpoint->CompressContentTypes, contentType) != 0) { + if (response->EnableCompression()) { + headers.Erase("Content-Length"); // we will need new length after compression + } + } + } + response->Set(headers); + response->SetBody(Body); + } else { + response->Set(headers); + if (!response->ContentLength) { + response->Set<&THttpResponse::ContentLength>("0"); + } + } return response; } diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 2f11bfe04d4..318cc5173e3 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -29,6 +29,8 @@ void TrimBegin(TStringBuf& target, char delim); void TrimEnd(TStringBuf& target, char delim); void Trim(TStringBuf& target, char delim); void TrimEnd(TString& target, char delim); +TString CompressDeflate(TStringBuf source); +TString DecompressDeflate(TStringBuf source); struct TLessNoCase { bool operator()(TStringBuf l, TStringBuf r) const { @@ -443,6 +445,7 @@ public: }; ERenderStage Stage = ERenderStage::Init; + TString Content; // body storage //THttpRenderer(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); // request void InitRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) { @@ -592,8 +595,13 @@ public: parser.Clear(); parser.Advance(size); // move buffer and result back + bool needReassignBody = (parser.Body.data() == parser.Content.data()); static_cast<HeaderType&>(*this) = std::move(static_cast<HeaderType&>(parser)); static_cast<BufferType&>(*this) = std::move(static_cast<BufferType&>(parser)); + if (needReassignBody) { + Content = std::move(parser.Content); + HeaderType::Body = Content; + } switch (parser.Stage) { case THttpParser<HeaderType, BufferType>::EParseStage::Method: case THttpParser<HeaderType, BufferType>::EParseStage::URL: @@ -798,12 +806,21 @@ public: return false; } - static TString CompressDeflate(TStringBuf source); - void SetBody(TStringBuf body) { if (ContentEncoding == "deflate") { TString compressedBody = CompressDeflate(body); THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(compressedBody); + Body = Content = body; + } else { + THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(body); + } + } + + void SetBody(const TString& body) { + if (ContentEncoding == "deflate") { + TString compressedBody = CompressDeflate(body); + THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(compressedBody); + Body = Content = body; } else { THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(body); } diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp index c9dad66355a..73f6834a3de 100644 --- a/library/cpp/actors/http/http_cache.cpp +++ b/library/cpp/actors/http/http_cache.cpp @@ -383,6 +383,7 @@ public: void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) { cacheRecord.Request = cacheRecord.Request->Duplicate(); + cacheRecord.Request->AcceptEncoding.Clear(); // disable compression IncomingRequests[cacheRecord.Request.Get()] = cacheKey; TActorId handler = GetRequestHandler(cacheRecord.Request); if (handler) { diff --git a/library/cpp/actors/http/http_compress.cpp b/library/cpp/actors/http/http_compress.cpp index 0c5a2cd73c9..b6593fe99d0 100644 --- a/library/cpp/actors/http/http_compress.cpp +++ b/library/cpp/actors/http/http_compress.cpp @@ -4,7 +4,7 @@ namespace NHttp { -TString THttpOutgoingResponse::CompressDeflate(TStringBuf source) { +TString CompressDeflate(TStringBuf source) { int compressionlevel = Z_BEST_COMPRESSION; z_stream zs = {}; @@ -39,4 +39,38 @@ TString THttpOutgoingResponse::CompressDeflate(TStringBuf source) { return result; } +TString DecompressDeflate(TStringBuf source) { + z_stream zs = {}; + + if (inflateInit(&zs) != Z_OK) { + throw yexception() << "inflateInit failed while decompressing"; + } + + zs.next_in = (Bytef*)source.data(); + zs.avail_in = source.size(); + + int ret; + char outbuffer[32768]; + TString result; + + // retrieve the decompressed bytes blockwise + do { + zs.next_out = reinterpret_cast<Bytef*>(outbuffer); + zs.avail_out = sizeof(outbuffer); + + ret = inflate(&zs, Z_NO_FLUSH); + + if (result.size() < zs.total_out) { + result.append(outbuffer, zs.total_out - result.size()); + } + } while (ret == Z_OK); + + inflateEnd(&zs); + + if (ret != Z_STREAM_END) { + throw yexception() << "Exception during zlib decompression: (" << ret << ") " << zs.msg; + } + return result; +} + } diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index ad3b49e869f..7a569a08c36 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -102,9 +102,16 @@ Y_UNIT_TEST_SUITE(HttpProxy) { NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK"); TString compressedBody = "something very long to compress with deflate algorithm. something very long to compress with deflate algorithm."; response->EnableCompression(); + size_t size1 = response->Size(); response->SetBody(compressedBody); + size_t size2 = response->Size(); + size_t compressedBodySize = size2 - size1; UNIT_ASSERT_VALUES_EQUAL("deflate", response->ContentEncoding); - UNIT_ASSERT_VALUES_UNEQUAL(compressedBody, response->Body); + UNIT_ASSERT(compressedBodySize < compressedBody.size()); + NHttp::THttpOutgoingResponsePtr response2 = response->Duplicate(request); + UNIT_ASSERT_VALUES_EQUAL(response->Body, response2->Body); + UNIT_ASSERT_VALUES_EQUAL(response->ContentLength, response2->ContentLength); + UNIT_ASSERT_VALUES_EQUAL(response->Size(), response2->Size()); } Y_UNIT_TEST(BasicPartialParsing) { @@ -192,7 +199,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) { TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); - actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG); + //actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG); NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); @@ -225,7 +232,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) { TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); - actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG); + //actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG); NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); |