aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-12-24 22:01:20 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-12-24 22:01:20 +0000
commitbd0e2de0b1035962a4d5b9e847eaa6508fad7fcf (patch)
tree79878ca309f9f7fada064f9b78b4223af4635f28 /yt/cpp
parentbe43a4691ebdd4dbe260a8d77df4cd8423b14c05 (diff)
parente6bd80ded127cd064560f7ea471974b602770cb1 (diff)
downloadydb-bd0e2de0b1035962a4d5b9e847eaa6508fad7fcf.tar.gz
Merge branch 'PR'
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp26
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp105
-rw-r--r--yt/cpp/mapreduce/client/client_reader.h5
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp172
-rw-r--r--yt/cpp/mapreduce/client/file_reader.h26
-rw-r--r--yt/cpp/mapreduce/common/retry_lib.cpp5
-rw-r--r--yt/cpp/mapreduce/common/retry_lib.h1
-rw-r--r--yt/cpp/mapreduce/http/http_client.h29
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp19
-rw-r--r--yt/cpp/mapreduce/http/retry_request.h2
-rw-r--r--yt/cpp/mapreduce/interface/client_method_options.h9
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h31
-rw-r--r--yt/cpp/mapreduce/io/helpers.h4
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp187
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h24
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp44
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h9
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp50
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h12
19 files changed, 384 insertions, 376 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 9e3976b144..9fcb82f5b7 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1352,11 +1352,27 @@ TNode::TListType TClient::SkyShareTable(
const TSkyShareTableOptions& options)
{
CheckShutdown();
- return NRawClient::SkyShareTable(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- Context_,
- tablePaths,
- options);
+
+ // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
+ // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
+ NHttpClient::IHttpResponsePtr response;
+ do {
+ response = RequestWithRetry<NHttpClient::IHttpResponsePtr>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &tablePaths, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->SkyShareTable(tablePaths, options);
+ });
+ TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
+ } while (response->GetStatusCode() != 200);
+
+ if (options.KeyColumns_) {
+ return NodeFromJsonString(response->GetResponse())["torrents"].AsList();
+ } else {
+ TNode torrent;
+ torrent["key"] = TNode::CreateList();
+ torrent["rbtorrent"] = response->GetResponse();
+ return TNode::TListType{torrent};
+ }
}
TCheckPermissionResponse TClient::CheckPermission(
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index b312716877..e7538a22da 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -166,99 +166,28 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest();
}
- bool areRangesUpdated = false;
+ auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
- while (true) {
- CurrentRequestRetryPolicy_->NotifyNewAttempt();
-
- THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
- if (Context_.ServiceTicketAuth) {
- header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ if (rowIndex.Defined()) {
+ auto& ranges = Path_.MutableRanges();
+ if (ranges.Empty()) {
+ ranges.ConstructInPlace(TVector{TReadRange()});
} else {
- header.SetToken(Context_.Token);
- }
-
- if (Context_.ImpersonationUser) {
- header.SetImpersonationUser(*Context_.ImpersonationUser);
- }
-
- auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
- header.AddTransactionId(transactionId);
-
- const auto& controlAttributes = Options_.ControlAttributes_;
- header.AddParameter("control_attributes", TNode()
- ("enable_row_index", controlAttributes.EnableRowIndex_)
- ("enable_range_index", controlAttributes.EnableRangeIndex_));
- header.SetOutputFormat(Format_);
-
- header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
-
- if (rowIndex.Defined() && !areRangesUpdated) {
- auto& ranges = Path_.MutableRanges();
- if (ranges.Empty()) {
- ranges.ConstructInPlace(TVector{TReadRange()});
- } else {
- if (rangeIndex.GetOrElse(0) >= ranges->size()) {
- ythrow yexception()
- << "range index " << rangeIndex.GetOrElse(0)
- << " is out of range, input range count is " << ranges->size();
- }
- ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
+ if (rangeIndex.GetOrElse(0) >= ranges->size()) {
+ ythrow yexception()
+ << "range index " << rangeIndex.GetOrElse(0)
+ << " is out of range, input range count is " << ranges->size();
}
- ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
- areRangesUpdated = true;
- }
-
- header.MergeParameters(FormIORequestParameters(Path_, Options_));
-
- auto requestId = CreateGuidAsString();
-
- try {
- const auto proxyName = GetProxyForHeavyRequest(Context_);
- UpdateHeaderForProxyIfNeed(proxyName, Context_, header);
- Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header);
-
- Input_ = Response_->GetResponseStream();
-
- YT_LOG_DEBUG(
- "RSP %v - table stream (RangeIndex: %v, RowIndex: %v)",
- requestId,
- rangeIndex,
- rowIndex);
-
- return;
- } catch (const TErrorResponse& e) {
- LogRequestError(
- requestId,
- header,
- e.what(),
- CurrentRequestRetryPolicy_->GetAttemptDescription());
-
- if (!IsRetriable(e)) {
- throw;
- }
- auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e);
- if (!backoff) {
- throw;
- }
- NDetail::TWaitProxy::Get()->Sleep(*backoff);
- } catch (const std::exception& e) {
- LogRequestError(
- requestId,
- header,
- e.what(),
- CurrentRequestRetryPolicy_->GetAttemptDescription());
-
- Response_.reset();
- Input_ = nullptr;
-
- auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e);
- if (!backoff) {
- throw;
- }
- NDetail::TWaitProxy::Get()->Sleep(*backoff);
+ ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
}
+ ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
}
+
+ Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ CurrentRequestRetryPolicy_,
+ [this, &transactionId] (TMutationId /*mutationId*/) {
+ return RawClient_->ReadTable(transactionId, Path_, Format_, Options_);
+ });
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h
index 61bc698340..3f73080046 100644
--- a/yt/cpp/mapreduce/client/client_reader.h
+++ b/yt/cpp/mapreduce/client/client_reader.h
@@ -2,8 +2,6 @@
#include <yt/cpp/mapreduce/common/fwd.h>
-#include <yt/cpp/mapreduce/interface/io.h>
-
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/http.h>
@@ -55,8 +53,7 @@ private:
THolder<TPingableTransaction> ReadTransaction_;
- NHttpClient::IHttpResponsePtr Response_;
- IInputStream* Input_;
+ std::unique_ptr<IInputStream> Input_;
IRequestRetryPolicyPtr CurrentRequestRetryPolicy_;
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index 06463d0af2..f88b40e38b 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -31,7 +31,7 @@ using ::ToString;
static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
if (options.Length_) {
- return options.Offset_.GetOrElse(0) + *options.Length_;
+ return options.Offset_ + *options.Length_;
} else {
return Nothing();
}
@@ -46,7 +46,6 @@ TStreamReaderBase::TStreamReaderBase(
const TClientContext& context,
const TTransactionId& transactionId)
: RawClient_(rawClient)
- , Context_(context)
, ClientRetryPolicy_(std::move(clientRetryPolicy))
, ReadTransaction_(MakeHolder<TPingableTransaction>(
RawClient_,
@@ -64,59 +63,26 @@ TYPath TStreamReaderBase::Snapshot(const TYPath& path)
return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path);
}
-TString TStreamReaderBase::GetActiveRequestId() const
-{
- if (Response_) {
- return Response_->GetRequestId();;
- } else {
- return "<no-active-request>";
- }
-}
-
size_t TStreamReaderBase::DoRead(void* buf, size_t len)
{
- const int retryCount = Context_.Config->ReadRetryCount;
- for (int attempt = 1; attempt <= retryCount; ++attempt) {
- try {
- if (!Input_) {
- Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_);
- Input_ = Response_->GetResponseStream();
- }
- if (len == 0) {
- return 0;
- }
- const size_t read = Input_->Read(buf, len);
- CurrentOffset_ += read;
- return read;
- } catch (TErrorResponse& e) {
- YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
- GetActiveRequestId(),
- e.what(),
- attempt,
- retryCount);
-
- if (!IsRetriable(e) || attempt == retryCount) {
- throw;
- }
- TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
- } catch (std::exception& e) {
- YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
- GetActiveRequestId(),
- e.what(),
- attempt,
- retryCount);
-
- // Invalidate connection.
- Response_.reset();
-
- if (attempt == retryCount) {
+ if (len == 0) {
+ return 0;
+ }
+ return RequestWithRetry<size_t>(
+ ClientRetryPolicy_->CreatePolicyForReaderRequest(),
+ [this, &buf, len] (TMutationId /*mutationId*/) {
+ try {
+ if (!Input_) {
+ Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_);
+ }
+ const size_t read = Input_->Read(buf, len);
+ CurrentOffset_ += read;
+ return read;
+ } catch (...) {
+ Input_ = nullptr;
throw;
}
- TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
- }
- Input_ = nullptr;
- }
- Y_UNREACHABLE(); // we should either return or throw from loop above
+ });
}
////////////////////////////////////////////////////////////////////////////////
@@ -130,57 +96,25 @@ TFileReader::TFileReader(
const TTransactionId& transactionId,
const TFileReaderOptions& options)
: TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
- , FileReaderOptions_(options)
+ , StartOffset_(options.Offset_)
+ , EndOffset_(GetEndOffset(options))
+ , Options_(options)
, Path_(path)
- , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0))
- , EndOffset_(GetEndOffset(FileReaderOptions_))
{
Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
}
-NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const ui64 currentOffset = StartOffset_ + readBytes;
- TString hostName = GetProxyForHeavyRequest(context);
-
- THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion));
- if (context.ServiceTicketAuth) {
- header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
- } else {
- header.SetToken(context.Token);
- }
-
- if (context.ImpersonationUser) {
- header.SetImpersonationUser(*context.ImpersonationUser);
- }
-
- UpdateHeaderForProxyIfNeed(hostName, context, header);
-
- header.AddTransactionId(transactionId);
- header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
if (EndOffset_) {
Y_ABORT_UNLESS(*EndOffset_ >= currentOffset);
- FileReaderOptions_.Length(*EndOffset_ - currentOffset);
- }
- FileReaderOptions_.Offset(currentOffset);
- header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_));
-
- header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
-
- auto requestId = CreateGuidAsString();
- NHttpClient::IHttpResponsePtr response;
- try {
- response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
- } catch (const std::exception& ex) {
- LogRequestError(requestId, header, ex.what(), "");
- throw;
+ Options_.Length(*EndOffset_ - currentOffset);
}
- YT_LOG_DEBUG("RSP %v - file stream",
- requestId);
-
- return response;
+ Options_.Offset(currentOffset);
+ return RawClient_->ReadFile(transactionId, Path_, Options_);
}
////////////////////////////////////////////////////////////////////////////////
@@ -195,66 +129,22 @@ TBlobTableReader::TBlobTableReader(
const TTransactionId& transactionId,
const TBlobTableReaderOptions& options)
: TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
+ , StartOffset_(options.Offset_)
, Key_(key)
, Options_(options)
{
Path_ = TStreamReaderBase::Snapshot(path);
}
-NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
- TString hostName = GetProxyForHeavyRequest(context);
-
- THttpHeader header("GET", "read_blob_table");
- if (context.ServiceTicketAuth) {
- header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
- } else {
- header.SetToken(context.Token);
- }
-
- if (context.ImpersonationUser) {
- header.SetImpersonationUser(*context.ImpersonationUser);
- }
-
- UpdateHeaderForProxyIfNeed(hostName, context, header);
-
- header.AddTransactionId(transactionId);
- header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
-
- const ui64 currentOffset = Options_.Offset_ + readBytes;
+ const i64 currentOffset = StartOffset_ + readBytes;
const i64 startPartIndex = currentOffset / Options_.PartSize_;
- const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
- auto lowerLimitKey = Key_;
- lowerLimitKey.Parts_.push_back(startPartIndex);
- auto upperLimitKey = Key_;
- upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max());
- TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange()
- .LowerLimit(TReadLimit().Key(lowerLimitKey))
- .UpperLimit(TReadLimit().Key(upperLimitKey))));
- params["start_part_index"] = TNode(startPartIndex);
- params["offset"] = skipBytes;
- if (Options_.PartIndexColumnName_) {
- params["part_index_column_name"] = *Options_.PartIndexColumnName_;
- }
- if (Options_.DataColumnName_) {
- params["data_column_name"] = *Options_.DataColumnName_;
- }
- params["part_size"] = Options_.PartSize_;
- header.MergeParameters(params);
- header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
-
- auto requestId = CreateGuidAsString();
- NHttpClient::IHttpResponsePtr response;
- try {
- response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
- } catch (const std::exception& ex) {
- LogRequestError(requestId, header, ex.what(), "");
- throw;
- }
+ const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
- YT_LOG_DEBUG("RSP %v - blob table stream",
- requestId);
- return response;
+ Options_.Offset(skipBytes);
+ Options_.StartPartIndex(startPartIndex);
+ return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h
index 48248696d3..8aafdc860d 100644
--- a/yt/cpp/mapreduce/client/file_reader.h
+++ b/yt/cpp/mapreduce/client/file_reader.h
@@ -11,7 +11,6 @@ class IInputStream;
namespace NYT {
-class THttpRequest;
class TPingableTransaction;
namespace NDetail {
@@ -35,19 +34,16 @@ protected:
protected:
const IRawClientPtr RawClient_;
- const TClientContext Context_;
private:
size_t DoRead(void* buf, size_t len) override;
- virtual NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) = 0;
- TString GetActiveRequestId() const;
+ virtual std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0;
private:
const IClientRetryPolicyPtr ClientRetryPolicy_;
TFileReaderOptions FileReaderOptions_;
- NHttpClient::IHttpResponsePtr Response_;
- IInputStream* Input_ = nullptr;
+ std::unique_ptr<IInputStream> Input_;
THolder<TPingableTransaction> ReadTransaction_;
@@ -67,17 +63,17 @@ public:
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId,
- const TFileReaderOptions& options = TFileReaderOptions());
+ const TFileReaderOptions& options = {});
private:
- NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override;
+ std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
private:
- TFileReaderOptions FileReaderOptions_;
-
- TRichYPath Path_;
const ui64 StartOffset_;
const TMaybe<ui64> EndOffset_;
+
+ TFileReaderOptions Options_;
+ TRichYPath Path_;
};
////////////////////////////////////////////////////////////////////////////////
@@ -94,14 +90,16 @@ public:
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId,
- const TBlobTableReaderOptions& options);
+ const TBlobTableReaderOptions& options = {});
private:
- NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override;
+ std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
private:
+ const ui64 StartOffset_;
const TKey Key_;
- const TBlobTableReaderOptions Options_;
+
+ TBlobTableReaderOptions Options_;
TYPath Path_;
};
diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp
index 772a2ab0cd..8146eb8b46 100644
--- a/yt/cpp/mapreduce/common/retry_lib.cpp
+++ b/yt/cpp/mapreduce/common/retry_lib.cpp
@@ -118,6 +118,11 @@ public:
return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_));
}
+ IRequestRetryPolicyPtr CreatePolicyForReaderRequest() override
+ {
+ return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->ReadRetryCount), Config_));
+ }
+
IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy)
{
auto config = RetryConfigProvider_->CreateRetryConfig();
diff --git a/yt/cpp/mapreduce/common/retry_lib.h b/yt/cpp/mapreduce/common/retry_lib.h
index c6c061f614..5b406b075f 100644
--- a/yt/cpp/mapreduce/common/retry_lib.h
+++ b/yt/cpp/mapreduce/common/retry_lib.h
@@ -48,6 +48,7 @@ class IClientRetryPolicy
public:
virtual IRequestRetryPolicyPtr CreatePolicyForGenericRequest() = 0;
virtual IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() = 0;
+ virtual IRequestRetryPolicyPtr CreatePolicyForReaderRequest() = 0;
};
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
index 6087eca098..a01b619fab 100644
--- a/yt/cpp/mapreduce/http/http_client.h
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -42,7 +42,6 @@ public:
virtual IHttpResponsePtr Finish() = 0;
};
-
class IHttpClient
{
public:
@@ -65,6 +64,34 @@ public:
////////////////////////////////////////////////////////////////////////////////
+class THttpResponseStream
+ : public IInputStream
+{
+public:
+ THttpResponseStream(IHttpResponsePtr response)
+ : Response_(std::move(response))
+ {
+ Underlying_ = Response_->GetResponseStream();
+ }
+
+private:
+ size_t DoRead(void *buf, size_t len) override
+ {
+ return Underlying_->Read(buf, len);
+ }
+
+ size_t DoSkip(size_t len) override
+ {
+ return Underlying_->Skip(len);
+ }
+
+private:
+ IHttpResponsePtr Response_;
+ IInputStream* Underlying_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
IHttpClientPtr CreateDefaultHttpClient();
IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config);
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
index 1d9267009f..a47b2952b1 100644
--- a/yt/cpp/mapreduce/http/retry_request.cpp
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -20,7 +20,7 @@ namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
-static TResponseInfo Request(
+static NHttpClient::IHttpResponsePtr Request(
const TClientContext& context,
THttpHeader& header,
TMaybe<TStringBuf> body,
@@ -38,16 +38,10 @@ static TResponseInfo Request(
auto url = GetFullUrlForProxy(hostName, context, header);
- auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body);
-
- TResponseInfo result;
- result.RequestId = requestId;
- result.Response = response->GetResponse();
- result.HttpCode = response->GetStatusCode();
- return result;
+ return context.HttpClient->Request(url, requestId, config.HttpConfig, header, body);
}
-TResponseInfo RequestWithoutRetry(
+NHttpClient::IHttpResponsePtr RequestWithoutRetry(
const TClientContext& context,
TMutationId& mutationId,
THttpHeader& header,
@@ -118,7 +112,12 @@ TResponseInfo RetryRequestWithPolicy(
}
}
- return Request(context, header, body, requestId, config);
+ auto response = Request(context, header, body, requestId, config);
+ return TResponseInfo{
+ .RequestId = response->GetRequestId(),
+ .Response = response->GetResponse(),
+ .HttpCode = response->GetStatusCode(),
+ };
} catch (const TErrorResponse& e) {
LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription());
retryWithSameMutationId = e.IsTransportError();
diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h
index 9750d0b541..444ecbbafc 100644
--- a/yt/cpp/mapreduce/http/retry_request.h
+++ b/yt/cpp/mapreduce/http/retry_request.h
@@ -105,7 +105,7 @@ TResponseInfo RetryRequestWithPolicy(
TMaybe<TStringBuf> body = {},
const TRequestConfig& config = TRequestConfig());
-TResponseInfo RequestWithoutRetry(
+NHttpClient::IHttpResponsePtr RequestWithoutRetry(
const TClientContext& context,
TMutationId& mutationId,
THttpHeader& header,
diff --git a/yt/cpp/mapreduce/interface/client_method_options.h b/yt/cpp/mapreduce/interface/client_method_options.h
index 9bfb79753d..d457bf5f43 100644
--- a/yt/cpp/mapreduce/interface/client_method_options.h
+++ b/yt/cpp/mapreduce/interface/client_method_options.h
@@ -287,9 +287,12 @@ struct TBlobTableReaderOptions
///
/// All blob parts except the last part of the blob must be of this size
/// otherwise blob table reader emits error.
- FLUENT_FIELD_DEFAULT(ui64, PartSize, 4 * 1024 * 1024);
+ FLUENT_FIELD_DEFAULT(i64, PartSize, 4 * 1024 * 1024);
- /// @brief Offset from which to start reading
+ /// @brief Part index from which to start reading.
+ FLUENT_FIELD_DEFAULT(i64, StartPartIndex, 0);
+
+ /// @brief Offset from which to start reading.
FLUENT_FIELD_DEFAULT(i64, Offset, 0);
};
@@ -468,7 +471,7 @@ struct TFileReaderOptions
/// @brief Offset to start reading from.
///
/// By default reading is started from the beginning of the file.
- FLUENT_FIELD_OPTION(i64, Offset);
+ FLUENT_FIELD_DEFAULT(i64, Offset, 0);
///
/// @brief Maximum length to read.
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 32055e3d00..4994826863 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -8,6 +8,13 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+namespace NHttpClient {
+ class IHttpResponse;
+ using IHttpResponsePtr = std::unique_ptr<IHttpResponse>;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
class IRawClient
: public virtual TThrRefBase
{
@@ -196,6 +203,18 @@ public:
const TOperationId& operationId,
const TGetJobTraceOptions& options = {}) = 0;
+ // SkyShare
+
+ virtual NHttpClient::IHttpResponsePtr SkyShareTable(
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options = {}) = 0;
+
+ // Files
+ virtual std::unique_ptr<IInputStream> ReadFile(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TFileReaderOptions& options = {}) = 0;
+
// File cache
virtual TMaybe<TYPath> GetFileFromCache(
@@ -266,6 +285,18 @@ public:
const TYPath& path,
const TAlterTableOptions& options = {}) = 0;
+ virtual std::unique_ptr<IInputStream> ReadTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableReaderOptions& options = {}) = 0;
+
+ virtual std::unique_ptr<IInputStream> ReadBlobTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TKey& key,
+ const TBlobTableReaderOptions& options = {}) = 0;
+
virtual void AlterTableReplica(
TMutationId& mutationId,
const TReplicaId& replicaId,
diff --git a/yt/cpp/mapreduce/io/helpers.h b/yt/cpp/mapreduce/io/helpers.h
index 0733ff417c..0d3ec40ab6 100644
--- a/yt/cpp/mapreduce/io/helpers.h
+++ b/yt/cpp/mapreduce/io/helpers.h
@@ -63,9 +63,7 @@ inline TNode FormIORequestParameters(
if (options.Config_) {
params[TIOOptionsTraits<TTableReaderOptions>::ConfigName] = *options.Config_;
}
- if (options.Offset_) {
- params["offset"] = *options.Offset_;
- }
+ params["offset"] = options.Offset_;
if (options.Length_) {
params["length"] = *options.Length_;
}
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 71d8d5fba9..65bfa01cea 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -14,6 +14,8 @@
#include <yt/cpp/mapreduce/interface/operation.h>
#include <yt/cpp/mapreduce/interface/tvm.h>
+#include <yt/cpp/mapreduce/io/helpers.h>
+
#include <library/cpp/yson/node/node_io.h>
namespace NYT::NDetail {
@@ -32,7 +34,7 @@ TNode THttpRawClient::Get(
TMutationId mutationId;
THttpHeader header("GET", "get");
header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options));
- return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
TNode THttpRawClient::TryGet(
@@ -61,7 +63,7 @@ void THttpRawClient::Set(
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options));
auto body = NodeToYsonString(value);
- RequestWithoutRetry(Context_, mutationId, header, body);
+ RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
}
bool THttpRawClient::Exists(
@@ -72,7 +74,7 @@ bool THttpRawClient::Exists(
TMutationId mutationId;
THttpHeader header("GET", "exists");
header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options));
- return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
void THttpRawClient::MultisetAttributes(
@@ -86,7 +88,7 @@ void THttpRawClient::MultisetAttributes(
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options));
auto body = NodeToYsonString(value);
- RequestWithoutRetry(Context_, mutationId, header, body);
+ RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
}
TNodeId THttpRawClient::Create(
@@ -99,7 +101,7 @@ TNodeId THttpRawClient::Create(
THttpHeader header("POST", "create");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
TNodeId THttpRawClient::CopyWithoutRetries(
@@ -112,7 +114,7 @@ TNodeId THttpRawClient::CopyWithoutRetries(
THttpHeader header("POST", "copy");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
TNodeId THttpRawClient::CopyInsideMasterCell(
@@ -129,7 +131,7 @@ TNodeId THttpRawClient::CopyInsideMasterCell(
// Make cross cell copying disable.
params["enable_cross_cell_copying"] = false;
header.MergeParameters(params);
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
TNodeId THttpRawClient::MoveWithoutRetries(
@@ -142,7 +144,7 @@ TNodeId THttpRawClient::MoveWithoutRetries(
THttpHeader header("POST", "move");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
TNodeId THttpRawClient::MoveInsideMasterCell(
@@ -159,7 +161,7 @@ TNodeId THttpRawClient::MoveInsideMasterCell(
// Make cross cell copying disable.
params["enable_cross_cell_copying"] = false;
header.MergeParameters(params);
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
void THttpRawClient::Remove(
@@ -171,7 +173,7 @@ void THttpRawClient::Remove(
THttpHeader header("POST", "remove");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
TNode::TListType THttpRawClient::List(
@@ -190,7 +192,7 @@ TNode::TListType THttpRawClient::List(
}
header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NodeFromYsonString(responseInfo.Response).AsList();
+ return NodeFromYsonString(responseInfo->GetResponse()).AsList();
}
TNodeId THttpRawClient::Link(
@@ -203,7 +205,7 @@ TNodeId THttpRawClient::Link(
THttpHeader header("POST", "link");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
TLockId THttpRawClient::Lock(
@@ -216,7 +218,7 @@ TLockId THttpRawClient::Lock(
THttpHeader header("POST", "lock");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
void THttpRawClient::Unlock(
@@ -228,7 +230,7 @@ void THttpRawClient::Unlock(
THttpHeader header("POST", "unlock");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::Concatenate(
@@ -241,7 +243,7 @@ void THttpRawClient::Concatenate(
THttpHeader header("POST", "concatenate");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
TTransactionId THttpRawClient::StartTransaction(
@@ -252,7 +254,7 @@ TTransactionId THttpRawClient::StartTransaction(
THttpHeader header("POST", "start_tx");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
}
void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
@@ -264,7 +266,7 @@ void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
requestConfig.HttpConfig = NHttpClient::THttpConfig{
.SocketTimeout = Context_.Config->PingTimeout
};
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::AbortTransaction(
@@ -274,7 +276,7 @@ void THttpRawClient::AbortTransaction(
THttpHeader header("POST", "abort_tx");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::CommitTransaction(
@@ -284,7 +286,7 @@ void THttpRawClient::CommitTransaction(
THttpHeader header("POST", "commit_tx");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
TOperationAttributes THttpRawClient::GetOperation(
@@ -295,7 +297,7 @@ TOperationAttributes THttpRawClient::GetOperation(
THttpHeader header("GET", "get_operation");
header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo.Response));
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
}
TOperationAttributes THttpRawClient::GetOperation(
@@ -306,7 +308,7 @@ TOperationAttributes THttpRawClient::GetOperation(
THttpHeader header("GET", "get_operation");
header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo.Response));
+ return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
}
void THttpRawClient::AbortOperation(
@@ -316,7 +318,7 @@ void THttpRawClient::AbortOperation(
THttpHeader header("POST", "abort_op");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::CompleteOperation(
@@ -326,7 +328,7 @@ void THttpRawClient::CompleteOperation(
THttpHeader header("POST", "complete_op");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::SuspendOperation(
@@ -337,7 +339,7 @@ void THttpRawClient::SuspendOperation(
THttpHeader header("POST", "suspend_op");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::ResumeOperation(
@@ -348,7 +350,7 @@ void THttpRawClient::ResumeOperation(
THttpHeader header("POST", "resume_op");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
template <typename TKey>
@@ -367,7 +369,7 @@ TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOption
THttpHeader header("GET", "list_operations");
header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
const auto& operationNodesList = resultNode["operations"].AsList();
@@ -417,7 +419,7 @@ NYson::TYsonString THttpRawClient::GetJob(
THttpHeader header("GET", "get_job");
header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NYson::TYsonString(responseInfo.Response);
+ return NYson::TYsonString(responseInfo->GetResponse());
}
TListJobsResult THttpRawClient::ListJobs(
@@ -428,7 +430,7 @@ TListJobsResult THttpRawClient::ListJobs(
THttpHeader header("GET", "list_jobs");
header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
const auto& jobNodesList = resultNode["jobs"].AsList();
@@ -524,7 +526,7 @@ TString THttpRawClient::GetJobStderrWithRetries(
TRequestConfig config;
config.IsHeavy = true;
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, {}, config);
- return responseInfo.Response;
+ return responseInfo->GetResponse();
}
IFileReaderPtr THttpRawClient::GetJobStderr(
@@ -573,7 +575,7 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
THttpHeader header("GET", "get_job_trace");
header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
const auto& traceEventNodesList = resultNode.AsList();
@@ -586,6 +588,50 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
return result;
}
+NHttpClient::IHttpResponsePtr THttpRawClient::SkyShareTable(
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
+
+ auto proxyName = Context_.ServerName.substr(0, Context_.ServerName.find('.'));
+
+ auto host = Context_.Config->SkynetApiHost;
+ if (host == "") {
+ host = "skynet." + proxyName + ".yt.yandex.net";
+ }
+
+ TSkyShareTableOptions patchedOptions = options;
+
+ if (Context_.Config->Pool && !patchedOptions.Pool_) {
+ patchedOptions.Pool(Context_.Config->Pool);
+ }
+
+ header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, Context_.Config->Prefix, tablePaths, patchedOptions));
+ TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()});
+
+ return RequestWithoutRetry(skyApiHost, mutationId, header, "");
+}
+
+std::unique_ptr<IInputStream> THttpRawClient::ReadFile(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TFileReaderOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion));
+ header.AddTransactionId(transactionId);
+ header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
+ header.MergeParameters(FormIORequestParameters(path, options));
+ header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
TMaybe<TYPath> THttpRawClient::GetFileFromCache(
const TTransactionId& transactionId,
const TString& md5Signature,
@@ -596,7 +642,7 @@ TMaybe<TYPath> THttpRawClient::GetFileFromCache(
THttpHeader header("GET", "get_file_from_cache");
header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response).AsString();
+ auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString();
return resultNode.empty() ? Nothing() : TMaybe<TYPath>(resultNode);
}
@@ -611,7 +657,7 @@ TYPath THttpRawClient::PutFileToCache(
THttpHeader header("POST", "put_file_to_cache");
header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NodeFromYsonString(responseInfo.Response).AsString();
+ return NodeFromYsonString(responseInfo->GetResponse()).AsString();
}
void THttpRawClient::MountTable(
@@ -626,7 +672,7 @@ void THttpRawClient::MountTable(
header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
}
header.AddParameter("freeze", options.Freeze_);
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::UnmountTable(
@@ -638,7 +684,7 @@ void THttpRawClient::UnmountTable(
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
header.AddParameter("force", options.Force_);
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::RemountTable(
@@ -649,7 +695,7 @@ void THttpRawClient::RemountTable(
THttpHeader header("POST", "remount_table");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::ReshardTableByPivotKeys(
@@ -662,7 +708,7 @@ void THttpRawClient::ReshardTableByPivotKeys(
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::ReshardTableByTabletCount(
@@ -675,7 +721,7 @@ void THttpRawClient::ReshardTableByTabletCount(
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
header.AddParameter("tablet_count", tabletCount);
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::InsertRows(
@@ -690,7 +736,7 @@ void THttpRawClient::InsertRows(
auto body = NodeListToYsonString(rows);
TRequestConfig config;
config.IsHeavy = true;
- RequestWithoutRetry(Context_, mutationId, header, body, config);
+ RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
}
void THttpRawClient::TrimRows(
@@ -706,7 +752,7 @@ void THttpRawClient::TrimRows(
header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
TRequestConfig config;
config.IsHeavy = true;
- RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
}
TNode::TListType THttpRawClient::LookupRows(
@@ -737,7 +783,7 @@ TNode::TListType THttpRawClient::LookupRows(
TRequestConfig config;
config.IsHeavy = true;
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config);
- return NodeFromYsonString(responseInfo.Response, ::NYson::EYsonType::ListFragment).AsList();
+ return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
}
TNode::TListType THttpRawClient::SelectRows(
@@ -769,7 +815,44 @@ TNode::TListType THttpRawClient::SelectRows(
TRequestConfig config;
config.IsHeavy = true;
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return NodeFromYsonString(responseInfo.Response, ::NYson::EYsonType::ListFragment).AsList();
+ return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
+}
+
+std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableReaderOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
+ header.SetOutputFormat(format);
+ header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
+ header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, Context_.Config->Prefix, path, options));
+ header.MergeParameters(FormIORequestParameters(path, options));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
+}
+
+std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TKey& key,
+ const TBlobTableReaderOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "read_blob_table");
+ header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
+ header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
+ header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
}
void THttpRawClient::AlterTable(
@@ -781,7 +864,7 @@ void THttpRawClient::AlterTable(
THttpHeader header("POST", "alter_table");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::AlterTableReplica(
@@ -792,7 +875,7 @@ void THttpRawClient::AlterTableReplica(
THttpHeader header("POST", "alter_table_replica");
header.AddMutationId();
header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::DeleteRows(
@@ -808,7 +891,7 @@ void THttpRawClient::DeleteRows(
auto body = NodeListToYsonString(keys);
TRequestConfig config;
config.IsHeavy = true;
- RequestWithoutRetry(Context_, mutationId, header, body, config);
+ RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
}
void THttpRawClient::FreezeTable(
@@ -818,7 +901,7 @@ void THttpRawClient::FreezeTable(
TMutationId mutationId;
THttpHeader header("POST", "freeze_table");
header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
void THttpRawClient::UnfreezeTable(
@@ -828,7 +911,7 @@ void THttpRawClient::UnfreezeTable(
TMutationId mutationId;
THttpHeader header("POST", "unfreeze_table");
header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header);
+ RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
TCheckPermissionResponse THttpRawClient::CheckPermission(
@@ -841,7 +924,7 @@ TCheckPermissionResponse THttpRawClient::CheckPermission(
THttpHeader header("GET", "check_permission");
header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo.Response));
+ return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse()));
}
TVector<TTabletInfo> THttpRawClient::GetTabletInfos(
@@ -854,7 +937,7 @@ TVector<TTabletInfo> THttpRawClient::GetTabletInfos(
header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options));
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
TVector<TTabletInfo> result;
- Deserialize(result, *NodeFromYsonString(responseInfo.Response).AsMap().FindPtr("tablets"));
+ Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets"));
return result;
}
@@ -870,7 +953,7 @@ TVector<TTableColumnarStatistics> THttpRawClient::GetTableColumnarStatistics(
config.IsHeavy = true;
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
TVector<TTableColumnarStatistics> result;
- Deserialize(result, NodeFromYsonString(responseInfo.Response));
+ Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
return result;
}
@@ -886,7 +969,7 @@ TMultiTablePartitions THttpRawClient::GetTablePartitions(
config.IsHeavy = true;
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
TMultiTablePartitions result;
- Deserialize(result, NodeFromYsonString(responseInfo.Response));
+ Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
return result;
}
@@ -897,7 +980,7 @@ ui64 THttpRawClient::GenerateTimestamp()
TRequestConfig config;
config.IsHeavy = true;
auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return NodeFromYsonString(responseInfo.Response).AsUint64();
+ return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
}
TAuthorizationInfo THttpRawClient::WhoAmI()
@@ -908,7 +991,7 @@ TAuthorizationInfo THttpRawClient::WhoAmI()
TAuthorizationInfo result;
NJson::TJsonValue jsonValue;
- bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /*throwOnError*/ true);
+ bool ok = NJson::ReadJsonTree(requestResult->GetResponse(), &jsonValue, /*throwOnError*/ true);
Y_ABORT_UNLESS(ok);
result.Login = jsonValue["login"].GetString();
result.Realm = jsonValue["realm"].GetString();
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index 08015f024f..e540d1b331 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -202,6 +202,18 @@ public:
const TOperationId& operationId,
const TGetJobTraceOptions& options = {}) override;
+ // SkyShare
+
+ NHttpClient::IHttpResponsePtr SkyShareTable(
+ const std::vector<TYPath>& tablePaths,
+ const TSkyShareTableOptions& options = {}) override;
+
+ // Files
+ std::unique_ptr<IInputStream> ReadFile(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TFileReaderOptions& options = {}) override;
+
// File cache
TMaybe<TYPath> GetFileFromCache(
@@ -266,6 +278,18 @@ public:
const TString& query,
const TSelectRowsOptions& options = {}) override;
+ std::unique_ptr<IInputStream> ReadTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableReaderOptions& options = {}) override;
+
+ std::unique_ptr<IInputStream> ReadBlobTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TKey& key,
+ const TBlobTableReaderOptions& options = {}) override;
+
void AlterTable(
TMutationId& mutationId,
const TTransactionId& transactionId,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index a3f10e6c41..a3f01da6fc 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -301,50 +301,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
return result;
}
-TNode::TListType SkyShareTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const std::vector<TYPath>& tablePaths,
- const TSkyShareTableOptions& options)
-{
- THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
-
- auto proxyName = context.ServerName.substr(0, context.ServerName.find('.'));
-
- auto host = context.Config->SkynetApiHost;
- if (host == "") {
- host = "skynet." + proxyName + ".yt.yandex.net";
- }
-
- TSkyShareTableOptions patchedOptions = options;
-
- if (context.Config->Pool && !patchedOptions.Pool_) {
- patchedOptions.Pool(context.Config->Pool);
- }
-
- header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions));
- TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() });
- TResponseInfo response = {};
-
- // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
- // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
- while (response.HttpCode != 200) {
- response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, "");
- TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
- }
-
- if (options.KeyColumns_) {
- return NodeFromJsonString(response.Response)["torrents"].AsList();
- } else {
- TNode torrent;
-
- torrent["key"] = TNode::CreateList();
- torrent["rbtorrent"] = response.Response;
-
- return TNode::TListType{ torrent };
- }
-}
-
TRichYPath CanonizeYPath(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index c60536c86d..bcc9a4bfd7 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -29,7 +29,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node);
////////////////////////////////////////////////////////////////////////////////
-//
// marks `batchRequest' as executed
void ExecuteBatch(
IRequestRetryPolicyPtr retryPolicy,
@@ -37,14 +36,6 @@ void ExecuteBatch(
TRawBatchRequest& batchRequest,
const TExecuteBatchOptions& options = {});
-// SkyShare
-
-TNode::TListType SkyShareTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const std::vector<TYPath>& tablePaths,
- const TSkyShareTableOptions& options = {});
-
// Misc
TRichYPath CanonizeYPath(
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
index 8474bd0edc..2869ddcc0f 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -4,6 +4,7 @@
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/fluent.h>
#include <yt/cpp/mapreduce/interface/operation.h>
#include <yt/cpp/mapreduce/interface/serialize.h>
@@ -639,13 +640,60 @@ TNode SerializeParametersForDeleteRows(
TNode SerializeParametersForTrimRows(
const TString& pathPrefix,
const TYPath& path,
- const TTrimRowsOptions& /* options*/)
+ const TTrimRowsOptions& /*options*/)
{
TNode result;
SetPathParam(&result, pathPrefix, path);
return result;
}
+TNode SerializeParamsForReadTable(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TRichYPath& path,
+ const TTableReaderOptions& options)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["control_attributes"] = BuildYsonNodeFluently()
+ .BeginMap()
+ .Item("enable_row_index").Value(options.ControlAttributes_.EnableRowIndex_)
+ .Item("enable_range_index").Value(options.ControlAttributes_.EnableRangeIndex_)
+ .EndMap();
+ return result;
+}
+
+TNode SerializeParamsForReadBlobTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TKey& key,
+ const TBlobTableReaderOptions& options)
+{
+ auto lowerLimitKey = key;
+ lowerLimitKey.Parts_.push_back(options.StartPartIndex_);
+ auto upperLimitKey = key;
+ upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max());
+
+ TNode result = PathToParamNode(
+ TRichYPath(path).
+ AddRange(TReadRange()
+ .LowerLimit(TReadLimit().Key(lowerLimitKey))
+ .UpperLimit(TReadLimit().Key(upperLimitKey))));
+
+ SetTransactionIdParam(&result, transactionId);
+
+ result["start_part_index"] = options.StartPartIndex_;
+ result["offset"] = options.Offset_;
+ if (options.PartIndexColumnName_) {
+ result["part_index_column_name"] = *options.PartIndexColumnName_;
+ }
+ if (options.DataColumnName_) {
+ result["data_column_name"] = *options.DataColumnName_;
+ }
+ result["part_size"] = options.PartSize_;
+ return result;
+}
+
TNode SerializeParamsForParseYPath(const TRichYPath& path)
{
TNode result;
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
index 655198248c..acbf003b5c 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
@@ -146,6 +146,18 @@ TNode SerializeParametersForTrimRows(
const TYPath& path,
const TTrimRowsOptions& options);
+TNode SerializeParamsForReadTable(
+ const TTransactionId& transactionId,
+ const TString& pathPrefix,
+ const TRichYPath& path,
+ const TTableReaderOptions& options);
+
+TNode SerializeParamsForReadBlobTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TKey& key,
+ const TBlobTableReaderOptions& options);
+
TNode SerializeParamsForParseYPath(
const TRichYPath& path);