diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-08-04 16:47:03 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-08-04 16:47:03 +0300 |
commit | 6de758f15f8df28b4f991c4084937d71cdefa20d (patch) | |
tree | aadbb814cfbfbfe668edd60433cdf0886d18aba0 | |
parent | bf961bc388920c0f3ccdeb8e34b1b60254ce445b (diff) | |
download | ydb-6de758f15f8df28b4f991c4084937d71cdefa20d.tar.gz |
Add cancel hook for streaming download.
3 files changed, 33 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 75edbe3669e..5dd2b295d8e 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -69,12 +69,13 @@ public: } } - virtual void Download( + TCancelHook Download( TString , THeaders , std::size_t , TOnNewDataPart , - TOnDownloadFinish ) { + TOnDownloadFinish ) final { + return {}; } void AddDefaultResponse(TDataDefaultResponse response) { diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 4933e2fcf30..1b244f983b8 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -209,12 +209,20 @@ public: } enum class EAction : i8 { + Drop = -2, Stop = -1, None = 0, - Work = 1 + Work = 1, + StopAndDrop = Stop + Drop }; EAction GetAction(size_t buffersSize) { + if (Cancelled) { + const auto ret = Working ? EAction::StopAndDrop : EAction::Drop; + Working = false; + return ret; + } + if (Working != Counter->load() < buffersSize) { if (Working = !Working) SkipTo(Position); @@ -223,6 +231,11 @@ public: return EAction::None; } + + void Cancel(TIssue issue) { + Cancelled = true; + OnFinish(TIssues{issue}); + } private: void Fail(const TIssue& error) final { Working = false; @@ -251,6 +264,7 @@ private: const std::shared_ptr<std::atomic_size_t> Counter; bool Working = false; size_t Position = 0ULL; + bool Cancelled = false; }; using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; @@ -395,12 +409,19 @@ private: for (auto it = Streams.cbegin(); Streams.cend() != it;) { if (const auto& stream = it->lock()) { switch (stream->GetAction(BuffersSizePerStream)) { + case TEasyCurlStream::EAction::Drop: + Allocated.erase(stream->GetHandle()); + break; case TEasyCurlStream::EAction::Work: curl_multi_add_handle(Handle, stream->GetHandle()); break; case TEasyCurlStream::EAction::Stop: curl_multi_remove_handle(Handle, stream->GetHandle()); break; + case TEasyCurlStream::EAction::StopAndDrop: + curl_multi_remove_handle(Handle, stream->GetHandle()); + Allocated.erase(stream->GetHandle()); + break; case TEasyCurlStream::EAction::None: break; } @@ -515,7 +536,7 @@ private: Wakeup(expectedSize); } - void Download( + TCancelHook Download( TString url, THeaders headers, size_t offset, @@ -525,9 +546,14 @@ private: auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); const auto handle = stream->GetHandle(); + TEasyCurlStream::TWeakPtr weak = stream; Streams.emplace_back(stream); Allocated.emplace(handle, std::move(stream)); Wakeup(0ULL); + return [weak](TIssue issue) { + if (const auto& stream = weak.lock()) + stream->Cancel(issue); + }; } void OnRetry(TEasyCurlBuffer::TPtr easy) { diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 7f0fbb3c00b..2f80cb7d801 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -96,8 +96,9 @@ public: using TOnNewDataPart = std::function<void(TCountedContent&&)>; using TOnDownloadFinish = std::function<void(std::variant<long, TIssues>)>; // http code or issues. + using TCancelHook = std::function<void(TIssue)>; - virtual void Download( + virtual TCancelHook Download( TString url, THeaders headers, std::size_t offset, |