aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-08-04 16:47:03 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-08-04 16:47:03 +0300
commit6de758f15f8df28b4f991c4084937d71cdefa20d (patch)
treeaadbb814cfbfbfe668edd60433cdf0886d18aba0
parentbf961bc388920c0f3ccdeb8e34b1b60254ce445b (diff)
downloadydb-6de758f15f8df28b4f991c4084937d71cdefa20d.tar.gz
Add cancel hook for streaming download.
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp5
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp30
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h3
3 files changed, 33 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 75edbe3669e..5dd2b295d8e 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -69,12 +69,13 @@ public:
}
}
- virtual void Download(
+ TCancelHook Download(
TString ,
THeaders ,
std::size_t ,
TOnNewDataPart ,
- TOnDownloadFinish ) {
+ TOnDownloadFinish ) final {
+ return {};
}
void AddDefaultResponse(TDataDefaultResponse response) {
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 4933e2fcf30..1b244f983b8 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -209,12 +209,20 @@ public:
}
enum class EAction : i8 {
+ Drop = -2,
Stop = -1,
None = 0,
- Work = 1
+ Work = 1,
+ StopAndDrop = Stop + Drop
};
EAction GetAction(size_t buffersSize) {
+ if (Cancelled) {
+ const auto ret = Working ? EAction::StopAndDrop : EAction::Drop;
+ Working = false;
+ return ret;
+ }
+
if (Working != Counter->load() < buffersSize) {
if (Working = !Working)
SkipTo(Position);
@@ -223,6 +231,11 @@ public:
return EAction::None;
}
+
+ void Cancel(TIssue issue) {
+ Cancelled = true;
+ OnFinish(TIssues{issue});
+ }
private:
void Fail(const TIssue& error) final {
Working = false;
@@ -251,6 +264,7 @@ private:
const std::shared_ptr<std::atomic_size_t> Counter;
bool Working = false;
size_t Position = 0ULL;
+ bool Cancelled = false;
};
using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
@@ -395,12 +409,19 @@ private:
for (auto it = Streams.cbegin(); Streams.cend() != it;) {
if (const auto& stream = it->lock()) {
switch (stream->GetAction(BuffersSizePerStream)) {
+ case TEasyCurlStream::EAction::Drop:
+ Allocated.erase(stream->GetHandle());
+ break;
case TEasyCurlStream::EAction::Work:
curl_multi_add_handle(Handle, stream->GetHandle());
break;
case TEasyCurlStream::EAction::Stop:
curl_multi_remove_handle(Handle, stream->GetHandle());
break;
+ case TEasyCurlStream::EAction::StopAndDrop:
+ curl_multi_remove_handle(Handle, stream->GetHandle());
+ Allocated.erase(stream->GetHandle());
+ break;
case TEasyCurlStream::EAction::None:
break;
}
@@ -515,7 +536,7 @@ private:
Wakeup(expectedSize);
}
- void Download(
+ TCancelHook Download(
TString url,
THeaders headers,
size_t offset,
@@ -525,9 +546,14 @@ private:
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
const auto handle = stream->GetHandle();
+ TEasyCurlStream::TWeakPtr weak = stream;
Streams.emplace_back(stream);
Allocated.emplace(handle, std::move(stream));
Wakeup(0ULL);
+ return [weak](TIssue issue) {
+ if (const auto& stream = weak.lock())
+ stream->Cancel(issue);
+ };
}
void OnRetry(TEasyCurlBuffer::TPtr easy) {
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 7f0fbb3c00b..2f80cb7d801 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -96,8 +96,9 @@ public:
using TOnNewDataPart = std::function<void(TCountedContent&&)>;
using TOnDownloadFinish = std::function<void(std::variant<long, TIssues>)>; // http code or issues.
+ using TCancelHook = std::function<void(TIssue)>;
- virtual void Download(
+ virtual TCancelHook Download(
TString url,
THeaders headers,
std::size_t offset,