diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-02-08 15:42:05 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-02-08 16:00:27 +0300 |
commit | 2309a9980fd82ba7df5a21876c790e7e4d776ded (patch) | |
tree | 1c17d715ab385e3444ccb610835f14fcb443553e | |
parent | f432cacb199120d2ca3b2c6c4224362cef823a44 (diff) | |
download | ydb-2309a9980fd82ba7df5a21876c790e7e4d776ded.tar.gz |
YT-23616: Switch TYtPoller to IRawClient
commit_hash:30039ea85dc5a55f14b4964ce3aec0a0508d6836
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/yt_poller.cpp | 15 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/yt_poller.h | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 2 |
6 files changed, 25 insertions, 9 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 92a255696f..6d8ce22ce5 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1433,7 +1433,7 @@ TYtPoller& TClient::GetYtPoller() // We don't use current client and create new client because YtPoller_ might use // this client during current client shutdown. // That might lead to incrementing of current client refcount and double delete of current client object. - YtPoller_ = std::make_unique<TYtPoller>(Context_, ClientRetryPolicy_); + YtPoller_ = std::make_unique<TYtPoller>(RawClient_->Clone(), Context_.Config, ClientRetryPolicy_); } return *YtPoller_; } diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp index 1b5b156aa5..12f4af06d2 100644 --- a/yt/cpp/mapreduce/client/yt_poller.cpp +++ b/yt/cpp/mapreduce/client/yt_poller.cpp @@ -9,6 +9,7 @@ #include <yt/cpp/mapreduce/http/retry_request.h> #include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> @@ -20,9 +21,11 @@ using namespace NRawClient; //////////////////////////////////////////////////////////////////////////////// TYtPoller::TYtPoller( - TClientContext context, + IRawClientPtr rawClient, + const TConfigPtr& config, const IClientRetryPolicyPtr& retryPolicy) - : Context_(std::move(context)) + : RawClient_(std::move(rawClient)) + , Config_(config) , ClientRetryPolicy_(retryPolicy) , WaiterThread_(&TYtPoller::WatchLoopProc, this) { @@ -83,7 +86,7 @@ void TYtPoller::WatchLoop() { auto ug = Unguard(Lock_); // allow adding new items into Pending_ TWaitProxy::Get()->SleepUntil(nextRequest); - nextRequest = TInstant::Now() + Context_.Config->WaitLockPollInterval; + nextRequest = TInstant::Now() + Config_->WaitLockPollInterval; } if (!Pending_.empty()) { InProgress_.splice(InProgress_.end(), Pending_); @@ -91,14 +94,14 @@ void TYtPoller::WatchLoop() Y_ABORT_UNLESS(!InProgress_.empty()); } - THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest()); + auto rawBatchRequest = RawClient_->CreateRawBatchRequest(); for (auto& item : InProgress_) { - item->PrepareRequest(&rawBatchRequest); + item->PrepareRequest(rawBatchRequest.Get()); } try { - rawBatchRequest.ExecuteBatch(); + rawBatchRequest->ExecuteBatch(); } catch (const std::exception& ex) { YT_LOG_ERROR("Exception while executing batch request: %v", ex.what()); } diff --git a/yt/cpp/mapreduce/client/yt_poller.h b/yt/cpp/mapreduce/client/yt_poller.h index c2e6809324..700541827c 100644 --- a/yt/cpp/mapreduce/client/yt_poller.h +++ b/yt/cpp/mapreduce/client/yt_poller.h @@ -46,7 +46,10 @@ class TYtPoller : public TThrRefBase { public: - TYtPoller(TClientContext context, const IClientRetryPolicyPtr& retryPolicy); + TYtPoller( + IRawClientPtr rawClient, + const TConfigPtr& config, + const IClientRetryPolicyPtr& retryPolicy); ~TYtPoller(); void Watch(IYtPollerItemPtr item); @@ -62,7 +65,8 @@ private: private: struct TItem; - const TClientContext Context_; + const IRawClientPtr RawClient_; + const TConfigPtr Config_; const IClientRetryPolicyPtr ClientRetryPolicy_; diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp index 378fbdfde6..eb45b1315b 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.cpp +++ b/yt/cpp/mapreduce/http_client/raw_client.cpp @@ -933,6 +933,11 @@ IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest() return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr); } +IRawClientPtr THttpRawClient::Clone() +{ + return ::MakeIntrusive<THttpRawClient>(Context_); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h index f0e8378db3..6aa670f524 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.h +++ b/yt/cpp/mapreduce/http_client/raw_client.h @@ -337,6 +337,8 @@ public: IRawBatchRequestPtr CreateRawBatchRequest() override; + IRawClientPtr Clone() override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 441e738287..b97c06ce37 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -331,6 +331,8 @@ public: // Batch virtual IRawBatchRequestPtr CreateRawBatchRequest() = 0; + + virtual IRawClientPtr Clone() = 0; }; //////////////////////////////////////////////////////////////////////////////// |