diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-23 15:18:22 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-23 16:12:22 +0300 |
commit | fac5ce91c704b4a1d3aa2f059842df3ba3a955bd (patch) | |
tree | 00fa7a628e7aa63df387c095cf8a2c7cbac3e3e9 | |
parent | fbc08848ae794f01d5107e6b556045e62fd36afe (diff) | |
download | ydb-fac5ce91c704b4a1d3aa2f059842df3ba3a955bd.tar.gz |
YT-23616: Move read_blob_table and read_file to THttpRawClient
commit_hash:6c9209d019fa324c9ae4f182b18e7d089a32937d
-rw-r--r-- | yt/cpp/mapreduce/client/file_reader.cpp | 172 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/file_reader.h | 26 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/client_method_options.h | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/helpers.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 36 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp | 31 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h | 6 |
11 files changed, 153 insertions, 161 deletions
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index 06463d0af2..f88b40e38b 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -31,7 +31,7 @@ using ::ToString; static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) { if (options.Length_) { - return options.Offset_.GetOrElse(0) + *options.Length_; + return options.Offset_ + *options.Length_; } else { return Nothing(); } @@ -46,7 +46,6 @@ TStreamReaderBase::TStreamReaderBase( const TClientContext& context, const TTransactionId& transactionId) : RawClient_(rawClient) - , Context_(context) , ClientRetryPolicy_(std::move(clientRetryPolicy)) , ReadTransaction_(MakeHolder<TPingableTransaction>( RawClient_, @@ -64,59 +63,26 @@ TYPath TStreamReaderBase::Snapshot(const TYPath& path) return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path); } -TString TStreamReaderBase::GetActiveRequestId() const -{ - if (Response_) { - return Response_->GetRequestId();; - } else { - return "<no-active-request>"; - } -} - size_t TStreamReaderBase::DoRead(void* buf, size_t len) { - const int retryCount = Context_.Config->ReadRetryCount; - for (int attempt = 1; attempt <= retryCount; ++attempt) { - try { - if (!Input_) { - Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_); - Input_ = Response_->GetResponseStream(); - } - if (len == 0) { - return 0; - } - const size_t read = Input_->Read(buf, len); - CurrentOffset_ += read; - return read; - } catch (TErrorResponse& e) { - YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", - GetActiveRequestId(), - e.what(), - attempt, - retryCount); - - if (!IsRetriable(e) || attempt == retryCount) { - throw; - } - TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); - } catch (std::exception& e) { - YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", - GetActiveRequestId(), - e.what(), - attempt, - retryCount); - - // Invalidate connection. - Response_.reset(); - - if (attempt == retryCount) { + if (len == 0) { + return 0; + } + return RequestWithRetry<size_t>( + ClientRetryPolicy_->CreatePolicyForReaderRequest(), + [this, &buf, len] (TMutationId /*mutationId*/) { + try { + if (!Input_) { + Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_); + } + const size_t read = Input_->Read(buf, len); + CurrentOffset_ += read; + return read; + } catch (...) { + Input_ = nullptr; throw; } - TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); - } - Input_ = nullptr; - } - Y_UNREACHABLE(); // we should either return or throw from loop above + }); } //////////////////////////////////////////////////////////////////////////////// @@ -130,57 +96,25 @@ TFileReader::TFileReader( const TTransactionId& transactionId, const TFileReaderOptions& options) : TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId) - , FileReaderOptions_(options) + , StartOffset_(options.Offset_) + , EndOffset_(GetEndOffset(options)) + , Options_(options) , Path_(path) - , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0)) - , EndOffset_(GetEndOffset(FileReaderOptions_)) { Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_); } -NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes) { const ui64 currentOffset = StartOffset_ + readBytes; - TString hostName = GetProxyForHeavyRequest(context); - - THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion)); - if (context.ServiceTicketAuth) { - header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(context.Token); - } - - if (context.ImpersonationUser) { - header.SetImpersonationUser(*context.ImpersonationUser); - } - - UpdateHeaderForProxyIfNeed(hostName, context, header); - - header.AddTransactionId(transactionId); - header.SetOutputFormat(TMaybe<TFormat>()); // Binary format if (EndOffset_) { Y_ABORT_UNLESS(*EndOffset_ >= currentOffset); - FileReaderOptions_.Length(*EndOffset_ - currentOffset); - } - FileReaderOptions_.Offset(currentOffset); - header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_)); - - header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); - - auto requestId = CreateGuidAsString(); - NHttpClient::IHttpResponsePtr response; - try { - response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); - } catch (const std::exception& ex) { - LogRequestError(requestId, header, ex.what(), ""); - throw; + Options_.Length(*EndOffset_ - currentOffset); } - YT_LOG_DEBUG("RSP %v - file stream", - requestId); - - return response; + Options_.Offset(currentOffset); + return RawClient_->ReadFile(transactionId, Path_, Options_); } //////////////////////////////////////////////////////////////////////////////// @@ -195,66 +129,22 @@ TBlobTableReader::TBlobTableReader( const TTransactionId& transactionId, const TBlobTableReaderOptions& options) : TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId) + , StartOffset_(options.Offset_) , Key_(key) , Options_(options) { Path_ = TStreamReaderBase::Snapshot(path); } -NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes) { - TString hostName = GetProxyForHeavyRequest(context); - - THttpHeader header("GET", "read_blob_table"); - if (context.ServiceTicketAuth) { - header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(context.Token); - } - - if (context.ImpersonationUser) { - header.SetImpersonationUser(*context.ImpersonationUser); - } - - UpdateHeaderForProxyIfNeed(hostName, context, header); - - header.AddTransactionId(transactionId); - header.SetOutputFormat(TMaybe<TFormat>()); // Binary format - - const ui64 currentOffset = Options_.Offset_ + readBytes; + const i64 currentOffset = StartOffset_ + readBytes; const i64 startPartIndex = currentOffset / Options_.PartSize_; - const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex; - auto lowerLimitKey = Key_; - lowerLimitKey.Parts_.push_back(startPartIndex); - auto upperLimitKey = Key_; - upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max()); - TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange() - .LowerLimit(TReadLimit().Key(lowerLimitKey)) - .UpperLimit(TReadLimit().Key(upperLimitKey)))); - params["start_part_index"] = TNode(startPartIndex); - params["offset"] = skipBytes; - if (Options_.PartIndexColumnName_) { - params["part_index_column_name"] = *Options_.PartIndexColumnName_; - } - if (Options_.DataColumnName_) { - params["data_column_name"] = *Options_.DataColumnName_; - } - params["part_size"] = Options_.PartSize_; - header.MergeParameters(params); - header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); - - auto requestId = CreateGuidAsString(); - NHttpClient::IHttpResponsePtr response; - try { - response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); - } catch (const std::exception& ex) { - LogRequestError(requestId, header, ex.what(), ""); - throw; - } + const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex; - YT_LOG_DEBUG("RSP %v - blob table stream", - requestId); - return response; + Options_.Offset(skipBytes); + Options_.StartPartIndex(startPartIndex); + return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h index 48248696d3..8aafdc860d 100644 --- a/yt/cpp/mapreduce/client/file_reader.h +++ b/yt/cpp/mapreduce/client/file_reader.h @@ -11,7 +11,6 @@ class IInputStream; namespace NYT { -class THttpRequest; class TPingableTransaction; namespace NDetail { @@ -35,19 +34,16 @@ protected: protected: const IRawClientPtr RawClient_; - const TClientContext Context_; private: size_t DoRead(void* buf, size_t len) override; - virtual NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) = 0; - TString GetActiveRequestId() const; + virtual std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0; private: const IClientRetryPolicyPtr ClientRetryPolicy_; TFileReaderOptions FileReaderOptions_; - NHttpClient::IHttpResponsePtr Response_; - IInputStream* Input_ = nullptr; + std::unique_ptr<IInputStream> Input_; THolder<TPingableTransaction> ReadTransaction_; @@ -67,17 +63,17 @@ public: ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& transactionId, - const TFileReaderOptions& options = TFileReaderOptions()); + const TFileReaderOptions& options = {}); private: - NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; private: - TFileReaderOptions FileReaderOptions_; - - TRichYPath Path_; const ui64 StartOffset_; const TMaybe<ui64> EndOffset_; + + TFileReaderOptions Options_; + TRichYPath Path_; }; //////////////////////////////////////////////////////////////////////////////// @@ -94,14 +90,16 @@ public: ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& transactionId, - const TBlobTableReaderOptions& options); + const TBlobTableReaderOptions& options = {}); private: - NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; private: + const ui64 StartOffset_; const TKey Key_; - const TBlobTableReaderOptions Options_; + + TBlobTableReaderOptions Options_; TYPath Path_; }; diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp index 772a2ab0cd..8146eb8b46 100644 --- a/yt/cpp/mapreduce/common/retry_lib.cpp +++ b/yt/cpp/mapreduce/common/retry_lib.cpp @@ -118,6 +118,11 @@ public: return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_)); } + IRequestRetryPolicyPtr CreatePolicyForReaderRequest() override + { + return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->ReadRetryCount), Config_)); + } + IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy) { auto config = RetryConfigProvider_->CreateRetryConfig(); diff --git a/yt/cpp/mapreduce/common/retry_lib.h b/yt/cpp/mapreduce/common/retry_lib.h index c6c061f614..5b406b075f 100644 --- a/yt/cpp/mapreduce/common/retry_lib.h +++ b/yt/cpp/mapreduce/common/retry_lib.h @@ -48,6 +48,7 @@ class IClientRetryPolicy public: virtual IRequestRetryPolicyPtr CreatePolicyForGenericRequest() = 0; virtual IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() = 0; + virtual IRequestRetryPolicyPtr CreatePolicyForReaderRequest() = 0; }; diff --git a/yt/cpp/mapreduce/interface/client_method_options.h b/yt/cpp/mapreduce/interface/client_method_options.h index 9bfb79753d..d457bf5f43 100644 --- a/yt/cpp/mapreduce/interface/client_method_options.h +++ b/yt/cpp/mapreduce/interface/client_method_options.h @@ -287,9 +287,12 @@ struct TBlobTableReaderOptions /// /// All blob parts except the last part of the blob must be of this size /// otherwise blob table reader emits error. - FLUENT_FIELD_DEFAULT(ui64, PartSize, 4 * 1024 * 1024); + FLUENT_FIELD_DEFAULT(i64, PartSize, 4 * 1024 * 1024); - /// @brief Offset from which to start reading + /// @brief Part index from which to start reading. + FLUENT_FIELD_DEFAULT(i64, StartPartIndex, 0); + + /// @brief Offset from which to start reading. FLUENT_FIELD_DEFAULT(i64, Offset, 0); }; @@ -468,7 +471,7 @@ struct TFileReaderOptions /// @brief Offset to start reading from. /// /// By default reading is started from the beginning of the file. - FLUENT_FIELD_OPTION(i64, Offset); + FLUENT_FIELD_DEFAULT(i64, Offset, 0); /// /// @brief Maximum length to read. diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 0377e0d064..4994826863 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -209,6 +209,12 @@ public: const std::vector<TYPath>& tablePaths, const TSkyShareTableOptions& options = {}) = 0; + // Files + virtual std::unique_ptr<IInputStream> ReadFile( + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileReaderOptions& options = {}) = 0; + // File cache virtual TMaybe<TYPath> GetFileFromCache( @@ -285,6 +291,12 @@ public: const TMaybe<TFormat>& format, const TTableReaderOptions& options = {}) = 0; + virtual std::unique_ptr<IInputStream> ReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options = {}) = 0; + virtual void AlterTableReplica( TMutationId& mutationId, const TReplicaId& replicaId, diff --git a/yt/cpp/mapreduce/io/helpers.h b/yt/cpp/mapreduce/io/helpers.h index 0733ff417c..0d3ec40ab6 100644 --- a/yt/cpp/mapreduce/io/helpers.h +++ b/yt/cpp/mapreduce/io/helpers.h @@ -63,9 +63,7 @@ inline TNode FormIORequestParameters( if (options.Config_) { params[TIOOptionsTraits<TTableReaderOptions>::ConfigName] = *options.Config_; } - if (options.Offset_) { - params["offset"] = *options.Offset_; - } + params["offset"] = options.Offset_; if (options.Length_) { params["length"] = *options.Length_; } diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 469ad1d4ea..65bfa01cea 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -614,6 +614,24 @@ NHttpClient::IHttpResponsePtr THttpRawClient::SkyShareTable( return RequestWithoutRetry(skyApiHost, mutationId, header, ""); } +std::unique_ptr<IInputStream> THttpRawClient::ReadFile( + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileReaderOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion)); + header.AddTransactionId(transactionId); + header.SetOutputFormat(TMaybe<TFormat>()); // Binary format + header.MergeParameters(FormIORequestParameters(path, options)); + header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); + + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); +} + TMaybe<TYPath> THttpRawClient::GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, @@ -819,6 +837,24 @@ std::unique_ptr<IInputStream> THttpRawClient::ReadTable( return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); } +std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "read_blob_table"); + header.SetOutputFormat(TMaybe<TFormat>()); // Binary format + header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); + header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options)); + + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); +} + void THttpRawClient::AlterTable( TMutationId& mutationId, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index 2fe0dd771c..e540d1b331 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -208,6 +208,12 @@ public: const std::vector<TYPath>& tablePaths, const TSkyShareTableOptions& options = {}) override; + // Files + std::unique_ptr<IInputStream> ReadFile( + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileReaderOptions& options = {}) override; + // File cache TMaybe<TYPath> GetFileFromCache( @@ -278,6 +284,12 @@ public: const TMaybe<TFormat>& format, const TTableReaderOptions& options = {}) override; + std::unique_ptr<IInputStream> ReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options = {}) override; + void AlterTable( TMutationId& mutationId, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index 4d6edede16..2869ddcc0f 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -663,6 +663,37 @@ TNode SerializeParamsForReadTable( return result; } +TNode SerializeParamsForReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options) +{ + auto lowerLimitKey = key; + lowerLimitKey.Parts_.push_back(options.StartPartIndex_); + auto upperLimitKey = key; + upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max()); + + TNode result = PathToParamNode( + TRichYPath(path). + AddRange(TReadRange() + .LowerLimit(TReadLimit().Key(lowerLimitKey)) + .UpperLimit(TReadLimit().Key(upperLimitKey)))); + + SetTransactionIdParam(&result, transactionId); + + result["start_part_index"] = options.StartPartIndex_; + result["offset"] = options.Offset_; + if (options.PartIndexColumnName_) { + result["part_index_column_name"] = *options.PartIndexColumnName_; + } + if (options.DataColumnName_) { + result["data_column_name"] = *options.DataColumnName_; + } + result["part_size"] = options.PartSize_; + return result; +} + TNode SerializeParamsForParseYPath(const TRichYPath& path) { TNode result; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index 07879b0dc8..acbf003b5c 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -152,6 +152,12 @@ TNode SerializeParamsForReadTable( const TRichYPath& path, const TTableReaderOptions& options); +TNode SerializeParamsForReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options); + TNode SerializeParamsForParseYPath( const TRichYPath& path); |