diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-12-24 22:01:20 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-12-24 22:01:20 +0000 |
commit | bd0e2de0b1035962a4d5b9e847eaa6508fad7fcf (patch) | |
tree | 79878ca309f9f7fada064f9b78b4223af4635f28 /yt/cpp | |
parent | be43a4691ebdd4dbe260a8d77df4cd8423b14c05 (diff) | |
parent | e6bd80ded127cd064560f7ea471974b602770cb1 (diff) | |
download | ydb-bd0e2de0b1035962a4d5b9e847eaa6508fad7fcf.tar.gz |
Merge branch 'PR'
Diffstat (limited to 'yt/cpp')
19 files changed, 384 insertions, 376 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 9e3976b144..9fcb82f5b7 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1352,11 +1352,27 @@ TNode::TListType TClient::SkyShareTable( const TSkyShareTableOptions& options) { CheckShutdown(); - return NRawClient::SkyShareTable( - ClientRetryPolicy_->CreatePolicyForGenericRequest(), - Context_, - tablePaths, - options); + + // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu + // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK). + NHttpClient::IHttpResponsePtr response; + do { + response = RequestWithRetry<NHttpClient::IHttpResponsePtr>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &tablePaths, &options] (TMutationId /*mutationId*/) { + return RawClient_->SkyShareTable(tablePaths, options); + }); + TWaitProxy::Get()->Sleep(TDuration::Seconds(5)); + } while (response->GetStatusCode() != 200); + + if (options.KeyColumns_) { + return NodeFromJsonString(response->GetResponse())["torrents"].AsList(); + } else { + TNode torrent; + torrent["key"] = TNode::CreateList(); + torrent["rbtorrent"] = response->GetResponse(); + return TNode::TListType{torrent}; + } } TCheckPermissionResponse TClient::CheckPermission( diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index b312716877..e7538a22da 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -166,99 +166,28 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest(); } - bool areRangesUpdated = false; + auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_); - while (true) { - CurrentRequestRetryPolicy_->NotifyNewAttempt(); - - THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion)); - if (Context_.ServiceTicketAuth) { - header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket()); + if (rowIndex.Defined()) { + auto& ranges = Path_.MutableRanges(); + if (ranges.Empty()) { + ranges.ConstructInPlace(TVector{TReadRange()}); } else { - header.SetToken(Context_.Token); - } - - if (Context_.ImpersonationUser) { - header.SetImpersonationUser(*Context_.ImpersonationUser); - } - - auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_); - header.AddTransactionId(transactionId); - - const auto& controlAttributes = Options_.ControlAttributes_; - header.AddParameter("control_attributes", TNode() - ("enable_row_index", controlAttributes.EnableRowIndex_) - ("enable_range_index", controlAttributes.EnableRangeIndex_)); - header.SetOutputFormat(Format_); - - header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); - - if (rowIndex.Defined() && !areRangesUpdated) { - auto& ranges = Path_.MutableRanges(); - if (ranges.Empty()) { - ranges.ConstructInPlace(TVector{TReadRange()}); - } else { - if (rangeIndex.GetOrElse(0) >= ranges->size()) { - ythrow yexception() - << "range index " << rangeIndex.GetOrElse(0) - << " is out of range, input range count is " << ranges->size(); - } - ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); + if (rangeIndex.GetOrElse(0) >= ranges->size()) { + ythrow yexception() + << "range index " << rangeIndex.GetOrElse(0) + << " is out of range, input range count is " << ranges->size(); } - ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); - areRangesUpdated = true; - } - - header.MergeParameters(FormIORequestParameters(Path_, Options_)); - - auto requestId = CreateGuidAsString(); - - try { - const auto proxyName = GetProxyForHeavyRequest(Context_); - UpdateHeaderForProxyIfNeed(proxyName, Context_, header); - Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header); - - Input_ = Response_->GetResponseStream(); - - YT_LOG_DEBUG( - "RSP %v - table stream (RangeIndex: %v, RowIndex: %v)", - requestId, - rangeIndex, - rowIndex); - - return; - } catch (const TErrorResponse& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - if (!IsRetriable(e)) { - throw; - } - auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); - } catch (const std::exception& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - Response_.reset(); - Input_ = nullptr; - - auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); + ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); } + ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } + + Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>( + CurrentRequestRetryPolicy_, + [this, &transactionId] (TMutationId /*mutationId*/) { + return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); + }); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 61bc698340..3f73080046 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -2,8 +2,6 @@ #include <yt/cpp/mapreduce/common/fwd.h> -#include <yt/cpp/mapreduce/interface/io.h> - #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/http.h> @@ -55,8 +53,7 @@ private: THolder<TPingableTransaction> ReadTransaction_; - NHttpClient::IHttpResponsePtr Response_; - IInputStream* Input_; + std::unique_ptr<IInputStream> Input_; IRequestRetryPolicyPtr CurrentRequestRetryPolicy_; diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index 06463d0af2..f88b40e38b 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -31,7 +31,7 @@ using ::ToString; static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) { if (options.Length_) { - return options.Offset_.GetOrElse(0) + *options.Length_; + return options.Offset_ + *options.Length_; } else { return Nothing(); } @@ -46,7 +46,6 @@ TStreamReaderBase::TStreamReaderBase( const TClientContext& context, const TTransactionId& transactionId) : RawClient_(rawClient) - , Context_(context) , ClientRetryPolicy_(std::move(clientRetryPolicy)) , ReadTransaction_(MakeHolder<TPingableTransaction>( RawClient_, @@ -64,59 +63,26 @@ TYPath TStreamReaderBase::Snapshot(const TYPath& path) return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path); } -TString TStreamReaderBase::GetActiveRequestId() const -{ - if (Response_) { - return Response_->GetRequestId();; - } else { - return "<no-active-request>"; - } -} - size_t TStreamReaderBase::DoRead(void* buf, size_t len) { - const int retryCount = Context_.Config->ReadRetryCount; - for (int attempt = 1; attempt <= retryCount; ++attempt) { - try { - if (!Input_) { - Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_); - Input_ = Response_->GetResponseStream(); - } - if (len == 0) { - return 0; - } - const size_t read = Input_->Read(buf, len); - CurrentOffset_ += read; - return read; - } catch (TErrorResponse& e) { - YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", - GetActiveRequestId(), - e.what(), - attempt, - retryCount); - - if (!IsRetriable(e) || attempt == retryCount) { - throw; - } - TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); - } catch (std::exception& e) { - YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", - GetActiveRequestId(), - e.what(), - attempt, - retryCount); - - // Invalidate connection. - Response_.reset(); - - if (attempt == retryCount) { + if (len == 0) { + return 0; + } + return RequestWithRetry<size_t>( + ClientRetryPolicy_->CreatePolicyForReaderRequest(), + [this, &buf, len] (TMutationId /*mutationId*/) { + try { + if (!Input_) { + Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_); + } + const size_t read = Input_->Read(buf, len); + CurrentOffset_ += read; + return read; + } catch (...) { + Input_ = nullptr; throw; } - TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); - } - Input_ = nullptr; - } - Y_UNREACHABLE(); // we should either return or throw from loop above + }); } //////////////////////////////////////////////////////////////////////////////// @@ -130,57 +96,25 @@ TFileReader::TFileReader( const TTransactionId& transactionId, const TFileReaderOptions& options) : TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId) - , FileReaderOptions_(options) + , StartOffset_(options.Offset_) + , EndOffset_(GetEndOffset(options)) + , Options_(options) , Path_(path) - , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0)) - , EndOffset_(GetEndOffset(FileReaderOptions_)) { Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_); } -NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes) { const ui64 currentOffset = StartOffset_ + readBytes; - TString hostName = GetProxyForHeavyRequest(context); - - THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion)); - if (context.ServiceTicketAuth) { - header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(context.Token); - } - - if (context.ImpersonationUser) { - header.SetImpersonationUser(*context.ImpersonationUser); - } - - UpdateHeaderForProxyIfNeed(hostName, context, header); - - header.AddTransactionId(transactionId); - header.SetOutputFormat(TMaybe<TFormat>()); // Binary format if (EndOffset_) { Y_ABORT_UNLESS(*EndOffset_ >= currentOffset); - FileReaderOptions_.Length(*EndOffset_ - currentOffset); - } - FileReaderOptions_.Offset(currentOffset); - header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_)); - - header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); - - auto requestId = CreateGuidAsString(); - NHttpClient::IHttpResponsePtr response; - try { - response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); - } catch (const std::exception& ex) { - LogRequestError(requestId, header, ex.what(), ""); - throw; + Options_.Length(*EndOffset_ - currentOffset); } - YT_LOG_DEBUG("RSP %v - file stream", - requestId); - - return response; + Options_.Offset(currentOffset); + return RawClient_->ReadFile(transactionId, Path_, Options_); } //////////////////////////////////////////////////////////////////////////////// @@ -195,66 +129,22 @@ TBlobTableReader::TBlobTableReader( const TTransactionId& transactionId, const TBlobTableReaderOptions& options) : TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId) + , StartOffset_(options.Offset_) , Key_(key) , Options_(options) { Path_ = TStreamReaderBase::Snapshot(path); } -NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes) { - TString hostName = GetProxyForHeavyRequest(context); - - THttpHeader header("GET", "read_blob_table"); - if (context.ServiceTicketAuth) { - header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(context.Token); - } - - if (context.ImpersonationUser) { - header.SetImpersonationUser(*context.ImpersonationUser); - } - - UpdateHeaderForProxyIfNeed(hostName, context, header); - - header.AddTransactionId(transactionId); - header.SetOutputFormat(TMaybe<TFormat>()); // Binary format - - const ui64 currentOffset = Options_.Offset_ + readBytes; + const i64 currentOffset = StartOffset_ + readBytes; const i64 startPartIndex = currentOffset / Options_.PartSize_; - const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex; - auto lowerLimitKey = Key_; - lowerLimitKey.Parts_.push_back(startPartIndex); - auto upperLimitKey = Key_; - upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max()); - TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange() - .LowerLimit(TReadLimit().Key(lowerLimitKey)) - .UpperLimit(TReadLimit().Key(upperLimitKey)))); - params["start_part_index"] = TNode(startPartIndex); - params["offset"] = skipBytes; - if (Options_.PartIndexColumnName_) { - params["part_index_column_name"] = *Options_.PartIndexColumnName_; - } - if (Options_.DataColumnName_) { - params["data_column_name"] = *Options_.DataColumnName_; - } - params["part_size"] = Options_.PartSize_; - header.MergeParameters(params); - header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); - - auto requestId = CreateGuidAsString(); - NHttpClient::IHttpResponsePtr response; - try { - response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); - } catch (const std::exception& ex) { - LogRequestError(requestId, header, ex.what(), ""); - throw; - } + const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex; - YT_LOG_DEBUG("RSP %v - blob table stream", - requestId); - return response; + Options_.Offset(skipBytes); + Options_.StartPartIndex(startPartIndex); + return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h index 48248696d3..8aafdc860d 100644 --- a/yt/cpp/mapreduce/client/file_reader.h +++ b/yt/cpp/mapreduce/client/file_reader.h @@ -11,7 +11,6 @@ class IInputStream; namespace NYT { -class THttpRequest; class TPingableTransaction; namespace NDetail { @@ -35,19 +34,16 @@ protected: protected: const IRawClientPtr RawClient_; - const TClientContext Context_; private: size_t DoRead(void* buf, size_t len) override; - virtual NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) = 0; - TString GetActiveRequestId() const; + virtual std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0; private: const IClientRetryPolicyPtr ClientRetryPolicy_; TFileReaderOptions FileReaderOptions_; - NHttpClient::IHttpResponsePtr Response_; - IInputStream* Input_ = nullptr; + std::unique_ptr<IInputStream> Input_; THolder<TPingableTransaction> ReadTransaction_; @@ -67,17 +63,17 @@ public: ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& transactionId, - const TFileReaderOptions& options = TFileReaderOptions()); + const TFileReaderOptions& options = {}); private: - NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; private: - TFileReaderOptions FileReaderOptions_; - - TRichYPath Path_; const ui64 StartOffset_; const TMaybe<ui64> EndOffset_; + + TFileReaderOptions Options_; + TRichYPath Path_; }; //////////////////////////////////////////////////////////////////////////////// @@ -94,14 +90,16 @@ public: ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& transactionId, - const TBlobTableReaderOptions& options); + const TBlobTableReaderOptions& options = {}); private: - NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; private: + const ui64 StartOffset_; const TKey Key_; - const TBlobTableReaderOptions Options_; + + TBlobTableReaderOptions Options_; TYPath Path_; }; diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp index 772a2ab0cd..8146eb8b46 100644 --- a/yt/cpp/mapreduce/common/retry_lib.cpp +++ b/yt/cpp/mapreduce/common/retry_lib.cpp @@ -118,6 +118,11 @@ public: return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_)); } + IRequestRetryPolicyPtr CreatePolicyForReaderRequest() override + { + return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->ReadRetryCount), Config_)); + } + IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy) { auto config = RetryConfigProvider_->CreateRetryConfig(); diff --git a/yt/cpp/mapreduce/common/retry_lib.h b/yt/cpp/mapreduce/common/retry_lib.h index c6c061f614..5b406b075f 100644 --- a/yt/cpp/mapreduce/common/retry_lib.h +++ b/yt/cpp/mapreduce/common/retry_lib.h @@ -48,6 +48,7 @@ class IClientRetryPolicy public: virtual IRequestRetryPolicyPtr CreatePolicyForGenericRequest() = 0; virtual IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() = 0; + virtual IRequestRetryPolicyPtr CreatePolicyForReaderRequest() = 0; }; diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h index 6087eca098..a01b619fab 100644 --- a/yt/cpp/mapreduce/http/http_client.h +++ b/yt/cpp/mapreduce/http/http_client.h @@ -42,7 +42,6 @@ public: virtual IHttpResponsePtr Finish() = 0; }; - class IHttpClient { public: @@ -65,6 +64,34 @@ public: //////////////////////////////////////////////////////////////////////////////// +class THttpResponseStream + : public IInputStream +{ +public: + THttpResponseStream(IHttpResponsePtr response) + : Response_(std::move(response)) + { + Underlying_ = Response_->GetResponseStream(); + } + +private: + size_t DoRead(void *buf, size_t len) override + { + return Underlying_->Read(buf, len); + } + + size_t DoSkip(size_t len) override + { + return Underlying_->Skip(len); + } + +private: + IHttpResponsePtr Response_; + IInputStream* Underlying_; +}; + +//////////////////////////////////////////////////////////////////////////////// + IHttpClientPtr CreateDefaultHttpClient(); IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config); diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 1d9267009f..a47b2952b1 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -20,7 +20,7 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////////// -static TResponseInfo Request( +static NHttpClient::IHttpResponsePtr Request( const TClientContext& context, THttpHeader& header, TMaybe<TStringBuf> body, @@ -38,16 +38,10 @@ static TResponseInfo Request( auto url = GetFullUrlForProxy(hostName, context, header); - auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body); - - TResponseInfo result; - result.RequestId = requestId; - result.Response = response->GetResponse(); - result.HttpCode = response->GetStatusCode(); - return result; + return context.HttpClient->Request(url, requestId, config.HttpConfig, header, body); } -TResponseInfo RequestWithoutRetry( +NHttpClient::IHttpResponsePtr RequestWithoutRetry( const TClientContext& context, TMutationId& mutationId, THttpHeader& header, @@ -118,7 +112,12 @@ TResponseInfo RetryRequestWithPolicy( } } - return Request(context, header, body, requestId, config); + auto response = Request(context, header, body, requestId, config); + return TResponseInfo{ + .RequestId = response->GetRequestId(), + .Response = response->GetResponse(), + .HttpCode = response->GetStatusCode(), + }; } catch (const TErrorResponse& e) { LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription()); retryWithSameMutationId = e.IsTransportError(); diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h index 9750d0b541..444ecbbafc 100644 --- a/yt/cpp/mapreduce/http/retry_request.h +++ b/yt/cpp/mapreduce/http/retry_request.h @@ -105,7 +105,7 @@ TResponseInfo RetryRequestWithPolicy( TMaybe<TStringBuf> body = {}, const TRequestConfig& config = TRequestConfig()); -TResponseInfo RequestWithoutRetry( +NHttpClient::IHttpResponsePtr RequestWithoutRetry( const TClientContext& context, TMutationId& mutationId, THttpHeader& header, diff --git a/yt/cpp/mapreduce/interface/client_method_options.h b/yt/cpp/mapreduce/interface/client_method_options.h index 9bfb79753d..d457bf5f43 100644 --- a/yt/cpp/mapreduce/interface/client_method_options.h +++ b/yt/cpp/mapreduce/interface/client_method_options.h @@ -287,9 +287,12 @@ struct TBlobTableReaderOptions /// /// All blob parts except the last part of the blob must be of this size /// otherwise blob table reader emits error. - FLUENT_FIELD_DEFAULT(ui64, PartSize, 4 * 1024 * 1024); + FLUENT_FIELD_DEFAULT(i64, PartSize, 4 * 1024 * 1024); - /// @brief Offset from which to start reading + /// @brief Part index from which to start reading. + FLUENT_FIELD_DEFAULT(i64, StartPartIndex, 0); + + /// @brief Offset from which to start reading. FLUENT_FIELD_DEFAULT(i64, Offset, 0); }; @@ -468,7 +471,7 @@ struct TFileReaderOptions /// @brief Offset to start reading from. /// /// By default reading is started from the beginning of the file. - FLUENT_FIELD_OPTION(i64, Offset); + FLUENT_FIELD_DEFAULT(i64, Offset, 0); /// /// @brief Maximum length to read. diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 32055e3d00..4994826863 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -8,6 +8,13 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +namespace NHttpClient { + class IHttpResponse; + using IHttpResponsePtr = std::unique_ptr<IHttpResponse>; +} + +//////////////////////////////////////////////////////////////////////////////// + class IRawClient : public virtual TThrRefBase { @@ -196,6 +203,18 @@ public: const TOperationId& operationId, const TGetJobTraceOptions& options = {}) = 0; + // SkyShare + + virtual NHttpClient::IHttpResponsePtr SkyShareTable( + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options = {}) = 0; + + // Files + virtual std::unique_ptr<IInputStream> ReadFile( + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileReaderOptions& options = {}) = 0; + // File cache virtual TMaybe<TYPath> GetFileFromCache( @@ -266,6 +285,18 @@ public: const TYPath& path, const TAlterTableOptions& options = {}) = 0; + virtual std::unique_ptr<IInputStream> ReadTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableReaderOptions& options = {}) = 0; + + virtual std::unique_ptr<IInputStream> ReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options = {}) = 0; + virtual void AlterTableReplica( TMutationId& mutationId, const TReplicaId& replicaId, diff --git a/yt/cpp/mapreduce/io/helpers.h b/yt/cpp/mapreduce/io/helpers.h index 0733ff417c..0d3ec40ab6 100644 --- a/yt/cpp/mapreduce/io/helpers.h +++ b/yt/cpp/mapreduce/io/helpers.h @@ -63,9 +63,7 @@ inline TNode FormIORequestParameters( if (options.Config_) { params[TIOOptionsTraits<TTableReaderOptions>::ConfigName] = *options.Config_; } - if (options.Offset_) { - params["offset"] = *options.Offset_; - } + params["offset"] = options.Offset_; if (options.Length_) { params["length"] = *options.Length_; } diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 71d8d5fba9..65bfa01cea 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -14,6 +14,8 @@ #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/tvm.h> +#include <yt/cpp/mapreduce/io/helpers.h> + #include <library/cpp/yson/node/node_io.h> namespace NYT::NDetail { @@ -32,7 +34,7 @@ TNode THttpRawClient::Get( TMutationId mutationId; THttpHeader header("GET", "get"); header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options)); - return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header).Response); + return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNode THttpRawClient::TryGet( @@ -61,7 +63,7 @@ void THttpRawClient::Set( header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options)); auto body = NodeToYsonString(value); - RequestWithoutRetry(Context_, mutationId, header, body); + RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse(); } bool THttpRawClient::Exists( @@ -72,7 +74,7 @@ bool THttpRawClient::Exists( TMutationId mutationId; THttpHeader header("GET", "exists"); header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options)); - return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::MultisetAttributes( @@ -86,7 +88,7 @@ void THttpRawClient::MultisetAttributes( header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options)); auto body = NodeToYsonString(value); - RequestWithoutRetry(Context_, mutationId, header, body); + RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse(); } TNodeId THttpRawClient::Create( @@ -99,7 +101,7 @@ TNodeId THttpRawClient::Create( THttpHeader header("POST", "create"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options)); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::CopyWithoutRetries( @@ -112,7 +114,7 @@ TNodeId THttpRawClient::CopyWithoutRetries( THttpHeader header("POST", "copy"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options)); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::CopyInsideMasterCell( @@ -129,7 +131,7 @@ TNodeId THttpRawClient::CopyInsideMasterCell( // Make cross cell copying disable. params["enable_cross_cell_copying"] = false; header.MergeParameters(params); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::MoveWithoutRetries( @@ -142,7 +144,7 @@ TNodeId THttpRawClient::MoveWithoutRetries( THttpHeader header("POST", "move"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options)); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::MoveInsideMasterCell( @@ -159,7 +161,7 @@ TNodeId THttpRawClient::MoveInsideMasterCell( // Make cross cell copying disable. params["enable_cross_cell_copying"] = false; header.MergeParameters(params); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::Remove( @@ -171,7 +173,7 @@ void THttpRawClient::Remove( THttpHeader header("POST", "remove"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } TNode::TListType THttpRawClient::List( @@ -190,7 +192,7 @@ TNode::TListType THttpRawClient::List( } header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - return NodeFromYsonString(responseInfo.Response).AsList(); + return NodeFromYsonString(responseInfo->GetResponse()).AsList(); } TNodeId THttpRawClient::Link( @@ -203,7 +205,7 @@ TNodeId THttpRawClient::Link( THttpHeader header("POST", "link"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options)); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TLockId THttpRawClient::Lock( @@ -216,7 +218,7 @@ TLockId THttpRawClient::Lock( THttpHeader header("POST", "lock"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options)); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::Unlock( @@ -228,7 +230,7 @@ void THttpRawClient::Unlock( THttpHeader header("POST", "unlock"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::Concatenate( @@ -241,7 +243,7 @@ void THttpRawClient::Concatenate( THttpHeader header("POST", "concatenate"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } TTransactionId THttpRawClient::StartTransaction( @@ -252,7 +254,7 @@ TTransactionId THttpRawClient::StartTransaction( THttpHeader header("POST", "start_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options)); - return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::PingTransaction(const TTransactionId& transactionId) @@ -264,7 +266,7 @@ void THttpRawClient::PingTransaction(const TTransactionId& transactionId) requestConfig.HttpConfig = NHttpClient::THttpConfig{ .SocketTimeout = Context_.Config->PingTimeout }; - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::AbortTransaction( @@ -274,7 +276,7 @@ void THttpRawClient::AbortTransaction( THttpHeader header("POST", "abort_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::CommitTransaction( @@ -284,7 +286,7 @@ void THttpRawClient::CommitTransaction( THttpHeader header("POST", "commit_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } TOperationAttributes THttpRawClient::GetOperation( @@ -295,7 +297,7 @@ TOperationAttributes THttpRawClient::GetOperation( THttpHeader header("GET", "get_operation"); header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo.Response)); + return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse())); } TOperationAttributes THttpRawClient::GetOperation( @@ -306,7 +308,7 @@ TOperationAttributes THttpRawClient::GetOperation( THttpHeader header("GET", "get_operation"); header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo.Response)); + return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse())); } void THttpRawClient::AbortOperation( @@ -316,7 +318,7 @@ void THttpRawClient::AbortOperation( THttpHeader header("POST", "abort_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::CompleteOperation( @@ -326,7 +328,7 @@ void THttpRawClient::CompleteOperation( THttpHeader header("POST", "complete_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::SuspendOperation( @@ -337,7 +339,7 @@ void THttpRawClient::SuspendOperation( THttpHeader header("POST", "suspend_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::ResumeOperation( @@ -348,7 +350,7 @@ void THttpRawClient::ResumeOperation( THttpHeader header("POST", "resume_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } template <typename TKey> @@ -367,7 +369,7 @@ TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOption THttpHeader header("GET", "list_operations"); header.MergeParameters(NRawClient::SerializeParamsForListOperations(options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); + auto resultNode = NodeFromYsonString(responseInfo->GetResponse()); const auto& operationNodesList = resultNode["operations"].AsList(); @@ -417,7 +419,7 @@ NYson::TYsonString THttpRawClient::GetJob( THttpHeader header("GET", "get_job"); header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - return NYson::TYsonString(responseInfo.Response); + return NYson::TYsonString(responseInfo->GetResponse()); } TListJobsResult THttpRawClient::ListJobs( @@ -428,7 +430,7 @@ TListJobsResult THttpRawClient::ListJobs( THttpHeader header("GET", "list_jobs"); header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); + auto resultNode = NodeFromYsonString(responseInfo->GetResponse()); const auto& jobNodesList = resultNode["jobs"].AsList(); @@ -524,7 +526,7 @@ TString THttpRawClient::GetJobStderrWithRetries( TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, {}, config); - return responseInfo.Response; + return responseInfo->GetResponse(); } IFileReaderPtr THttpRawClient::GetJobStderr( @@ -573,7 +575,7 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace( THttpHeader header("GET", "get_job_trace"); header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); + auto resultNode = NodeFromYsonString(responseInfo->GetResponse()); const auto& traceEventNodesList = resultNode.AsList(); @@ -586,6 +588,50 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace( return result; } +NHttpClient::IHttpResponsePtr THttpRawClient::SkyShareTable( + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options) +{ + TMutationId mutationId; + THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); + + auto proxyName = Context_.ServerName.substr(0, Context_.ServerName.find('.')); + + auto host = Context_.Config->SkynetApiHost; + if (host == "") { + host = "skynet." + proxyName + ".yt.yandex.net"; + } + + TSkyShareTableOptions patchedOptions = options; + + if (Context_.Config->Pool && !patchedOptions.Pool_) { + patchedOptions.Pool(Context_.Config->Pool); + } + + header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, Context_.Config->Prefix, tablePaths, patchedOptions)); + TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()}); + + return RequestWithoutRetry(skyApiHost, mutationId, header, ""); +} + +std::unique_ptr<IInputStream> THttpRawClient::ReadFile( + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileReaderOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion)); + header.AddTransactionId(transactionId); + header.SetOutputFormat(TMaybe<TFormat>()); // Binary format + header.MergeParameters(FormIORequestParameters(path, options)); + header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); + + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); +} + TMaybe<TYPath> THttpRawClient::GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, @@ -596,7 +642,7 @@ TMaybe<TYPath> THttpRawClient::GetFileFromCache( THttpHeader header("GET", "get_file_from_cache"); header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - auto resultNode = NodeFromYsonString(responseInfo.Response).AsString(); + auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString(); return resultNode.empty() ? Nothing() : TMaybe<TYPath>(resultNode); } @@ -611,7 +657,7 @@ TYPath THttpRawClient::PutFileToCache( THttpHeader header("POST", "put_file_to_cache"); header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - return NodeFromYsonString(responseInfo.Response).AsString(); + return NodeFromYsonString(responseInfo->GetResponse()).AsString(); } void THttpRawClient::MountTable( @@ -626,7 +672,7 @@ void THttpRawClient::MountTable( header.AddParameter("cell_id", GetGuidAsString(*options.CellId_)); } header.AddParameter("freeze", options.Freeze_); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::UnmountTable( @@ -638,7 +684,7 @@ void THttpRawClient::UnmountTable( header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); header.AddParameter("force", options.Force_); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::RemountTable( @@ -649,7 +695,7 @@ void THttpRawClient::RemountTable( THttpHeader header("POST", "remount_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::ReshardTableByPivotKeys( @@ -662,7 +708,7 @@ void THttpRawClient::ReshardTableByPivotKeys( header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::ReshardTableByTabletCount( @@ -675,7 +721,7 @@ void THttpRawClient::ReshardTableByTabletCount( header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); header.AddParameter("tablet_count", tabletCount); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::InsertRows( @@ -690,7 +736,7 @@ void THttpRawClient::InsertRows( auto body = NodeListToYsonString(rows); TRequestConfig config; config.IsHeavy = true; - RequestWithoutRetry(Context_, mutationId, header, body, config); + RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse(); } void THttpRawClient::TrimRows( @@ -706,7 +752,7 @@ void THttpRawClient::TrimRows( header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options)); TRequestConfig config; config.IsHeavy = true; - RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse(); } TNode::TListType THttpRawClient::LookupRows( @@ -737,7 +783,7 @@ TNode::TListType THttpRawClient::LookupRows( TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config); - return NodeFromYsonString(responseInfo.Response, ::NYson::EYsonType::ListFragment).AsList(); + return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList(); } TNode::TListType THttpRawClient::SelectRows( @@ -769,7 +815,44 @@ TNode::TListType THttpRawClient::SelectRows( TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); - return NodeFromYsonString(responseInfo.Response, ::NYson::EYsonType::ListFragment).AsList(); + return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList(); +} + +std::unique_ptr<IInputStream> THttpRawClient::ReadTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableReaderOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion)); + header.SetOutputFormat(format); + header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); + header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, Context_.Config->Prefix, path, options)); + header.MergeParameters(FormIORequestParameters(path, options)); + + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); +} + +std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "read_blob_table"); + header.SetOutputFormat(TMaybe<TFormat>()); // Binary format + header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); + header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options)); + + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); } void THttpRawClient::AlterTable( @@ -781,7 +864,7 @@ void THttpRawClient::AlterTable( THttpHeader header("POST", "alter_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::AlterTableReplica( @@ -792,7 +875,7 @@ void THttpRawClient::AlterTableReplica( THttpHeader header("POST", "alter_table_replica"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::DeleteRows( @@ -808,7 +891,7 @@ void THttpRawClient::DeleteRows( auto body = NodeListToYsonString(keys); TRequestConfig config; config.IsHeavy = true; - RequestWithoutRetry(Context_, mutationId, header, body, config); + RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse(); } void THttpRawClient::FreezeTable( @@ -818,7 +901,7 @@ void THttpRawClient::FreezeTable( TMutationId mutationId; THttpHeader header("POST", "freeze_table"); header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::UnfreezeTable( @@ -828,7 +911,7 @@ void THttpRawClient::UnfreezeTable( TMutationId mutationId; THttpHeader header("POST", "unfreeze_table"); header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options)); - RequestWithoutRetry(Context_, mutationId, header); + RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } TCheckPermissionResponse THttpRawClient::CheckPermission( @@ -841,7 +924,7 @@ TCheckPermissionResponse THttpRawClient::CheckPermission( THttpHeader header("GET", "check_permission"); header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); - return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo.Response)); + return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse())); } TVector<TTabletInfo> THttpRawClient::GetTabletInfos( @@ -854,7 +937,7 @@ TVector<TTabletInfo> THttpRawClient::GetTabletInfos( header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); TVector<TTabletInfo> result; - Deserialize(result, *NodeFromYsonString(responseInfo.Response).AsMap().FindPtr("tablets")); + Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets")); return result; } @@ -870,7 +953,7 @@ TVector<TTableColumnarStatistics> THttpRawClient::GetTableColumnarStatistics( config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); TVector<TTableColumnarStatistics> result; - Deserialize(result, NodeFromYsonString(responseInfo.Response)); + Deserialize(result, NodeFromYsonString(responseInfo->GetResponse())); return result; } @@ -886,7 +969,7 @@ TMultiTablePartitions THttpRawClient::GetTablePartitions( config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); TMultiTablePartitions result; - Deserialize(result, NodeFromYsonString(responseInfo.Response)); + Deserialize(result, NodeFromYsonString(responseInfo->GetResponse())); return result; } @@ -897,7 +980,7 @@ ui64 THttpRawClient::GenerateTimestamp() TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); - return NodeFromYsonString(responseInfo.Response).AsUint64(); + return NodeFromYsonString(responseInfo->GetResponse()).AsUint64(); } TAuthorizationInfo THttpRawClient::WhoAmI() @@ -908,7 +991,7 @@ TAuthorizationInfo THttpRawClient::WhoAmI() TAuthorizationInfo result; NJson::TJsonValue jsonValue; - bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /*throwOnError*/ true); + bool ok = NJson::ReadJsonTree(requestResult->GetResponse(), &jsonValue, /*throwOnError*/ true); Y_ABORT_UNLESS(ok); result.Login = jsonValue["login"].GetString(); result.Realm = jsonValue["realm"].GetString(); diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index 08015f024f..e540d1b331 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -202,6 +202,18 @@ public: const TOperationId& operationId, const TGetJobTraceOptions& options = {}) override; + // SkyShare + + NHttpClient::IHttpResponsePtr SkyShareTable( + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options = {}) override; + + // Files + std::unique_ptr<IInputStream> ReadFile( + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileReaderOptions& options = {}) override; + // File cache TMaybe<TYPath> GetFileFromCache( @@ -266,6 +278,18 @@ public: const TString& query, const TSelectRowsOptions& options = {}) override; + std::unique_ptr<IInputStream> ReadTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableReaderOptions& options = {}) override; + + std::unique_ptr<IInputStream> ReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options = {}) override; + void AlterTable( TMutationId& mutationId, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index a3f10e6c41..a3f01da6fc 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -301,50 +301,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node) return result; } -TNode::TListType SkyShareTable( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const std::vector<TYPath>& tablePaths, - const TSkyShareTableOptions& options) -{ - THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); - - auto proxyName = context.ServerName.substr(0, context.ServerName.find('.')); - - auto host = context.Config->SkynetApiHost; - if (host == "") { - host = "skynet." + proxyName + ".yt.yandex.net"; - } - - TSkyShareTableOptions patchedOptions = options; - - if (context.Config->Pool && !patchedOptions.Pool_) { - patchedOptions.Pool(context.Config->Pool); - } - - header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions)); - TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() }); - TResponseInfo response = {}; - - // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu - // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK). - while (response.HttpCode != 200) { - response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, ""); - TWaitProxy::Get()->Sleep(TDuration::Seconds(5)); - } - - if (options.KeyColumns_) { - return NodeFromJsonString(response.Response)["torrents"].AsList(); - } else { - TNode torrent; - - torrent["key"] = TNode::CreateList(); - torrent["rbtorrent"] = response.Response; - - return TNode::TListType{ torrent }; - } -} - TRichYPath CanonizeYPath( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index c60536c86d..bcc9a4bfd7 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -29,7 +29,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); //////////////////////////////////////////////////////////////////////////////// -// // marks `batchRequest' as executed void ExecuteBatch( IRequestRetryPolicyPtr retryPolicy, @@ -37,14 +36,6 @@ void ExecuteBatch( TRawBatchRequest& batchRequest, const TExecuteBatchOptions& options = {}); -// SkyShare - -TNode::TListType SkyShareTable( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const std::vector<TYPath>& tablePaths, - const TSkyShareTableOptions& options = {}); - // Misc TRichYPath CanonizeYPath( diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index 8474bd0edc..2869ddcc0f 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -4,6 +4,7 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/fluent.h> #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/serialize.h> @@ -639,13 +640,60 @@ TNode SerializeParametersForDeleteRows( TNode SerializeParametersForTrimRows( const TString& pathPrefix, const TYPath& path, - const TTrimRowsOptions& /* options*/) + const TTrimRowsOptions& /*options*/) { TNode result; SetPathParam(&result, pathPrefix, path); return result; } +TNode SerializeParamsForReadTable( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TRichYPath& path, + const TTableReaderOptions& options) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["control_attributes"] = BuildYsonNodeFluently() + .BeginMap() + .Item("enable_row_index").Value(options.ControlAttributes_.EnableRowIndex_) + .Item("enable_range_index").Value(options.ControlAttributes_.EnableRangeIndex_) + .EndMap(); + return result; +} + +TNode SerializeParamsForReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options) +{ + auto lowerLimitKey = key; + lowerLimitKey.Parts_.push_back(options.StartPartIndex_); + auto upperLimitKey = key; + upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max()); + + TNode result = PathToParamNode( + TRichYPath(path). + AddRange(TReadRange() + .LowerLimit(TReadLimit().Key(lowerLimitKey)) + .UpperLimit(TReadLimit().Key(upperLimitKey)))); + + SetTransactionIdParam(&result, transactionId); + + result["start_part_index"] = options.StartPartIndex_; + result["offset"] = options.Offset_; + if (options.PartIndexColumnName_) { + result["part_index_column_name"] = *options.PartIndexColumnName_; + } + if (options.DataColumnName_) { + result["data_column_name"] = *options.DataColumnName_; + } + result["part_size"] = options.PartSize_; + return result; +} + TNode SerializeParamsForParseYPath(const TRichYPath& path) { TNode result; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index 655198248c..acbf003b5c 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -146,6 +146,18 @@ TNode SerializeParametersForTrimRows( const TYPath& path, const TTrimRowsOptions& options); +TNode SerializeParamsForReadTable( + const TTransactionId& transactionId, + const TString& pathPrefix, + const TRichYPath& path, + const TTableReaderOptions& options); + +TNode SerializeParamsForReadBlobTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options); + TNode SerializeParamsForParseYPath( const TRichYPath& path); |