aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2025-02-08 15:42:05 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-02-08 16:00:27 +0300
commit2309a9980fd82ba7df5a21876c790e7e4d776ded (patch)
tree1c17d715ab385e3444ccb610835f14fcb443553e
parentf432cacb199120d2ca3b2c6c4224362cef823a44 (diff)
downloadydb-2309a9980fd82ba7df5a21876c790e7e4d776ded.tar.gz
YT-23616: Switch TYtPoller to IRawClient
commit_hash:30039ea85dc5a55f14b4964ce3aec0a0508d6836
-rw-r--r--yt/cpp/mapreduce/client/client.cpp2
-rw-r--r--yt/cpp/mapreduce/client/yt_poller.cpp15
-rw-r--r--yt/cpp/mapreduce/client/yt_poller.h8
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.cpp5
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.h2
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h2
6 files changed, 25 insertions, 9 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 92a255696f..6d8ce22ce5 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1433,7 +1433,7 @@ TYtPoller& TClient::GetYtPoller()
// We don't use current client and create new client because YtPoller_ might use
// this client during current client shutdown.
// That might lead to incrementing of current client refcount and double delete of current client object.
- YtPoller_ = std::make_unique<TYtPoller>(Context_, ClientRetryPolicy_);
+ YtPoller_ = std::make_unique<TYtPoller>(RawClient_->Clone(), Context_.Config, ClientRetryPolicy_);
}
return *YtPoller_;
}
diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp
index 1b5b156aa5..12f4af06d2 100644
--- a/yt/cpp/mapreduce/client/yt_poller.cpp
+++ b/yt/cpp/mapreduce/client/yt_poller.cpp
@@ -9,6 +9,7 @@
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
@@ -20,9 +21,11 @@ using namespace NRawClient;
////////////////////////////////////////////////////////////////////////////////
TYtPoller::TYtPoller(
- TClientContext context,
+ IRawClientPtr rawClient,
+ const TConfigPtr& config,
const IClientRetryPolicyPtr& retryPolicy)
- : Context_(std::move(context))
+ : RawClient_(std::move(rawClient))
+ , Config_(config)
, ClientRetryPolicy_(retryPolicy)
, WaiterThread_(&TYtPoller::WatchLoopProc, this)
{
@@ -83,7 +86,7 @@ void TYtPoller::WatchLoop()
{
auto ug = Unguard(Lock_); // allow adding new items into Pending_
TWaitProxy::Get()->SleepUntil(nextRequest);
- nextRequest = TInstant::Now() + Context_.Config->WaitLockPollInterval;
+ nextRequest = TInstant::Now() + Config_->WaitLockPollInterval;
}
if (!Pending_.empty()) {
InProgress_.splice(InProgress_.end(), Pending_);
@@ -91,14 +94,14 @@ void TYtPoller::WatchLoop()
Y_ABORT_UNLESS(!InProgress_.empty());
}
- THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest());
+ auto rawBatchRequest = RawClient_->CreateRawBatchRequest();
for (auto& item : InProgress_) {
- item->PrepareRequest(&rawBatchRequest);
+ item->PrepareRequest(rawBatchRequest.Get());
}
try {
- rawBatchRequest.ExecuteBatch();
+ rawBatchRequest->ExecuteBatch();
} catch (const std::exception& ex) {
YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
}
diff --git a/yt/cpp/mapreduce/client/yt_poller.h b/yt/cpp/mapreduce/client/yt_poller.h
index c2e6809324..700541827c 100644
--- a/yt/cpp/mapreduce/client/yt_poller.h
+++ b/yt/cpp/mapreduce/client/yt_poller.h
@@ -46,7 +46,10 @@ class TYtPoller
: public TThrRefBase
{
public:
- TYtPoller(TClientContext context, const IClientRetryPolicyPtr& retryPolicy);
+ TYtPoller(
+ IRawClientPtr rawClient,
+ const TConfigPtr& config,
+ const IClientRetryPolicyPtr& retryPolicy);
~TYtPoller();
void Watch(IYtPollerItemPtr item);
@@ -62,7 +65,8 @@ private:
private:
struct TItem;
- const TClientContext Context_;
+ const IRawClientPtr RawClient_;
+ const TConfigPtr Config_;
const IClientRetryPolicyPtr ClientRetryPolicy_;
diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp
index 378fbdfde6..eb45b1315b 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/http_client/raw_client.cpp
@@ -933,6 +933,11 @@ IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
}
+IRawClientPtr THttpRawClient::Clone()
+{
+ return ::MakeIntrusive<THttpRawClient>(Context_);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h
index f0e8378db3..6aa670f524 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.h
+++ b/yt/cpp/mapreduce/http_client/raw_client.h
@@ -337,6 +337,8 @@ public:
IRawBatchRequestPtr CreateRawBatchRequest() override;
+ IRawClientPtr Clone() override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 441e738287..b97c06ce37 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -331,6 +331,8 @@ public:
// Batch
virtual IRawBatchRequestPtr CreateRawBatchRequest() = 0;
+
+ virtual IRawClientPtr Clone() = 0;
};
////////////////////////////////////////////////////////////////////////////////