aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbystrovserg <bystrovserg@yandex-team.com>2025-04-09 16:52:06 +0300
committerbystrovserg <bystrovserg@yandex-team.com>2025-04-09 17:08:12 +0300
commit439db1cfe398621d7fa0ef4f2fa59ff4421eda0c (patch)
tree50f734a36929455b1a598e3b6dbc06d7fe93787c
parent1488933736b20bc05fb1b3382aa7939ec75dd489 (diff)
downloadydb-439db1cfe398621d7fa0ef4f2fa59ff4421eda0c.tar.gz
YT-6812, YT-24298: Introducing attributes and events to list_jobs
commit_hash:9395b2e879d30ba83d17dcc8c94af4618a13d2a7
-rw-r--r--yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp22
-rw-r--r--yt/cpp/mapreduce/interface/operation.h47
-rw-r--r--yt/yt/client/api/operation_client.h7
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp3
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp20
-rw-r--r--yt/yt/client/driver/scheduler_commands.cpp7
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto4
7 files changed, 93 insertions, 17 deletions
diff --git a/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp
index 8487a1de8f2..14ab0e1db85 100644
--- a/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp
@@ -49,18 +49,10 @@ static void SetPathParam(TNode* node, const TString& pathPrefix, const TYPath& p
(*node)["path"] = std::move(updatedPath);
}
-static TNode SerializeAttributeFilter(const TAttributeFilter& attributeFilter)
+template <typename TFilter>
+static TNode SerializeAttributeFilter(const TFilter& attributeFilter)
{
- TNode result = TNode::CreateList();
- for (const auto& attribute : attributeFilter.Attributes_) {
- result.Add(attribute);
- }
- return result;
-}
-
-static TNode SerializeAttributeFilter(const TOperationAttributeFilter& attributeFilter)
-{
- TNode result = TNode::CreateList();
+ auto result = TNode::CreateList();
for (const auto& attribute : attributeFilter.Attributes_) {
result.Add(ToString(attribute));
}
@@ -513,11 +505,14 @@ TNode SerializeParamsForUpdateOperationParameters(
TNode SerializeParamsForGetJob(
const TOperationId& operationId,
const TJobId& jobId,
- const TGetJobOptions& /* options */)
+ const TGetJobOptions& options)
{
TNode result;
SetOperationIdParam(&result, operationId);
result["job_id"] = GetGuidAsString(jobId);
+ if (options.AttributeFilter_) {
+ result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_);
+ }
return result;
}
@@ -597,6 +592,9 @@ TNode SerializeParamsForListJobs(
if (options.IncludeControllerAgent_) {
result["include_controller_agent"] = *options.IncludeControllerAgent_;
}
+ if (options.AttributeFilter_) {
+ result["attributes"] = SerializeAttributeFilter(*options.AttributeFilter_);
+ }
return result;
}
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
index 1f327b4b5d6..ee3e3b1fa11 100644
--- a/yt/cpp/mapreduce/interface/operation.h
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -2858,6 +2858,45 @@ enum class EJobSortDirection : int
};
///
+/// @brief Attributes to request for a job.
+enum class EJobAttribute : int
+{
+ Id /* "id" */,
+ Type /* "type" */,
+ State /* "state" */,
+ Address /* "address" */,
+ TaskName /* "task_name" */,
+ StartTime /* "start_time" */,
+ FinishTime /* "finish_time" */,
+ Progress /* "progress" */,
+ StderrSize /* "stderr_size" */,
+ Error /* "error" */,
+ Result /* "result" */,
+ BriefStatistics /* "brief_statistics" */,
+ InputPaths /* "input_paths" */,
+ CoreInfos /* "core_infos" */,
+};
+
+///
+/// @brief A class that specifies which attributes to request when using @ref NYT::IClient::GetJob or @ref NYT::IClient::ListJobs.
+struct TJobAttributeFilter
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TJobAttributeFilter;
+ /// @endcond
+
+ THashSet<EJobAttribute> Attributes_;
+
+ ///
+ /// @brief Add attribute to the filter. Calls are supposed to be chained.
+ TSelf& Add(EJobAttribute attribute)
+ {
+ Attributes_.insert(attribute);
+ return *this;
+ }
+};
+
+///
/// @brief Options for @ref NYT::IClient::ListJobs.
///
/// @see https://ytsaurus.tech/docs/en/api/commands.html#list_jobs
@@ -2921,6 +2960,10 @@ struct TListJobsOptions
/// @brief Search for jobs with filters encoded in token.
FLUENT_FIELD_OPTION(TString, ContinuationToken);
+ ///
+ /// @brief Return only requested job attributes.
+ FLUENT_FIELD_OPTION(TJobAttributeFilter, AttributeFilter);
+
/// @}
///
@@ -3065,6 +3108,10 @@ struct TGetJobOptions
/// @cond Doxygen_Suppress
using TSelf = TGetJobOptions;
/// @endcond
+
+ ///
+ /// @brief Return only requested job attributes.
+ FLUENT_FIELD_OPTION(TJobAttributeFilter, AttributeFilter);
};
///
diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h
index 8e672c80e5f..7b67508c481 100644
--- a/yt/yt/client/api/operation_client.h
+++ b/yt/yt/client/api/operation_client.h
@@ -217,6 +217,8 @@ struct TListJobsOptions
std::optional<TInstant> FromTime;
std::optional<TInstant> ToTime;
+ std::optional<THashSet<TString>> Attributes;
+
std::optional<TString> ContinuationToken;
TDuration RunningJobsLookbehindPeriod = TDuration::Max();
@@ -394,9 +396,12 @@ struct TJob
NYson::TYsonString ArchiveFeatures;
std::optional<std::string> OperationIncarnation;
std::optional<NScheduler::TAllocationId> AllocationId;
-
std::optional<bool> IsStale;
+ // Service flags which are used to compute "is_stale" attribute in "list_jobs".
+ bool PresentInArchive = false;
+ bool PresentInControllerAgent = false;
+
std::optional<NJobTrackerClient::EJobState> GetState() const;
};
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index a4b77dd1832..fdb29f948b2 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -1482,6 +1482,9 @@ TFuture<TListJobsResult> TClient::ListJobs(
if (options.ContinuationToken) {
req->set_continuation_token(*options.ContinuationToken);
}
+ if (options.Attributes) {
+ ToProto(req->mutable_attributes()->mutable_keys(), *options.Attributes);
+ }
req->set_sort_field(static_cast<NProto::EJobSortField>(options.SortField));
req->set_sort_order(static_cast<NProto::EJobSortDirection>(options.SortOrder));
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index 8e1fdd366ec..ba36a57d88a 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -886,6 +886,12 @@ void ToProto(NProto::TJob* protoJob, const NApi::TJob& job)
YT_OPTIONAL_TO_PROTO(protoJob, monitoring_descriptor, job.MonitoringDescriptor);
YT_OPTIONAL_SET_PROTO(protoJob, operation_incarnation, job.OperationIncarnation);
YT_OPTIONAL_TO_PROTO(protoJob, allocation_id, job.AllocationId);
+ if (job.Events) {
+ protoJob->set_events(job.Events.ToString());
+ }
+ if (job.Statistics) {
+ protoJob->set_statistics(job.Statistics.ToString());
+ }
}
void FromProto(NApi::TJob* job, const NProto::TJob& protoJob)
@@ -916,8 +922,6 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob)
job->FailContextSize = YT_OPTIONAL_FROM_PROTO(protoJob, fail_context_size);
if (protoJob.has_has_spec()) {
job->HasSpec = protoJob.has_spec();
- } else {
- job->HasSpec = false;
}
if (protoJob.has_error()) {
job->Error = TYsonString(protoJob.error());
@@ -956,8 +960,6 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob)
}
if (protoJob.has_has_competitors()) {
job->HasCompetitors = protoJob.has_competitors();
- } else {
- job->HasCompetitors = false;
}
job->HasProbingCompetitors = YT_OPTIONAL_FROM_PROTO(protoJob, has_probing_competitors);
job->IsStale = YT_OPTIONAL_FROM_PROTO(protoJob, is_stale);
@@ -966,6 +968,11 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob)
} else {
job->ExecAttributes = TYsonString();
}
+ if (protoJob.has_events()) {
+ job->Events = TYsonString(protoJob.events());
+ } else {
+ job->Events = TYsonString();
+ }
job->TaskName = YT_OPTIONAL_FROM_PROTO(protoJob, task_name);
job->PoolTree = YT_OPTIONAL_FROM_PROTO(protoJob, pool_tree);
job->Pool = YT_OPTIONAL_FROM_PROTO(protoJob, pool);
@@ -982,6 +989,11 @@ void FromProto(NApi::TJob* job, const NProto::TJob& protoJob)
} else {
job->AllocationId = {};
}
+ if (protoJob.has_statistics()) {
+ job->Statistics = TYsonString(protoJob.statistics());
+ } else {
+ job->Statistics = TYsonString();
+ }
}
void ToProto(
diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp
index 6710ee43e28..8169d7aea30 100644
--- a/yt/yt/client/driver/scheduler_commands.cpp
+++ b/yt/yt/client/driver/scheduler_commands.cpp
@@ -585,6 +585,13 @@ void TListJobsCommand::Register(TRegistrar registrar)
return command->Options.RunningJobsLookbehindPeriod;
})
.Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<THashSet<TString>>>(
+ "attributes",
+ [] (TThis* command) -> auto& {
+ return command->Options.Attributes;
+ })
+ .Optional(/*init*/ false);
}
void TListJobsCommand::DoExecute(ICommandContextPtr context)
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index c1f922829e3..25551ed9632 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -2479,6 +2479,8 @@ message TReqListJobs
optional bool with_interruption_info = 26;
+ optional NYT.NYTree.NProto.TAttributeFilter attributes = 27;
+
optional TMasterReadOptions master_read_options = 102;
}
@@ -3216,6 +3218,8 @@ message TJob
optional string operation_incarnation = 31;
optional NYT.NProto.TGuid allocation_id = 32;
optional NNodeTrackerClient.NProto.TAddressMap addresses = 33;
+ optional bytes events = 34; // YSON
+ optional bytes statistics = 35; // YSON
}
message TListJobsStatistics