summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorproller <[email protected]>2024-09-02 19:08:22 +0300
committerrobot-piglet <[email protected]>2024-09-02 19:20:48 +0300
commitdc434a34cd0a6bdd95c85c1990988b1d66a9a9b9 (patch)
tree5a49b41ed7377b972a7a899f15a19b9daf293cf2
parentb516b7aa59fa1515c7575e8a26bb3eb70546123b (diff)
Add pagination for the GetJobStderr command
No description --- Co-authored-by: proller <[email protected]> d538c6346fd862f0cfc76f7ebab84e37c1777c50 Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/708 Co-authored-by: ermolovd <[email protected]>
-rw-r--r--yt/yt/client/api/delegating_client.h2
-rw-r--r--yt/yt/client/api/operation_client.cpp47
-rw-r--r--yt/yt/client/api/operation_client.h28
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp13
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h2
-rw-r--r--yt/yt/client/driver/scheduler_commands.cpp26
-rw-r--r--yt/yt/client/driver/scheduler_commands.h1
-rw-r--r--yt/yt/client/federated/client.cpp2
-rw-r--r--yt/yt/client/hedging/hedging.cpp2
-rw-r--r--yt/yt/client/unittests/mock/client.h2
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto4
11 files changed, 118 insertions, 11 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index 3369b70c208..f977dc06c00 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -510,7 +510,7 @@ public:
const TGetJobSpecOptions& options),
(jobId, options))
- DELEGATE_METHOD(TFuture<TSharedRef>, GetJobStderr, (
+ DELEGATE_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
const TGetJobStderrOptions& options),
diff --git a/yt/yt/client/api/operation_client.cpp b/yt/yt/client/api/operation_client.cpp
index acc0dfe7bb6..4e1816a6e7c 100644
--- a/yt/yt/client/api/operation_client.cpp
+++ b/yt/yt/client/api/operation_client.cpp
@@ -227,5 +227,52 @@ void TListOperationsAccessFilter::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+TGetJobStderrResponse TGetJobStderrResponse::MakeJobStderr(const TSharedRef& data, const TGetJobStderrOptions& options)
+{
+ auto totalSize = std::ssize(data);
+ auto endOffset = totalSize;
+ auto offset = options.Offset.value_or(0);
+ auto limit = options.Limit.value_or(0);
+
+ if (!offset && !limit) {
+ return {
+ .Data = data,
+ .TotalSize = totalSize,
+ .EndOffset = endOffset,
+ };
+ };
+
+ size_t firstPos = 0;
+ if (offset > 0) {
+ firstPos = offset;
+ }
+
+ if (firstPos >= data.size()) {
+ return {
+ .Data = TSharedRef{},
+ .TotalSize = totalSize,
+ .EndOffset = 0,
+ };
+ } else {
+ auto lastPos = firstPos;
+ if (limit > 0) {
+ lastPos += limit;
+ } else {
+ lastPos += data.size();
+ }
+ if (lastPos > data.size()) {
+ lastPos = data.size();
+ }
+ const auto dataCut = data.Slice(firstPos, lastPos);
+ return {
+ .Data = dataCut,
+ .TotalSize = totalSize,
+ .EndOffset = limit ? static_cast<i64>(firstPos + dataCut.size()) : endOffset,
+ };
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NApi
diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h
index 08fed371f18..48687497da5 100644
--- a/yt/yt/client/api/operation_client.h
+++ b/yt/yt/client/api/operation_client.h
@@ -84,7 +84,10 @@ struct TGetJobSpecOptions
struct TGetJobStderrOptions
: public TTimeoutOptions
, public TMasterReadOptions
-{ };
+{
+ std::optional<i64> Limit;
+ std::optional<i64> Offset;
+};
struct TGetJobFailContextOptions
: public TTimeoutOptions
@@ -361,6 +364,27 @@ struct TListJobsResult
std::vector<TError> Errors;
};
+struct TGetJobStderrResponse
+{
+ // 0
+ // |<- stderr full log ->|
+ // [ [<- Data ->] ]
+ // |<- request.Offset
+ // |<- request.Limit ->|
+ // |<- EndOffset
+ // |<- TotalSize
+
+ TSharedRef Data;
+
+ // Total current stderr size.
+ i64 TotalSize = 0;
+
+ // Index of the last byte of the result in the full stderr.
+ i64 EndOffset = 0;
+
+ static TGetJobStderrResponse MakeJobStderr(const TSharedRef& data, const TGetJobStderrOptions& options = {});
+};
+
////////////////////////////////////////////////////////////////////////////////
struct IOperationClient
@@ -414,7 +438,7 @@ struct IOperationClient
NJobTrackerClient::TJobId jobId,
const TGetJobSpecOptions& options = {}) = 0;
- virtual TFuture<TSharedRef> GetJobStderr(
+ virtual TFuture<TGetJobStderrResponse> GetJobStderr(
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
const TGetJobStderrOptions& options = {}) = 0;
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 5f3ee6a1eb0..454d6fd16b5 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -1294,7 +1294,7 @@ TFuture<TYsonString> TClient::GetJobSpec(
}));
}
-TFuture<TSharedRef> TClient::GetJobStderr(
+TFuture<TGetJobStderrResponse> TClient::GetJobStderr(
const TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
const TGetJobStderrOptions& options)
@@ -1306,10 +1306,17 @@ TFuture<TSharedRef> TClient::GetJobStderr(
NScheduler::ToProto(req, operationIdOrAlias);
ToProto(req->mutable_job_id(), jobId);
+ if (options.Limit) {
+ req->set_limit(*options.Limit);
+ }
+ if (options.Offset) {
+ req->set_offset(*options.Offset);
+ }
- return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspGetJobStderrPtr& rsp) {
+ return req->Invoke().Apply(BIND([req = req](const TApiServiceProxy::TRspGetJobStderrPtr& rsp) {
YT_VERIFY(rsp->Attachments().size() == 1);
- return rsp->Attachments().front();
+ TGetJobStderrOptions options{.Limit = req->limit(), .Offset = req->offset()};
+ return TGetJobStderrResponse::MakeJobStderr(rsp->Attachments().front(), options);
}));
}
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index b868d7493fc..9619084ec99 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -273,7 +273,7 @@ public:
NJobTrackerClient::TJobId jobId,
const NApi::TGetJobSpecOptions& options) override;
- TFuture<TSharedRef> GetJobStderr(
+ TFuture<TGetJobStderrResponse> GetJobStderr(
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
const NApi::TGetJobStderrOptions& options) override;
diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp
index be52ea9770b..9cb7396725f 100644
--- a/yt/yt/client/driver/scheduler_commands.cpp
+++ b/yt/yt/client/driver/scheduler_commands.cpp
@@ -136,6 +136,25 @@ void TGetJobSpecCommand::DoExecute(ICommandContextPtr context)
void TGetJobStderrCommand::Register(TRegistrar registrar)
{
registrar.Parameter("job_id", &TThis::JobId);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ "limit",
+ [] (TThis* command) -> auto& {
+ return command->Options.Limit;
+ })
+ .Optional(/*init*/ true);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ "offset",
+ [] (TThis* command) -> auto& {
+ return command->Options.Offset;
+ })
+ .Optional(/*init*/ true);
+}
+
+bool TGetJobStderrCommand::HasResponseParameters() const
+{
+ return true;
}
void TGetJobStderrCommand::DoExecute(ICommandContextPtr context)
@@ -143,8 +162,13 @@ void TGetJobStderrCommand::DoExecute(ICommandContextPtr context)
auto result = WaitFor(context->GetClient()->GetJobStderr(OperationIdOrAlias, JobId, Options))
.ValueOrThrow();
+ ProduceResponseParameters(context, [&] (NYson::IYsonConsumer* consumer) {
+ BuildYsonMapFragmentFluently(consumer)
+ .Item("total_size").Value(result.TotalSize)
+ .Item("end_offset").Value(result.EndOffset);
+ });
auto output = context->Request().OutputStream;
- WaitFor(output->Write(result))
+ WaitFor(output->Write(result.Data))
.ThrowOnError();
}
diff --git a/yt/yt/client/driver/scheduler_commands.h b/yt/yt/client/driver/scheduler_commands.h
index 349b16d7505..280d55e331b 100644
--- a/yt/yt/client/driver/scheduler_commands.h
+++ b/yt/yt/client/driver/scheduler_commands.h
@@ -132,6 +132,7 @@ private:
NJobTrackerClient::TJobId JobId;
void DoExecute(ICommandContextPtr context) override;
+ bool HasResponseParameters() const override;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 4f221deedb9..fe1992162bf 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -406,7 +406,7 @@ public:
UNIMPLEMENTED_METHOD(TFuture<NConcurrency::IAsyncZeroCopyInputStreamPtr>, GetJobInput, (NJobTrackerClient::TJobId, const TGetJobInputOptions&));
UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&));
UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&));
- UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&));
UNIMPLEMENTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&));
UNIMPLEMENTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index ca5dee69170..52c4722b279 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -165,7 +165,7 @@ public:
UNSUPPORTED_METHOD(TFuture<NConcurrency::IAsyncZeroCopyInputStreamPtr>, GetJobInput, (NJobTrackerClient::TJobId, const TGetJobInputOptions&));
UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&));
UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&));
- UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
+ UNSUPPORTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&));
UNSUPPORTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&));
UNSUPPORTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&));
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index f6290bded79..b2b1dae28e5 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -620,7 +620,7 @@ public:
const TGetJobSpecOptions& options),
(override));
- MOCK_METHOD(TFuture<TSharedRef>, GetJobStderr, (
+ MOCK_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
const TGetJobStderrOptions& options),
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 7b41d983863..f9606ee6bd3 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -2467,10 +2467,14 @@ message TReqGetJobStderr
string operation_alias = 3;
}
required NYT.NProto.TGuid job_id = 2;
+ optional int64 limit = 4;
+ optional int64 offset = 5;
}
message TRspGetJobStderr
{
+ optional int64 end_offset = 1;
+ optional int64 total_size = 2;
}
////////////////////////////////////////////////////////////////////////////////