diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-22 16:37:15 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-22 16:58:29 +0300 |
commit | f9e36e78609de5b801f829466f06ccc9a87dc00b (patch) | |
tree | f0d954fa30bf5352c4ce825c3293032d72a51189 | |
parent | 75a274b6f3b196529306be849688c15b6537d1d2 (diff) | |
download | ydb-f9e36e78609de5b801f829466f06ccc9a87dc00b.tar.gz |
YT-23616: Move ReadTable to THttpRawClient
commit_hash:c145049aef2f4ccaff537670be830d81ebc6f8d6
-rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 105 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_reader.h | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 21 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp | 19 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h | 6 |
7 files changed, 75 insertions, 93 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index b312716877..e7538a22da 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -166,99 +166,28 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest(); } - bool areRangesUpdated = false; + auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_); - while (true) { - CurrentRequestRetryPolicy_->NotifyNewAttempt(); - - THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion)); - if (Context_.ServiceTicketAuth) { - header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket()); + if (rowIndex.Defined()) { + auto& ranges = Path_.MutableRanges(); + if (ranges.Empty()) { + ranges.ConstructInPlace(TVector{TReadRange()}); } else { - header.SetToken(Context_.Token); - } - - if (Context_.ImpersonationUser) { - header.SetImpersonationUser(*Context_.ImpersonationUser); - } - - auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_); - header.AddTransactionId(transactionId); - - const auto& controlAttributes = Options_.ControlAttributes_; - header.AddParameter("control_attributes", TNode() - ("enable_row_index", controlAttributes.EnableRowIndex_) - ("enable_range_index", controlAttributes.EnableRangeIndex_)); - header.SetOutputFormat(Format_); - - header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); - - if (rowIndex.Defined() && !areRangesUpdated) { - auto& ranges = Path_.MutableRanges(); - if (ranges.Empty()) { - ranges.ConstructInPlace(TVector{TReadRange()}); - } else { - if (rangeIndex.GetOrElse(0) >= ranges->size()) { - ythrow yexception() - << "range index " << rangeIndex.GetOrElse(0) - << " is out of range, input range count is " << ranges->size(); - } - ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); + if (rangeIndex.GetOrElse(0) >= ranges->size()) { + ythrow yexception() + << "range index " << rangeIndex.GetOrElse(0) + << " is out of range, input range count is " << ranges->size(); } - ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); - areRangesUpdated = true; - } - - header.MergeParameters(FormIORequestParameters(Path_, Options_)); - - auto requestId = CreateGuidAsString(); - - try { - const auto proxyName = GetProxyForHeavyRequest(Context_); - UpdateHeaderForProxyIfNeed(proxyName, Context_, header); - Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header); - - Input_ = Response_->GetResponseStream(); - - YT_LOG_DEBUG( - "RSP %v - table stream (RangeIndex: %v, RowIndex: %v)", - requestId, - rangeIndex, - rowIndex); - - return; - } catch (const TErrorResponse& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - if (!IsRetriable(e)) { - throw; - } - auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); - } catch (const std::exception& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - Response_.reset(); - Input_ = nullptr; - - auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); + ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); } + ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } + + Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>( + CurrentRequestRetryPolicy_, + [this, &transactionId] (TMutationId /*mutationId*/) { + return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); + }); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 61bc698340..3f73080046 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -2,8 +2,6 @@ #include <yt/cpp/mapreduce/common/fwd.h> -#include <yt/cpp/mapreduce/interface/io.h> - #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/http.h> @@ -55,8 +53,7 @@ private: THolder<TPingableTransaction> ReadTransaction_; - NHttpClient::IHttpResponsePtr Response_; - IInputStream* Input_; + std::unique_ptr<IInputStream> Input_; IRequestRetryPolicyPtr CurrentRequestRetryPolicy_; diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index b3ba487d2a..0377e0d064 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -279,6 +279,12 @@ public: const TYPath& path, const TAlterTableOptions& options = {}) = 0; + virtual std::unique_ptr<IInputStream> ReadTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableReaderOptions& options = {}) = 0; + virtual void AlterTableReplica( TMutationId& mutationId, const TReplicaId& replicaId, diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 582c66f682..469ad1d4ea 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -14,6 +14,8 @@ #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/tvm.h> +#include <yt/cpp/mapreduce/io/helpers.h> + #include <library/cpp/yson/node/node_io.h> namespace NYT::NDetail { @@ -798,6 +800,25 @@ TNode::TListType THttpRawClient::SelectRows( return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList(); } +std::unique_ptr<IInputStream> THttpRawClient::ReadTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableReaderOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion)); + header.SetOutputFormat(format); + header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); + header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, Context_.Config->Prefix, path, options)); + header.MergeParameters(FormIORequestParameters(path, options)); + + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); +} + void THttpRawClient::AlterTable( TMutationId& mutationId, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index d8e9e42434..2fe0dd771c 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -272,6 +272,12 @@ public: const TString& query, const TSelectRowsOptions& options = {}) override; + std::unique_ptr<IInputStream> ReadTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableReaderOptions& options = {}) override; + void AlterTable( TMutationId& mutationId, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index 8474bd0edc..4d6edede16 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -4,6 +4,7 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/fluent.h> #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/serialize.h> @@ -639,13 +640,29 @@ TNode SerializeParametersForDeleteRows( TNode SerializeParametersForTrimRows( const TString& pathPrefix, const TYPath& path, - const TTrimRowsOptions& /* options*/) + const TTrimRowsOptions& /*options*/) { TNode result; SetPathParam(&result, pathPrefix, path); return result; } +TNode SerializeParamsForReadTable( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TRichYPath& path, + const TTableReaderOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["control_attributes"] = BuildYsonNodeFluently() + .BeginMap() + .Item("enable_row_index").Value(options.ControlAttributes_.EnableRowIndex_) + .Item("enable_range_index").Value(options.ControlAttributes_.EnableRangeIndex_) + .EndMap(); + return result; +} + TNode SerializeParamsForParseYPath(const TRichYPath& path) { TNode result; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index 655198248c..07879b0dc8 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -146,6 +146,12 @@ TNode SerializeParametersForTrimRows( const TYPath& path, const TTrimRowsOptions& options); +TNode SerializeParamsForReadTable( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TRichYPath& path, + const TTableReaderOptions& options); + TNode SerializeParamsForParseYPath( const TRichYPath& path); |