summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/file_reader.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-23 15:18:22 +0300
committerhiddenpath <[email protected]>2024-12-23 16:12:22 +0300
commitfac5ce91c704b4a1d3aa2f059842df3ba3a955bd (patch)
tree00fa7a628e7aa63df387c095cf8a2c7cbac3e3e9 /yt/cpp/mapreduce/client/file_reader.cpp
parentfbc08848ae794f01d5107e6b556045e62fd36afe (diff)
YT-23616: Move read_blob_table and read_file to THttpRawClient
commit_hash:6c9209d019fa324c9ae4f182b18e7d089a32937d
Diffstat (limited to 'yt/cpp/mapreduce/client/file_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp172
1 files changed, 31 insertions, 141 deletions
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index 06463d0af24..f88b40e38b6 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -31,7 +31,7 @@ using ::ToString;
static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
if (options.Length_) {
- return options.Offset_.GetOrElse(0) + *options.Length_;
+ return options.Offset_ + *options.Length_;
} else {
return Nothing();
}
@@ -46,7 +46,6 @@ TStreamReaderBase::TStreamReaderBase(
const TClientContext& context,
const TTransactionId& transactionId)
: RawClient_(rawClient)
- , Context_(context)
, ClientRetryPolicy_(std::move(clientRetryPolicy))
, ReadTransaction_(MakeHolder<TPingableTransaction>(
RawClient_,
@@ -64,59 +63,26 @@ TYPath TStreamReaderBase::Snapshot(const TYPath& path)
return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path);
}
-TString TStreamReaderBase::GetActiveRequestId() const
-{
- if (Response_) {
- return Response_->GetRequestId();;
- } else {
- return "<no-active-request>";
- }
-}
-
size_t TStreamReaderBase::DoRead(void* buf, size_t len)
{
- const int retryCount = Context_.Config->ReadRetryCount;
- for (int attempt = 1; attempt <= retryCount; ++attempt) {
- try {
- if (!Input_) {
- Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_);
- Input_ = Response_->GetResponseStream();
- }
- if (len == 0) {
- return 0;
- }
- const size_t read = Input_->Read(buf, len);
- CurrentOffset_ += read;
- return read;
- } catch (TErrorResponse& e) {
- YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
- GetActiveRequestId(),
- e.what(),
- attempt,
- retryCount);
-
- if (!IsRetriable(e) || attempt == retryCount) {
- throw;
- }
- TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
- } catch (std::exception& e) {
- YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
- GetActiveRequestId(),
- e.what(),
- attempt,
- retryCount);
-
- // Invalidate connection.
- Response_.reset();
-
- if (attempt == retryCount) {
+ if (len == 0) {
+ return 0;
+ }
+ return RequestWithRetry<size_t>(
+ ClientRetryPolicy_->CreatePolicyForReaderRequest(),
+ [this, &buf, len] (TMutationId /*mutationId*/) {
+ try {
+ if (!Input_) {
+ Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_);
+ }
+ const size_t read = Input_->Read(buf, len);
+ CurrentOffset_ += read;
+ return read;
+ } catch (...) {
+ Input_ = nullptr;
throw;
}
- TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
- }
- Input_ = nullptr;
- }
- Y_UNREACHABLE(); // we should either return or throw from loop above
+ });
}
////////////////////////////////////////////////////////////////////////////////
@@ -130,57 +96,25 @@ TFileReader::TFileReader(
const TTransactionId& transactionId,
const TFileReaderOptions& options)
: TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
- , FileReaderOptions_(options)
+ , StartOffset_(options.Offset_)
+ , EndOffset_(GetEndOffset(options))
+ , Options_(options)
, Path_(path)
- , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0))
- , EndOffset_(GetEndOffset(FileReaderOptions_))
{
Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
}
-NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const ui64 currentOffset = StartOffset_ + readBytes;
- TString hostName = GetProxyForHeavyRequest(context);
-
- THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion));
- if (context.ServiceTicketAuth) {
- header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
- } else {
- header.SetToken(context.Token);
- }
-
- if (context.ImpersonationUser) {
- header.SetImpersonationUser(*context.ImpersonationUser);
- }
-
- UpdateHeaderForProxyIfNeed(hostName, context, header);
-
- header.AddTransactionId(transactionId);
- header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
if (EndOffset_) {
Y_ABORT_UNLESS(*EndOffset_ >= currentOffset);
- FileReaderOptions_.Length(*EndOffset_ - currentOffset);
- }
- FileReaderOptions_.Offset(currentOffset);
- header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_));
-
- header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
-
- auto requestId = CreateGuidAsString();
- NHttpClient::IHttpResponsePtr response;
- try {
- response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
- } catch (const std::exception& ex) {
- LogRequestError(requestId, header, ex.what(), "");
- throw;
+ Options_.Length(*EndOffset_ - currentOffset);
}
- YT_LOG_DEBUG("RSP %v - file stream",
- requestId);
-
- return response;
+ Options_.Offset(currentOffset);
+ return RawClient_->ReadFile(transactionId, Path_, Options_);
}
////////////////////////////////////////////////////////////////////////////////
@@ -195,66 +129,22 @@ TBlobTableReader::TBlobTableReader(
const TTransactionId& transactionId,
const TBlobTableReaderOptions& options)
: TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
+ , StartOffset_(options.Offset_)
, Key_(key)
, Options_(options)
{
Path_ = TStreamReaderBase::Snapshot(path);
}
-NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
- TString hostName = GetProxyForHeavyRequest(context);
-
- THttpHeader header("GET", "read_blob_table");
- if (context.ServiceTicketAuth) {
- header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
- } else {
- header.SetToken(context.Token);
- }
-
- if (context.ImpersonationUser) {
- header.SetImpersonationUser(*context.ImpersonationUser);
- }
-
- UpdateHeaderForProxyIfNeed(hostName, context, header);
-
- header.AddTransactionId(transactionId);
- header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
-
- const ui64 currentOffset = Options_.Offset_ + readBytes;
+ const i64 currentOffset = StartOffset_ + readBytes;
const i64 startPartIndex = currentOffset / Options_.PartSize_;
- const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
- auto lowerLimitKey = Key_;
- lowerLimitKey.Parts_.push_back(startPartIndex);
- auto upperLimitKey = Key_;
- upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max());
- TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange()
- .LowerLimit(TReadLimit().Key(lowerLimitKey))
- .UpperLimit(TReadLimit().Key(upperLimitKey))));
- params["start_part_index"] = TNode(startPartIndex);
- params["offset"] = skipBytes;
- if (Options_.PartIndexColumnName_) {
- params["part_index_column_name"] = *Options_.PartIndexColumnName_;
- }
- if (Options_.DataColumnName_) {
- params["data_column_name"] = *Options_.DataColumnName_;
- }
- params["part_size"] = Options_.PartSize_;
- header.MergeParameters(params);
- header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
-
- auto requestId = CreateGuidAsString();
- NHttpClient::IHttpResponsePtr response;
- try {
- response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
- } catch (const std::exception& ex) {
- LogRequestError(requestId, header, ex.what(), "");
- throw;
- }
+ const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
- YT_LOG_DEBUG("RSP %v - blob table stream",
- requestId);
- return response;
+ Options_.Offset(skipBytes);
+ Options_.StartPartIndex(startPartIndex);
+ return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_);
}
////////////////////////////////////////////////////////////////////////////////