aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2024-12-22 16:37:15 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2024-12-22 16:58:29 +0300
commitf9e36e78609de5b801f829466f06ccc9a87dc00b (patch)
treef0d954fa30bf5352c4ce825c3293032d72a51189
parent75a274b6f3b196529306be849688c15b6537d1d2 (diff)
downloadydb-f9e36e78609de5b801f829466f06ccc9a87dc00b.tar.gz
YT-23616: Move ReadTable to THttpRawClient
commit_hash:c145049aef2f4ccaff537670be830d81ebc6f8d6
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp105
-rw-r--r--yt/cpp/mapreduce/client/client_reader.h5
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp21
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp19
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h6
7 files changed, 75 insertions, 93 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index b312716877..e7538a22da 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -166,99 +166,28 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest();
}
- bool areRangesUpdated = false;
+ auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
- while (true) {
- CurrentRequestRetryPolicy_->NotifyNewAttempt();
-
- THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
- if (Context_.ServiceTicketAuth) {
- header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ if (rowIndex.Defined()) {
+ auto& ranges = Path_.MutableRanges();
+ if (ranges.Empty()) {
+ ranges.ConstructInPlace(TVector{TReadRange()});
} else {
- header.SetToken(Context_.Token);
- }
-
- if (Context_.ImpersonationUser) {
- header.SetImpersonationUser(*Context_.ImpersonationUser);
- }
-
- auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
- header.AddTransactionId(transactionId);
-
- const auto& controlAttributes = Options_.ControlAttributes_;
- header.AddParameter("control_attributes", TNode()
- ("enable_row_index", controlAttributes.EnableRowIndex_)
- ("enable_range_index", controlAttributes.EnableRangeIndex_));
- header.SetOutputFormat(Format_);
-
- header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
-
- if (rowIndex.Defined() && !areRangesUpdated) {
- auto& ranges = Path_.MutableRanges();
- if (ranges.Empty()) {
- ranges.ConstructInPlace(TVector{TReadRange()});
- } else {
- if (rangeIndex.GetOrElse(0) >= ranges->size()) {
- ythrow yexception()
- << "range index " << rangeIndex.GetOrElse(0)
- << " is out of range, input range count is " << ranges->size();
- }
- ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
+ if (rangeIndex.GetOrElse(0) >= ranges->size()) {
+ ythrow yexception()
+ << "range index " << rangeIndex.GetOrElse(0)
+ << " is out of range, input range count is " << ranges->size();
}
- ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
- areRangesUpdated = true;
- }
-
- header.MergeParameters(FormIORequestParameters(Path_, Options_));
-
- auto requestId = CreateGuidAsString();
-
- try {
- const auto proxyName = GetProxyForHeavyRequest(Context_);
- UpdateHeaderForProxyIfNeed(proxyName, Context_, header);
- Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header);
-
- Input_ = Response_->GetResponseStream();
-
- YT_LOG_DEBUG(
- "RSP %v - table stream (RangeIndex: %v, RowIndex: %v)",
- requestId,
- rangeIndex,
- rowIndex);
-
- return;
- } catch (const TErrorResponse& e) {
- LogRequestError(
- requestId,
- header,
- e.what(),
- CurrentRequestRetryPolicy_->GetAttemptDescription());
-
- if (!IsRetriable(e)) {
- throw;
- }
- auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e);
- if (!backoff) {
- throw;
- }
- NDetail::TWaitProxy::Get()->Sleep(*backoff);
- } catch (const std::exception& e) {
- LogRequestError(
- requestId,
- header,
- e.what(),
- CurrentRequestRetryPolicy_->GetAttemptDescription());
-
- Response_.reset();
- Input_ = nullptr;
-
- auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e);
- if (!backoff) {
- throw;
- }
- NDetail::TWaitProxy::Get()->Sleep(*backoff);
+ ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
}
+ ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
}
+
+ Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ CurrentRequestRetryPolicy_,
+ [this, &transactionId] (TMutationId /*mutationId*/) {
+ return RawClient_->ReadTable(transactionId, Path_, Format_, Options_);
+ });
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h
index 61bc698340..3f73080046 100644
--- a/yt/cpp/mapreduce/client/client_reader.h
+++ b/yt/cpp/mapreduce/client/client_reader.h
@@ -2,8 +2,6 @@
#include <yt/cpp/mapreduce/common/fwd.h>
-#include <yt/cpp/mapreduce/interface/io.h>
-
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/http.h>
@@ -55,8 +53,7 @@ private:
THolder<TPingableTransaction> ReadTransaction_;
- NHttpClient::IHttpResponsePtr Response_;
- IInputStream* Input_;
+ std::unique_ptr<IInputStream> Input_;
IRequestRetryPolicyPtr CurrentRequestRetryPolicy_;
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index b3ba487d2a..0377e0d064 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -279,6 +279,12 @@ public:
const TYPath& path,
const TAlterTableOptions& options = {}) = 0;
+ virtual std::unique_ptr<IInputStream> ReadTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableReaderOptions& options = {}) = 0;
+
virtual void AlterTableReplica(
TMutationId& mutationId,
const TReplicaId& replicaId,
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 582c66f682..469ad1d4ea 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -14,6 +14,8 @@
#include <yt/cpp/mapreduce/interface/operation.h>
#include <yt/cpp/mapreduce/interface/tvm.h>
+#include <yt/cpp/mapreduce/io/helpers.h>
+
#include <library/cpp/yson/node/node_io.h>
namespace NYT::NDetail {
@@ -798,6 +800,25 @@ TNode::TListType THttpRawClient::SelectRows(
return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
}
+std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableReaderOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
+ header.SetOutputFormat(format);
+ header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
+ header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, Context_.Config->Prefix, path, options));
+ header.MergeParameters(FormIORequestParameters(path, options));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
void THttpRawClient::AlterTable(
TMutationId& mutationId,
const TTransactionId& transactionId,
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index d8e9e42434..2fe0dd771c 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -272,6 +272,12 @@ public:
const TString& query,
const TSelectRowsOptions& options = {}) override;
+ std::unique_ptr<IInputStream> ReadTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableReaderOptions& options = {}) override;
+
void AlterTable(
TMutationId& mutationId,
const TTransactionId& transactionId,
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
index 8474bd0edc..4d6edede16 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -4,6 +4,7 @@
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/fluent.h>
#include <yt/cpp/mapreduce/interface/operation.h>
#include <yt/cpp/mapreduce/interface/serialize.h>
@@ -639,13 +640,29 @@ TNode SerializeParametersForDeleteRows(
TNode SerializeParametersForTrimRows(
const TString& pathPrefix,
const TYPath& path,
- const TTrimRowsOptions& /* options*/)
+ const TTrimRowsOptions& /*options*/)
{
TNode result;
SetPathParam(&result, pathPrefix, path);
return result;
}
+TNode SerializeParamsForReadTable(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TRichYPath& path,
+ const TTableReaderOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["control_attributes"] = BuildYsonNodeFluently()
+ .BeginMap()
+ .Item("enable_row_index").Value(options.ControlAttributes_.EnableRowIndex_)
+ .Item("enable_range_index").Value(options.ControlAttributes_.EnableRangeIndex_)
+ .EndMap();
+ return result;
+}
+
TNode SerializeParamsForParseYPath(const TRichYPath& path)
{
TNode result;
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
index 655198248c..07879b0dc8 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
@@ -146,6 +146,12 @@ TNode SerializeParametersForTrimRows(
const TYPath& path,
const TTrimRowsOptions& options);
+TNode SerializeParamsForReadTable(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TRichYPath& path,
+ const TTableReaderOptions& options);
+
TNode SerializeParamsForParseYPath(
const TRichYPath& path);