diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/http/http_cache.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/http/http_cache.cpp')
-rw-r--r-- | library/cpp/actors/http/http_cache.cpp | 599 |
1 files changed, 599 insertions, 0 deletions
diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp new file mode 100644 index 0000000000..27c4eeb6f3 --- /dev/null +++ b/library/cpp/actors/http/http_cache.cpp @@ -0,0 +1,599 @@ +#include "http.h" +#include "http_proxy.h" +#include "http_cache.h" +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/actors/http/http.h> +#include <library/cpp/digest/md5/md5.h> +#include <util/digest/multi.h> +#include <util/generic/queue.h> +#include <util/string/cast.h> + +namespace NHttp { + +class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig { +public: + using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>; + NActors::TActorId HttpProxyId; + TGetCachePolicy GetCachePolicy; + static constexpr TDuration RefreshTimeout = TDuration::Seconds(1); + + struct TCacheKey { + TString Host; + TString URL; + TString Headers; + + operator size_t() const { + return MultiHash(Host, URL, Headers); + } + + TString GetId() const { + return MD5::Calc(Host + ':' + URL + ':' + Headers); + } + }; + + struct TCacheRecord { + TInstant RefreshTime; + TInstant DeathTime; + TCachePolicy CachePolicy; + NHttp::THttpOutgoingRequestPtr Request; + NHttp::THttpOutgoingRequestPtr OutgoingRequest; + TDuration Timeout; + NHttp::THttpIncomingResponsePtr Response; + TString Error; + TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters; + + TCacheRecord(const TCachePolicy cachePolicy) + : CachePolicy(cachePolicy) + {} + + bool IsValid() const { + return Response != nullptr || !Error.empty(); + } + + void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) { + if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) { + Response = response; + Error = error; + } + RefreshTime = now + CachePolicy.TimeToRefresh; + if (CachePolicy.PaceToRefresh) { + RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds()); + } + } + + TString GetName() const { + return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL; + } + }; + + struct TRefreshRecord { + TCacheKey Key; + TInstant RefreshTime; + + bool operator <(const TRefreshRecord& b) const { + return RefreshTime > b.RefreshTime; + } + }; + + THashMap<TCacheKey, TCacheRecord> Cache; + TPriorityQueue<TRefreshRecord> RefreshQueue; + THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests; + + THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy) + : HttpProxyId(httpProxyId) + , GetCachePolicy(std::move(getCachePolicy)) + {} + + void Bootstrap(const NActors::TActorContext&) { + // + Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) { + TStringBuilder key; + if (!policy.HeadersToCacheKey.empty()) { + NHttp::THeaders headers(request->Headers); + for (const TString& header : policy.HeadersToCacheKey) { + key << headers[header]; + } + } + return key; + } + + static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) { + return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) }; + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { + NHttp::THttpOutgoingRequestPtr request(event->Get()->Request); + NHttp::THttpIncomingResponsePtr response(event->Get()->Response); + auto itRequests = OutgoingRequests.find(request.Get()); + if (itRequests == OutgoingRequests.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL); + return; + } + auto key = itRequests->second; + OutgoingRequests.erase(itRequests); + auto it = Cache.find(key); + if (it == Cache.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL); + return; + } + TCacheRecord& cacheRecord = it->second; + cacheRecord.OutgoingRequest.Reset(); + for (auto& waiter : cacheRecord.Waiters) { + NHttp::THttpIncomingResponsePtr response2; + TString error2; + if (response != nullptr) { + response2 = response->Duplicate(waiter->Get()->Request); + } + if (!event->Get()->Error.empty()) { + error2 = event->Get()->Error; + } + ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2)); + } + cacheRecord.Waiters.clear(); + TString error; + if (event->Get()->Error.empty()) { + if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") { + error = event->Get()->Response->Message; + } + } else { + error = event->Get()->Error; + } + if (!error.empty()) { + LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error); + } + LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName()); + cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now()); + RefreshQueue.push({it->first, it->second.RefreshTime}); + LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get(); + auto policy = GetCachePolicy(request); + if (policy.TimeToExpire == TDuration()) { + ctx.Send(event->Forward(HttpProxyId)); + return; + } + auto key = GetCacheKey(request, policy); + auto it = Cache.find(key); + if (it != Cache.end()) { + if (it->second.IsValid()) { + LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond " + << it->second.GetName() + << " (" + << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error")) + << ")"); + NHttp::THttpIncomingResponsePtr response = it->second.Response; + if (response != nullptr) { + response = response->Duplicate(event->Get()->Request); + } + ctx.Send(event->Sender, + new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request, + response, + it->second.Error)); + it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items + return; + } + } else { + it = Cache.emplace(key, policy).first; + it->second.Request = event->Get()->Request; + it->second.Timeout = event->Get()->Timeout; + it->second.OutgoingRequest = it->second.Request->Duplicate(); + OutgoingRequests[it->second.OutgoingRequest.Get()] = key; + LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName()); + ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout)); + } + it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; + it->second.Waiters.emplace_back(std::move(event)); + } + + void HandleRefresh(const NActors::TActorContext& ctx) { + while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) { + TRefreshRecord rrec = RefreshQueue.top(); + RefreshQueue.pop(); + auto it = Cache.find(rrec.Key); + if (it != Cache.end()) { + if (it->second.DeathTime > ctx.Now()) { + LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName()); + it->second.OutgoingRequest = it->second.Request->Duplicate(); + OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first; + ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout)); + } else { + LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName()); + if (it->second.OutgoingRequest) { + OutgoingRequests.erase(it->second.OutgoingRequest.Get()); + } + Cache.erase(it); + } + } + } + ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle); + HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh); + } + } +}; + +const TDuration THttpOutgoingCacheActor::RefreshTimeout; + +class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig { +public: + using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>; + NActors::TActorId HttpProxyId; + TGetCachePolicy GetCachePolicy; + static constexpr TDuration RefreshTimeout = TDuration::Seconds(1); + THashMap<TString, TActorId> Handlers; + + struct TCacheKey { + TString Host; + TString URL; + TString Headers; + + operator size_t() const { + return MultiHash(Host, URL, Headers); + } + + TString GetId() const { + return MD5::Calc(Host + ':' + URL + ':' + Headers); + } + }; + + struct TCacheRecord { + TInstant RefreshTime; + TInstant DeathTime; + TCachePolicy CachePolicy; + TString CacheId; + NHttp::THttpIncomingRequestPtr Request; + TDuration Timeout; + NHttp::THttpOutgoingResponsePtr Response; + TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters; + ui32 Retries = 0; + bool Enqueued = false; + + TCacheRecord(const TCachePolicy cachePolicy) + : CachePolicy(cachePolicy) + {} + + bool IsValid() const { + return Response != nullptr; + } + + void InitRequest(NHttp::THttpIncomingRequestPtr request) { + Request = request; + if (CachePolicy.TimeToExpire) { + DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire; + } + } + + void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) { + if (error.empty() || !CachePolicy.KeepOnError) { + Response = response; + } + Retries = 0; + if (CachePolicy.TimeToRefresh) { + RefreshTime = now + CachePolicy.TimeToRefresh; + if (CachePolicy.PaceToRefresh) { + RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds()); + } + } + } + + void UpdateExpireTime() { + if (CachePolicy.TimeToExpire) { + DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire; + } + } + + TString GetName() const { + return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL + << " (" << CacheId << ")"; + } + }; + + struct TRefreshRecord { + TCacheKey Key; + TInstant RefreshTime; + + bool operator <(const TRefreshRecord& b) const { + return RefreshTime > b.RefreshTime; + } + }; + + THashMap<TCacheKey, TCacheRecord> Cache; + TPriorityQueue<TRefreshRecord> RefreshQueue; + THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests; + + THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy) + : HttpProxyId(httpProxyId) + , GetCachePolicy(std::move(getCachePolicy)) + {} + + void Bootstrap(const NActors::TActorContext&) { + // + Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) { + TStringBuilder key; + if (!policy.HeadersToCacheKey.empty()) { + NHttp::THeaders headers(request->Headers); + for (const TString& header : policy.HeadersToCacheKey) { + key << headers[header]; + } + } + return key; + } + + static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) { + return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) }; + } + + TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) { + TStringBuf url = request->URL.Before('?'); + THashMap<TString, TActorId>::iterator it; + while (!url.empty()) { + it = Handlers.find(url); + if (it != Handlers.end()) { + return it->second; + } else { + if (url.EndsWith('/')) { + url.Trunc(url.size() - 1); + } + size_t pos = url.rfind('/'); + if (pos == TStringBuf::npos) { + break; + } else { + url = url.substr(0, pos + 1); + } + } + } + return {}; + } + + void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) { + cacheRecord.Request = cacheRecord.Request->Duplicate(); + IncomingRequests[cacheRecord.Request.Get()] = cacheKey; + TActorId handler = GetRequestHandler(cacheRecord.Request); + if (handler) { + Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request)); + } else { + LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName()); + } + } + + void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) { + if (it->second.Request) { + IncomingRequests.erase(it->second.Request.Get()); + } + for (auto& waiter : it->second.Waiters) { + NHttp::THttpOutgoingResponsePtr response; + response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain"); + Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + } + Cache.erase(it); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { + Handlers[event->Get()->Path] = event->Get()->Handler; + ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { + NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest()); + NHttp::THttpOutgoingResponsePtr response(event->Get()->Response); + auto itRequests = IncomingRequests.find(request.Get()); + if (itRequests == IncomingRequests.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL); + return; + } + + TCacheKey key = itRequests->second; + auto it = Cache.find(key); + if (it == Cache.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL); + return; + } + + IncomingRequests.erase(itRequests); + TCacheRecord& cacheRecord = it->second; + TStringBuf status; + TString error; + + if (event->Get()->Response != nullptr) { + status = event->Get()->Response->Status; + if (!status.StartsWith("2")) { + error = event->Get()->Response->Message; + } + } + if (cacheRecord.CachePolicy.RetriesCount > 0) { + auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status); + if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) { + if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) { + ++cacheRecord.Retries; + LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error); + SendCacheRequest(key, cacheRecord, ctx); + return; + } + } + } + for (auto& waiter : cacheRecord.Waiters) { + NHttp::THttpOutgoingResponsePtr response2; + response2 = response->Duplicate(waiter->Get()->Request); + ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2)); + } + cacheRecord.Waiters.clear(); + if (!error.empty()) { + LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error); + if (!cacheRecord.Response) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName()); + DropCacheRecord(it); + return; + } + } + if (cacheRecord.CachePolicy.TimeToRefresh) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName()); + cacheRecord.UpdateResponse(response, error, ctx.Now()); + if (!cacheRecord.Enqueued) { + RefreshQueue.push({it->first, it->second.RefreshTime}); + cacheRecord.Enqueued = true; + } + LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime); + } else { + LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName()); + DropCacheRecord(it); + } + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { + const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get(); + TCachePolicy policy = GetCachePolicy(request); + if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) { + TActorId handler = GetRequestHandler(event->Get()->Request); + if (handler) { + ctx.Send(event->Forward(handler)); + } + return; + } + auto key = GetCacheKey(request, policy); + auto it = Cache.find(key); + if (it != Cache.end() && !policy.DiscardCache) { + it->second.UpdateExpireTime(); + if (it->second.IsValid()) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond " + << it->second.GetName() + << " (" + << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error")) + << ")"); + NHttp::THttpOutgoingResponsePtr response = it->second.Response; + if (response != nullptr) { + response = response->Duplicate(event->Get()->Request); + } + ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + return; + } + } else { + it = Cache.emplace(key, policy).first; + it->second.CacheId = key.GetId(); // for debugging + it->second.InitRequest(event->Get()->Request); + if (policy.DiscardCache) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName()); + } + LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName()); + SendCacheRequest(key, it->second, ctx); + } + it->second.Waiters.emplace_back(std::move(event)); + } + + void HandleRefresh(const NActors::TActorContext& ctx) { + while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) { + TRefreshRecord rrec = RefreshQueue.top(); + RefreshQueue.pop(); + auto it = Cache.find(rrec.Key); + if (it != Cache.end()) { + it->second.Enqueued = false; + if (it->second.DeathTime > ctx.Now()) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName()); + SendCacheRequest(it->first, it->second, ctx); + } else { + LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName()); + DropCacheRecord(it); + } + } + } + ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle); + HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh); + } + } +}; + +TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) { + TCachePolicy policy = defaultPolicy; + THeaders headers(request->Headers); + TStringBuf cacheControl(headers["Cache-Control"]); + while (TStringBuf cacheItem = cacheControl.NextTok(',')) { + Trim(cacheItem, ' '); + if (cacheItem == "no-store" || cacheItem == "no-cache") { + policy.DiscardCache = true; + } + TStringBuf itemName = cacheItem.NextTok('='); + TrimEnd(itemName, ' '); + TrimBegin(cacheItem, ' '); + if (itemName == "max-age") { + policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem)); + } + if (itemName == "min-fresh") { + policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem)); + } + if (itemName == "stale-if-error") { + policy.KeepOnError = true; + } + } + return policy; +} + +NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { + return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy)); +} + +NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { + return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy)); +} + +NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { + return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy)); +} + +} |