diff options
| author | proller <[email protected]> | 2024-09-02 19:08:22 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2024-09-02 19:20:48 +0300 |
| commit | dc434a34cd0a6bdd95c85c1990988b1d66a9a9b9 (patch) | |
| tree | 5a49b41ed7377b972a7a899f15a19b9daf293cf2 | |
| parent | b516b7aa59fa1515c7575e8a26bb3eb70546123b (diff) | |
Add pagination for the GetJobStderr command
No description
---
Co-authored-by: proller <[email protected]>
d538c6346fd862f0cfc76f7ebab84e37c1777c50
Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/708
Co-authored-by: ermolovd <[email protected]>
| -rw-r--r-- | yt/yt/client/api/delegating_client.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/api/operation_client.cpp | 47 | ||||
| -rw-r--r-- | yt/yt/client/api/operation_client.h | 28 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 13 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/driver/scheduler_commands.cpp | 26 | ||||
| -rw-r--r-- | yt/yt/client/driver/scheduler_commands.h | 1 | ||||
| -rw-r--r-- | yt/yt/client/federated/client.cpp | 2 | ||||
| -rw-r--r-- | yt/yt/client/hedging/hedging.cpp | 2 | ||||
| -rw-r--r-- | yt/yt/client/unittests/mock/client.h | 2 | ||||
| -rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 4 |
11 files changed, 118 insertions, 11 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index 3369b70c208..f977dc06c00 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -510,7 +510,7 @@ public: const TGetJobSpecOptions& options), (jobId, options)) - DELEGATE_METHOD(TFuture<TSharedRef>, GetJobStderr, ( + DELEGATE_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, ( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, const TGetJobStderrOptions& options), diff --git a/yt/yt/client/api/operation_client.cpp b/yt/yt/client/api/operation_client.cpp index acc0dfe7bb6..4e1816a6e7c 100644 --- a/yt/yt/client/api/operation_client.cpp +++ b/yt/yt/client/api/operation_client.cpp @@ -227,5 +227,52 @@ void TListOperationsAccessFilter::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +TGetJobStderrResponse TGetJobStderrResponse::MakeJobStderr(const TSharedRef& data, const TGetJobStderrOptions& options) +{ + auto totalSize = std::ssize(data); + auto endOffset = totalSize; + auto offset = options.Offset.value_or(0); + auto limit = options.Limit.value_or(0); + + if (!offset && !limit) { + return { + .Data = data, + .TotalSize = totalSize, + .EndOffset = endOffset, + }; + }; + + size_t firstPos = 0; + if (offset > 0) { + firstPos = offset; + } + + if (firstPos >= data.size()) { + return { + .Data = TSharedRef{}, + .TotalSize = totalSize, + .EndOffset = 0, + }; + } else { + auto lastPos = firstPos; + if (limit > 0) { + lastPos += limit; + } else { + lastPos += data.size(); + } + if (lastPos > data.size()) { + lastPos = data.size(); + } + const auto dataCut = data.Slice(firstPos, lastPos); + return { + .Data = dataCut, + .TotalSize = totalSize, + .EndOffset = limit ? static_cast<i64>(firstPos + dataCut.size()) : endOffset, + }; + } +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NApi diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h index 08fed371f18..48687497da5 100644 --- a/yt/yt/client/api/operation_client.h +++ b/yt/yt/client/api/operation_client.h @@ -84,7 +84,10 @@ struct TGetJobSpecOptions struct TGetJobStderrOptions : public TTimeoutOptions , public TMasterReadOptions -{ }; +{ + std::optional<i64> Limit; + std::optional<i64> Offset; +}; struct TGetJobFailContextOptions : public TTimeoutOptions @@ -361,6 +364,27 @@ struct TListJobsResult std::vector<TError> Errors; }; +struct TGetJobStderrResponse +{ + // 0 + // |<- stderr full log ->| + // [ [<- Data ->] ] + // |<- request.Offset + // |<- request.Limit ->| + // |<- EndOffset + // |<- TotalSize + + TSharedRef Data; + + // Total current stderr size. + i64 TotalSize = 0; + + // Index of the last byte of the result in the full stderr. + i64 EndOffset = 0; + + static TGetJobStderrResponse MakeJobStderr(const TSharedRef& data, const TGetJobStderrOptions& options = {}); +}; + //////////////////////////////////////////////////////////////////////////////// struct IOperationClient @@ -414,7 +438,7 @@ struct IOperationClient NJobTrackerClient::TJobId jobId, const TGetJobSpecOptions& options = {}) = 0; - virtual TFuture<TSharedRef> GetJobStderr( + virtual TFuture<TGetJobStderrResponse> GetJobStderr( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, const TGetJobStderrOptions& options = {}) = 0; diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 5f3ee6a1eb0..454d6fd16b5 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -1294,7 +1294,7 @@ TFuture<TYsonString> TClient::GetJobSpec( })); } -TFuture<TSharedRef> TClient::GetJobStderr( +TFuture<TGetJobStderrResponse> TClient::GetJobStderr( const TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, const TGetJobStderrOptions& options) @@ -1306,10 +1306,17 @@ TFuture<TSharedRef> TClient::GetJobStderr( NScheduler::ToProto(req, operationIdOrAlias); ToProto(req->mutable_job_id(), jobId); + if (options.Limit) { + req->set_limit(*options.Limit); + } + if (options.Offset) { + req->set_offset(*options.Offset); + } - return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspGetJobStderrPtr& rsp) { + return req->Invoke().Apply(BIND([req = req](const TApiServiceProxy::TRspGetJobStderrPtr& rsp) { YT_VERIFY(rsp->Attachments().size() == 1); - return rsp->Attachments().front(); + TGetJobStderrOptions options{.Limit = req->limit(), .Offset = req->offset()}; + return TGetJobStderrResponse::MakeJobStderr(rsp->Attachments().front(), options); })); } diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index b868d7493fc..9619084ec99 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -273,7 +273,7 @@ public: NJobTrackerClient::TJobId jobId, const NApi::TGetJobSpecOptions& options) override; - TFuture<TSharedRef> GetJobStderr( + TFuture<TGetJobStderrResponse> GetJobStderr( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, const NApi::TGetJobStderrOptions& options) override; diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp index be52ea9770b..9cb7396725f 100644 --- a/yt/yt/client/driver/scheduler_commands.cpp +++ b/yt/yt/client/driver/scheduler_commands.cpp @@ -136,6 +136,25 @@ void TGetJobSpecCommand::DoExecute(ICommandContextPtr context) void TGetJobStderrCommand::Register(TRegistrar registrar) { registrar.Parameter("job_id", &TThis::JobId); + + registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + "limit", + [] (TThis* command) -> auto& { + return command->Options.Limit; + }) + .Optional(/*init*/ true); + + registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + "offset", + [] (TThis* command) -> auto& { + return command->Options.Offset; + }) + .Optional(/*init*/ true); +} + +bool TGetJobStderrCommand::HasResponseParameters() const +{ + return true; } void TGetJobStderrCommand::DoExecute(ICommandContextPtr context) @@ -143,8 +162,13 @@ void TGetJobStderrCommand::DoExecute(ICommandContextPtr context) auto result = WaitFor(context->GetClient()->GetJobStderr(OperationIdOrAlias, JobId, Options)) .ValueOrThrow(); + ProduceResponseParameters(context, [&] (NYson::IYsonConsumer* consumer) { + BuildYsonMapFragmentFluently(consumer) + .Item("total_size").Value(result.TotalSize) + .Item("end_offset").Value(result.EndOffset); + }); auto output = context->Request().OutputStream; - WaitFor(output->Write(result)) + WaitFor(output->Write(result.Data)) .ThrowOnError(); } diff --git a/yt/yt/client/driver/scheduler_commands.h b/yt/yt/client/driver/scheduler_commands.h index 349b16d7505..280d55e331b 100644 --- a/yt/yt/client/driver/scheduler_commands.h +++ b/yt/yt/client/driver/scheduler_commands.h @@ -132,6 +132,7 @@ private: NJobTrackerClient::TJobId JobId; void DoExecute(ICommandContextPtr context) override; + bool HasResponseParameters() const override; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 4f221deedb9..fe1992162bf 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -406,7 +406,7 @@ public: UNIMPLEMENTED_METHOD(TFuture<NConcurrency::IAsyncZeroCopyInputStreamPtr>, GetJobInput, (NJobTrackerClient::TJobId, const TGetJobInputOptions&)); UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&)); UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&)); - UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&)); UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&)); UNIMPLEMENTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&)); UNIMPLEMENTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index ca5dee69170..52c4722b279 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -165,7 +165,7 @@ public: UNSUPPORTED_METHOD(TFuture<NConcurrency::IAsyncZeroCopyInputStreamPtr>, GetJobInput, (NJobTrackerClient::TJobId, const TGetJobInputOptions&)); UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&)); UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&)); - UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&)); + UNSUPPORTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&)); UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&)); UNSUPPORTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&)); UNSUPPORTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&)); diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index f6290bded79..b2b1dae28e5 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -620,7 +620,7 @@ public: const TGetJobSpecOptions& options), (override)); - MOCK_METHOD(TFuture<TSharedRef>, GetJobStderr, ( + MOCK_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, ( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, const TGetJobStderrOptions& options), diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 7b41d983863..f9606ee6bd3 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -2467,10 +2467,14 @@ message TReqGetJobStderr string operation_alias = 3; } required NYT.NProto.TGuid job_id = 2; + optional int64 limit = 4; + optional int64 offset = 5; } message TRspGetJobStderr { + optional int64 end_offset = 1; + optional int64 total_size = 2; } //////////////////////////////////////////////////////////////////////////////// |
