aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordinmukhammed <dinmukhammed@yandex-team.ru>2022-02-18 15:50:28 +0300
committerdinmukhammed <dinmukhammed@yandex-team.ru>2022-02-18 15:50:28 +0300
commit9bc8d9cc172750f71ee4218bb3910b01492eca26 (patch)
treefd004d3de2a9e0544ab0b065408d0fde1780b063
parentb4cb34dfb2619f594d82e512fd9ff7fc97400133 (diff)
downloadydb-9bc8d9cc172750f71ee4218bb3910b01492eca26.tar.gz
YQ-560 Retry mechanism in HttpMultiGateway
Added Retry mechanism into HttpMultiGateway ref:0bb1363d5a97bb64e9fdf4cf638dddbd9856d576
-rw-r--r--CMakeLists.txt4
-rw-r--r--ydb/library/yql/providers/common/http_gateway/CMakeLists.txt2
-rw-r--r--ydb/library/yql/providers/common/http_gateway/ya.make2
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp70
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h11
5 files changed, 78 insertions, 11 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 08bd79ce23..b99930af47 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -836,6 +836,8 @@ add_subdirectory(ydb/library/yql/providers/clickhouse/expr_nodes)
add_subdirectory(ydb/library/yql/providers/clickhouse/proto)
add_subdirectory(ydb/library/yql/providers/common/dq)
add_subdirectory(ydb/library/yql/providers/common/http_gateway)
+add_subdirectory(library/cpp/retry)
+add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/library/yql/providers/common/comp_nodes)
add_subdirectory(ydb/library/yql/providers/dq/provider/exec)
add_subdirectory(ydb/library/yql/providers/pq/cm_client/interface)
@@ -854,8 +856,6 @@ add_subdirectory(ydb/library/folder_service)
add_subdirectory(ydb/core/persqueue)
add_subdirectory(ydb/core/persqueue/codecs)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_core)
-add_subdirectory(library/cpp/retry)
-add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_core/impl)
add_subdirectory(library/cpp/containers/disjoint_interval_tree)
add_subdirectory(ydb/library/persqueue/obfuscate)
diff --git a/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt b/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt
index 20962f656d..3aa7a46029 100644
--- a/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt
+++ b/ydb/library/yql/providers/common/http_gateway/CMakeLists.txt
@@ -7,6 +7,8 @@ target_link_libraries(providers-common-http_gateway PUBLIC
yutil
contrib-libs-curl
cpp-monlib-dynamic_counters
+ library-cpp-retry
+ cpp-threading-task_scheduler
providers-common-proto
yql-public-issue
)
diff --git a/ydb/library/yql/providers/common/http_gateway/ya.make b/ydb/library/yql/providers/common/http_gateway/ya.make
index f53489973e..4093364336 100644
--- a/ydb/library/yql/providers/common/http_gateway/ya.make
+++ b/ydb/library/yql/providers/common/http_gateway/ya.make
@@ -12,6 +12,8 @@ SRCS(
PEERDIR(
contrib/libs/curl
library/cpp/monlib/dynamic_counters
+ library/cpp/retry
+ library/cpp/threading/task_scheduler
ydb/library/yql/providers/common/proto
ydb/library/yql/public/issue
)
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 5c2a1451b4..eccab2fa27 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -110,19 +110,22 @@ private:
std::stack<IHTTPGateway::TOnResult> Callbacks;
};
-using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString>;
+using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
class TKeyHash {
public:
- TKeyHash() : Hash() {}
+ TKeyHash() : Hash(), HashPtr() {}
size_t operator()(const TKeyType& key) const {
const auto& headers = std::get<1U>(key);
- return std::accumulate(headers.cbegin(), headers.cend(), CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key))),
+ auto initHash = CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key)));
+ initHash = CombineHashes(HashPtr(std::get<3U>(key)), initHash);
+ return std::accumulate(headers.cbegin(), headers.cend(), initHash,
[this](size_t hash, const TString& item) { return CombineHashes(hash, Hash(item)); });
}
private:
const std::hash<TString> Hash;
+ const std::hash<IRetryPolicy<long>::TPtr> HashPtr;
};
class THTTPMultiGateway : public IHTTPGateway {
@@ -148,6 +151,8 @@ public:
if (httpGatewaysCfg->HasMaxSimulatenousDownloadsSize()) {
MaxSimulatenousDownloadsSize = httpGatewaysCfg->GetMaxSimulatenousDownloadsSize();
}
+
+ TaskScheduler.Start();
}
~THTTPMultiGateway() {
@@ -227,7 +232,17 @@ private:
{
const std::unique_lock lock(Sync);
if (const auto it = Allocated.find(handle); Allocated.cend() != it) {
+ long httpResponseCode = 0;
easy = std::move(it->second);
+ curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
+
+ if (const auto stateIt = Easy2RetryState.find(easy); stateIt != Easy2RetryState.end()) {
+ if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) {
+ Y_VERIFY(TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay));
+ } else {
+ Easy2RetryState.erase(stateIt);
+ }
+ }
AllocatedSize -= easy->GetExpectedSize();
Allocated.erase(it);
}
@@ -265,30 +280,60 @@ private:
}
}
- void Download(TString url, THeaders headers, std::size_t expectedSize, TOnResult callback, TString data) final {
+ void Download(
+ TString url,
+ THeaders headers,
+ std::size_t expectedSize,
+ TOnResult callback,
+ TString data,
+ IRetryPolicy<long>::TPtr retryPolicy) final
+ {
Rps->Inc();
InFlight->Inc();
const std::unique_lock lock(Sync);
- auto& entry = Requests[TKeyType(url, headers, data)];
+ auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)];
if (const auto& easy = entry.lock())
return easy->AddCallback(std::move(callback));
auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
entry = easy;
+ Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState()));
Await.emplace(std::move(easy));
if (Allocated.size() < MaxHandlers && AllocatedSize + expectedSize + OutputSize.load() <= MaxSimulatenousDownloadsSize) {
curl_multi_wakeup(Handle);
}
}
+ class THttpGatewayTask: public TTaskScheduler::IRepeatedTask {
+ public:
+ THttpGatewayTask(
+ TEasyCurl::TPtr easy,
+ THTTPMultiGateway::TWeakPtr gateway)
+ : Easy(easy)
+ , Gateway(gateway)
+ {}
+
+ bool Process() override {
+ if (const auto g = Gateway.lock()) {
+ Y_VERIFY(Easy);
+ g->Await.emplace(std::move(Easy));
+ }
+ return false;
+ }
+ private:
+ TEasyCurl::TPtr Easy;
+ THTTPMultiGateway::TWeakPtr Gateway;
+ };
+
+private:
CURLM* Handle = nullptr;
std::queue<TEasyCurl::TPtr> Await;
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
-
std::unordered_map<TKeyType, TEasyCurl::TWeakPtr, TKeyHash> Requests;
+ std::unordered_map<TEasyCurl::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState;
std::mutex Sync;
std::thread Thread;
@@ -303,6 +348,8 @@ private:
const NMonitoring::TDynamicCounters::TCounterPtr Rps;
const NMonitoring::TDynamicCounters::TCounterPtr InFlight;
const NMonitoring::TDynamicCounters::TCounterPtr AllocatedMemory;
+
+ TTaskScheduler TaskScheduler;
};
std::atomic_size_t THTTPMultiGateway::OutputSize = 0ULL;
@@ -334,9 +381,16 @@ private:
easy->Done(result);
}
- void Download(TString url, THeaders headers, std::size_t expectedSize, TOnResult callback, TString data) final {
+ void Download(
+ TString url,
+ THeaders headers,
+ std::size_t expectedSize,
+ TOnResult callback,
+ TString data,
+ IRetryPolicy<long>::TPtr retryPolicy) final
+ {
const std::unique_lock lock(Sync);
- auto& entry = Requests[TKeyType(url, headers, data)];
+ auto& entry = Requests[TKeyType(url, headers, data, std::move(retryPolicy))];
if (const auto& easy = entry.first.lock())
return easy->AddCallback(std::move(callback));
auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 4e15aa9c78..506b7d774e 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -5,6 +5,8 @@
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/retry/retry_policy.h>
+#include <library/cpp/threading/task_scheduler/task_scheduler.h>
#include <variant>
#include <functional>
@@ -48,7 +50,14 @@ public:
using TOnResult = std::function<void(TResult&&)>;
using THeaders = TSmallVec<TString>;
- virtual void Download(TString url, THeaders headers, std::size_t expectedSize, TOnResult callback, TString data = {}) = 0;
+ virtual void Download(
+ TString url,
+ THeaders headers,
+ std::size_t expectedSize,
+ TOnResult callback,
+ TString data = {},
+ IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()
+ ) = 0;
};
}