aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-06-01 02:28:37 +0300
committerarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-06-01 02:28:37 +0300
commit1f8fed0d87db2306d61b41c0dfb7b33ce7417341 (patch)
tree1e23712360d41fc6dfebbc099933473b2212c69c /library/cpp
parent25a2cfb08f14d8e2368a8e329c90062165ead0da (diff)
downloadydb-1f8fed0d87db2306d61b41c0dfb7b33ce7417341.tar.gz
intermediate changes
ref:c234234dab27a004d8ea6b9fa34ee745cc4736a1
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/http/http.cpp41
-rw-r--r--library/cpp/actors/http/http.h21
-rw-r--r--library/cpp/actors/http/http_cache.cpp1
-rw-r--r--library/cpp/actors/http/http_compress.cpp36
-rw-r--r--library/cpp/actors/http/http_ut.cpp13
5 files changed, 102 insertions, 10 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp
index 4f8df434ca3..ef25a17a8a6 100644
--- a/library/cpp/actors/http/http.cpp
+++ b/library/cpp/actors/http/http.cpp
@@ -277,6 +277,10 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
if (!ContentLength.empty()) {
if (ProcessData(Body, data, FromString(ContentLength))) {
Stage = EParseStage::Done;
+ if (Body && ContentEncoding == "deflate") {
+ Content = DecompressDeflate(Body);
+ Body = Content;
+ }
}
} else if (TEqNoCase()(TransferEncoding, "chunked")) {
Stage = EParseStage::ChunkLength;
@@ -284,6 +288,10 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
if (ProcessData(Content, data, GetBodySizeFromTotalSize())) {
Body = Content;
Stage = EParseStage::Done;
+ if (Body && ContentEncoding == "deflate") {
+ Content = DecompressDeflate(Body);
+ Body = Content;
+ }
}
} else {
// Invalid body encoding
@@ -323,6 +331,10 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
if (ChunkLength == 0) {
Body = Content;
Stage = EParseStage::Done;
+ if (Body && ContentEncoding == "deflate") {
+ Content = DecompressDeflate(Body);
+ Body = Content;
+ }
} else {
Stage = EParseStage::ChunkLength;
}
@@ -447,7 +459,7 @@ THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status,
if (Method == "HEAD") {
response->Set<&THttpResponse::ContentLength>(ToString(body.size()));
} else {
- response->Set<&THttpResponse::Body>(body);
+ response->SetBody(body);
}
}
return response;
@@ -468,9 +480,30 @@ THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPt
}
THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) {
- THttpOutgoingResponsePtr response = new THttpOutgoingResponse(*this);
- response->Reparse();
- response->Request = request;
+ THeadersBuilder headers(Headers);
+ if (!request->Endpoint->WorkerName.empty()) {
+ headers.Set("X-Worker-Name", request->Endpoint->WorkerName);
+ }
+ THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request);
+ response->InitResponse(Protocol, Version, Status, Message);
+ if (Body) {
+ if (ContentType && !request->Endpoint->CompressContentTypes.empty()) {
+ TStringBuf contentType = ContentType.Before(';');
+ Trim(contentType, ' ');
+ if (Count(request->Endpoint->CompressContentTypes, contentType) != 0) {
+ if (response->EnableCompression()) {
+ headers.Erase("Content-Length"); // we will need new length after compression
+ }
+ }
+ }
+ response->Set(headers);
+ response->SetBody(Body);
+ } else {
+ response->Set(headers);
+ if (!response->ContentLength) {
+ response->Set<&THttpResponse::ContentLength>("0");
+ }
+ }
return response;
}
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h
index 2f11bfe04d4..318cc5173e3 100644
--- a/library/cpp/actors/http/http.h
+++ b/library/cpp/actors/http/http.h
@@ -29,6 +29,8 @@ void TrimBegin(TStringBuf& target, char delim);
void TrimEnd(TStringBuf& target, char delim);
void Trim(TStringBuf& target, char delim);
void TrimEnd(TString& target, char delim);
+TString CompressDeflate(TStringBuf source);
+TString DecompressDeflate(TStringBuf source);
struct TLessNoCase {
bool operator()(TStringBuf l, TStringBuf r) const {
@@ -443,6 +445,7 @@ public:
};
ERenderStage Stage = ERenderStage::Init;
+ TString Content; // body storage
//THttpRenderer(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); // request
void InitRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) {
@@ -592,8 +595,13 @@ public:
parser.Clear();
parser.Advance(size);
// move buffer and result back
+ bool needReassignBody = (parser.Body.data() == parser.Content.data());
static_cast<HeaderType&>(*this) = std::move(static_cast<HeaderType&>(parser));
static_cast<BufferType&>(*this) = std::move(static_cast<BufferType&>(parser));
+ if (needReassignBody) {
+ Content = std::move(parser.Content);
+ HeaderType::Body = Content;
+ }
switch (parser.Stage) {
case THttpParser<HeaderType, BufferType>::EParseStage::Method:
case THttpParser<HeaderType, BufferType>::EParseStage::URL:
@@ -798,12 +806,21 @@ public:
return false;
}
- static TString CompressDeflate(TStringBuf source);
-
void SetBody(TStringBuf body) {
if (ContentEncoding == "deflate") {
TString compressedBody = CompressDeflate(body);
THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(compressedBody);
+ Body = Content = body;
+ } else {
+ THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(body);
+ }
+ }
+
+ void SetBody(const TString& body) {
+ if (ContentEncoding == "deflate") {
+ TString compressedBody = CompressDeflate(body);
+ THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(compressedBody);
+ Body = Content = body;
} else {
THttpRenderer<THttpResponse, TSocketBuffer>::SetBody(body);
}
diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp
index c9dad66355a..73f6834a3de 100644
--- a/library/cpp/actors/http/http_cache.cpp
+++ b/library/cpp/actors/http/http_cache.cpp
@@ -383,6 +383,7 @@ public:
void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) {
cacheRecord.Request = cacheRecord.Request->Duplicate();
+ cacheRecord.Request->AcceptEncoding.Clear(); // disable compression
IncomingRequests[cacheRecord.Request.Get()] = cacheKey;
TActorId handler = GetRequestHandler(cacheRecord.Request);
if (handler) {
diff --git a/library/cpp/actors/http/http_compress.cpp b/library/cpp/actors/http/http_compress.cpp
index 0c5a2cd73c9..b6593fe99d0 100644
--- a/library/cpp/actors/http/http_compress.cpp
+++ b/library/cpp/actors/http/http_compress.cpp
@@ -4,7 +4,7 @@
namespace NHttp {
-TString THttpOutgoingResponse::CompressDeflate(TStringBuf source) {
+TString CompressDeflate(TStringBuf source) {
int compressionlevel = Z_BEST_COMPRESSION;
z_stream zs = {};
@@ -39,4 +39,38 @@ TString THttpOutgoingResponse::CompressDeflate(TStringBuf source) {
return result;
}
+TString DecompressDeflate(TStringBuf source) {
+ z_stream zs = {};
+
+ if (inflateInit(&zs) != Z_OK) {
+ throw yexception() << "inflateInit failed while decompressing";
+ }
+
+ zs.next_in = (Bytef*)source.data();
+ zs.avail_in = source.size();
+
+ int ret;
+ char outbuffer[32768];
+ TString result;
+
+ // retrieve the decompressed bytes blockwise
+ do {
+ zs.next_out = reinterpret_cast<Bytef*>(outbuffer);
+ zs.avail_out = sizeof(outbuffer);
+
+ ret = inflate(&zs, Z_NO_FLUSH);
+
+ if (result.size() < zs.total_out) {
+ result.append(outbuffer, zs.total_out - result.size());
+ }
+ } while (ret == Z_OK);
+
+ inflateEnd(&zs);
+
+ if (ret != Z_STREAM_END) {
+ throw yexception() << "Exception during zlib decompression: (" << ret << ") " << zs.msg;
+ }
+ return result;
+}
+
}
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index ad3b49e869f..7a569a08c36 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -102,9 +102,16 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK");
TString compressedBody = "something very long to compress with deflate algorithm. something very long to compress with deflate algorithm.";
response->EnableCompression();
+ size_t size1 = response->Size();
response->SetBody(compressedBody);
+ size_t size2 = response->Size();
+ size_t compressedBodySize = size2 - size1;
UNIT_ASSERT_VALUES_EQUAL("deflate", response->ContentEncoding);
- UNIT_ASSERT_VALUES_UNEQUAL(compressedBody, response->Body);
+ UNIT_ASSERT(compressedBodySize < compressedBody.size());
+ NHttp::THttpOutgoingResponsePtr response2 = response->Duplicate(request);
+ UNIT_ASSERT_VALUES_EQUAL(response->Body, response2->Body);
+ UNIT_ASSERT_VALUES_EQUAL(response->ContentLength, response2->ContentLength);
+ UNIT_ASSERT_VALUES_EQUAL(response->Size(), response2->Size());
}
Y_UNIT_TEST(BasicPartialParsing) {
@@ -192,7 +199,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
- actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG);
+ //actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG);
NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
@@ -225,7 +232,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
- actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG);
+ //actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG);
NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);