diff options
| -rw-r--r-- | yt/cpp/mapreduce/interface/operation.h | 12 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp | 9 | ||||
| -rw-r--r-- | yt/yt/client/api/operation_client.cpp | 101 | ||||
| -rw-r--r-- | yt/yt/client/api/operation_client.h | 53 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 9 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/helpers.cpp | 8 | ||||
| -rw-r--r-- | yt/yt/client/driver/scheduler_commands.cpp | 16 | ||||
| -rw-r--r-- | yt/yt/client/ya.make | 1 | ||||
| -rw-r--r-- | yt/yt/core/ytree/yson_struct-inl.h | 13 | ||||
| -rw-r--r-- | yt/yt/core/ytree/yson_struct.h | 3 | ||||
| -rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 7 |
11 files changed, 222 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h index 76c4eabdfe6..a87ddc30b4a 100644 --- a/yt/cpp/mapreduce/interface/operation.h +++ b/yt/cpp/mapreduce/interface/operation.h @@ -2855,6 +2855,18 @@ struct TListJobsOptions /// @brief Return only jobs with monitoring descriptor. FLUENT_FIELD_OPTION(bool, WithMonitoringDescriptor); + /// + /// @brief Search for jobs with start time >= `FromTime`. + FLUENT_FIELD_OPTION(TInstant, FromTime); + + /// + /// @brief Search for jobs with start time <= `ToTime`. + FLUENT_FIELD_OPTION(TInstant, ToTime); + + /// + /// @brief Search for jobs with filters encoded in token. + FLUENT_FIELD_OPTION(TString, ContinuationToken); + /// @} /// diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index bebc59ec917..3a78ed3319f 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -562,6 +562,15 @@ TNode SerializeParamsForListJobs( if (options.WithMonitoringDescriptor_) { result["with_monitoring_descriptor"] = *options.WithMonitoringDescriptor_; } + if (options.FromTime_) { + result["from_time"] = ToString(options.FromTime_); + } + if (options.ToTime_) { + result["to_time"] = ToString(options.ToTime_); + } + if (options.ContinuationToken_) { + result["continuation_token"] = *options.ContinuationToken_; + } if (options.SortField_) { result["sort_field"] = ToString(*options.SortField_); diff --git a/yt/yt/client/api/operation_client.cpp b/yt/yt/client/api/operation_client.cpp index 8f8dbb355b0..07253e5ee52 100644 --- a/yt/yt/client/api/operation_client.cpp +++ b/yt/yt/client/api/operation_client.cpp @@ -3,14 +3,115 @@ #include <yt/yt/client/job_tracker_client/helpers.h> #include <yt/yt/core/ytree/fluent.h> +#include <yt/yt/core/ytree/yson_struct.h> + +#include <library/cpp/string_utils/base64/base64.h> namespace NYT::NApi { using namespace NYTree; using namespace NJobTrackerClient; +using namespace NYson; //////////////////////////////////////////////////////////////////////////////// +void TListJobsContinuationTokenSerializer::Register(TRegistrar registrar) +{ + registrar.ExternalClassParameter("version", &TThat::Version) + .Default(0) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("job_competition_id", &TListJobsOptions::JobCompetitionId) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("type", &TThat::Type) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("state", &TThat::State) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("address", &TThat::Address) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_stderr", &TThat::WithStderr) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_fail_context", &TThat::WithFailContext) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_spec", &TThat::WithSpec) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_competitors", &TThat::WithCompetitors) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_monitoring_gescriptor", &TThat::WithMonitoringDescriptor) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("task_name", &TThat::TaskName) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("running_jobs_lookbehind_period", &TThat::RunningJobsLookbehindPeriod) + .Default(TDuration::Max()) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("sort_field", &TThat::SortField) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("sort_order", &TThat::SortOrder) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("offset", &TThat::Offset) + .Default(0) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("limit", &TThat::Limit) + .Default(1000) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("include_archive", &TThat::IncludeArchive) + .Default(false) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("include_cypress", &TThat::IncludeCypress) + .Default(false) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("include_controller_agent", &TThat::IncludeControllerAgent) + .Default(false) + .DontSerializeDefault(); +} + +TString EncodeNewToken(TListJobsOptions&& options, int jobCount) +{ + options.Offset += jobCount; + options.ContinuationToken.reset(); + + TListJobsContinuationToken token; + static_cast<TListJobsOptions&>(token) = std::move(options); + + auto optionsYson = ConvertToYsonString(token); + return Base64Encode(optionsYson.ToString()); +} + +TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken) +{ + auto optionsYson = TYsonString(Base64StrictDecode(continuationToken)); + return ConvertTo<TListJobsContinuationToken>(optionsYson); +} + void Serialize( const TOperation& operation, NYson::IYsonConsumer* consumer, diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h index 19cea82ad71..ae11054bd18 100644 --- a/yt/yt/client/api/operation_client.h +++ b/yt/yt/client/api/operation_client.h @@ -166,16 +166,16 @@ struct TPollJobShellResponse }; DEFINE_ENUM(EJobSortField, - ((None) (0)) - ((Type) (1)) - ((State) (2)) - ((StartTime) (3)) - ((FinishTime) (4)) - ((Address) (5)) - ((Duration) (6)) - ((Progress) (7)) - ((Id) (8)) - ((TaskName) (9)) + ((None) (0)) + ((Type) (1)) + ((State) (2)) + ((StartTime) (3)) + ((FinishTime) (4)) + ((Address) (5)) + ((Duration) (6)) + ((Progress) (7)) + ((Id) (8)) + ((TaskName) (9)) ); DEFINE_ENUM(EJobSortDirection, @@ -206,6 +206,11 @@ struct TListJobsOptions std::optional<bool> WithMonitoringDescriptor; std::optional<TString> TaskName; + std::optional<TInstant> FromTime; + std::optional<TInstant> ToTime; + + std::optional<TString> ContinuationToken; + TDuration RunningJobsLookbehindPeriod = TDuration::Max(); EJobSortField SortField = EJobSortField::None; @@ -221,6 +226,32 @@ struct TListJobsOptions EDataSource DataSource = EDataSource::Auto; }; +struct TListJobsContinuationToken + : public TListJobsOptions +{ + int Version = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TListJobsContinuationTokenSerializer + : public virtual NYTree::TExternalizedYsonStruct +{ +public: + REGISTER_EXTERNALIZED_YSON_STRUCT(TListJobsContinuationToken, TListJobsContinuationTokenSerializer); + + static void Register(TRegistrar registrar); +}; + +ASSIGN_EXTERNAL_YSON_SERIALIZER(TListJobsContinuationToken, TListJobsContinuationTokenSerializer); + +//////////////////////////////////////////////////////////////////////////////// + +TString EncodeNewToken(TListJobsOptions&& options, int jobCount); +TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken); + +//////////////////////////////////////////////////////////////////////////////// + struct TAbandonJobOptions : public TTimeoutOptions { }; @@ -387,6 +418,8 @@ struct TListJobsResult TListJobsStatistics Statistics; std::vector<TError> Errors; + + std::optional<TString> ContinuationToken; }; struct TGetJobStderrResponse diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index a763772b7e5..f7397d2f373 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -1486,6 +1486,15 @@ TFuture<TListJobsResult> TClient::ListJobs( if (options.TaskName) { req->set_task_name(*options.TaskName); } + if (options.FromTime) { + req->set_from_time(NYT::ToProto(*options.FromTime)); + } + if (options.ToTime) { + req->set_to_time(NYT::ToProto(*options.ToTime)); + } + if (options.ContinuationToken) { + req->set_continuation_token(*options.ContinuationToken); + } req->set_sort_field(static_cast<NProto::EJobSortField>(options.SortField)); req->set_sort_order(static_cast<NProto::EJobSortDirection>(options.SortOrder)); diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 88c12aa793a..dba32868407 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -407,6 +407,9 @@ void ToProto( if (result.ArchiveJobCount) { proto->set_archive_job_count(*result.ArchiveJobCount); } + if (result.ContinuationToken) { + proto->set_continuation_token(*result.ContinuationToken); + } ToProto(proto->mutable_statistics(), result.Statistics); ToProto(proto->mutable_errors(), result.Errors); @@ -433,6 +436,11 @@ void FromProto( } else { result->ArchiveJobCount.reset(); } + if (proto.has_continuation_token()) { + result->ContinuationToken = proto.continuation_token(); + } else { + result->ContinuationToken.reset(); + } FromProto(&result->Statistics, proto.statistics()); FromProto(&result->Errors, proto.errors()); diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp index 53f82c7bf27..d64f69d3336 100644 --- a/yt/yt/client/driver/scheduler_commands.cpp +++ b/yt/yt/client/driver/scheduler_commands.cpp @@ -490,6 +490,21 @@ void TListJobsCommand::Register(TRegistrar registrar) [] (TThis* command) -> auto& { return command->Options.WithMonitoringDescriptor; }) .Optional(/*init*/ false); + registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>( + "from_time", + [] (TThis* command) -> auto& { return command->Options.FromTime; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>( + "to_time", + [] (TThis* command) -> auto& { return command->Options.ToTime; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<TString>>( + "continuation_token", + [] (TThis* command) -> auto& { return command->Options.ContinuationToken; }) + .Optional(/*init*/ false); + registrar.ParameterWithUniversalAccessor<TJobId>( "job_competition_id", [] (TThis* command) -> auto& { return command->Options.JobCompetitionId; }) @@ -593,6 +608,7 @@ void TListJobsCommand::DoExecute(ICommandContextPtr context) } }) .Item("errors").Value(result.Errors) + .Item("continuation_token").Value(result.ContinuationToken) .EndMap()); } diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index fef0c109381..2b384fe8189 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -220,6 +220,7 @@ PEERDIR( yt/yt/library/quantile_digest yt/yt_proto/yt/client library/cpp/json + library/cpp/string_utils/base64 contrib/libs/pfr ) diff --git a/yt/yt/core/ytree/yson_struct-inl.h b/yt/yt/core/ytree/yson_struct-inl.h index 92bd2d65fae..4471056588d 100644 --- a/yt/yt/core/ytree/yson_struct-inl.h +++ b/yt/yt/core/ytree/yson_struct-inl.h @@ -249,6 +249,19 @@ void TYsonStructRegistrar<TStruct>::ExternalPostprocessor(TExternalPostprocessor } template <class TStruct> +template <class TBase, class TValue> +TYsonStructParameter<TValue>& TYsonStructRegistrar<TStruct>::ExternalBaseClassParameter(const TString& key, TValue(TBase::*field)) +{ + static_assert(std::derived_from<TStruct, TExternalizedYsonStruct>); + static_assert(std::derived_from<typename TStruct::TExternal, TBase>); + auto universalAccessor = [field] (TStruct* serializer) -> auto& { + return serializer->That_->*field; + }; + + return ParameterWithUniversalAccessor<TValue>(key, universalAccessor); +} + +template <class TStruct> void TYsonStructRegistrar<TStruct>::UnrecognizedStrategy(EUnrecognizedStrategy strategy) { Meta_->SetUnrecognizedStrategy(strategy); diff --git a/yt/yt/core/ytree/yson_struct.h b/yt/yt/core/ytree/yson_struct.h index ae23668f0c5..097addf65a6 100644 --- a/yt/yt/core/ytree/yson_struct.h +++ b/yt/yt/core/ytree/yson_struct.h @@ -313,6 +313,9 @@ public: // requires std::derived_from<TStruct, TExternalizedYsonStruct<TExternal, TStruct>> TYsonStructParameter<TValue>& ExternalClassParameter(const TString& key, TValue(TExternal::*field)); + template <class TBase, class TValue> + TYsonStructParameter<TValue>& ExternalBaseClassParameter(const TString& key, TValue(TBase::*field)); + template <class TExternalPreprocessor> // requires (CInvocable<TExternalPreprocessor, void(typename TStruct::TExternal*)>) void ExternalPreprocessor(TExternalPreprocessor preprocessor); diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 00d0104d4a2..2ea1b46e2a6 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -2409,6 +2409,11 @@ message TReqListJobs optional bool with_monitoring_descriptor = 21; + optional uint64 from_time = 22; // TInstant + optional uint64 to_time = 23; // TInstant + + optional string continuation_token = 24; + optional TMasterReadOptions master_read_options = 102; } @@ -3179,6 +3184,8 @@ message TListJobsResult required TListJobsStatistics statistics = 5; repeated NYT.NProto.TError errors = 6; + + optional string continuation_token = 7; } message TJobTraceEvent |
