#include "http.h"
#include "http_proxy.h"
#include "http_cache.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/http/http.h>
#include <library/cpp/digest/md5/md5.h>
#include <util/digest/multi.h>
#include <util/generic/queue.h>
#include <util/string/cast.h>
namespace NHttp {
class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig {
public:
using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>;
NActors::TActorId HttpProxyId;
TGetCachePolicy GetCachePolicy;
static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
struct TCacheKey {
TString Host;
TString URL;
TString Headers;
operator size_t() const {
return MultiHash(Host, URL, Headers);
}
TString GetId() const {
return MD5::Calc(Host + ':' + URL + ':' + Headers);
}
};
struct TCacheRecord {
TInstant RefreshTime;
TInstant DeathTime;
TCachePolicy CachePolicy;
NHttp::THttpOutgoingRequestPtr Request;
NHttp::THttpOutgoingRequestPtr OutgoingRequest;
TDuration Timeout;
NHttp::THttpIncomingResponsePtr Response;
TString Error;
TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters;
TCacheRecord(const TCachePolicy cachePolicy)
: CachePolicy(cachePolicy)
{}
bool IsValid() const {
return Response != nullptr || !Error.empty();
}
void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) {
if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) {
Response = response;
Error = error;
}
RefreshTime = now + CachePolicy.TimeToRefresh;
if (CachePolicy.PaceToRefresh) {
RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
}
}
TString GetName() const {
return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL;
}
};
struct TRefreshRecord {
TCacheKey Key;
TInstant RefreshTime;
bool operator <(const TRefreshRecord& b) const {
return RefreshTime > b.RefreshTime;
}
};
THashMap<TCacheKey, TCacheRecord> Cache;
TPriorityQueue<TRefreshRecord> RefreshQueue;
THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests;
THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
: HttpProxyId(httpProxyId)
, GetCachePolicy(std::move(getCachePolicy))
{}
void Bootstrap(const NActors::TActorContext&) {
//
Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
}
static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
TStringBuilder key;
if (!policy.HeadersToCacheKey.empty()) {
NHttp::THeaders headers(request->Headers);
for (const TString& header : policy.HeadersToCacheKey) {
key << headers[header];
}
}
return key;
}
static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
}
void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(HttpProxyId));
}
void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(HttpProxyId));
}
void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(HttpProxyId));
}
void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(HttpProxyId));
}
void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
NHttp::THttpOutgoingRequestPtr request(event->Get()->Request);
NHttp::THttpIncomingResponsePtr response(event->Get()->Response);
auto itRequests = OutgoingRequests.find(request.Get());
if (itRequests == OutgoingRequests.end()) {
LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
return;
}
auto key = itRequests->second;
OutgoingRequests.erase(itRequests);
auto it = Cache.find(key);
if (it == Cache.end()) {
LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
return;
}
TCacheRecord& cacheRecord = it->second;
cacheRecord.OutgoingRequest.Reset();
for (auto& waiter : cacheRecord.Waiters) {
NHttp::THttpIncomingResponsePtr response2;
TString error2;
if (response != nullptr) {
response2 = response->Duplicate(waiter->Get()->Request);
}
if (!event->Get()->Error.empty()) {
error2 = event->Get()->Error;
}
ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2));
}
cacheRecord.Waiters.clear();
TString error;
if (event->Get()->Error.empty()) {
if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") {
error = event->Get()->Response->Message;
}
} else {
error = event->Get()->Error;
}
if (!error.empty()) {
LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
}
LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName());
cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now());
RefreshQueue.push({it->first, it->second.RefreshTime});
LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
}
void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get();
auto policy = GetCachePolicy(request);
if (policy.TimeToExpire == TDuration()) {
ctx.Send(event->Forward(HttpProxyId));
return;
}
auto key = GetCacheKey(request, policy);
auto it = Cache.find(key);
if (it != Cache.end()) {
if (it->second.IsValid()) {
LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond "
<< it->second.GetName()
<< " ("
<< ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
<< ")");
NHttp::THttpIncomingResponsePtr response = it->second.Response;
if (response != nullptr) {
response = response->Duplicate(event->Get()->Request);
}
ctx.Send(event->Sender,
new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request,
response,
it->second.Error));
it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items
return;
}
} else {
it = Cache.emplace(key, policy).first;
it->second.Request = event->Get()->Request;
it->second.Timeout = event->Get()->Timeout;
it->second.OutgoingRequest = it->second.Request->Duplicate();
OutgoingRequests[it->second.OutgoingRequest.Get()] = key;
LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName());
ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
}
it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire;
it->second.Waiters.emplace_back(std::move(event));
}
void HandleRefresh(const NActors::TActorContext& ctx) {
while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
TRefreshRecord rrec = RefreshQueue.top();
RefreshQueue.pop();
auto it = Cache.find(rrec.Key);
if (it != Cache.end()) {
if (it->second.DeathTime > ctx.Now()) {
LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName());
it->second.OutgoingRequest = it->second.Request->Duplicate();
OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first;
ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
} else {
LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName());
if (it->second.OutgoingRequest) {
OutgoingRequests.erase(it->second.OutgoingRequest.Get());
}
Cache.erase(it);
}
}
}
ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
}
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
}
}
};
const TDuration THttpOutgoingCacheActor::RefreshTimeout;
class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig {
public:
using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>;
NActors::TActorId HttpProxyId;
TGetCachePolicy GetCachePolicy;
static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
THashMap<TString, TActorId> Handlers;
struct TCacheKey {
TString Host;
TString URL;
TString Headers;
operator size_t() const {
return MultiHash(Host, URL, Headers);
}
TString GetId() const {
return MD5::Calc(Host + ':' + URL + ':' + Headers);
}
};
struct TCacheRecord {
TInstant RefreshTime;
TInstant DeathTime;
TCachePolicy CachePolicy;
TString CacheId;
NHttp::THttpIncomingRequestPtr Request;
TDuration Timeout;
NHttp::THttpOutgoingResponsePtr Response;
TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters;
ui32 Retries = 0;
bool Enqueued = false;
TCacheRecord(const TCachePolicy cachePolicy)
: CachePolicy(cachePolicy)
{}
bool IsValid() const {
return Response != nullptr;
}
void InitRequest(NHttp::THttpIncomingRequestPtr request) {
Request = request;
if (CachePolicy.TimeToExpire) {
DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
}
}
void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) {
if (error.empty() || !CachePolicy.KeepOnError) {
Response = response;
}
Retries = 0;
if (CachePolicy.TimeToRefresh) {
RefreshTime = now + CachePolicy.TimeToRefresh;
if (CachePolicy.PaceToRefresh) {
RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
}
}
}
void UpdateExpireTime() {
if (CachePolicy.TimeToExpire) {
DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
}
}
TString GetName() const {
return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL
<< " (" << CacheId << ")";
}
};
struct TRefreshRecord {
TCacheKey Key;
TInstant RefreshTime;
bool operator <(const TRefreshRecord& b) const {
return RefreshTime > b.RefreshTime;
}
};
THashMap<TCacheKey, TCacheRecord> Cache;
TPriorityQueue<TRefreshRecord> RefreshQueue;
THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests;
THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
: HttpProxyId(httpProxyId)
, GetCachePolicy(std::move(getCachePolicy))
{}
void Bootstrap(const NActors::TActorContext&) {
//
Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
}
static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
TStringBuilder key;
if (!policy.HeadersToCacheKey.empty()) {
NHttp::THeaders headers(request->Headers);
for (const TString& header : policy.HeadersToCacheKey) {
key << headers[header];
}
}
return key;
}
static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
}
TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) {
TStringBuf url = request->URL.Before('?');
THashMap<TString, TActorId>::iterator it;
while (!url.empty()) {
it = Handlers.find(url);
if (it != Handlers.end()) {
return it->second;
} else {
if (url.EndsWith('/')) {
url.Trunc(url.size() - 1);
}
size_t pos = url.rfind('/');
if (pos == TStringBuf::npos) {
break;
} else {
url = url.substr(0, pos + 1);
}
}
}
return {};
}
void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) {
cacheRecord.Request = cacheRecord.Request->Duplicate();
IncomingRequests[cacheRecord.Request.Get()] = cacheKey;
TActorId handler = GetRequestHandler(cacheRecord.Request);
if (handler) {
Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request));
} else {
LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName());
}
}
void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) {
if (it->second.Request) {
IncomingRequests.erase(it->second.Request.Get());
}
for (auto& waiter : it->second.Waiters) {
NHttp::THttpOutgoingResponsePtr response;
response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain");
Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
}
Cache.erase(it);
}
void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(HttpProxyId));
}
void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(HttpProxyId));
}
void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(HttpProxyId));
}
void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
Handlers[event->Get()->Path] = event->Get()->Handler;
ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID));
}
void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest());
NHttp::THttpOutgoingResponsePtr response(event->Get()->Response);
auto itRequests = IncomingRequests.find(request.Get());
if (itRequests == IncomingRequests.end()) {
LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
return;
}
TCacheKey key = itRequests->second;
auto it = Cache.find(key);
if (it == Cache.end()) {
LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
return;
}
IncomingRequests.erase(itRequests);
TCacheRecord& cacheRecord = it->second;
TStringBuf status;
TString error;
if (event->Get()->Response != nullptr) {
status = event->Get()->Response->Status;
if (!status.StartsWith("2")) {
error = event->Get()->Response->Message;
}
}
if (cacheRecord.CachePolicy.RetriesCount > 0) {
auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status);
if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) {
if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) {
++cacheRecord.Retries;
LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error);
SendCacheRequest(key, cacheRecord, ctx);
return;
}
}
}
for (auto& waiter : cacheRecord.Waiters) {
NHttp::THttpOutgoingResponsePtr response2;
response2 = response->Duplicate(waiter->Get()->Request);
ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2));
}
cacheRecord.Waiters.clear();
if (!error.empty()) {
LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
if (!cacheRecord.Response) {
LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName());
DropCacheRecord(it);
return;
}
}
if (cacheRecord.CachePolicy.TimeToRefresh) {
LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName());
cacheRecord.UpdateResponse(response, error, ctx.Now());
if (!cacheRecord.Enqueued) {
RefreshQueue.push({it->first, it->second.RefreshTime});
cacheRecord.Enqueued = true;
}
LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
} else {
LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName());
DropCacheRecord(it);
}
}
void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get();
TCachePolicy policy = GetCachePolicy(request);
if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) {
TActorId handler = GetRequestHandler(event->Get()->Request);
if (handler) {
ctx.Send(event->Forward(handler));
}
return;
}
auto key = GetCacheKey(request, policy);
auto it = Cache.find(key);
if (it != Cache.end() && !policy.DiscardCache) {
it->second.UpdateExpireTime();
if (it->second.IsValid()) {
LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond "
<< it->second.GetName()
<< " ("
<< ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
<< ")");
NHttp::THttpOutgoingResponsePtr response = it->second.Response;
if (response != nullptr) {
response = response->Duplicate(event->Get()->Request);
}
ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
return;
}
} else {
it = Cache.emplace(key, policy).first;
it->second.CacheId = key.GetId(); // for debugging
it->second.InitRequest(event->Get()->Request);
if (policy.DiscardCache) {
LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName());
}
LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName());
SendCacheRequest(key, it->second, ctx);
}
it->second.Waiters.emplace_back(std::move(event));
}
void HandleRefresh(const NActors::TActorContext& ctx) {
while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
TRefreshRecord rrec = RefreshQueue.top();
RefreshQueue.pop();
auto it = Cache.find(rrec.Key);
if (it != Cache.end()) {
it->second.Enqueued = false;
if (it->second.DeathTime > ctx.Now()) {
LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName());
SendCacheRequest(it->first, it->second, ctx);
} else {
LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName());
DropCacheRecord(it);
}
}
}
ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
}
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
}
}
};
TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) {
TCachePolicy policy = defaultPolicy;
THeaders headers(request->Headers);
TStringBuf cacheControl(headers["Cache-Control"]);
while (TStringBuf cacheItem = cacheControl.NextTok(',')) {
Trim(cacheItem, ' ');
if (cacheItem == "no-store" || cacheItem == "no-cache") {
policy.DiscardCache = true;
}
TStringBuf itemName = cacheItem.NextTok('=');
TrimEnd(itemName, ' ');
TrimBegin(cacheItem, ' ');
if (itemName == "max-age") {
policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
}
if (itemName == "min-fresh") {
policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
}
if (itemName == "stale-if-error") {
policy.KeepOnError = true;
}
}
return policy;
}
NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
}
NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
}
NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy));
}
}