aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http/abortable_http_response.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 11:13:34 +0300
committermax42 <max42@yandex-team.com>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/http/abortable_http_response.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
downloadydb-3e1899838408bbad47622007aa382bc8a2b01f87.tar.gz
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/http/abortable_http_response.cpp')
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.cpp223
1 files changed, 0 insertions, 223 deletions
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp
deleted file mode 100644
index 9da9241d33..0000000000
--- a/yt/cpp/mapreduce/http/abortable_http_response.cpp
+++ /dev/null
@@ -1,223 +0,0 @@
-#include "abortable_http_response.h"
-
-#include <util/system/mutex.h>
-#include <util/generic/singleton.h>
-#include <util/generic/hash_set.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TAbortableHttpResponseRegistry {
-public:
- TOutageId StartOutage(TString urlPattern, const TOutageOptions& options)
- {
- auto g = Guard(Lock_);
- auto id = NextId_++;
- IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
- return id;
- }
-
- void StopOutage(TOutageId id)
- {
- auto g = Guard(Lock_);
- IdToOutage.erase(id);
- }
-
- void Add(IAbortableHttpResponse* response)
- {
- auto g = Guard(Lock_);
- for (auto& [id, entry] : IdToOutage) {
- if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) {
- response->SetLengthLimit(entry.LengthLimit);
- entry.Counter -= 1;
- }
- }
- ResponseList_.PushBack(response);
- }
-
- void Remove(IAbortableHttpResponse* response)
- {
- auto g = Guard(Lock_);
- response->Unlink();
- }
-
- static TAbortableHttpResponseRegistry& Get()
- {
- return *Singleton<TAbortableHttpResponseRegistry>();
- }
-
- int AbortAll(const TString& urlPattern)
- {
- int result = 0;
- for (auto& response : ResponseList_) {
- if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
- response.Abort();
- ++result;
- }
- }
- return result;
- }
-
-private:
- struct TOutageEntry
- {
- TString Pattern;
- size_t Counter;
- size_t LengthLimit;
- };
-
-private:
- TOutageId NextId_ = 0;
- TIntrusiveList<IAbortableHttpResponse> ResponseList_;
- THashMap<TOutageId, TOutageEntry> IdToOutage;
- TMutex Lock_;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-TAbortableHttpResponse::TOutage::TOutage(
- TString urlPattern,
- TAbortableHttpResponseRegistry& registry,
- const TOutageOptions& options)
- : UrlPattern_(std::move(urlPattern))
- , Registry_(registry)
- , Id_(registry.StartOutage(UrlPattern_, options))
-{ }
-
-TAbortableHttpResponse::TOutage::~TOutage()
-{
- Stop();
-}
-
-void TAbortableHttpResponse::TOutage::Stop()
-{
- if (!Stopped_) {
- Registry_.StopOutage(Id_);
- Stopped_ = true;
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TAbortableHttpResponseBase::TAbortableHttpResponseBase(const TString& url)
- : Url_(url)
-{
- TAbortableHttpResponseRegistry::Get().Add(this);
-}
-
-TAbortableHttpResponseBase::~TAbortableHttpResponseBase()
-{
- TAbortableHttpResponseRegistry::Get().Remove(this);
-}
-
-void TAbortableHttpResponseBase::Abort()
-{
- Aborted_ = true;
-}
-
-void TAbortableHttpResponseBase::SetLengthLimit(size_t limit)
-{
- LengthLimit_ = limit;
- if (LengthLimit_ == 0) {
- Abort();
- }
-}
-
-const TString& TAbortableHttpResponseBase::GetUrl() const
-{
- return Url_;
-}
-
-bool TAbortableHttpResponseBase::IsAborted() const
-{
- return Aborted_;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TAbortableHttpResponse::TAbortableHttpResponse(
- IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName,
- const TString& url)
- : THttpResponse(socketStream, requestId, hostName)
- , TAbortableHttpResponseBase(url)
-{
-}
-
-size_t TAbortableHttpResponse::DoRead(void* buf, size_t len)
-{
- if (Aborted_) {
- ythrow TAbortedForTestPurpose() << "response was aborted";
- }
- len = std::min(len, LengthLimit_);
- auto read = THttpResponse::DoRead(buf, len);
- LengthLimit_ -= read;
- if (LengthLimit_ == 0) {
- Abort();
- }
- return read;
-}
-
-size_t TAbortableHttpResponse::DoSkip(size_t len)
-{
- if (Aborted_) {
- ythrow TAbortedForTestPurpose() << "response was aborted";
- }
- return THttpResponse::DoSkip(len);
-}
-
-int TAbortableHttpResponse::AbortAll(const TString& urlPattern)
-{
- return TAbortableHttpResponseRegistry::Get().AbortAll(urlPattern);
-}
-
-TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
- const TString& urlPattern,
- const TOutageOptions& options)
-{
- return TOutage(urlPattern, TAbortableHttpResponseRegistry::Get(), options);
-}
-
-TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
- const TString& urlPattern,
- size_t responseCount)
-{
- return StartOutage(urlPattern, TOutageOptions().ResponseCount(responseCount));
-}
-
-TAbortableCoreHttpResponse::TAbortableCoreHttpResponse(
- std::unique_ptr<IInputStream> stream,
- const TString& url)
- : TAbortableHttpResponseBase(url)
- , Stream_(std::move(stream))
-{
-}
-
-size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len)
-{
- if (Aborted_) {
- ythrow TAbortedForTestPurpose() << "response was aborted";
- }
- len = std::min(len, LengthLimit_);
- auto read = Stream_->Read(buf, len);
- LengthLimit_ -= read;
- if (LengthLimit_ == 0) {
- Abort();
- }
-
- return read;
-}
-
-size_t TAbortableCoreHttpResponse::DoSkip(size_t len)
-{
- if (Aborted_) {
- ythrow TAbortedForTestPurpose() << "response was aborted";
- }
- return Stream_->Skip(len);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT