diff options
author | bystrovserg <bystrovserg@yandex-team.com> | 2025-04-09 16:52:06 +0300 |
---|---|---|
committer | bystrovserg <bystrovserg@yandex-team.com> | 2025-04-09 17:08:12 +0300 |
commit | 439db1cfe398621d7fa0ef4f2fa59ff4421eda0c (patch) | |
tree | 50f734a36929455b1a598e3b6dbc06d7fe93787c | |
parent | 1488933736b20bc05fb1b3382aa7939ec75dd489 (diff) | |
download | ydb-439db1cfe398621d7fa0ef4f2fa59ff4421eda0c.tar.gz |
YT-6812, YT-24298: Introducing attributes and events to list_jobs
commit_hash:9395b2e879d30ba83d17dcc8c94af4618a13d2a7
-rw-r--r-- | yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp | 22 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/operation.h | 47 | ||||
-rw-r--r-- | yt/yt/client/api/operation_client.h | 7 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 3 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/helpers.cpp | 20 | ||||
-rw-r--r-- | yt/yt/client/driver/scheduler_commands.cpp | 7 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 4 |
7 files changed, 93 insertions, 17 deletions
diff --git a/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp index 8487a1de8f2..14ab0e1db85 100644 --- a/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp @@ -49,18 +49,10 @@ static void SetPathParam(TNode* node, const TString& pathPrefix, const TYPath& p (*node)["path"] = std::move(updatedPath); } -static TNode SerializeAttributeFilter(const TAttributeFilter& attributeFilter) +template <typename TFilter> +static TNode SerializeAttributeFilter(const TFilter& attributeFilter) { - TNode result = TNode::CreateList(); - for (const auto& attribute : attributeFilter.Attributes_) { - result.Add(attribute); - } - return result; -} - -static TNode SerializeAttributeFilter(const TOperationAttributeFilter& attributeFilter) -{ - TNode result = TNode::CreateList(); + auto result = TNode::CreateList(); for (const auto& attribute : attributeFilter.Attributes_) { result.Add(ToString(attribute)); } @@ -513,11 +505,14 @@ TNode SerializeParamsForUpdateOperationParameters( TNode SerializeParamsForGetJob( const TOperationId& operationId, const TJobId& jobId, - const TGetJobOptions& /* options */) + const TGetJobOptions& options) { TNode result; SetOperationIdParam(&result, operationId); result["job_id"] = GetGuidAsString(jobId); + if (options.AttributeFilter_) { + result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_); + } return result; } @@ -597,6 +592,9 @@ TNode SerializeParamsForListJobs( if (options.IncludeControllerAgent_) { result["include_controller_agent"] = *options.IncludeControllerAgent_; } + if (options.AttributeFilter_) { + result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_); + } return result; } diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h index 1f327b4b5d6..ee3e3b1fa11 100644 --- a/yt/cpp/mapreduce/interface/operation.h +++ b/yt/cpp/mapreduce/interface/operation.h @@ -2858,6 +2858,45 @@ enum class EJobSortDirection : int }; /// +/// @brief Attributes to request for a job. +enum class EJobAttribute : int +{ + Id /* "id" */, + Type /* "type" */, + State /* "state" */, + Address /* "address" */, + TaskName /* "task_name" */, + StartTime /* "start_time" */, + FinishTime /* "finish_time" */, + Progress /* "progress" */, + StderrSize /* "stderr_size" */, + Error /* "error" */, + Result /* "result" */, + BriefStatistics /* "brief_statistics" */, + InputPaths /* "input_paths" */, + CoreInfos /* "core_infos" */, +}; + +/// +/// @brief A class that specifies which attributes to request when using @ref NYT::IClient::GetJob or @ref NYT::IClient::ListJobs. +struct TJobAttributeFilter +{ + /// @cond Doxygen_Suppress + using TSelf = TJobAttributeFilter; + /// @endcond + + THashSet<EJobAttribute> Attributes_; + + /// + /// @brief Add attribute to the filter. Calls are supposed to be chained. + TSelf& Add(EJobAttribute attribute) + { + Attributes_.insert(attribute); + return *this; + } +}; + +/// /// @brief Options for @ref NYT::IClient::ListJobs. /// /// @see https://ytsaurus.tech/docs/en/api/commands.html#list_jobs @@ -2921,6 +2960,10 @@ struct TListJobsOptions /// @brief Search for jobs with filters encoded in token. FLUENT_FIELD_OPTION(TString, ContinuationToken); + /// + /// @brief Return only requested job attributes. + FLUENT_FIELD_OPTION(TJobAttributeFilter, AttributeFilter); + /// @} /// @@ -3065,6 +3108,10 @@ struct TGetJobOptions /// @cond Doxygen_Suppress using TSelf = TGetJobOptions; /// @endcond + + /// + /// @brief Return only requested job attributes. + FLUENT_FIELD_OPTION(TJobAttributeFilter, AttributeFilter); }; /// diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h index 8e672c80e5f..7b67508c481 100644 --- a/yt/yt/client/api/operation_client.h +++ b/yt/yt/client/api/operation_client.h @@ -217,6 +217,8 @@ struct TListJobsOptions std::optional<TInstant> FromTime; std::optional<TInstant> ToTime; + std::optional<THashSet<TString>> Attributes; + std::optional<TString> ContinuationToken; TDuration RunningJobsLookbehindPeriod = TDuration::Max(); @@ -394,9 +396,12 @@ struct TJob NYson::TYsonString ArchiveFeatures; std::optional<std::string> OperationIncarnation; std::optional<NScheduler::TAllocationId> AllocationId; - std::optional<bool> IsStale; + // Service flags which are used to compute "is_stale" attribute in "list_jobs". + bool PresentInArchive = false; + bool PresentInControllerAgent = false; + std::optional<NJobTrackerClient::EJobState> GetState() const; }; diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index a4b77dd1832..fdb29f948b2 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -1482,6 +1482,9 @@ TFuture<TListJobsResult> TClient::ListJobs( if (options.ContinuationToken) { req->set_continuation_token(*options.ContinuationToken); } + if (options.Attributes) { + ToProto(req->mutable_attributes()->mutable_keys(), *options.Attributes); + } req->set_sort_field(static_cast<NProto::EJobSortField>(options.SortField)); req->set_sort_order(static_cast<NProto::EJobSortDirection>(options.SortOrder)); diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 8e1fdd366ec..ba36a57d88a 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -886,6 +886,12 @@ void ToProto(NProto::TJob* protoJob, const NApi::TJob& job) YT_OPTIONAL_TO_PROTO(protoJob, monitoring_descriptor, job.MonitoringDescriptor); YT_OPTIONAL_SET_PROTO(protoJob, operation_incarnation, job.OperationIncarnation); YT_OPTIONAL_TO_PROTO(protoJob, allocation_id, job.AllocationId); + if (job.Events) { + protoJob->set_events(job.Events.ToString()); + } + if (job.Statistics) { + protoJob->set_statistics(job.Statistics.ToString()); + } } void FromProto(NApi::TJob* job, const NProto::TJob& protoJob) @@ -916,8 +922,6 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob) job->FailContextSize = YT_OPTIONAL_FROM_PROTO(protoJob, fail_context_size); if (protoJob.has_has_spec()) { job->HasSpec = protoJob.has_spec(); - } else { - job->HasSpec = false; } if (protoJob.has_error()) { job->Error = TYsonString(protoJob.error()); @@ -956,8 +960,6 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob) } if (protoJob.has_has_competitors()) { job->HasCompetitors = protoJob.has_competitors(); - } else { - job->HasCompetitors = false; } job->HasProbingCompetitors = YT_OPTIONAL_FROM_PROTO(protoJob, has_probing_competitors); job->IsStale = YT_OPTIONAL_FROM_PROTO(protoJob, is_stale); @@ -966,6 +968,11 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob) } else { job->ExecAttributes = TYsonString(); } + if (protoJob.has_events()) { + job->Events = TYsonString(protoJob.events()); + } else { + job->Events = TYsonString(); + } job->TaskName = YT_OPTIONAL_FROM_PROTO(protoJob, task_name); job->PoolTree = YT_OPTIONAL_FROM_PROTO(protoJob, pool_tree); job->Pool = YT_OPTIONAL_FROM_PROTO(protoJob, pool); @@ -982,6 +989,11 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob) } else { job->AllocationId = {}; } + if (protoJob.has_statistics()) { + job->Statistics = TYsonString(protoJob.statistics()); + } else { + job->Statistics = TYsonString(); + } } void ToProto( diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp index 6710ee43e28..8169d7aea30 100644 --- a/yt/yt/client/driver/scheduler_commands.cpp +++ b/yt/yt/client/driver/scheduler_commands.cpp @@ -585,6 +585,13 @@ void TListJobsCommand::Register(TRegistrar registrar) return command->Options.RunningJobsLookbehindPeriod; }) .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<THashSet<TString>>>( + "attributes", + [] (TThis* command) -> auto& { + return command->Options.Attributes; + }) + .Optional(/*init*/ false); } void TListJobsCommand::DoExecute(ICommandContextPtr context) diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index c1f922829e3..25551ed9632 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -2479,6 +2479,8 @@ message TReqListJobs optional bool with_interruption_info = 26; + optional NYT.NYTree.NProto.TAttributeFilter attributes = 27; + optional TMasterReadOptions master_read_options = 102; } @@ -3216,6 +3218,8 @@ message TJob optional string operation_incarnation = 31; optional NYT.NProto.TGuid allocation_id = 32; optional NNodeTrackerClient.NProto.TAddressMap addresses = 33; + optional bytes events = 34; // YSON + optional bytes statistics = 35; // YSON } message TListJobsStatistics |