summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoromgronny <[email protected]>2024-11-22 15:50:21 +0300
committeromgronny <[email protected]>2024-11-22 16:06:35 +0300
commit145d1a017c8b4a1d1d2d900d87df267373c7479e (patch)
treeb2acdbaaac31d99af6f0743ba6c3bfcccad352f7
parent06ad4bcf8a7b739bedbd565fc385c6273baaa346 (diff)
YT-22455: Introduce list jobs continuation token
[nodiff:caesar] commit_hash:d45b3da99e7b19120e02298ca6e87c02cc800ea2
-rw-r--r--yt/cpp/mapreduce/interface/operation.h12
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp9
-rw-r--r--yt/yt/client/api/operation_client.cpp101
-rw-r--r--yt/yt/client/api/operation_client.h53
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp9
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp8
-rw-r--r--yt/yt/client/driver/scheduler_commands.cpp16
-rw-r--r--yt/yt/client/ya.make1
-rw-r--r--yt/yt/core/ytree/yson_struct-inl.h13
-rw-r--r--yt/yt/core/ytree/yson_struct.h3
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto7
11 files changed, 222 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
index 76c4eabdfe6..a87ddc30b4a 100644
--- a/yt/cpp/mapreduce/interface/operation.h
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -2855,6 +2855,18 @@ struct TListJobsOptions
/// @brief Return only jobs with monitoring descriptor.
FLUENT_FIELD_OPTION(bool, WithMonitoringDescriptor);
+ ///
+ /// @brief Search for jobs with start time >= `FromTime`.
+ FLUENT_FIELD_OPTION(TInstant, FromTime);
+
+ ///
+ /// @brief Search for jobs with start time <= `ToTime`.
+ FLUENT_FIELD_OPTION(TInstant, ToTime);
+
+ ///
+ /// @brief Search for jobs with filters encoded in token.
+ FLUENT_FIELD_OPTION(TString, ContinuationToken);
+
/// @}
///
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
index bebc59ec917..3a78ed3319f 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -562,6 +562,15 @@ TNode SerializeParamsForListJobs(
if (options.WithMonitoringDescriptor_) {
result["with_monitoring_descriptor"] = *options.WithMonitoringDescriptor_;
}
+ if (options.FromTime_) {
+ result["from_time"] = ToString(options.FromTime_);
+ }
+ if (options.ToTime_) {
+ result["to_time"] = ToString(options.ToTime_);
+ }
+ if (options.ContinuationToken_) {
+ result["continuation_token"] = *options.ContinuationToken_;
+ }
if (options.SortField_) {
result["sort_field"] = ToString(*options.SortField_);
diff --git a/yt/yt/client/api/operation_client.cpp b/yt/yt/client/api/operation_client.cpp
index 8f8dbb355b0..07253e5ee52 100644
--- a/yt/yt/client/api/operation_client.cpp
+++ b/yt/yt/client/api/operation_client.cpp
@@ -3,14 +3,115 @@
#include <yt/yt/client/job_tracker_client/helpers.h>
#include <yt/yt/core/ytree/fluent.h>
+#include <yt/yt/core/ytree/yson_struct.h>
+
+#include <library/cpp/string_utils/base64/base64.h>
namespace NYT::NApi {
using namespace NYTree;
using namespace NJobTrackerClient;
+using namespace NYson;
////////////////////////////////////////////////////////////////////////////////
+void TListJobsContinuationTokenSerializer::Register(TRegistrar registrar)
+{
+ registrar.ExternalClassParameter("version", &TThat::Version)
+ .Default(0)
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("job_competition_id", &TListJobsOptions::JobCompetitionId)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("type", &TThat::Type)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("state", &TThat::State)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("address", &TThat::Address)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("with_stderr", &TThat::WithStderr)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("with_fail_context", &TThat::WithFailContext)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("with_spec", &TThat::WithSpec)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("with_competitors", &TThat::WithCompetitors)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("with_monitoring_gescriptor", &TThat::WithMonitoringDescriptor)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("task_name", &TThat::TaskName)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("running_jobs_lookbehind_period", &TThat::RunningJobsLookbehindPeriod)
+ .Default(TDuration::Max())
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("sort_field", &TThat::SortField)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("sort_order", &TThat::SortOrder)
+ .Default()
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("offset", &TThat::Offset)
+ .Default(0)
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("limit", &TThat::Limit)
+ .Default(1000)
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("include_archive", &TThat::IncludeArchive)
+ .Default(false)
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("include_cypress", &TThat::IncludeCypress)
+ .Default(false)
+ .DontSerializeDefault();
+
+ registrar.ExternalBaseClassParameter("include_controller_agent", &TThat::IncludeControllerAgent)
+ .Default(false)
+ .DontSerializeDefault();
+}
+
+TString EncodeNewToken(TListJobsOptions&& options, int jobCount)
+{
+ options.Offset += jobCount;
+ options.ContinuationToken.reset();
+
+ TListJobsContinuationToken token;
+ static_cast<TListJobsOptions&>(token) = std::move(options);
+
+ auto optionsYson = ConvertToYsonString(token);
+ return Base64Encode(optionsYson.ToString());
+}
+
+TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken)
+{
+ auto optionsYson = TYsonString(Base64StrictDecode(continuationToken));
+ return ConvertTo<TListJobsContinuationToken>(optionsYson);
+}
+
void Serialize(
const TOperation& operation,
NYson::IYsonConsumer* consumer,
diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h
index 19cea82ad71..ae11054bd18 100644
--- a/yt/yt/client/api/operation_client.h
+++ b/yt/yt/client/api/operation_client.h
@@ -166,16 +166,16 @@ struct TPollJobShellResponse
};
DEFINE_ENUM(EJobSortField,
- ((None) (0))
- ((Type) (1))
- ((State) (2))
- ((StartTime) (3))
- ((FinishTime) (4))
- ((Address) (5))
- ((Duration) (6))
- ((Progress) (7))
- ((Id) (8))
- ((TaskName) (9))
+ ((None) (0))
+ ((Type) (1))
+ ((State) (2))
+ ((StartTime) (3))
+ ((FinishTime) (4))
+ ((Address) (5))
+ ((Duration) (6))
+ ((Progress) (7))
+ ((Id) (8))
+ ((TaskName) (9))
);
DEFINE_ENUM(EJobSortDirection,
@@ -206,6 +206,11 @@ struct TListJobsOptions
std::optional<bool> WithMonitoringDescriptor;
std::optional<TString> TaskName;
+ std::optional<TInstant> FromTime;
+ std::optional<TInstant> ToTime;
+
+ std::optional<TString> ContinuationToken;
+
TDuration RunningJobsLookbehindPeriod = TDuration::Max();
EJobSortField SortField = EJobSortField::None;
@@ -221,6 +226,32 @@ struct TListJobsOptions
EDataSource DataSource = EDataSource::Auto;
};
+struct TListJobsContinuationToken
+ : public TListJobsOptions
+{
+ int Version = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TListJobsContinuationTokenSerializer
+ : public virtual NYTree::TExternalizedYsonStruct
+{
+public:
+ REGISTER_EXTERNALIZED_YSON_STRUCT(TListJobsContinuationToken, TListJobsContinuationTokenSerializer);
+
+ static void Register(TRegistrar registrar);
+};
+
+ASSIGN_EXTERNAL_YSON_SERIALIZER(TListJobsContinuationToken, TListJobsContinuationTokenSerializer);
+
+////////////////////////////////////////////////////////////////////////////////
+
+TString EncodeNewToken(TListJobsOptions&& options, int jobCount);
+TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken);
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TAbandonJobOptions
: public TTimeoutOptions
{ };
@@ -387,6 +418,8 @@ struct TListJobsResult
TListJobsStatistics Statistics;
std::vector<TError> Errors;
+
+ std::optional<TString> ContinuationToken;
};
struct TGetJobStderrResponse
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index a763772b7e5..f7397d2f373 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -1486,6 +1486,15 @@ TFuture<TListJobsResult> TClient::ListJobs(
if (options.TaskName) {
req->set_task_name(*options.TaskName);
}
+ if (options.FromTime) {
+ req->set_from_time(NYT::ToProto(*options.FromTime));
+ }
+ if (options.ToTime) {
+ req->set_to_time(NYT::ToProto(*options.ToTime));
+ }
+ if (options.ContinuationToken) {
+ req->set_continuation_token(*options.ContinuationToken);
+ }
req->set_sort_field(static_cast<NProto::EJobSortField>(options.SortField));
req->set_sort_order(static_cast<NProto::EJobSortDirection>(options.SortOrder));
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index 88c12aa793a..dba32868407 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -407,6 +407,9 @@ void ToProto(
if (result.ArchiveJobCount) {
proto->set_archive_job_count(*result.ArchiveJobCount);
}
+ if (result.ContinuationToken) {
+ proto->set_continuation_token(*result.ContinuationToken);
+ }
ToProto(proto->mutable_statistics(), result.Statistics);
ToProto(proto->mutable_errors(), result.Errors);
@@ -433,6 +436,11 @@ void FromProto(
} else {
result->ArchiveJobCount.reset();
}
+ if (proto.has_continuation_token()) {
+ result->ContinuationToken = proto.continuation_token();
+ } else {
+ result->ContinuationToken.reset();
+ }
FromProto(&result->Statistics, proto.statistics());
FromProto(&result->Errors, proto.errors());
diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp
index 53f82c7bf27..d64f69d3336 100644
--- a/yt/yt/client/driver/scheduler_commands.cpp
+++ b/yt/yt/client/driver/scheduler_commands.cpp
@@ -490,6 +490,21 @@ void TListJobsCommand::Register(TRegistrar registrar)
[] (TThis* command) -> auto& { return command->Options.WithMonitoringDescriptor; })
.Optional(/*init*/ false);
+ registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>(
+ "from_time",
+ [] (TThis* command) -> auto& { return command->Options.FromTime; })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>(
+ "to_time",
+ [] (TThis* command) -> auto& { return command->Options.ToTime; })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<TString>>(
+ "continuation_token",
+ [] (TThis* command) -> auto& { return command->Options.ContinuationToken; })
+ .Optional(/*init*/ false);
+
registrar.ParameterWithUniversalAccessor<TJobId>(
"job_competition_id",
[] (TThis* command) -> auto& { return command->Options.JobCompetitionId; })
@@ -593,6 +608,7 @@ void TListJobsCommand::DoExecute(ICommandContextPtr context)
}
})
.Item("errors").Value(result.Errors)
+ .Item("continuation_token").Value(result.ContinuationToken)
.EndMap());
}
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index fef0c109381..2b384fe8189 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -220,6 +220,7 @@ PEERDIR(
yt/yt/library/quantile_digest
yt/yt_proto/yt/client
library/cpp/json
+ library/cpp/string_utils/base64
contrib/libs/pfr
)
diff --git a/yt/yt/core/ytree/yson_struct-inl.h b/yt/yt/core/ytree/yson_struct-inl.h
index 92bd2d65fae..4471056588d 100644
--- a/yt/yt/core/ytree/yson_struct-inl.h
+++ b/yt/yt/core/ytree/yson_struct-inl.h
@@ -249,6 +249,19 @@ void TYsonStructRegistrar<TStruct>::ExternalPostprocessor(TExternalPostprocessor
}
template <class TStruct>
+template <class TBase, class TValue>
+TYsonStructParameter<TValue>& TYsonStructRegistrar<TStruct>::ExternalBaseClassParameter(const TString& key, TValue(TBase::*field))
+{
+ static_assert(std::derived_from<TStruct, TExternalizedYsonStruct>);
+ static_assert(std::derived_from<typename TStruct::TExternal, TBase>);
+ auto universalAccessor = [field] (TStruct* serializer) -> auto& {
+ return serializer->That_->*field;
+ };
+
+ return ParameterWithUniversalAccessor<TValue>(key, universalAccessor);
+}
+
+template <class TStruct>
void TYsonStructRegistrar<TStruct>::UnrecognizedStrategy(EUnrecognizedStrategy strategy)
{
Meta_->SetUnrecognizedStrategy(strategy);
diff --git a/yt/yt/core/ytree/yson_struct.h b/yt/yt/core/ytree/yson_struct.h
index ae23668f0c5..097addf65a6 100644
--- a/yt/yt/core/ytree/yson_struct.h
+++ b/yt/yt/core/ytree/yson_struct.h
@@ -313,6 +313,9 @@ public:
// requires std::derived_from<TStruct, TExternalizedYsonStruct<TExternal, TStruct>>
TYsonStructParameter<TValue>& ExternalClassParameter(const TString& key, TValue(TExternal::*field));
+ template <class TBase, class TValue>
+ TYsonStructParameter<TValue>& ExternalBaseClassParameter(const TString& key, TValue(TBase::*field));
+
template <class TExternalPreprocessor>
// requires (CInvocable<TExternalPreprocessor, void(typename TStruct::TExternal*)>)
void ExternalPreprocessor(TExternalPreprocessor preprocessor);
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 00d0104d4a2..2ea1b46e2a6 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -2409,6 +2409,11 @@ message TReqListJobs
optional bool with_monitoring_descriptor = 21;
+ optional uint64 from_time = 22; // TInstant
+ optional uint64 to_time = 23; // TInstant
+
+ optional string continuation_token = 24;
+
optional TMasterReadOptions master_read_options = 102;
}
@@ -3179,6 +3184,8 @@ message TListJobsResult
required TListJobsStatistics statistics = 5;
repeated NYT.NProto.TError errors = 6;
+
+ optional string continuation_token = 7;
}
message TJobTraceEvent