aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2024-12-19 21:42:25 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2024-12-19 22:01:45 +0300
commit9ae7e7cc07f96dea61e2875b0978feb5ca354caf (patch)
tree14224d0dd7a2b0b03e04031dddbf812ff2613cac
parentcd46bd9fd4cf19e780612d8321e2fdec1b214999 (diff)
downloadydb-9ae7e7cc07f96dea61e2875b0978feb5ca354caf.tar.gz
[yt/cpp/mapreduce] YT-23616: Move SkyShareTable implementation to THttpRawClient
commit_hash:92ea122e55a25d9ae51128f652c4e43c1e72c05d
-rw-r--r--yt/cpp/mapreduce/client/client.cpp26
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h12
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp26
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp44
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h9
6 files changed, 65 insertions, 58 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 9e3976b144b..69f0147a66a 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1352,11 +1352,27 @@ TNode::TListType TClient::SkyShareTable(
const TSkyShareTableOptions& options)
{
CheckShutdown();
- return NRawClient::SkyShareTable(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- Context_,
- tablePaths,
- options);
+
+ // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
+ // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
+ TResponseInfo response;
+ while (response.HttpCode != 200) {
+ response = RequestWithRetry<TResponseInfo>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &tablePaths, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->SkyShareTable(tablePaths, options);
+ });
+ TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
+ }
+
+ if (options.KeyColumns_) {
+ return NodeFromJsonString(response.Response)["torrents"].AsList();
+ } else {
+ TNode torrent;
+ torrent["key"] = TNode::CreateList();
+ torrent["rbtorrent"] = response.Response;
+ return TNode::TListType{torrent};
+ }
}
TCheckPermissionResponse TClient::CheckPermission(
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 32055e3d004..06d4fb9b07a 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -8,6 +8,12 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+namespace NDetail {
+ struct TResponseInfo;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
class IRawClient
: public virtual TThrRefBase
{
@@ -196,6 +202,12 @@ public:
const TOperationId& operationId,
const TGetJobTraceOptions& options = {}) = 0;
+ // SkyShare
+
+ virtual NDetail::TResponseInfo SkyShareTable(
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options = {}) = 0;
+
// File cache
virtual TMaybe<TYPath> GetFileFromCache(
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 71d8d5fba90..49ea8ce975f 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -586,6 +586,32 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
return result;
}
+TResponseInfo THttpRawClient::SkyShareTable(
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
+
+ auto proxyName = Context_.ServerName.substr(0, Context_.ServerName.find('.'));
+
+ auto host = Context_.Config->SkynetApiHost;
+ if (host == "") {
+ host = "skynet." + proxyName + ".yt.yandex.net";
+ }
+
+ TSkyShareTableOptions patchedOptions = options;
+
+ if (Context_.Config->Pool && !patchedOptions.Pool_) {
+ patchedOptions.Pool(Context_.Config->Pool);
+ }
+
+ header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, Context_.Config->Prefix, tablePaths, patchedOptions));
+ TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()});
+
+ return RequestWithoutRetry(skyApiHost, mutationId, header);
+}
+
TMaybe<TYPath> THttpRawClient::GetFileFromCache(
const TTransactionId& transactionId,
const TString& md5Signature,
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index 08015f024f8..7a69dc79988 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -202,6 +202,12 @@ public:
const TOperationId& operationId,
const TGetJobTraceOptions& options = {}) override;
+ // SkyShare
+
+ TResponseInfo SkyShareTable(
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options = {}) override;
+
// File cache
TMaybe<TYPath> GetFileFromCache(
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index a3f10e6c414..a3f01da6fc9 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -301,50 +301,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
return result;
}
-TNode::TListType SkyShareTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const std::vector<TYPath>& tablePaths,
- const TSkyShareTableOptions& options)
-{
- THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
-
- auto proxyName = context.ServerName.substr(0, context.ServerName.find('.'));
-
- auto host = context.Config->SkynetApiHost;
- if (host == "") {
- host = "skynet." + proxyName + ".yt.yandex.net";
- }
-
- TSkyShareTableOptions patchedOptions = options;
-
- if (context.Config->Pool && !patchedOptions.Pool_) {
- patchedOptions.Pool(context.Config->Pool);
- }
-
- header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions));
- TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() });
- TResponseInfo response = {};
-
- // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
- // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
- while (response.HttpCode != 200) {
- response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, "");
- TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
- }
-
- if (options.KeyColumns_) {
- return NodeFromJsonString(response.Response)["torrents"].AsList();
- } else {
- TNode torrent;
-
- torrent["key"] = TNode::CreateList();
- torrent["rbtorrent"] = response.Response;
-
- return TNode::TListType{ torrent };
- }
-}
-
TRichYPath CanonizeYPath(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index c60536c86d0..bcc9a4bfd7f 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -29,7 +29,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node);
////////////////////////////////////////////////////////////////////////////////
-//
// marks `batchRequest' as executed
void ExecuteBatch(
IRequestRetryPolicyPtr retryPolicy,
@@ -37,14 +36,6 @@ void ExecuteBatch(
TRawBatchRequest& batchRequest,
const TExecuteBatchOptions& options = {});
-// SkyShare
-
-TNode::TListType SkyShareTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const std::vector<TYPath>& tablePaths,
- const TSkyShareTableOptions& options = {});
-
// Misc
TRichYPath CanonizeYPath(