diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-19 21:42:25 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-19 22:01:45 +0300 |
commit | 9ae7e7cc07f96dea61e2875b0978feb5ca354caf (patch) | |
tree | 14224d0dd7a2b0b03e04031dddbf812ff2613cac | |
parent | cd46bd9fd4cf19e780612d8321e2fdec1b214999 (diff) | |
download | ydb-9ae7e7cc07f96dea61e2875b0978feb5ca354caf.tar.gz |
[yt/cpp/mapreduce] YT-23616: Move SkyShareTable implementation to THttpRawClient
commit_hash:92ea122e55a25d9ae51128f652c4e43c1e72c05d
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 26 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 26 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 44 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 9 |
6 files changed, 65 insertions, 58 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 9e3976b144b..69f0147a66a 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1352,11 +1352,27 @@ TNode::TListType TClient::SkyShareTable( const TSkyShareTableOptions& options) { CheckShutdown(); - return NRawClient::SkyShareTable( - ClientRetryPolicy_->CreatePolicyForGenericRequest(), - Context_, - tablePaths, - options); + + // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu + // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK). + TResponseInfo response; + while (response.HttpCode != 200) { + response = RequestWithRetry<TResponseInfo>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &tablePaths, &options] (TMutationId /*mutationId*/) { + return RawClient_->SkyShareTable(tablePaths, options); + }); + TWaitProxy::Get()->Sleep(TDuration::Seconds(5)); + } + + if (options.KeyColumns_) { + return NodeFromJsonString(response.Response)["torrents"].AsList(); + } else { + TNode torrent; + torrent["key"] = TNode::CreateList(); + torrent["rbtorrent"] = response.Response; + return TNode::TListType{torrent}; + } } TCheckPermissionResponse TClient::CheckPermission( diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 32055e3d004..06d4fb9b07a 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -8,6 +8,12 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +namespace NDetail { + struct TResponseInfo; +} + +//////////////////////////////////////////////////////////////////////////////// + class IRawClient : public virtual TThrRefBase { @@ -196,6 +202,12 @@ public: const TOperationId& operationId, const TGetJobTraceOptions& options = {}) = 0; + // SkyShare + + virtual NDetail::TResponseInfo SkyShareTable( + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options = {}) = 0; + // File cache virtual TMaybe<TYPath> GetFileFromCache( diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 71d8d5fba90..49ea8ce975f 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -586,6 +586,32 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace( return result; } +TResponseInfo THttpRawClient::SkyShareTable( + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options) +{ + TMutationId mutationId; + THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); + + auto proxyName = Context_.ServerName.substr(0, Context_.ServerName.find('.')); + + auto host = Context_.Config->SkynetApiHost; + if (host == "") { + host = "skynet." + proxyName + ".yt.yandex.net"; + } + + TSkyShareTableOptions patchedOptions = options; + + if (Context_.Config->Pool && !patchedOptions.Pool_) { + patchedOptions.Pool(Context_.Config->Pool); + } + + header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, Context_.Config->Prefix, tablePaths, patchedOptions)); + TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()}); + + return RequestWithoutRetry(skyApiHost, mutationId, header); +} + TMaybe<TYPath> THttpRawClient::GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index 08015f024f8..7a69dc79988 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -202,6 +202,12 @@ public: const TOperationId& operationId, const TGetJobTraceOptions& options = {}) override; + // SkyShare + + TResponseInfo SkyShareTable( + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options = {}) override; + // File cache TMaybe<TYPath> GetFileFromCache( diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index a3f10e6c414..a3f01da6fc9 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -301,50 +301,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node) return result; } -TNode::TListType SkyShareTable( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const std::vector<TYPath>& tablePaths, - const TSkyShareTableOptions& options) -{ - THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); - - auto proxyName = context.ServerName.substr(0, context.ServerName.find('.')); - - auto host = context.Config->SkynetApiHost; - if (host == "") { - host = "skynet." + proxyName + ".yt.yandex.net"; - } - - TSkyShareTableOptions patchedOptions = options; - - if (context.Config->Pool && !patchedOptions.Pool_) { - patchedOptions.Pool(context.Config->Pool); - } - - header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions)); - TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() }); - TResponseInfo response = {}; - - // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu - // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK). - while (response.HttpCode != 200) { - response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, ""); - TWaitProxy::Get()->Sleep(TDuration::Seconds(5)); - } - - if (options.KeyColumns_) { - return NodeFromJsonString(response.Response)["torrents"].AsList(); - } else { - TNode torrent; - - torrent["key"] = TNode::CreateList(); - torrent["rbtorrent"] = response.Response; - - return TNode::TListType{ torrent }; - } -} - TRichYPath CanonizeYPath( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index c60536c86d0..bcc9a4bfd7f 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -29,7 +29,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); //////////////////////////////////////////////////////////////////////////////// -// // marks `batchRequest' as executed void ExecuteBatch( IRequestRetryPolicyPtr retryPolicy, @@ -37,14 +36,6 @@ void ExecuteBatch( TRawBatchRequest& batchRequest, const TExecuteBatchOptions& options = {}); -// SkyShare - -TNode::TListType SkyShareTable( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const std::vector<TYPath>& tablePaths, - const TSkyShareTableOptions& options = {}); - // Misc TRichYPath CanonizeYPath( |