aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/http/http_cache.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/http/http_cache.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/http/http_cache.cpp')
-rw-r--r--library/cpp/actors/http/http_cache.cpp599
1 files changed, 599 insertions, 0 deletions
diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp
new file mode 100644
index 0000000000..27c4eeb6f3
--- /dev/null
+++ b/library/cpp/actors/http/http_cache.cpp
@@ -0,0 +1,599 @@
+#include "http.h"
+#include "http_proxy.h"
+#include "http_cache.h"
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/http/http.h>
+#include <library/cpp/digest/md5/md5.h>
+#include <util/digest/multi.h>
+#include <util/generic/queue.h>
+#include <util/string/cast.h>
+
+namespace NHttp {
+
+class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig {
+public:
+ using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>;
+ NActors::TActorId HttpProxyId;
+ TGetCachePolicy GetCachePolicy;
+ static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
+
+ struct TCacheKey {
+ TString Host;
+ TString URL;
+ TString Headers;
+
+ operator size_t() const {
+ return MultiHash(Host, URL, Headers);
+ }
+
+ TString GetId() const {
+ return MD5::Calc(Host + ':' + URL + ':' + Headers);
+ }
+ };
+
+ struct TCacheRecord {
+ TInstant RefreshTime;
+ TInstant DeathTime;
+ TCachePolicy CachePolicy;
+ NHttp::THttpOutgoingRequestPtr Request;
+ NHttp::THttpOutgoingRequestPtr OutgoingRequest;
+ TDuration Timeout;
+ NHttp::THttpIncomingResponsePtr Response;
+ TString Error;
+ TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters;
+
+ TCacheRecord(const TCachePolicy cachePolicy)
+ : CachePolicy(cachePolicy)
+ {}
+
+ bool IsValid() const {
+ return Response != nullptr || !Error.empty();
+ }
+
+ void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) {
+ if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) {
+ Response = response;
+ Error = error;
+ }
+ RefreshTime = now + CachePolicy.TimeToRefresh;
+ if (CachePolicy.PaceToRefresh) {
+ RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
+ }
+ }
+
+ TString GetName() const {
+ return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL;
+ }
+ };
+
+ struct TRefreshRecord {
+ TCacheKey Key;
+ TInstant RefreshTime;
+
+ bool operator <(const TRefreshRecord& b) const {
+ return RefreshTime > b.RefreshTime;
+ }
+ };
+
+ THashMap<TCacheKey, TCacheRecord> Cache;
+ TPriorityQueue<TRefreshRecord> RefreshQueue;
+ THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests;
+
+ THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
+ : HttpProxyId(httpProxyId)
+ , GetCachePolicy(std::move(getCachePolicy))
+ {}
+
+ void Bootstrap(const NActors::TActorContext&) {
+ //
+ Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
+ TStringBuilder key;
+ if (!policy.HeadersToCacheKey.empty()) {
+ NHttp::THeaders headers(request->Headers);
+ for (const TString& header : policy.HeadersToCacheKey) {
+ key << headers[header];
+ }
+ }
+ return key;
+ }
+
+ static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
+ return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ NHttp::THttpOutgoingRequestPtr request(event->Get()->Request);
+ NHttp::THttpIncomingResponsePtr response(event->Get()->Response);
+ auto itRequests = OutgoingRequests.find(request.Get());
+ if (itRequests == OutgoingRequests.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
+ return;
+ }
+ auto key = itRequests->second;
+ OutgoingRequests.erase(itRequests);
+ auto it = Cache.find(key);
+ if (it == Cache.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
+ return;
+ }
+ TCacheRecord& cacheRecord = it->second;
+ cacheRecord.OutgoingRequest.Reset();
+ for (auto& waiter : cacheRecord.Waiters) {
+ NHttp::THttpIncomingResponsePtr response2;
+ TString error2;
+ if (response != nullptr) {
+ response2 = response->Duplicate(waiter->Get()->Request);
+ }
+ if (!event->Get()->Error.empty()) {
+ error2 = event->Get()->Error;
+ }
+ ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2));
+ }
+ cacheRecord.Waiters.clear();
+ TString error;
+ if (event->Get()->Error.empty()) {
+ if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") {
+ error = event->Get()->Response->Message;
+ }
+ } else {
+ error = event->Get()->Error;
+ }
+ if (!error.empty()) {
+ LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
+ }
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName());
+ cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now());
+ RefreshQueue.push({it->first, it->second.RefreshTime});
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get();
+ auto policy = GetCachePolicy(request);
+ if (policy.TimeToExpire == TDuration()) {
+ ctx.Send(event->Forward(HttpProxyId));
+ return;
+ }
+ auto key = GetCacheKey(request, policy);
+ auto it = Cache.find(key);
+ if (it != Cache.end()) {
+ if (it->second.IsValid()) {
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond "
+ << it->second.GetName()
+ << " ("
+ << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
+ << ")");
+ NHttp::THttpIncomingResponsePtr response = it->second.Response;
+ if (response != nullptr) {
+ response = response->Duplicate(event->Get()->Request);
+ }
+ ctx.Send(event->Sender,
+ new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request,
+ response,
+ it->second.Error));
+ it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items
+ return;
+ }
+ } else {
+ it = Cache.emplace(key, policy).first;
+ it->second.Request = event->Get()->Request;
+ it->second.Timeout = event->Get()->Timeout;
+ it->second.OutgoingRequest = it->second.Request->Duplicate();
+ OutgoingRequests[it->second.OutgoingRequest.Get()] = key;
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName());
+ ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
+ }
+ it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire;
+ it->second.Waiters.emplace_back(std::move(event));
+ }
+
+ void HandleRefresh(const NActors::TActorContext& ctx) {
+ while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
+ TRefreshRecord rrec = RefreshQueue.top();
+ RefreshQueue.pop();
+ auto it = Cache.find(rrec.Key);
+ if (it != Cache.end()) {
+ if (it->second.DeathTime > ctx.Now()) {
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName());
+ it->second.OutgoingRequest = it->second.Request->Duplicate();
+ OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first;
+ ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
+ } else {
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName());
+ if (it->second.OutgoingRequest) {
+ OutgoingRequests.erase(it->second.OutgoingRequest.Get());
+ }
+ Cache.erase(it);
+ }
+ }
+ }
+ ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
+ CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
+ }
+ }
+};
+
+const TDuration THttpOutgoingCacheActor::RefreshTimeout;
+
+class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig {
+public:
+ using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>;
+ NActors::TActorId HttpProxyId;
+ TGetCachePolicy GetCachePolicy;
+ static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
+ THashMap<TString, TActorId> Handlers;
+
+ struct TCacheKey {
+ TString Host;
+ TString URL;
+ TString Headers;
+
+ operator size_t() const {
+ return MultiHash(Host, URL, Headers);
+ }
+
+ TString GetId() const {
+ return MD5::Calc(Host + ':' + URL + ':' + Headers);
+ }
+ };
+
+ struct TCacheRecord {
+ TInstant RefreshTime;
+ TInstant DeathTime;
+ TCachePolicy CachePolicy;
+ TString CacheId;
+ NHttp::THttpIncomingRequestPtr Request;
+ TDuration Timeout;
+ NHttp::THttpOutgoingResponsePtr Response;
+ TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters;
+ ui32 Retries = 0;
+ bool Enqueued = false;
+
+ TCacheRecord(const TCachePolicy cachePolicy)
+ : CachePolicy(cachePolicy)
+ {}
+
+ bool IsValid() const {
+ return Response != nullptr;
+ }
+
+ void InitRequest(NHttp::THttpIncomingRequestPtr request) {
+ Request = request;
+ if (CachePolicy.TimeToExpire) {
+ DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
+ }
+ }
+
+ void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) {
+ if (error.empty() || !CachePolicy.KeepOnError) {
+ Response = response;
+ }
+ Retries = 0;
+ if (CachePolicy.TimeToRefresh) {
+ RefreshTime = now + CachePolicy.TimeToRefresh;
+ if (CachePolicy.PaceToRefresh) {
+ RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
+ }
+ }
+ }
+
+ void UpdateExpireTime() {
+ if (CachePolicy.TimeToExpire) {
+ DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
+ }
+ }
+
+ TString GetName() const {
+ return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL
+ << " (" << CacheId << ")";
+ }
+ };
+
+ struct TRefreshRecord {
+ TCacheKey Key;
+ TInstant RefreshTime;
+
+ bool operator <(const TRefreshRecord& b) const {
+ return RefreshTime > b.RefreshTime;
+ }
+ };
+
+ THashMap<TCacheKey, TCacheRecord> Cache;
+ TPriorityQueue<TRefreshRecord> RefreshQueue;
+ THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests;
+
+ THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
+ : HttpProxyId(httpProxyId)
+ , GetCachePolicy(std::move(getCachePolicy))
+ {}
+
+ void Bootstrap(const NActors::TActorContext&) {
+ //
+ Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
+ TStringBuilder key;
+ if (!policy.HeadersToCacheKey.empty()) {
+ NHttp::THeaders headers(request->Headers);
+ for (const TString& header : policy.HeadersToCacheKey) {
+ key << headers[header];
+ }
+ }
+ return key;
+ }
+
+ static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
+ return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
+ }
+
+ TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) {
+ TStringBuf url = request->URL.Before('?');
+ THashMap<TString, TActorId>::iterator it;
+ while (!url.empty()) {
+ it = Handlers.find(url);
+ if (it != Handlers.end()) {
+ return it->second;
+ } else {
+ if (url.EndsWith('/')) {
+ url.Trunc(url.size() - 1);
+ }
+ size_t pos = url.rfind('/');
+ if (pos == TStringBuf::npos) {
+ break;
+ } else {
+ url = url.substr(0, pos + 1);
+ }
+ }
+ }
+ return {};
+ }
+
+ void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) {
+ cacheRecord.Request = cacheRecord.Request->Duplicate();
+ IncomingRequests[cacheRecord.Request.Get()] = cacheKey;
+ TActorId handler = GetRequestHandler(cacheRecord.Request);
+ if (handler) {
+ Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request));
+ } else {
+ LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName());
+ }
+ }
+
+ void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) {
+ if (it->second.Request) {
+ IncomingRequests.erase(it->second.Request.Get());
+ }
+ for (auto& waiter : it->second.Waiters) {
+ NHttp::THttpOutgoingResponsePtr response;
+ response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain");
+ Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ }
+ Cache.erase(it);
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
+ Handlers[event->Get()->Path] = event->Get()->Handler;
+ ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest());
+ NHttp::THttpOutgoingResponsePtr response(event->Get()->Response);
+ auto itRequests = IncomingRequests.find(request.Get());
+ if (itRequests == IncomingRequests.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
+ return;
+ }
+
+ TCacheKey key = itRequests->second;
+ auto it = Cache.find(key);
+ if (it == Cache.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
+ return;
+ }
+
+ IncomingRequests.erase(itRequests);
+ TCacheRecord& cacheRecord = it->second;
+ TStringBuf status;
+ TString error;
+
+ if (event->Get()->Response != nullptr) {
+ status = event->Get()->Response->Status;
+ if (!status.StartsWith("2")) {
+ error = event->Get()->Response->Message;
+ }
+ }
+ if (cacheRecord.CachePolicy.RetriesCount > 0) {
+ auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status);
+ if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) {
+ if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) {
+ ++cacheRecord.Retries;
+ LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error);
+ SendCacheRequest(key, cacheRecord, ctx);
+ return;
+ }
+ }
+ }
+ for (auto& waiter : cacheRecord.Waiters) {
+ NHttp::THttpOutgoingResponsePtr response2;
+ response2 = response->Duplicate(waiter->Get()->Request);
+ ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2));
+ }
+ cacheRecord.Waiters.clear();
+ if (!error.empty()) {
+ LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
+ if (!cacheRecord.Response) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName());
+ DropCacheRecord(it);
+ return;
+ }
+ }
+ if (cacheRecord.CachePolicy.TimeToRefresh) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName());
+ cacheRecord.UpdateResponse(response, error, ctx.Now());
+ if (!cacheRecord.Enqueued) {
+ RefreshQueue.push({it->first, it->second.RefreshTime});
+ cacheRecord.Enqueued = true;
+ }
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
+ } else {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName());
+ DropCacheRecord(it);
+ }
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get();
+ TCachePolicy policy = GetCachePolicy(request);
+ if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) {
+ TActorId handler = GetRequestHandler(event->Get()->Request);
+ if (handler) {
+ ctx.Send(event->Forward(handler));
+ }
+ return;
+ }
+ auto key = GetCacheKey(request, policy);
+ auto it = Cache.find(key);
+ if (it != Cache.end() && !policy.DiscardCache) {
+ it->second.UpdateExpireTime();
+ if (it->second.IsValid()) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond "
+ << it->second.GetName()
+ << " ("
+ << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
+ << ")");
+ NHttp::THttpOutgoingResponsePtr response = it->second.Response;
+ if (response != nullptr) {
+ response = response->Duplicate(event->Get()->Request);
+ }
+ ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ return;
+ }
+ } else {
+ it = Cache.emplace(key, policy).first;
+ it->second.CacheId = key.GetId(); // for debugging
+ it->second.InitRequest(event->Get()->Request);
+ if (policy.DiscardCache) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName());
+ }
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName());
+ SendCacheRequest(key, it->second, ctx);
+ }
+ it->second.Waiters.emplace_back(std::move(event));
+ }
+
+ void HandleRefresh(const NActors::TActorContext& ctx) {
+ while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
+ TRefreshRecord rrec = RefreshQueue.top();
+ RefreshQueue.pop();
+ auto it = Cache.find(rrec.Key);
+ if (it != Cache.end()) {
+ it->second.Enqueued = false;
+ if (it->second.DeathTime > ctx.Now()) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName());
+ SendCacheRequest(it->first, it->second, ctx);
+ } else {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName());
+ DropCacheRecord(it);
+ }
+ }
+ }
+ ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
+ CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
+ }
+ }
+};
+
+TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) {
+ TCachePolicy policy = defaultPolicy;
+ THeaders headers(request->Headers);
+ TStringBuf cacheControl(headers["Cache-Control"]);
+ while (TStringBuf cacheItem = cacheControl.NextTok(',')) {
+ Trim(cacheItem, ' ');
+ if (cacheItem == "no-store" || cacheItem == "no-cache") {
+ policy.DiscardCache = true;
+ }
+ TStringBuf itemName = cacheItem.NextTok('=');
+ TrimEnd(itemName, ' ');
+ TrimBegin(cacheItem, ' ');
+ if (itemName == "max-age") {
+ policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
+ }
+ if (itemName == "min-fresh") {
+ policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
+ }
+ if (itemName == "stale-if-error") {
+ policy.KeepOnError = true;
+ }
+ }
+ return policy;
+}
+
+NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
+ return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
+}
+
+NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
+ return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
+}
+
+NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
+ return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy));
+}
+
+}