diff options
author | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-18 15:50:28 +0300 |
---|---|---|
committer | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-18 15:50:28 +0300 |
commit | 9bc8d9cc172750f71ee4218bb3910b01492eca26 (patch) | |
tree | fd004d3de2a9e0544ab0b065408d0fde1780b063 | |
parent | b4cb34dfb2619f594d82e512fd9ff7fc97400133 (diff) | |
download | ydb-9bc8d9cc172750f71ee4218bb3910b01492eca26.tar.gz |
YQ-560 Retry mechanism in HttpMultiGateway
Added Retry mechanism into HttpMultiGateway
ref:0bb1363d5a97bb64e9fdf4cf638dddbd9856d576
5 files changed, 78 insertions, 11 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 08bd79ce23..b99930af47 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -836,6 +836,8 @@ add_subdirectory(ydb/library/yql/providers/clickhouse/expr_nodes) add_subdirectory(ydb/library/yql/providers/clickhouse/proto) add_subdirectory(ydb/library/yql/providers/common/dq) add_subdirectory(ydb/library/yql/providers/common/http_gateway) +add_subdirectory(library/cpp/retry) +add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/library/yql/providers/common/comp_nodes) add_subdirectory(ydb/library/yql/providers/dq/provider/exec) add_subdirectory(ydb/library/yql/providers/pq/cm_client/interface) @@ -854,8 +856,6 @@ add_subdirectory(ydb/library/folder_service) add_subdirectory(ydb/core/persqueue) add_subdirectory(ydb/core/persqueue/codecs) add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_core) -add_subdirectory(library/cpp/retry) -add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_core/impl) add_subdirectory(library/cpp/containers/disjoint_interval_tree) add_subdirectory(ydb/library/persqueue/obfuscate) diff --git a/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt b/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt index 20962f656d..3aa7a46029 100644 --- a/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt +++ b/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt @@ -7,6 +7,8 @@ target_link_libraries(providers-common-http_gateway PUBLIC yutil contrib-libs-curl cpp-monlib-dynamic_counters + library-cpp-retry + cpp-threading-task_scheduler providers-common-proto yql-public-issue ) diff --git a/ydb/library/yql/providers/common/http_gateway/ya.make b/ydb/library/yql/providers/common/http_gateway/ya.make index f53489973e..4093364336 100644 --- a/ydb/library/yql/providers/common/http_gateway/ya.make +++ b/ydb/library/yql/providers/common/http_gateway/ya.make @@ -12,6 +12,8 @@ SRCS( PEERDIR( contrib/libs/curl library/cpp/monlib/dynamic_counters + library/cpp/retry + library/cpp/threading/task_scheduler ydb/library/yql/providers/common/proto ydb/library/yql/public/issue ) diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 5c2a1451b4..eccab2fa27 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -110,19 +110,22 @@ private: std::stack<IHTTPGateway::TOnResult> Callbacks; }; -using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString>; +using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; class TKeyHash { public: - TKeyHash() : Hash() {} + TKeyHash() : Hash(), HashPtr() {} size_t operator()(const TKeyType& key) const { const auto& headers = std::get<1U>(key); - return std::accumulate(headers.cbegin(), headers.cend(), CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key))), + auto initHash = CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key))); + initHash = CombineHashes(HashPtr(std::get<3U>(key)), initHash); + return std::accumulate(headers.cbegin(), headers.cend(), initHash, [this](size_t hash, const TString& item) { return CombineHashes(hash, Hash(item)); }); } private: const std::hash<TString> Hash; + const std::hash<IRetryPolicy<long>::TPtr> HashPtr; }; class THTTPMultiGateway : public IHTTPGateway { @@ -148,6 +151,8 @@ public: if (httpGatewaysCfg->HasMaxSimulatenousDownloadsSize()) { MaxSimulatenousDownloadsSize = httpGatewaysCfg->GetMaxSimulatenousDownloadsSize(); } + + TaskScheduler.Start(); } ~THTTPMultiGateway() { @@ -227,7 +232,17 @@ private: { const std::unique_lock lock(Sync); if (const auto it = Allocated.find(handle); Allocated.cend() != it) { + long httpResponseCode = 0; easy = std::move(it->second); + curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); + + if (const auto stateIt = Easy2RetryState.find(easy); stateIt != Easy2RetryState.end()) { + if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) { + Y_VERIFY(TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay)); + } else { + Easy2RetryState.erase(stateIt); + } + } AllocatedSize -= easy->GetExpectedSize(); Allocated.erase(it); } @@ -265,30 +280,60 @@ private: } } - void Download(TString url, THeaders headers, std::size_t expectedSize, TOnResult callback, TString data) final { + void Download( + TString url, + THeaders headers, + std::size_t expectedSize, + TOnResult callback, + TString data, + IRetryPolicy<long>::TPtr retryPolicy) final + { Rps->Inc(); InFlight->Inc(); const std::unique_lock lock(Sync); - auto& entry = Requests[TKeyType(url, headers, data)]; + auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)]; if (const auto& easy = entry.lock()) return easy->AddCallback(std::move(callback)); auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); entry = easy; + Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState())); Await.emplace(std::move(easy)); if (Allocated.size() < MaxHandlers && AllocatedSize + expectedSize + OutputSize.load() <= MaxSimulatenousDownloadsSize) { curl_multi_wakeup(Handle); } } + class THttpGatewayTask: public TTaskScheduler::IRepeatedTask { + public: + THttpGatewayTask( + TEasyCurl::TPtr easy, + THTTPMultiGateway::TWeakPtr gateway) + : Easy(easy) + , Gateway(gateway) + {} + + bool Process() override { + if (const auto g = Gateway.lock()) { + Y_VERIFY(Easy); + g->Await.emplace(std::move(Easy)); + } + return false; + } + private: + TEasyCurl::TPtr Easy; + THTTPMultiGateway::TWeakPtr Gateway; + }; + +private: CURLM* Handle = nullptr; std::queue<TEasyCurl::TPtr> Await; std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated; - std::unordered_map<TKeyType, TEasyCurl::TWeakPtr, TKeyHash> Requests; + std::unordered_map<TEasyCurl::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState; std::mutex Sync; std::thread Thread; @@ -303,6 +348,8 @@ private: const NMonitoring::TDynamicCounters::TCounterPtr Rps; const NMonitoring::TDynamicCounters::TCounterPtr InFlight; const NMonitoring::TDynamicCounters::TCounterPtr AllocatedMemory; + + TTaskScheduler TaskScheduler; }; std::atomic_size_t THTTPMultiGateway::OutputSize = 0ULL; @@ -334,9 +381,16 @@ private: easy->Done(result); } - void Download(TString url, THeaders headers, std::size_t expectedSize, TOnResult callback, TString data) final { + void Download( + TString url, + THeaders headers, + std::size_t expectedSize, + TOnResult callback, + TString data, + IRetryPolicy<long>::TPtr retryPolicy) final + { const std::unique_lock lock(Sync); - auto& entry = Requests[TKeyType(url, headers, data)]; + auto& entry = Requests[TKeyType(url, headers, data, std::move(retryPolicy))]; if (const auto& easy = entry.first.lock()) return easy->AddCallback(std::move(callback)); auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 4e15aa9c78..506b7d774e 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -5,6 +5,8 @@ #include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/containers/stack_vector/stack_vec.h> #include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/retry/retry_policy.h> +#include <library/cpp/threading/task_scheduler/task_scheduler.h> #include <variant> #include <functional> @@ -48,7 +50,14 @@ public: using TOnResult = std::function<void(TResult&&)>; using THeaders = TSmallVec<TString>; - virtual void Download(TString url, THeaders headers, std::size_t expectedSize, TOnResult callback, TString data = {}) = 0; + virtual void Download( + TString url, + THeaders headers, + std::size_t expectedSize, + TOnResult callback, + TString data = {}, + IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy() + ) = 0; }; } |