diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/interface/operation.h | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/interface/operation.h')
-rw-r--r-- | yt/cpp/mapreduce/interface/operation.h | 3494 |
1 files changed, 3494 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h new file mode 100644 index 00000000000..171a7e4af79 --- /dev/null +++ b/yt/cpp/mapreduce/interface/operation.h @@ -0,0 +1,3494 @@ +#pragma once + +/// +/// @file yt/cpp/mapreduce/interface/operation.h +/// +/// Header containing interface to run operations in YT +/// and retrieve information about them. +/// @see [the doc](https://yt.yandex-team.ru/docs/description/mr/map_reduce_overview.html). + +#include "client_method_options.h" +#include "errors.h" +#include "io.h" +#include "job_statistics.h" +#include "job_counters.h" + +#include <library/cpp/threading/future/future.h> +#include <library/cpp/type_info/type_info.h> + +#include <util/datetime/base.h> +#include <util/generic/variant.h> +#include <util/generic/vector.h> +#include <util/generic/maybe.h> +#include <util/system/file.h> +#include <util/system/types.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +/// Tag class marking that the row type for table is not specified. +struct TUnspecifiedTableStructure +{ }; + +/// Tag class marking that table rows have protobuf type. +struct TProtobufTableStructure +{ + /// @brief Descriptor of the protobuf type of table rows. + /// + /// @note If table is tagged with @ref ::google::protobuf::Message instead of real proto class + /// this descriptor might be null. + const ::google::protobuf::Descriptor* Descriptor = nullptr; +}; + + +/// Tag class to specify table row type. +using TTableStructure = std::variant< + TUnspecifiedTableStructure, + TProtobufTableStructure +>; + +bool operator==(const TUnspecifiedTableStructure&, const TUnspecifiedTableStructure&); +bool operator==(const TProtobufTableStructure& lhs, const TProtobufTableStructure& rhs); + +/// Table path marked with @ref NYT::TTableStructure tag. +struct TStructuredTablePath +{ + TStructuredTablePath(TRichYPath richYPath = TRichYPath(), TTableStructure description = TUnspecifiedTableStructure()) + : RichYPath(std::move(richYPath)) + , Description(std::move(description)) + { } + + TStructuredTablePath(TRichYPath richYPath, const ::google::protobuf::Descriptor* descriptor) + : RichYPath(std::move(richYPath)) + , Description(TProtobufTableStructure({descriptor})) + { } + + TStructuredTablePath(TYPath path) + : RichYPath(std::move(path)) + , Description(TUnspecifiedTableStructure()) + { } + + TStructuredTablePath(const char* path) + : RichYPath(path) + , Description(TUnspecifiedTableStructure()) + { } + + TRichYPath RichYPath; + TTableStructure Description; +}; + +/// Create marked table path from row type. +template <typename TRow> +TStructuredTablePath Structured(TRichYPath richYPath); + +/// Create tag class from row type. +template <typename TRow> +TTableStructure StructuredTableDescription(); + +/////////////////////////////////////////////////////////////////////////////// + +/// Tag class marking that row stream is empty. +struct TVoidStructuredRowStream +{ }; + +/// Tag class marking that row stream consists of `NYT::TNode`. +struct TTNodeStructuredRowStream +{ }; + +/// Tag class marking that row stream consists of @ref NYT::TYaMRRow. +struct TTYaMRRowStructuredRowStream +{ }; + +/// Tag class marking that row stream consists of protobuf rows of given type. +struct TProtobufStructuredRowStream +{ + /// @brief Descriptor of the protobuf type of table rows. + /// + /// @note If `Descriptor` is nullptr, then row stream consists of multiple message types. + const ::google::protobuf::Descriptor* Descriptor = nullptr; +}; + +/// Tag class to specify type of rows in an operation row stream +using TStructuredRowStreamDescription = std::variant< + TVoidStructuredRowStream, + TTNodeStructuredRowStream, + TTYaMRRowStructuredRowStream, + TProtobufStructuredRowStream +>; + +/////////////////////////////////////////////////////////////////////////////// + +/// Tag class marking that current binary should be used in operation. +struct TJobBinaryDefault +{ }; + +/// Tag class marking that binary from specified local path should be used in operation. +struct TJobBinaryLocalPath +{ + TString Path; + TMaybe<TString> MD5CheckSum; +}; + +/// Tag class marking that binary from specified Cypress path should be used in operation. +struct TJobBinaryCypressPath +{ + TYPath Path; + TMaybe<TTransactionId> TransactionId; +}; + +//////////////////////////////////////////////////////////////////////////////// + + +/// @cond Doxygen_Suppress +namespace NDetail { + extern i64 OutputTableCount; +} // namespace NDetail +/// @endcond + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Auto merge mode. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/automerge +enum class EAutoMergeMode +{ + /// Auto merge is disabled. + Disabled /* "disabled" */, + + /// Mode that tries to achieve good chunk sizes and doesn't limit usage of chunk quota for intermediate chunks. + Relaxed /* "relaxed" */, + + /// Mode that tries to optimize usage of chunk quota for intermediate chunks, operation might run slower. + Economy /* "economy" */, + + /// + /// @brief Manual configuration of automerge parameters. + /// + /// @ref TAutoMergeSpec + Manual /* "manual" */, +}; + +/// +/// @brief Options for auto merge operation stage. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/automerge +class TAutoMergeSpec +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TAutoMergeSpec; + /// @endcond + + /// Mode of the auto merge. + FLUENT_FIELD_OPTION(EAutoMergeMode, Mode); + + /// @brief Upper limit for number of intermediate chunks. + /// + /// Works only for Manual mode. + FLUENT_FIELD_OPTION(i64, MaxIntermediateChunkCount); + + /// @brief Number of chunks limit to merge in one job. + /// + /// Works only for Manual mode. + FLUENT_FIELD_OPTION(i64, ChunkCountPerMergeJob); + + /// @brief Automerge will not merge chunks that are larger than `DesiredChunkSize * (ChunkSizeThreshold / 100.)` + /// + /// Works only for Manual mode. + FLUENT_FIELD_OPTION(i64, ChunkSizeThreshold); +}; + +/// Base for operations with auto merge options. +template <class TDerived> +class TWithAutoMergeSpec +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// @brief Options for auto merge operation stage. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/automerge + FLUENT_FIELD_OPTION(TAutoMergeSpec, AutoMerge); +}; + +/// +/// @brief Resources controlled by scheduler and used by running operations. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/scheduler/scheduler_and_pools#resursy +class TSchedulerResources +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TSchedulerResources; + /// @endcond + + /// Each job consumes exactly one user slot. + FLUENT_FIELD_OPTION_ENCAPSULATED(i64, UserSlots); + + /// Number of (virtual) cpu cores consumed by all jobs. + FLUENT_FIELD_OPTION_ENCAPSULATED(i64, Cpu); + + /// Amount of memory in bytes. + FLUENT_FIELD_OPTION_ENCAPSULATED(i64, Memory); +}; + +/// Base for input format hints of a user job. +template <class TDerived> +class TUserJobInputFormatHintsBase +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// @brief Fine tune input format of the job. + FLUENT_FIELD_OPTION(TFormatHints, InputFormatHints); +}; + +/// Base for output format hints of a user job. +template <class TDerived> +class TUserJobOutputFormatHintsBase +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// @brief Fine tune output format of the job. + FLUENT_FIELD_OPTION(TFormatHints, OutputFormatHints); +}; + +/// Base for format hints of a user job. +template <class TDerived> +class TUserJobFormatHintsBase + : public TUserJobInputFormatHintsBase<TDerived> + , public TUserJobOutputFormatHintsBase<TDerived> +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond +}; + +/// User job format hints. +class TUserJobFormatHints + : public TUserJobFormatHintsBase<TUserJobFormatHints> +{ }; + +/// Spec of input and output tables of a raw operation. +template <class TDerived> +class TRawOperationIoTableSpec +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// Add input table path to input path list. + TDerived& AddInput(const TRichYPath& path); + + /// Set input table path no. `tableIndex`. + TDerived& SetInput(size_t tableIndex, const TRichYPath& path); + + /// Add output table path to output path list. + TDerived& AddOutput(const TRichYPath& path); + + /// Set output table path no. `tableIndex`. + TDerived& SetOutput(size_t tableIndex, const TRichYPath& path); + + /// Get all input table paths. + const TVector<TRichYPath>& GetInputs() const; + + /// Get all output table paths. + const TVector<TRichYPath>& GetOutputs() const; + +private: + TVector<TRichYPath> Inputs_; + TVector<TRichYPath> Outputs_; +}; + +/// Base spec for IO in "simple" raw operations (Map, Reduce etc.). +template <class TDerived> +struct TSimpleRawOperationIoSpec + : public TRawOperationIoTableSpec<TDerived> +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// @brief Describes format for both input and output. + /// + /// @note `Format' is overriden by `InputFormat' and `OutputFormat'. + FLUENT_FIELD_OPTION(TFormat, Format); + + /// Describes input format. + FLUENT_FIELD_OPTION(TFormat, InputFormat); + + /// Describes output format. + FLUENT_FIELD_OPTION(TFormat, OutputFormat); +}; + +/// Spec for IO in MapReduce operation. +template <class TDerived> +class TRawMapReduceOperationIoSpec + : public TRawOperationIoTableSpec<TDerived> +{ +public: + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// @brief Describes format for both input and output of mapper. + /// + /// @note `MapperFormat' is overriden by `MapperInputFormat' and `MapperOutputFormat'. + FLUENT_FIELD_OPTION(TFormat, MapperFormat); + + /// Describes mapper input format. + FLUENT_FIELD_OPTION(TFormat, MapperInputFormat); + + /// Describes mapper output format. + FLUENT_FIELD_OPTION(TFormat, MapperOutputFormat); + + /// @brief Describes format for both input and output of reduce combiner. + /// + /// @note `ReduceCombinerFormat' is overriden by `ReduceCombinerInputFormat' and `ReduceCombinerOutputFormat'. + FLUENT_FIELD_OPTION(TFormat, ReduceCombinerFormat); + + /// Describes reduce combiner input format. + FLUENT_FIELD_OPTION(TFormat, ReduceCombinerInputFormat); + + /// Describes reduce combiner output format. + FLUENT_FIELD_OPTION(TFormat, ReduceCombinerOutputFormat); + + /// @brief Describes format for both input and output of reducer. + /// + /// @note `ReducerFormat' is overriden by `ReducerInputFormat' and `ReducerOutputFormat'. + FLUENT_FIELD_OPTION(TFormat, ReducerFormat); + + /// Describes reducer input format. + FLUENT_FIELD_OPTION(TFormat, ReducerInputFormat); + + /// Describes reducer output format. + FLUENT_FIELD_OPTION(TFormat, ReducerOutputFormat); + + /// Add direct map output table path. + TDerived& AddMapOutput(const TRichYPath& path); + + /// Set direct map output table path no. `tableIndex`. + TDerived& SetMapOutput(size_t tableIndex, const TRichYPath& path); + + /// Get all direct map output table paths + const TVector<TRichYPath>& GetMapOutputs() const; + +private: + TVector<TRichYPath> MapOutputs_; +}; + +/// +/// @brief Base spec of operations with input tables. +class TOperationInputSpecBase +{ +public: + template <class T, class = void> + struct TFormatAdder; + + /// + /// @brief Add input table path to input path list and specify type of rows. + template <class T> + void AddInput(const TRichYPath& path); + + /// + /// @brief Add input table path as structured paths. + void AddStructuredInput(TStructuredTablePath path); + + /// + /// @brief Set input table path and type. + template <class T> + void SetInput(size_t tableIndex, const TRichYPath& path); + + /// + /// @brief All input paths. + TVector<TRichYPath> Inputs_; + + /// + /// @brief Get all input structured paths. + const TVector<TStructuredTablePath>& GetStructuredInputs() const; + +private: + TVector<TStructuredTablePath> StructuredInputs_; + friend struct TOperationIOSpecBase; + template <class T> + friend struct TOperationIOSpec; +}; + +/// +/// @brief Base spec of operations with output tables. +class TOperationOutputSpecBase +{ +public: + template <class T, class = void> + struct TFormatAdder; + + /// + /// @brief Add output table path to output path list and specify type of rows. + template <class T> + void AddOutput(const TRichYPath& path); + + /// + /// @brief Add output table path as structured paths. + void AddStructuredOutput(TStructuredTablePath path); + + /// + /// @brief Set output table path and type. + template <class T> + void SetOutput(size_t tableIndex, const TRichYPath& path); + + /// + /// @brief All output paths. + TVector<TRichYPath> Outputs_; + + /// + /// @brief Get all output structured paths. + const TVector<TStructuredTablePath>& GetStructuredOutputs() const; + +private: + TVector<TStructuredTablePath> StructuredOutputs_; + friend struct TOperationIOSpecBase; + template <class T> + friend struct TOperationIOSpec; +}; + +/// +/// @brief Base spec for operations with inputs and outputs. +struct TOperationIOSpecBase + : public TOperationInputSpecBase + , public TOperationOutputSpecBase +{ }; + +/// +/// @brief Base spec for operations with inputs and outputs. +template <class TDerived> +struct TOperationIOSpec + : public TOperationIOSpecBase +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + template <class T> + TDerived& AddInput(const TRichYPath& path); + + TDerived& AddStructuredInput(TStructuredTablePath path); + + template <class T> + TDerived& SetInput(size_t tableIndex, const TRichYPath& path); + + template <class T> + TDerived& AddOutput(const TRichYPath& path); + + TDerived& AddStructuredOutput(TStructuredTablePath path); + + template <class T> + TDerived& SetOutput(size_t tableIndex, const TRichYPath& path); + + + // DON'T USE THESE METHODS! They are left solely for backward compatibility. + // These methods are the only way to do equivalent of (Add/Set)(Input/Output)<Message> + // but please consider using (Add/Set)(Input/Output)<TConcreteMessage> + // (where TConcreteMessage is some descendant of Message) + // because they are faster and better (see https://st.yandex-team.ru/YT-6967) + TDerived& AddProtobufInput_VerySlow_Deprecated(const TRichYPath& path); + TDerived& AddProtobufOutput_VerySlow_Deprecated(const TRichYPath& path); +}; + +/// +/// @brief Base spec for all operations. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/operations_options +template <class TDerived> +struct TOperationSpecBase +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// + /// @brief Limit on operation execution time. + /// + /// If operation doesn't finish in time it will be aborted. + FLUENT_FIELD_OPTION(TDuration, TimeLimit); + + /// @brief Title to be shown in web interface. + FLUENT_FIELD_OPTION(TString, Title); + + /// @brief Pool to be used for this operation. + FLUENT_FIELD_OPTION(TString, Pool); + + /// @brief Weight of operation. + /// + /// Coefficient defining how much resources operation gets relative to its siblings in the same pool. + FLUENT_FIELD_OPTION(double, Weight); + + /// @breif Pool tree list that operation will use. + FLUENT_OPTIONAL_VECTOR_FIELD_ENCAPSULATED(TString, PoolTree); + + /// How much resources can be consumed by operation. + FLUENT_FIELD_OPTION_ENCAPSULATED(TSchedulerResources, ResourceLimits); +}; + +/// +/// @brief Base spec for all operations with user jobs. +template <class TDerived> +struct TUserOperationSpecBase + : TOperationSpecBase<TDerived> +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// How many jobs can fail before operation is failed. + FLUENT_FIELD_OPTION(ui64, MaxFailedJobCount); + + /// On any unsuccessful job completion (i.e. abortion or failure) force the whole operation to fail. + FLUENT_FIELD_OPTION(bool, FailOnJobRestart); + + /// + /// @brief Table to save whole stderr of operation. + /// + /// @see https://clubs.at.yandex-team.ru/yt/1045 + FLUENT_FIELD_OPTION(TYPath, StderrTablePath); + + /// + /// @brief Table to save coredumps of operation. + /// + /// @see https://clubs.at.yandex-team.ru/yt/1045 + FLUENT_FIELD_OPTION(TYPath, CoreTablePath); + + /// + /// @brief How long should the scheduler wait for the job to be started on a node. + /// + /// When you run huge jobs that require preemption of all the other jobs on + /// a node, the default timeout might be insufficient and your job may be + /// aborted with 'waiting_timeout' reason. This is especially problematic + /// when you are setting 'FailOnJobRestart' option. + /// + /// @note The value must be between 10 seconds and 10 minutes. + FLUENT_FIELD_OPTION(TDuration, WaitingJobTimeout); +}; + +/// +/// @brief Class to provide information on intermediate mapreduce stream protobuf types. +/// +/// When using protobuf format it is important to know exact types of proto messages +/// that are used in input/output. +/// +/// Sometimes such messages cannot be derived from job class +/// i.e. when job class uses `NYT::TTableReader<::google::protobuf::Message>` +/// or `NYT::TTableWriter<::google::protobuf::Message>`. +/// +/// When using such jobs user can provide exact message type using this class. +/// +/// @note Only input/output that relate to intermediate tables can be hinted. +/// Input to map and output of reduce is derived from `AddInput`/`AddOutput`. +template <class TDerived> +struct TIntermediateTablesHintSpec +{ + /// Specify intermediate map output type. + template <class T> + TDerived& HintMapOutput(); + + /// Specify reduce combiner input. + template <class T> + TDerived& HintReduceCombinerInput(); + + /// Specify reduce combiner output. + template <class T> + TDerived& HintReduceCombinerOutput(); + + /// Specify reducer input. + template <class T> + TDerived& HintReduceInput(); + + /// + /// @brief Add output of map stage. + /// + /// Mapper output table #0 is always intermediate table that is going to be reduced later. + /// Rows that mapper write to tables #1, #2, ... are saved in MapOutput tables. + template <class T> + TDerived& AddMapOutput(const TRichYPath& path); + + TVector<TRichYPath> MapOutputs_; + + const TVector<TStructuredTablePath>& GetStructuredMapOutputs() const; + const TMaybe<TTableStructure>& GetIntermediateMapOutputDescription() const; + const TMaybe<TTableStructure>& GetIntermediateReduceCombinerInputDescription() const; + const TMaybe<TTableStructure>& GetIntermediateReduceCombinerOutputDescription() const; + const TMaybe<TTableStructure>& GetIntermediateReducerInputDescription() const; + +private: + TVector<TStructuredTablePath> StructuredMapOutputs_; + TMaybe<TTableStructure> IntermediateMapOutputDescription_; + TMaybe<TTableStructure> IntermediateReduceCombinerInputDescription_; + TMaybe<TTableStructure> IntermediateReduceCombinerOutputDescription_; + TMaybe<TTableStructure> IntermediateReducerInputDescription_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TAddLocalFileOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TAddLocalFileOptions; + /// @endcond + + /// + /// @brief Path by which job will see the uploaded file. + /// + /// Defaults to basename of the local path. + FLUENT_FIELD_OPTION(TString, PathInJob); + + /// + /// @brief MD5 checksum of uploaded file. + /// + /// If not specified it is computed by this library. + /// If this argument is provided, the user can some cpu and disk IO. + FLUENT_FIELD_OPTION(TString, MD5CheckSum); + + /// + /// @brief Do not put file into node cache + /// + /// @see NYT::TRichYPath::BypassArtifactCache + FLUENT_FIELD_OPTION(bool, BypassArtifactCache); +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// @brief Binary to run job profiler on. +enum class EProfilingBinary +{ + /// Profile job proxy. + JobProxy /* "job_proxy" */, + + /// Profile user job. + UserJob /* "user_job" */, +}; + +/// @brief Type of job profiler. +enum class EProfilerType +{ + /// Profile CPU usage. + Cpu /* "cpu" */, + + /// Profile memory usage. + Memory /* "memory" */, + + /// Profiler peak memory usage. + PeakMemory /* "peak_memory" */, +}; + +/// @brief Specifies a job profiler. +struct TJobProfilerSpec +{ + /// @cond Doxygen_Suppress + using TSelf = TJobProfilerSpec; + /// @endcond + + /// @brief Binary to profile. + FLUENT_FIELD_OPTION(EProfilingBinary, ProfilingBinary); + + /// @brief Type of the profiler. + FLUENT_FIELD_OPTION(EProfilerType, ProfilerType); + + /// @brief Probabiliy of the job being selected for profiling. + FLUENT_FIELD_OPTION(double, ProfilingProbability); + + /// @brief For sampling profilers, sets the number of samples per second. + FLUENT_FIELD_OPTION(int, SamplingFrequency); +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Spec of user job. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/operations_options#user_script_options +struct TUserJobSpec +{ + /// @cond Doxygen_Suppress + using TSelf = TUserJobSpec; + /// @endcond + + /// + /// @brief Specify a local file to upload to Cypress and prepare for use in job. + TSelf& AddLocalFile(const TLocalFilePath& path, const TAddLocalFileOptions& options = TAddLocalFileOptions()); + + /// + /// @brief Get the list of all added local files. + TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> GetLocalFiles() const; + + /// @brief Paths to files in Cypress to use in job. + FLUENT_VECTOR_FIELD(TRichYPath, File); + + /// + /// @brief MemoryLimit specifies how much memory job process can use. + /// + /// @note + /// If job uses tmpfs (check @ref NYT::TOperationOptions::MountSandboxInTmpfs) + /// YT computes its memory usage as total of: + /// - memory usage of job process itself (including mapped files); + /// - total size of tmpfs used by this job. + /// + /// @note + /// When @ref NYT::TOperationOptions::MountSandboxInTmpfs is enabled library will compute + /// total size of all files used by this job and add this total size to MemoryLimit. + /// Thus you shouldn't include size of your files (e.g. binary file) into MemoryLimit. + /// + /// @note + /// Final memory memory_limit passed to YT is calculated as follows: + /// + /// @note + /// ``` + /// memory_limit = MemoryLimit + <total-size-of-used-files> + ExtraTmpfsSize + /// ``` + /// + /// @see NYT::TUserJobSpec::ExtraTmpfsSize + FLUENT_FIELD_OPTION(i64, MemoryLimit); + + /// + /// @brief Size of data that is going to be written to tmpfs. + /// + /// This option should be used if job writes data to tmpfs. + /// + /// ExtraTmpfsSize should not include size of files specified with + /// @ref NYT::TUserJobSpec::AddLocalFile or @ref NYT::TUserJobSpec::AddFile + /// These files are copied to tmpfs automatically and their total size + /// is computed automatically. + /// + /// @see NYT::TOperationOptions::MountSandboxInTmpfs + /// @see NYT::TUserJobSpec::MemoryLimit + FLUENT_FIELD_OPTION(i64, ExtraTmpfsSize); + + /// + /// @brief Maximum number of CPU cores for a single job to use. + FLUENT_FIELD_OPTION(double, CpuLimit); + + /// + /// @brief Fraction of @ref NYT::TUserJobSpec::MemoryLimit that job gets at start. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/operations_options#memory_reserve_factor + FLUENT_FIELD_OPTION(double, MemoryReserveFactor); + + /// + /// @brief Local path to executable to be used inside jobs. + //// + /// Provided executable must use C++ YT API library (this library) + /// and implement job class that is going to be used. + /// + /// This option might be useful if we want to start operation from nonlinux machines + /// (in that case we use `JobBinary` to provide path to the same program compiled for linux). + /// Other example of using this option is uploading executable to cypress in advance + /// and save the time required to upload current executable to cache. + /// `md5` argument can be used to save cpu time and disk IO when binary MD5 checksum is known. + /// When argument is not provided library will compute it itself. + TUserJobSpec& JobBinaryLocalPath(TString path, TMaybe<TString> md5 = Nothing()); + + /// + /// @brief Cypress path to executable to be used inside jobs. + TUserJobSpec& JobBinaryCypressPath(TString path, TMaybe<TTransactionId> transactionId = Nothing()); + + /// + /// @brief String that will be prepended to the command. + /// + /// This option overrides @ref NYT::TOperationOptions::JobCommandPrefix. + FLUENT_FIELD(TString, JobCommandPrefix); + + /// + /// @brief String that will be appended to the command. + /// + /// This option overrides @ref NYT::TOperationOptions::JobCommandSuffix. + FLUENT_FIELD(TString, JobCommandSuffix); + + /// + /// @brief Map of environment variables that will be set for jobs. + FLUENT_MAP_FIELD(TString, TString, Environment); + + /// + /// @brief Limit for all files inside job sandbox (in bytes). + FLUENT_FIELD_OPTION(ui64, DiskSpaceLimit); + + /// + /// @brief Number of ports reserved for the job (passed through environment in YT_PORT_0, YT_PORT_1, ...). + FLUENT_FIELD_OPTION(ui16, PortCount); + + /// + /// @brief Network project used to isolate job network. + FLUENT_FIELD_OPTION(TString, NetworkProject); + + /// + /// @brief Limit on job execution time. + /// + /// Jobs that exceed this limit will be considered failed. + FLUENT_FIELD_OPTION(TDuration, JobTimeLimit); + + /// + /// @brief Get job binary config. + const TJobBinaryConfig& GetJobBinary() const; + + /// + /// @brief List of profilers to run. + FLUENT_VECTOR_FIELD(TJobProfilerSpec, JobProfiler); + +private: + TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> LocalFiles_; + TJobBinaryConfig JobBinary_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Spec of Map operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/map +template <typename TDerived> +struct TMapOperationSpecBase + : public TUserOperationSpecBase<TDerived> + , public TWithAutoMergeSpec<TDerived> +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// + /// @brief Spec of mapper job. + FLUENT_FIELD(TUserJobSpec, MapperSpec); + + /// + /// @brief Whether to guarantee the order of rows passed to mapper matches the order in the table. + /// + /// When `Ordered' is false (by default), there is no guaranties about order of reading rows. + /// In this case mapper might work slightly faster because row delivered from fast node can be processed YT waits + /// response from slow nodes. + /// When `Ordered' is true, rows will come in order in which they are stored in input tables. + FLUENT_FIELD_OPTION(bool, Ordered); + + /// + /// @brief Recommended number of jobs to run. + /// + /// `JobCount' has higher priority than @ref NYT::TMapOperationSpecBase::DataSizePerJob. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui32, JobCount); + + /// + /// @brief Recommended of data size for each job. + /// + /// `DataSizePerJob` has lower priority that @ref NYT::TMapOperationSpecBase::JobCount. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui64, DataSizePerJob); +}; + +/// +/// @brief Spec of Map operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/map +struct TMapOperationSpec + : public TMapOperationSpecBase<TMapOperationSpec> + , public TOperationIOSpec<TMapOperationSpec> + , public TUserJobFormatHintsBase<TMapOperationSpec> +{ }; + +/// +/// @brief Spec of raw Map operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/map +struct TRawMapOperationSpec + : public TMapOperationSpecBase<TRawMapOperationSpec> + , public TSimpleRawOperationIoSpec<TRawMapOperationSpec> +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Spec of Reduce operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/reduce +template <typename TDerived> +struct TReduceOperationSpecBase + : public TUserOperationSpecBase<TDerived> + , public TWithAutoMergeSpec<TDerived> +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// + /// @brief Spec of reduce job. + FLUENT_FIELD(TUserJobSpec, ReducerSpec); + + /// + /// @brief Columns to sort rows by (must include `ReduceBy` as prefix). + FLUENT_FIELD(TSortColumns, SortBy); + + /// + /// @brief Columns to group rows by. + FLUENT_FIELD(TSortColumns, ReduceBy); + + /// + /// @brief Columns to join foreign tables by (must be prefix of `ReduceBy`). + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables + FLUENT_FIELD_OPTION(TSortColumns, JoinBy); + + /// + /// @brief Guarantee to feed all rows with same `ReduceBy` columns to a single job (`true` by default). + FLUENT_FIELD_OPTION(bool, EnableKeyGuarantee); + + /// + /// @brief Recommended number of jobs to run. + /// + /// `JobCount' has higher priority than @ref NYT::TReduceOperationSpecBase::DataSizePerJob. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui32, JobCount); + + /// + /// @brief Recommended of data size for each job. + /// + /// `DataSizePerJob` has lower priority that @ref NYT::TReduceOperationSpecBase::JobCount. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui64, DataSizePerJob); +}; + +/// +/// @brief Spec of Reduce operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/reduce +struct TReduceOperationSpec + : public TReduceOperationSpecBase<TReduceOperationSpec> + , public TOperationIOSpec<TReduceOperationSpec> + , public TUserJobFormatHintsBase<TReduceOperationSpec> +{ }; + +/// +/// @brief Spec of raw Reduce operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/reduce +struct TRawReduceOperationSpec + : public TReduceOperationSpecBase<TRawReduceOperationSpec> + , public TSimpleRawOperationIoSpec<TRawReduceOperationSpec> +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Spec of JoinReduce operation. +/// +/// @deprecated Instead the user should run a reduce operation +/// with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false`. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables +template <typename TDerived> +struct TJoinReduceOperationSpecBase + : public TUserOperationSpecBase<TDerived> +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// + /// @brief Spec of reduce job. + FLUENT_FIELD(TUserJobSpec, ReducerSpec); + + /// + /// @brief Columns to join foreign tables by (must be prefix of `ReduceBy`). + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables + FLUENT_FIELD(TSortColumns, JoinBy); + + /// + /// @brief Recommended number of jobs to run. + /// + /// `JobCount' has higher priority than @ref NYT::TJoinReduceOperationSpecBase::DataSizePerJob. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui32, JobCount); + + /// + /// @brief Recommended of data size for each job. + /// + /// `DataSizePerJob` has lower priority that @ref NYT::TJoinReduceOperationSpecBase::JobCount. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui64, DataSizePerJob); +}; + +/// +/// @brief Spec of JoinReduce operation. +/// +/// @deprecated Instead the user should run a reduce operation +/// with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false`. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables +struct TJoinReduceOperationSpec + : public TJoinReduceOperationSpecBase<TJoinReduceOperationSpec> + , public TOperationIOSpec<TJoinReduceOperationSpec> + , public TUserJobFormatHintsBase<TJoinReduceOperationSpec> +{ }; + +/// +/// @brief Spec of raw JoinReduce operation. +/// +/// @deprecated Instead the user should run a reduce operation +/// with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false`. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables +struct TRawJoinReduceOperationSpec + : public TJoinReduceOperationSpecBase<TRawJoinReduceOperationSpec> + , public TSimpleRawOperationIoSpec<TRawJoinReduceOperationSpec> +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Spec of MapReduce operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce +template <typename TDerived> +struct TMapReduceOperationSpecBase + : public TUserOperationSpecBase<TDerived> +{ + /// @cond Doxygen_Suppress + using TSelf = TDerived; + /// @endcond + + /// + /// @brief Spec of map job. + FLUENT_FIELD(TUserJobSpec, MapperSpec); + + /// + /// @brief Spec of reduce job. + FLUENT_FIELD(TUserJobSpec, ReducerSpec); + + /// + /// @brief Spec of reduce combiner. + FLUENT_FIELD(TUserJobSpec, ReduceCombinerSpec); + + /// + /// @brief Columns to sort rows by (must include `ReduceBy` as prefix). + FLUENT_FIELD(TSortColumns, SortBy); + + /// + /// @brief Columns to group rows by. + FLUENT_FIELD(TSortColumns, ReduceBy); + + /// + /// @brief Recommended number of map jobs to run. + /// + /// `JobCount' has higher priority than @ref NYT::TMapReduceOperationSpecBase::DataSizePerMapJob. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui32, MapJobCount); + + /// + /// @brief Recommended of data size for each map job. + /// + /// `DataSizePerJob` has lower priority that @ref NYT::TMapReduceOperationSpecBase::MapJobCount. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui64, DataSizePerMapJob); + + /// + /// @brief Recommended number of intermediate data partitions. + FLUENT_FIELD_OPTION(ui64, PartitionCount); + + /// + /// @brief Recommended size of intermediate data partitions. + FLUENT_FIELD_OPTION(ui64, PartitionDataSize); + + /// + /// @brief Account to use for intermediate data. + FLUENT_FIELD_OPTION(TString, IntermediateDataAccount); + + /// + /// @brief Replication factor for intermediate data (1 by default). + FLUENT_FIELD_OPTION(ui64, IntermediateDataReplicationFactor); + + /// + /// @brief Recommended size of data to be passed to a single reduce combiner. + FLUENT_FIELD_OPTION(ui64, DataSizePerSortJob); + + /// + /// @brief Whether to guarantee the order of rows passed to mapper matches the order in the table. + /// + /// @see @ref NYT::TMapOperationSpec::Ordered for more info. + FLUENT_FIELD_OPTION(bool, Ordered); + + /// + /// @brief Guarantee to run reduce combiner before reducer. + FLUENT_FIELD_OPTION(bool, ForceReduceCombiners); +}; + +/// +/// @brief Spec of MapReduce operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce +struct TMapReduceOperationSpec + : public TMapReduceOperationSpecBase<TMapReduceOperationSpec> + , public TOperationIOSpec<TMapReduceOperationSpec> + , public TIntermediateTablesHintSpec<TMapReduceOperationSpec> +{ + /// @cond Doxygen_Suppress + using TSelf = TMapReduceOperationSpec; + /// @endcond + + /// + /// @brief Format hints for mapper. + FLUENT_FIELD_DEFAULT(TUserJobFormatHints, MapperFormatHints, TUserJobFormatHints()); + + /// + /// @brief Format hints for reducer. + FLUENT_FIELD_DEFAULT(TUserJobFormatHints, ReducerFormatHints, TUserJobFormatHints()); + + /// + /// @brief Format hints for reduce combiner. + FLUENT_FIELD_DEFAULT(TUserJobFormatHints, ReduceCombinerFormatHints, TUserJobFormatHints()); +}; + +/// +/// @brief Spec of raw MapReduce operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce +struct TRawMapReduceOperationSpec + : public TMapReduceOperationSpecBase<TRawMapReduceOperationSpec> + , public TRawMapReduceOperationIoSpec<TRawMapReduceOperationSpec> +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Schema inference mode. +/// +/// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference +enum class ESchemaInferenceMode : int +{ + FromInput /* "from_input" */, + FromOutput /* "from_output" */, + Auto /* "auto" */, +}; + +/// +/// @brief Spec of Sort operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/sort +struct TSortOperationSpec + : TOperationSpecBase<TSortOperationSpec> +{ + /// @cond Doxygen_Suppress + using TSelf = TSortOperationSpec; + /// @endcond + + /// + /// @brief Paths to input tables. + FLUENT_VECTOR_FIELD(TRichYPath, Input); + + /// + /// @brief Path to output table. + FLUENT_FIELD(TRichYPath, Output); + + /// + /// @brief Columns to sort table by. + FLUENT_FIELD(TSortColumns, SortBy); + + /// + /// @brief Recommended number of intermediate data partitions. + FLUENT_FIELD_OPTION(ui64, PartitionCount); + + /// + /// @brief Recommended size of intermediate data partitions. + FLUENT_FIELD_OPTION(ui64, PartitionDataSize); + + /// + /// @brief Recommended number of partition jobs to run. + /// + /// `JobCount' has higher priority than @ref NYT::TSortOperationSpec::DataSizePerPartitionJob. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui64, PartitionJobCount); + + /// + /// @brief Recommended of data size for each partition job. + /// + /// `DataSizePerJob` has lower priority that @ref NYT::TSortOperationSpec::PartitionJobCount. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui64, DataSizePerPartitionJob); + + /// + /// @brief Inference mode for output table schema. + /// + /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference + FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); + + /// + /// @brief Account to use for intermediate data. + FLUENT_FIELD_OPTION(TString, IntermediateDataAccount); + + /// + /// @brief Replication factor for intermediate data (1 by default). + FLUENT_FIELD_OPTION(ui64, IntermediateDataReplicationFactor); +}; + + +/// +/// @brief Merge mode. +enum EMergeMode : int +{ + MM_UNORDERED /* "unordered" */, + MM_ORDERED /* "ordered" */, + MM_SORTED /* "sorted" */, +}; + +/// +/// @brief Spec of Merge operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/merge +struct TMergeOperationSpec + : TOperationSpecBase<TMergeOperationSpec> +{ + /// @cond Doxygen_Suppress + using TSelf = TMergeOperationSpec; + /// @endcond + + /// + /// @brief Paths to input tables. + FLUENT_VECTOR_FIELD(TRichYPath, Input); + + /// + /// @brief Path to output table. + FLUENT_FIELD(TRichYPath, Output); + + /// + /// @brief Columns by which to merge (for @ref NYT::EMergeMode::MM_SORTED). + FLUENT_FIELD(TSortColumns, MergeBy); + + /// + /// @brief Merge mode. + FLUENT_FIELD_DEFAULT(EMergeMode, Mode, MM_UNORDERED); + + /// + /// @brief Combine output chunks to larger ones. + FLUENT_FIELD_DEFAULT(bool, CombineChunks, false); + + /// + /// @brief Guarantee that all input chunks will be read. + FLUENT_FIELD_DEFAULT(bool, ForceTransform, false); + + /// + /// @brief Recommended number of jobs to run. + /// + /// `JobCount' has higher priority than @ref NYT::TMergeOperationSpec::DataSizePerJob. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui32, JobCount); + + /// + /// @brief Recommended of data size for each job. + /// + /// `DataSizePerJob` has lower priority that @ref NYT::TMergeOperationSpec::JobCount. + /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits. + FLUENT_FIELD_OPTION(ui64, DataSizePerJob); + + /// + /// @brief Inference mode for output table schema. + /// + /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference + FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); +}; + +/// +/// @brief Spec of Erase operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/erase +struct TEraseOperationSpec + : TOperationSpecBase<TEraseOperationSpec> +{ + /// @cond Doxygen_Suppress + using TSelf = TEraseOperationSpec; + /// @endcond + + /// + /// @brief Which table (or row range) to erase. + FLUENT_FIELD(TRichYPath, TablePath); + + /// + /// Combine output chunks to larger ones. + FLUENT_FIELD_DEFAULT(bool, CombineChunks, false); + + /// + /// @brief Inference mode for output table schema. + /// + /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference + FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); +}; + +/// +/// @brief Spec of RemoteCopy operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/remote_copy +struct TRemoteCopyOperationSpec + : TOperationSpecBase<TRemoteCopyOperationSpec> +{ + /// @cond Doxygen_Suppress + using TSelf = TRemoteCopyOperationSpec; + /// @endcond + + /// + /// @brief Source cluster name. + FLUENT_FIELD(TString, ClusterName); + + /// + /// @brief Network to use for copy (all remote cluster nodes must have it configured). + FLUENT_FIELD_OPTION(TString, NetworkName); + + /// + /// @brief Paths to input tables. + FLUENT_VECTOR_FIELD(TRichYPath, Input); + + /// + /// @brief Path to output table. + FLUENT_FIELD(TRichYPath, Output); + + /// + /// @brief Inference mode for output table schema. + /// + /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference + FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); + + /// + /// @brief Copy user attributes from input to output table (allowed only for single input table). + FLUENT_FIELD_DEFAULT(bool, CopyAttributes, false); + + /// + /// @brief Names of user attributes to copy from input to output table. + /// + /// @note To make this option make sense set @ref NYT::TRemoteCopyOperationSpec::CopyAttributes to `true`. + FLUENT_VECTOR_FIELD(TString, AttributeKey); + +private: + + /// + /// @brief Config for remote cluster connection. + FLUENT_FIELD_OPTION(TNode, ClusterConnection); +}; + +class IVanillaJobBase; + +/// +/// @brief Task of Vanilla operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/vanilla +struct TVanillaTask + : public TOperationOutputSpecBase + , public TUserJobOutputFormatHintsBase<TVanillaTask> +{ + /// @cond Doxygen_Suppress + using TSelf = TVanillaTask; + /// @endcond + + /// + /// @brief Add output table path and specify the task output type (i.e. TMyProtoMessage). + template <class T> + TSelf& AddOutput(const TRichYPath& path); + + /// + /// @brief Add output table path as structured path. + TSelf& AddStructuredOutput(TStructuredTablePath path); + + /// + /// @brief Set output table path and specify the task output type (i.e. TMyProtoMessage). + template <class T> + TSelf& SetOutput(size_t tableIndex, const TRichYPath& path); + + /// + /// @brief Task name. + FLUENT_FIELD(TString, Name); + + /// + /// @brief Job to be executed in this task. + FLUENT_FIELD(::TIntrusivePtr<IVanillaJobBase>, Job); + + /// + /// @brief User job spec. + FLUENT_FIELD(TUserJobSpec, Spec); + + /// + /// @brief Number of jobs to run and wait for successful completion. + /// + /// @note If @ref NYT::TUserOperationSpecBase::FailOnJobRestart is `false`, a failed job will be restarted + /// and will not count in this amount. + FLUENT_FIELD(ui64, JobCount); + + /// + /// @brief Network project name. + FLUENT_FIELD(TMaybe<TString>, NetworkProject); + +}; + +/// +/// @brief Spec of Vanilla operation. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/vanilla +struct TVanillaOperationSpec + : TUserOperationSpecBase<TVanillaOperationSpec> +{ + /// @cond Doxygen_Suppress + using TSelf = TVanillaOperationSpec; + /// @endcond + + /// + /// @brief Description of tasks to run in this operation. + FLUENT_VECTOR_FIELD(TVanillaTask, Task); +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Options for @ref NYT::IOperationClient::Map and other operation start commands. +struct TOperationOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TOperationOptions; + /// @endcond + + /// + /// @brief Additional field to put to operation spec. + FLUENT_FIELD_OPTION(TNode, Spec); + + /// + /// @brief Start operation mode. + enum class EStartOperationMode : int + { + /// + /// @brief Prepare operation asynchronously. Call IOperation::Start() to start operation. + AsyncPrepare, + + /// + /// @brief Prepare and start operation asynchronously. Don't wait for operation completion. + AsyncStart, + + /// + /// @brief Prepare and start operation synchronously. Don't wait for operation completion. + SyncStart, + + /// + /// @brief Prepare, start and wait for operation completion synchronously. + SyncWait, + }; + + /// + /// @brief Start operation mode. + FLUENT_FIELD_DEFAULT(EStartOperationMode, StartOperationMode, EStartOperationMode::SyncWait); + + /// + /// @brief Wait for operation finish synchronously. + /// + /// @deprecated Use StartOperationMode() instead. + TSelf& Wait(bool value) { + StartOperationMode_ = value ? EStartOperationMode::SyncWait : EStartOperationMode::SyncStart; + return static_cast<TSelf&>(*this); + } + + /// + /// + /// @brief Use format from table attribute (for YAMR-like format). + /// + /// @deprecated + FLUENT_FIELD_DEFAULT(bool, UseTableFormats, false); + + /// + /// @brief Prefix for bash command running the jobs. + /// + /// Can be overridden for the specific job type in the @ref NYT::TUserJobSpec. + FLUENT_FIELD(TString, JobCommandPrefix); + + /// + /// @brief Suffix for bash command running the jobs. + /// + /// Can be overridden for the specific job type in the @ref NYT::TUserJobSpec. + FLUENT_FIELD(TString, JobCommandSuffix); + + /// + /// @brief Put all files required by the job into tmpfs. + /// + /// This option can be set globally using @ref NYT::TConfig::MountSandboxInTmpfs. + /// @see https://yt.yandex-team.ru/docs/problems/woodpeckers + FLUENT_FIELD_DEFAULT(bool, MountSandboxInTmpfs, false); + + /// + /// @brief Path to directory to store temporary files. + FLUENT_FIELD_OPTION(TString, FileStorage); + + /// + /// @brief Expiration timeout for uploaded files. + FLUENT_FIELD_OPTION(TDuration, FileExpirationTimeout); + + /// + /// @brief Info to be passed securely to the job. + FLUENT_FIELD_OPTION(TNode, SecureVault); + + /// + /// @brief File cache mode. + enum class EFileCacheMode : int + { + /// + /// @brief Use YT API commands "get_file_from_cache" and "put_file_to_cache". + ApiCommandBased, + + /// + /// @brief Upload files to random paths inside @ref NYT::TOperationOptions::FileStorage without caching. + CachelessRandomPathUpload, + }; + + /// + /// @brief File cache mode. + FLUENT_FIELD_DEFAULT(EFileCacheMode, FileCacheMode, EFileCacheMode::ApiCommandBased); + + /// + /// @brief Id of transaction within which all Cypress file storage entries will be checked/created. + /// + /// By default, the root transaction is used. + /// + /// @note Set a specific transaction only if you + /// 1. specify non-default file storage path in @ref NYT::TOperationOptions::FileStorage or in @ref NYT::TConfig::RemoteTempFilesDirectory. + /// 2. use `CachelessRandomPathUpload` caching mode (@ref NYT::TOperationOptions::FileCacheMode). + FLUENT_FIELD(TTransactionId, FileStorageTransactionId); + + /// + /// @brief Ensure stderr and core tables exist before starting operation. + /// + /// If set to `false`, it is user's responsibility to ensure these tables exist. + FLUENT_FIELD_DEFAULT(bool, CreateDebugOutputTables, true); + + /// + /// @brief Ensure output tables exist before starting operation. + /// + /// If set to `false`, it is user's responsibility to ensure output tables exist. + FLUENT_FIELD_DEFAULT(bool, CreateOutputTables, true); + + /// + /// @brief Try to infer schema of inexistent table from the type of written rows. + /// + /// @note Default values for this option may differ depending on the row type. + /// For protobuf it's currently `false` by default. + FLUENT_FIELD_OPTION(bool, InferOutputSchema); +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Get operation secure vault (specified in @ref NYT::TOperationOptions::SecureVault) inside a job. +const TNode& GetJobSecureVault(); + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Context passed to @ref NYT::IRawJob::Do. +class TRawJobContext +{ +public: + explicit TRawJobContext(size_t outputTableCount); + + /// + /// @brief Get file corresponding to input stream. + const TFile& GetInputFile() const; + + /// + /// @brief Get files corresponding to output streams. + const TVector<TFile>& GetOutputFileList() const; + +private: + TFile InputFile_; + TVector<TFile> OutputFileList_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Interface for classes that can be Saved/Loaded (to be used with @ref Y_SAVELOAD_JOB). +class ISerializableForJob +{ +public: + virtual ~ISerializableForJob() = default; + + /// + /// @brief Dump state to output stream to be restored in job. + virtual void Save(IOutputStream& stream) const = 0; + + /// + /// @brief Load state from a stream. + virtual void Load(IInputStream& stream) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Provider of information about operation inputs/outputs during @ref NYT::IJob::PrepareOperation. +class IOperationPreparationContext +{ +public: + virtual ~IOperationPreparationContext() = default; + + /// @brief Get the number of input tables. + virtual int GetInputCount() const = 0; + + /// @brief Get the number of output tables. + virtual int GetOutputCount() const = 0; + + /// @brief Get the schema of input table no. `index`. + virtual const TTableSchema& GetInputSchema(int index) const = 0; + + /// @brief Get all the input table schemas. + virtual const TVector<TTableSchema>& GetInputSchemas() const = 0; + + /// @brief Path to the input table if available (`Nothing()` for intermediate tables). + virtual TMaybe<TYPath> GetInputPath(int index) const = 0; + + /// @brief Path to the output table if available (`Nothing()` for intermediate tables). + virtual TMaybe<TYPath> GetOutputPath(int index) const = 0; +}; + +/// +/// @brief Fluent builder class for @ref NYT::IJob::PrepareOperation. +/// +/// @note Method calls are supposed to be chained. +class TJobOperationPreparer +{ +public: + + /// + /// @brief Group of input tables that allows to specify properties on all of them at once. + /// + /// The instances are created with @ref NYT::TJobOperationPreparer::BeginInputGroup, not directly. + class TInputGroup + { + public: + TInputGroup(TJobOperationPreparer& preparer, TVector<int> indices); + + /// @brief Specify the type of input rows. + template <typename TRow> + TInputGroup& Description(); + + /// @brief Specify renaming of input columns. + TInputGroup& ColumnRenaming(const THashMap<TString, TString>& renaming); + + /// @brief Specify what input columns to send to job + /// + /// @note Filter is applied before renaming, so it must specify original column names. + TInputGroup& ColumnFilter(const TVector<TString>& columns); + + /// @brief Finish describing the input group. + TJobOperationPreparer& EndInputGroup(); + + private: + TJobOperationPreparer& Preparer_; + TVector<int> Indices_; + }; + + /// + /// @brief Group of output tables that allows to specify properties on all of them at once. + /// + /// The instances are created with @ref NYT::TJobOperationPreparer::BeginOutputGroup, not directly. + class TOutputGroup + { + public: + TOutputGroup(TJobOperationPreparer& preparer, TVector<int> indices); + + /// @brief Specify the type of output rows. + /// + /// @tparam TRow type of output rows from tables of this group. + /// @param inferSchema Infer schema from `TRow` and specify it for these output tables. + template <typename TRow> + TOutputGroup& Description(bool inferSchema = true); + + /// @brief Specify schema for these tables. + TOutputGroup& Schema(const TTableSchema& schema); + + /// @brief Specify that all the the tables in this group are unschematized. + /// + /// It is equivalent of `.Schema(TTableSchema().Strict(false)`. + TOutputGroup& NoSchema(); + + /// @brief Finish describing the output group. + TJobOperationPreparer& EndOutputGroup(); + + private: + TJobOperationPreparer& Preparer_; + TVector<int> Indices_; + }; + +public: + explicit TJobOperationPreparer(const IOperationPreparationContext& context); + + /// @brief Begin input group consisting of tables with indices `[begin, end)`. + /// + /// @param begin First index. + /// @param end Index after the last one. + TInputGroup BeginInputGroup(int begin, int end); + + /// @brief Begin input group consisting of tables with indices from `indices`. + /// + /// @tparam TCont Container with integers. Must support `std::begin` and `std::end` functions. + /// @param indices Indices of tables to include in the group. + template <typename TCont> + TInputGroup BeginInputGroup(const TCont& indices); + + /// @brief Begin output group consisting of tables with indices `[begin, end)`. + /// + /// @param begin First index. + /// @param end Index after the last one. + TOutputGroup BeginOutputGroup(int begin, int end); + + /// @brief Begin input group consisting of tables with indices from `indices`. + /// + /// @tparam TCont Container with integers. Must support `std::begin` and `std::end` functions. + /// @param indices Indices of tables to include in the group. + template <typename TCont> + TOutputGroup BeginOutputGroup(const TCont& indices); + + /// @brief Specify the schema for output table no `tableIndex`. + /// + /// @note All the output schemas must be specified either with this method, `NoOutputSchema` or `OutputDescription` with `inferSchema == true` + TJobOperationPreparer& OutputSchema(int tableIndex, TTableSchema schema); + + /// @brief Mark the output table no. `tableIndex` as unschematized. + TJobOperationPreparer& NoOutputSchema(int tableIndex); + + /// @brief Specify renaming of input columns for table no. `tableIndex`. + TJobOperationPreparer& InputColumnRenaming(int tableIndex, const THashMap<TString, TString>& renaming); + + /// @brief Specify what input columns of table no. `tableIndex` to send to job + /// + /// @note Filter is applied before renaming, so it must specify original column names. + TJobOperationPreparer& InputColumnFilter(int tableIndex, const TVector<TString>& columns); + + /// @brief Specify the type of input rows for table no. `tableIndex`. + /// + /// @tparam TRow type of input rows. + template <typename TRow> + TJobOperationPreparer& InputDescription(int tableIndex); + + /// @brief Specify the type of output rows for table no. `tableIndex`. + /// + /// @tparam TRow type of output rows. + /// @param inferSchema Infer schema from `TRow` and specify it for the output tables. + template <typename TRow> + TJobOperationPreparer& OutputDescription(int tableIndex, bool inferSchema = true); + + /// @brief Set type of output rows for table no. `tableIndex` to TNode + /// + /// @note Set schema via `OutputSchema` if needed + TJobOperationPreparer& NodeOutput(int tableIndex); + + /// @brief Specify input format hints. + /// + /// These hints have lower priority than ones specified in spec. + TJobOperationPreparer& InputFormatHints(TFormatHints hints); + + /// @brief Specify output format hints. + /// + /// These hints have lower priority than ones specified in spec. + TJobOperationPreparer& OutputFormatHints(TFormatHints hints); + + /// @brief Specify format hints. + /// + /// These hints have lower priority than ones specified in spec. + TJobOperationPreparer& FormatHints(TUserJobFormatHints newFormatHints); + + /// @name "Private" members + /// The following methods should not be used by clients in @ref NYT::IJob::PrepareOperation + ///@{ + + /// @brief Finish the building process. + void Finish(); + + /// @brief Get output table schemas as specified by the user. + TVector<TTableSchema> GetOutputSchemas(); + + /// @brief Get input column renamings as specified by the user. + const TVector<THashMap<TString, TString>>& GetInputColumnRenamings() const; + + /// @brief Get input column filters as specified by the user. + const TVector<TMaybe<TVector<TString>>>& GetInputColumnFilters() const; + + /// @brief Get input column descriptions as specified by the user. + const TVector<TMaybe<TTableStructure>>& GetInputDescriptions() const; + + /// @brief Get output column descriptions as specified by the user. + const TVector<TMaybe<TTableStructure>>& GetOutputDescriptions() const; + + /// @brief Get format hints as specified by the user. + const TUserJobFormatHints& GetFormatHints() const; + + ///@} +private: + + /// @brief Validate that schema for output table no. `tableIndex` has not been set yet. + void ValidateMissingOutputSchema(int tableIndex) const; + + /// @brief Validate that description for input table no. `tableIndex` has not been set yet. + void ValidateMissingInputDescription(int tableIndex) const; + + /// @brief Validate that description for output table no. `tableIndex` has not been set yet. + void ValidateMissingOutputDescription(int tableIndex) const; + + /// @brief Validate that `tableIndex` is in correct range for input table indices. + /// + /// @param message Message to add to the exception in case of violation. + void ValidateInputTableIndex(int tableIndex, TStringBuf message) const; + + /// @brief Validate that `tableIndex` is in correct range for output table indices. + /// + /// @param message Message to add to the exception in case of violation. + void ValidateOutputTableIndex(int tableIndex, TStringBuf message) const; + + /// @brief Validate that all the output schemas has been set. + void FinallyValidate() const; + + static TTableSchema EmptyNonstrictSchema(); + +private: + const IOperationPreparationContext& Context_; + + TVector<TMaybe<TTableSchema>> OutputSchemas_; + TVector<THashMap<TString, TString>> InputColumnRenamings_; + TVector<TMaybe<TVector<TString>>> InputColumnFilters_; + TVector<TMaybe<TTableStructure>> InputTableDescriptions_; + TVector<TMaybe<TTableStructure>> OutputTableDescriptions_; + TUserJobFormatHints FormatHints_ = {}; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Interface for all user jobs. +class IJob + : public TThrRefBase +{ +public: + + /// + /// @brief Type of job. + enum EType + { + Mapper, + Reducer, + ReducerAggregator, + RawJob, + VanillaJob, + }; + + /// + /// @brief Save job state to stream to be restored on cluster nodes. + virtual void Save(IOutputStream& stream) const + { + Y_UNUSED(stream); + } + + /// + /// @brief Restore job state from a stream. + virtual void Load(IInputStream& stream) + { + Y_UNUSED(stream); + } + + /// + /// @brief Get operation secure vault (specified in @ref NYT::TOperationOptions::SecureVault) inside a job. + const TNode& SecureVault() const + { + return GetJobSecureVault(); + } + + /// + /// @brief Get number of output tables. + i64 GetOutputTableCount() const + { + Y_VERIFY(NDetail::OutputTableCount > 0); + + return NDetail::OutputTableCount; + } + + /// + /// @brief Method allowing user to control some properties of input and output tables and formats. + /// + /// User can override this method in their job class to: + /// - specify output table schemas. + /// The most natural way is usually through @ref NYT::TJobOperationPreparer::OutputDescription (especially for protobuf), + /// but you can use @ref NYT::TJobOperationPreparer::OutputSchema directly + /// - specify output row type (@ref NYT::TJobOperationPreparer::OutputDescription) + /// - specify input row type (@ref NYT::TJobOperationPreparer::InputDescription) + /// - specify input column filter and renaming (@ref NYT::TJobOperationPreparer::InputColumnFilter and @ref NYT::TJobOperationPreparer::InputColumnRenaming) + /// - specify format hints (@ref NYT::TJobOperationPreparer::InputFormatHints, + /// NYT::TJobOperationPreparer::OutputFormatHints and @ref NYT::TJobOperationPreparer::FormatHints) + /// - maybe something more, cf. the methods of @ref NYT::TJobOperationPreparer. + /// + /// If one has several similar tables, groups can be used. + /// Groups are delimited by @ref NYT::TJobOperationPreparer::BeginInputGroup / + /// @ref NYT::TJobOperationPreparer::TInputGroup::EndInputGroup and + /// @ref NYT::TJobOperationPreparer::BeginOutputGroup / + /// @ref NYT::TJobOperationPreparer::TOutputGroup::EndOutputGroup. + /// Example: + /// @code{.cpp} + /// preparer + /// .BeginInputGroup({1,2,4,8}) + /// .ColumnRenaming({{"a", "b"}, {"c", "d"}}) + /// .ColumnFilter({"a", "c"}) + /// .EndInputGroup(); + /// @endcode + /// + /// @note All the output table schemas must be set + /// (possibly as empty nonstrict using @ref NYT::TJobOperationPreparer::NoOutputSchema or + /// @ref NYT::TJobOperationPreparer::TOutputGroup::NoSchema). + /// By default all the output table schemas are marked as empty nonstrict. + virtual void PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& preparer) const; +}; + +/// +/// @brief Declare what fields of currently declared job class to save and restore on cluster node. +#define Y_SAVELOAD_JOB(...) \ + virtual void Save(IOutputStream& stream) const override { Save(&stream); } \ + virtual void Load(IInputStream& stream) override { Load(&stream); } \ + Y_PASS_VA_ARGS(Y_SAVELOAD_DEFINE(__VA_ARGS__)) + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Interface for jobs with typed inputs and outputs. +class IStructuredJob + : public IJob +{ +public: + /// + /// @brief This methods are called when creating table reader and writer for the job. + /// + /// Override them if you want to implement custom input logic. (e.g. addtitional bufferization) + virtual TRawTableReaderPtr CreateCustomRawJobReader(int fd) const; + virtual THolder<IProxyOutput> CreateCustomRawJobWriter(size_t outputTableCount) const; + + virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const = 0; + virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Create default raw job reader. +TRawTableReaderPtr CreateRawJobReader(int fd = 0); + +/// +/// @brief Create default raw job writer. +THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount); + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Base interface for structured (typed) map jobs. +class IMapperBase + : public IStructuredJob +{ }; + +/// +/// @brief Base interface for structured (typed) map jobs with given reader and writer. +template <class TR, class TW> +class IMapper + : public IMapperBase +{ +public: + using TReader = TR; + using TWriter = TW; + +public: + /// Type of job implemented by this class. + static constexpr EType JobType = EType::Mapper; + + /// + /// @brief This method is called before feeding input rows to mapper (before `Do` method). + virtual void Start(TWriter* writer) + { + Y_UNUSED(writer); + } + + /// + /// @brief This method is called exactly once for the whole job input. + /// + /// Read input rows from `reader` and write output ones to `writer`. + virtual void Do(TReader* reader, TWriter* writer) = 0; + + /// + /// @brief This method is called after feeding input rows to mapper (after `Do` method). + virtual void Finish(TWriter* writer) + { + Y_UNUSED(writer); + } + + virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override; + virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Base interface for structured (typed) reduce jobs. +/// +/// It is common base for @ref NYT::IReducer and @ref NYT::IAggregatorReducer. +class IReducerBase + : public IStructuredJob +{ }; + +/// +/// @brief Base interface for structured (typed) reduce jobs with given reader and writer. +template <class TR, class TW> +class IReducer + : public IReducerBase +{ +public: + using TReader = TR; + using TWriter = TW; + +public: + /// Type of job implemented by this class. + static constexpr EType JobType = EType::Reducer; + +public: + + /// + /// @brief This method is called before feeding input rows to reducer (before `Do` method). + virtual void Start(TWriter* writer) + { + Y_UNUSED(writer); + } + + /// + /// @brief This method is called exactly once for each range with same value of `ReduceBy` (or `JoinBy`) keys. + virtual void Do(TReader* reader, TWriter* writer) = 0; + + /// + /// @brief This method is called after feeding input rows to reducer (after `Do` method). + virtual void Finish(TWriter* writer) + { + Y_UNUSED(writer); + } + + /// + /// @brief Refuse to process the remaining row ranges and finish the job (successfully). + void Break(); + + virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override; + virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Base interface of jobs used inside reduce operations. +/// +/// Unlike @ref NYT::IReducer jobs their `Do' method is called only once +/// and takes whole range of records split by key boundaries. +/// +/// Template argument `TR` must be @ref NYT::TTableRangesReader. +template <class TR, class TW> +class IAggregatorReducer + : public IReducerBase +{ +public: + using TReader = TR; + using TWriter = TW; + +public: + /// Type of job implemented by this class. + static constexpr EType JobType = EType::ReducerAggregator; + +public: + /// + /// @brief This method is called before feeding input rows to reducer (before `Do` method). + virtual void Start(TWriter* writer) + { + Y_UNUSED(writer); + } + + /// + /// @brief This method is called exactly once for the whole job input. + virtual void Do(TReader* reader, TWriter* writer) = 0; + + /// + /// @brief This method is called after feeding input rows to reducer (after `Do` method). + virtual void Finish(TWriter* writer) + { + Y_UNUSED(writer); + } + + virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override; + virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Interface for raw jobs (i.e. reading and writing byte streams). +class IRawJob + : public IJob +{ +public: + /// Type of job implemented by this class. + static constexpr EType JobType = EType::RawJob; + + /// + /// @brief This method is called exactly once for the whole job input. + virtual void Do(const TRawJobContext& jobContext) = 0; +}; + +/// +/// @brief Interface of jobs that run the given bash command. +class ICommandJob + : public IJob +{ +public: + /// + /// @brief Get bash command to run. + /// + /// @note This method is called on the client side. + virtual const TString& GetCommand() const = 0; +}; + +/// +/// @brief Raw job executing given bash command. +/// +/// @note The binary will not be uploaded. +class TCommandRawJob + : public IRawJob + , public ICommandJob +{ +public: + /// + /// @brief Create job with specified command. + /// + /// @param command Bash command to run. + explicit TCommandRawJob(TStringBuf command = {}); + + const TString& GetCommand() const override; + void Do(const TRawJobContext& jobContext) override; + +private: + TString Command_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Base interface for vanilla jobs. +/// +/// @see https://yt.yandex-team.ru/docs/description/mr/vanilla +class IVanillaJobBase + : public virtual IStructuredJob +{ +public: + /// Type of job implemented by this class. + static constexpr EType JobType = EType::VanillaJob; +}; + +template <class TW = void> +class IVanillaJob; + +/// +/// @brief Interface of vanilla job without outputs. +template <> +class IVanillaJob<void> + : public IVanillaJobBase +{ +public: + /// + /// @brief This method is called exactly once for each vanilla job. + virtual void Do() = 0; + + virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override; + virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override; +}; + +/// +/// @brief Vanilla job executing given bash command. +/// +/// @note The binary will not be uploaded. +class TCommandVanillaJob + : public IVanillaJob<> + , public ICommandJob +{ +public: + /// + /// @brief Create job with specified command. + /// + /// @param command Bash command to run. + explicit TCommandVanillaJob(TStringBuf command = {}); + + const TString& GetCommand() const override; + void Do() override; + +private: + TString Command_; +}; + +/// +/// @brief Interface for vanilla jobs with output tables. +template <class TW> +class IVanillaJob + : public IVanillaJobBase +{ +public: + using TWriter = TW; + +public: + /// + /// @brief This method is called before `Do` method. + virtual void Start(TWriter* /* writer */) + { } + + /// + /// @brief This method is called exactly once for each vanilla job. + /// + /// Write output rows to `writer`. + virtual void Do(TWriter* writer) = 0; + + /// + /// @brief This method is called after `Do` method. + virtual void Finish(TWriter* /* writer */) + { } + + virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override; + virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Attributes to request for an operation. +enum class EOperationAttribute : int +{ + Id /* "id" */, + Type /* "type" */, + State /* "state" */, + AuthenticatedUser /* "authenticated_user" */, + StartTime /* "start_time" */, + FinishTime /* "finish_time" */, + BriefProgress /* "brief_progress" */, + BriefSpec /* "brief_spec" */, + Suspended /* "suspended" */, + Result /* "result" */, + Progress /* "progress" */, + Events /* "events" */, + Spec /* "spec" */, + FullSpec /* "full_spec" */, + UnrecognizedSpec /* "unrecognized_spec" */, +}; + +/// +/// @brief Class describing which attributes to request in @ref NYT::IClient::GetOperation or @ref NYT::IClient::ListOperations. +struct TOperationAttributeFilter +{ + /// @cond Doxygen_Suppress + using TSelf = TOperationAttributeFilter; + /// @endcond + + TVector<EOperationAttribute> Attributes_; + + /// + /// @brief Add attribute to the filter. Calls are supposed to be chained. + TSelf& Add(EOperationAttribute attribute) + { + Attributes_.push_back(attribute); + return *this; + } +}; + +/// +/// @brief Options for @ref NYT::IClient::GetOperation call. +struct TGetOperationOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetOperationOptions; + /// @endcond + + /// + /// @brief What attributes to request (if omitted, the default set of attributes will be requested). + FLUENT_FIELD_OPTION(TOperationAttributeFilter, AttributeFilter); +}; + +/// +/// @brief "Coarse-grained" state of an operation. +enum class EOperationBriefState : int +{ + InProgress /* "in_progress" */, + Completed /* "completed" */, + Aborted /* "aborted" */, + + /// Failed + Failed /* "failed" */, +}; + +/// +/// @brief Operation type. +enum class EOperationType : int +{ + Map /* "map" */, + Merge /* "merge" */, + Erase /* "erase" */, + Sort /* "sort" */, + Reduce /* "reduce" */, + MapReduce /* "map_reduce" */, + RemoteCopy /* "remote_copy" */, + JoinReduce /* "join_reduce" */, + Vanilla /* "vanilla" */, +}; + +/// +/// @brief Operation progress. +struct TOperationProgress +{ + /// + /// @brief Total job statistics. + TJobStatistics JobStatistics; + + /// + /// @brief Job counter for various job states with hierarchy. + TJobCounters JobCounters; + + /// + /// @brief Time when this progress was built on scheduler or CA. + TMaybe<TInstant> BuildTime; +}; + +/// +/// @brief Brief operation progress (numbers of jobs in these states). +struct TOperationBriefProgress +{ + ui64 Aborted = 0; + ui64 Completed = 0; + ui64 Failed = 0; + ui64 Lost = 0; + ui64 Pending = 0; + ui64 Running = 0; + ui64 Total = 0; +}; + +/// +/// @brief Operation result. +struct TOperationResult +{ + /// + /// @brief For a unsuccessfully finished operation: description of error. + TMaybe<TYtError> Error; +}; + +/// +/// @brief Operation event (change of state). +struct TOperationEvent +{ + /// + /// @brief New state of operation. + TString State; + + /// + /// @brief Time of state change. + TInstant Time; +}; + +/// +/// @brief Operation info. +/// +/// A field may be `Nothing()` either if it was not requested (see @ref NYT::TGetOperationOptions::AttributeFilter) +/// or it is not available (i.e. `FinishTime` for a running operation). +/// @see https://yt.yandex-team.ru/docs/api/commands#get_operation +struct TOperationAttributes +{ + /// + /// @brief Operation id. + TMaybe<TOperationId> Id; + + /// + /// @brief Operation type. + TMaybe<EOperationType> Type; + + /// + /// @brief Operation state. + TMaybe<TString> State; + + /// + /// @brief "Coarse-grained" operation state. + TMaybe<EOperationBriefState> BriefState; + + /// + /// @brief Name of user that started the operation. + TMaybe<TString> AuthenticatedUser; + + /// + /// @brief Operation start time. + TMaybe<TInstant> StartTime; + + /// + /// @brief Operation finish time (if the operation has finished). + TMaybe<TInstant> FinishTime; + + /// + /// @brief Brief progress of the operation. + TMaybe<TOperationBriefProgress> BriefProgress; + + /// + /// @brief Brief spec of operation (light-weight fields only). + TMaybe<TNode> BriefSpec; + + /// + /// @brief Spec of the operation as provided by the user. + TMaybe<TNode> Spec; + + /// + /// @brief Full spec of operation (all fields not specified by user are filled with default values). + TMaybe<TNode> FullSpec; + + /// + /// @brief Fields not recognized by scheduler. + TMaybe<TNode> UnrecognizedSpec; + + /// + /// @brief Is operation suspended. + TMaybe<bool> Suspended; + + /// + /// @brief Operation result. + TMaybe<TOperationResult> Result; + + /// + /// @brief Operation progress. + TMaybe<TOperationProgress> Progress; + + /// + /// @brief List of operation events (changes of state). + TMaybe<TVector<TOperationEvent>> Events; + + /// + /// @brief Map from alert name to its description. + TMaybe<THashMap<TString, TYtError>> Alerts; +}; + +/// +/// @brief Direction of cursor for paging, see @ref NYT::TListOperationsOptions::CursorDirection. +enum class ECursorDirection +{ + Past /* "past" */, + Future /* "future" */, +}; + +/// +/// @brief Options of @ref NYT::IClient::ListOperations command. +/// +/// @see https://yt.yandex-team.ru/docs/api/commands.html#list_operations +struct TListOperationsOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TListOperationsOptions; + /// @endcond + + /// + /// @name Time range specification + /// + /// List operations with start time in half-closed interval + /// `[CursorTime, ToTime)` if `CursorDirection == Future` or + /// `[FromTime, CursorTime)` if `CursorDirection == Past`. + ///@{ + + /// + /// @brief Search for operations with start time >= `FromTime`. + FLUENT_FIELD_OPTION(TInstant, FromTime); + + /// + /// @brief Search for operations with start time < `ToTime`. + FLUENT_FIELD_OPTION(TInstant, ToTime); + + /// + /// @brief Additional restriction on operation start time (useful for pagination). + /// + /// Search for operations with start time >= `CursorTime` if `CursorDirection == Future` + /// and with start time < `CursorTime` if `CursorDirection == Past` + FLUENT_FIELD_OPTION(TInstant, CursorTime); + + /// + /// @brief Direction of pagination (see @ref NYT::TListOperationsOptions::CursorTime). + FLUENT_FIELD_OPTION(ECursorDirection, CursorDirection); + + ///@} + + /// + /// @name Filters + /// Choose operations satisfying given filters. + ///@{ + + /// + /// @brief Search for `Filter` as a substring in operation text factors + /// (e.g. title or input/output table paths). + FLUENT_FIELD_OPTION(TString, Filter); + + /// + /// @brief Choose operations whose pools include `Pool`. + FLUENT_FIELD_OPTION(TString, Pool); + + /// + /// @brief Choose operations with given @ref NYT::TOperationAttributes::AuthenticatedUser. + FLUENT_FIELD_OPTION(TString, User); + + /// + /// @brief Choose operations with given @ref NYT::TOperationAttributes::State. + FLUENT_FIELD_OPTION(TString, State); + + /// + /// @brief Choose operations with given @ref NYT::TOperationAttributes::Type. + FLUENT_FIELD_OPTION(EOperationType, Type); + + /// + /// @brief Choose operations having (or not having) any failed jobs. + FLUENT_FIELD_OPTION(bool, WithFailedJobs); + + ///@} + + /// + /// @brief Search for operations in the archive in addition to Cypress. + FLUENT_FIELD_OPTION(bool, IncludeArchive); + + /// + /// @brief Include the counters for different filter parameters in the response. + /// + /// Include number of operations for each pool, user, state, type + /// and the number of operations having failed jobs. + FLUENT_FIELD_OPTION(bool, IncludeCounters); + + /// + /// @brief Return no more than `Limit` operations (current default and maximum value is 1000). + FLUENT_FIELD_OPTION(i64, Limit); +}; + +/// +/// @brief Response for @ref NYT::IClient::ListOperations command. +struct TListOperationsResult +{ + /// + /// @brief Found operations' attributes. + TVector<TOperationAttributes> Operations; + + /// + /// @name Counters for different filter. + /// + /// If counters were requested (@ref NYT::TListOperationsOptions::IncludeCounters is `true`) + /// the maps contain the number of operations found for each pool, user, state and type. + /// NOTE: + /// 1) Counters ignore CursorTime and CursorDirection, + /// they always are collected in the whole [FromTime, ToTime) interval. + /// 2) Each next counter in the sequence [pool, user, state, type, with_failed_jobs] + /// takes into account all the previous filters (i.e. if you set User filter to "some-user" + /// type counts describe only operations with user "some-user"). + /// @{ + + /// + /// @brief Number of operations for each pool. + TMaybe<THashMap<TString, i64>> PoolCounts; + + /// + /// @brief Number of operations for each user (subject to previous filters). + TMaybe<THashMap<TString, i64>> UserCounts; + + /// + /// @brief Number of operations for each state (subject to previous filters). + TMaybe<THashMap<TString, i64>> StateCounts; + + /// + /// @brief Number of operations for each type (subject to previous filters). + TMaybe<THashMap<EOperationType, i64>> TypeCounts; + + /// + /// @brief Number of operations having failed jobs (subject to all previous filters). + TMaybe<i64> WithFailedJobsCount; + + /// @} + + /// + /// @brief Whether some operations were not returned due to @ref NYT::TListOperationsOptions::Limit. + /// + /// `Incomplete == true` means that not all operations satisfying filters + /// were returned (limit exceeded) and you need to repeat the request with new @ref NYT::TListOperationsOptions::CursorTime + /// (e.g. `CursorTime == *Operations.back().StartTime`, but don't forget to + /// remove the duplicates). + bool Incomplete; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Data source for @ref NYT::IClient::ListJobs command. +enum class EListJobsDataSource : int +{ + Runtime /* "runtime" */, + Archive /* "archive" */, + Auto /* "auto" */, + Manual /* "manual" */, +}; + +/// +/// @brief Job type. +enum class EJobType : int +{ + SchedulerFirst /* "scheduler_first" */, + Map /* "map" */, + PartitionMap /* "partition_map" */, + SortedMerge /* "sorted_merge" */, + OrderedMerge /* "ordered_merge" */, + UnorderedMerge /* "unordered_merge" */, + Partition /* "partition" */, + SimpleSort /* "simple_sort" */, + FinalSort /* "final_sort" */, + SortedReduce /* "sorted_reduce" */, + PartitionReduce /* "partition_reduce" */, + ReduceCombiner /* "reduce_combiner" */, + RemoteCopy /* "remote_copy" */, + IntermediateSort /* "intermediate_sort" */, + OrderedMap /* "ordered_map" */, + JoinReduce /* "join_reduce" */, + Vanilla /* "vanilla" */, + SchedulerUnknown /* "scheduler_unknown" */, + SchedulerLast /* "scheduler_last" */, + ReplicatorFirst /* "replicator_first" */, + ReplicateChunk /* "replicate_chunk" */, + RemoveChunk /* "remove_chunk" */, + RepairChunk /* "repair_chunk" */, + SealChunk /* "seal_chunk" */, + ReplicatorLast /* "replicator_last" */, +}; + +/// +/// @brief Well-known task names. +enum class ETaskName : int +{ + Map /* "map" */, + PartitionMap0 /* "partition_map(0)" */, + SortedMerge /* "sorted_merge" */, + OrderedMerge /* "ordered_merge" */, + UnorderedMerge /* "unordered_merge" */, + Partition0 /* "partition(0)" */, + Partition1 /* "partition(1)" */, + Partition2 /* "partition(2)" */, + SimpleSort /* "simple_sort" */, + FinalSort /* "final_sort" */, + SortedReduce /* "sorted_reduce" */, + PartitionReduce /* "partition_reduce" */, + ReduceCombiner /* "reduce_combiner" */, + RemoteCopy /* "remote_copy" */, + IntermediateSort /* "intermediate_sort" */, + OrderedMap /* "ordered_map" */, + JoinReduce /* "join_reduce" */, +}; + +/// +/// @brief Task name (can either well-known or just a string). +class TTaskName +{ +public: + + // Constructors are implicit by design. + + /// + /// @brief Construct a custom task name. + TTaskName(TString taskName); + + /// + /// @brief Construct a custom task name. + TTaskName(const char* taskName); + + /// + /// @brief Construct a well-known task name. + TTaskName(ETaskName taskName); + + const TString& Get() const; + +private: + TString TaskName_; +}; + +/// +/// @brief Job state. +enum class EJobState : int +{ + None /* "none" */, + Waiting /* "waiting" */, + Running /* "running" */, + Aborting /* "aborting" */, + Completed /* "completed" */, + Failed /* "failed" */, + Aborted /* "aborted" */, + Lost /* "lost" */, +}; + +/// +/// @brief Job sort field. +/// +/// @see @ref NYT::TListJobsOptions. +enum class EJobSortField : int +{ + Type /* "type" */, + State /* "state" */, + StartTime /* "start_time" */, + FinishTime /* "finish_time" */, + Address /* "address" */, + Duration /* "duration" */, + Progress /* "progress" */, + Id /* "id" */, +}; + +/// +/// @brief Job sort direction. +/// +/// @see @ref NYT::TListJobsOptions. +enum class EJobSortDirection : int +{ + Ascending /* "ascending" */, + Descending /* "descending" */, +}; + +/// +/// @brief Options for @ref NYT::IClient::ListJobs. +/// +/// @see https://yt.yandex-team.ru/docs/api/commands.html#list_jobs +struct TListJobsOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TListJobsOptions; + /// @endcond + + /// + /// @name Filters + /// Return only jobs with given value of parameter (type, state, address and existence of stderr). + /// If a field is `Nothing()`, return jobs with all possible values of the corresponding parameter. + /// @{ + + /// + /// @brief Job type. + FLUENT_FIELD_OPTION(EJobType, Type); + + /// + /// @brief Job state. + FLUENT_FIELD_OPTION(EJobState, State); + + /// + /// @brief Address of the cluster node where job was running. + FLUENT_FIELD_OPTION(TString, Address); + + /// + /// @brief Return only jobs whose stderr has been saved. + FLUENT_FIELD_OPTION(bool, WithStderr); + + /// + /// @brief Return only jobs whose spec has been saved. + FLUENT_FIELD_OPTION(bool, WithSpec); + + /// + /// @brief Return only jobs whose fail context has been saved. + FLUENT_FIELD_OPTION(bool, WithFailContext); + + /// @} + + /// + /// @name Sort options + /// @{ + + /// + /// @brief Sort by this field. + FLUENT_FIELD_OPTION(EJobSortField, SortField); + + /// + /// @brief Sort order. + FLUENT_FIELD_OPTION(ESortOrder, SortOrder); + + /// @} + + /// + /// @brief Data source. + /// + /// Where to search for jobs: in scheduler and Cypress ('Runtime'), in archive ('Archive'), + /// automatically basing on operation presence in Cypress ('Auto') or choose manually (`Manual'). + FLUENT_FIELD_OPTION(EListJobsDataSource, DataSource); + + /// @deprecated + FLUENT_FIELD_OPTION(bool, IncludeCypress); + + /// @deprecated + FLUENT_FIELD_OPTION(bool, IncludeControllerAgent); + + /// @deprecated + FLUENT_FIELD_OPTION(bool, IncludeArchive); + + /// + /// @brief Maximum number of jobs to return. + FLUENT_FIELD_OPTION(i64, Limit); + + /// + /// @brief Number of jobs (in specified sort order) to skip. + /// + /// Together with @ref NYT::TListJobsOptions::Limit may be used for pagination. + FLUENT_FIELD_OPTION(i64, Offset); +}; + +/// +/// @brief Description of a core dump that happened in the job. +struct TCoreInfo +{ + i64 ProcessId; + TString ExecutableName; + TMaybe<ui64> Size; + TMaybe<TYtError> Error; +}; + +/// +/// @brief Job attributes. +/// +/// A field may be `Nothing()` if it is not available (i.e. `FinishTime` for a running job). +/// +/// @see https://yt.yandex-team.ru/docs/api/commands#get_job +struct TJobAttributes +{ + /// + /// @brief Job id. + TMaybe<TJobId> Id; + + /// + /// @brief Job type + TMaybe<EJobType> Type; + + /// + /// @brief Job state. + TMaybe<EJobState> State; + + /// + /// @brief Address of a cluster node where job was running. + TMaybe<TString> Address; + + /// + /// @brief The name of the task that job corresponds to. + TMaybe<TString> TaskName; + + /// + /// @brief Job start time. + TMaybe<TInstant> StartTime; + + /// + /// @brief Job finish time (for a finished job). + TMaybe<TInstant> FinishTime; + + /// + /// @brief Estimated ratio of job's completed work. + TMaybe<double> Progress; + + /// + /// @brief Size of saved job stderr. + TMaybe<i64> StderrSize; + + /// + /// @brief Error for a unsuccessfully finished job. + TMaybe<TYtError> Error; + + /// + /// @brief Job brief statistics. + TMaybe<TNode> BriefStatistics; + + /// + /// @brief Job input paths (with ranges). + TMaybe<TVector<TRichYPath>> InputPaths; + + /// + /// @brief Infos for core dumps produced by job. + TMaybe<TVector<TCoreInfo>> CoreInfos; +}; + +/// +/// @brief Response for @ref NYT::IOperation::ListJobs. +struct TListJobsResult +{ + /// + /// @brief Jobs. + TVector<TJobAttributes> Jobs; + + /// + /// @deprecated + TMaybe<i64> CypressJobCount; + + /// + /// @brief Number of jobs retrieved from controller agent. + TMaybe<i64> ControllerAgentJobCount; + + /// + /// @brief Number of jobs retrieved from archive. + TMaybe<i64> ArchiveJobCount; +}; + +//////////////////////////////////////////////////////////////////// + +/// +/// @brief Options for @ref NYT::IClient::GetJob. +struct TGetJobOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetJobOptions; + /// @endcond +}; + +/// +/// @brief Options for @ref NYT::IClient::GetJobInput. +struct TGetJobInputOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetJobInputOptions; + /// @endcond +}; + +/// +/// @brief Options for @ref NYT::IClient::GetJobFailContext. +struct TGetJobFailContextOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetJobFailContextOptions; + /// @endcond +}; + +/// +/// @brief Options for @ref NYT::IClient::GetJobStderr. +struct TGetJobStderrOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetJobStderrOptions; + /// @endcond +}; + +//////////////////////////////////////////////////////////////////// + +/// +/// @brief Options for @ref NYT::IOperation::GetFailedJobInfo. +struct TGetFailedJobInfoOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetFailedJobInfoOptions; + /// @endcond + + /// + /// @brief How many jobs to download. Which jobs will be chosen is undefined. + FLUENT_FIELD_DEFAULT(ui64, MaxJobCount, 10); + + /// + /// @brief How much of stderr tail should be downloaded. + FLUENT_FIELD_DEFAULT(ui64, StderrTailSize, 64 * 1024); +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// +/// @brief Interface representing an operation. +struct IOperation + : public TThrRefBase +{ + virtual ~IOperation() = default; + + /// + /// @brief Get operation id. + virtual const TOperationId& GetId() const = 0; + + /// + /// @brief Get URL of the operation in YT Web UI. + virtual TString GetWebInterfaceUrl() const = 0; + + /// + /// @brief Get last error for not started operations. Get state on YT cluster for started operations. + /// + /// For not started operations last error is an error that's being retried during operation + /// preparation/start (e.g. lock files, start operation request). + virtual TString GetStatus() const = 0; + + /// + /// @brief Get preparation future. + /// + /// @return future that is set when operation is prepared. + virtual ::NThreading::TFuture<void> GetPreparedFuture() = 0; + + /// + /// @brief Start operation synchronously. + /// + /// @note: Do NOT call this method twice. + /// + /// If operation is not prepared yet, Start() will block waiting for preparation finish. + /// Be ready to catch exception if operation preparation or start failed. + virtual void Start() = 0; + + /// + /// @brief Is the operation started + /// + /// Returns true if the operation is started on the cluster + virtual bool IsStarted() const = 0; + + /// + /// @brief Get start future. + /// + /// @return future that is set when operation is started. + virtual ::NThreading::TFuture<void> GetStartedFuture() = 0; + + /// + /// @brief Start watching operation. + /// + /// @return future that is set when operation is complete. + /// + /// @note: the user should check value of returned future to ensure that operation completed successfully e.g. + /// @code{.cpp} + /// auto operationComplete = operation->Watch(); + /// operationComplete.Wait(); + /// operationComplete.GetValue(); /// will throw if operation completed with errors + /// @endcode + /// + /// If operation is completed successfully the returned future contains void value. + /// If operation is completed with error future contains @ref NYT::TOperationFailedError. + /// In rare cases when error occurred while waiting (e.g. YT become unavailable) future might contain other exception. + virtual ::NThreading::TFuture<void> Watch() = 0; + + /// + /// @brief Get information about failed jobs. + /// + /// Can be called for operation in any stage. + /// Though user should keep in mind that this method always fetches info from cypress + /// and doesn't work when operation is archived. Successfully completed operations can be archived + /// quite quickly (in about ~30 seconds). + virtual TVector<TFailedJobInfo> GetFailedJobInfo(const TGetFailedJobInfoOptions& options = TGetFailedJobInfoOptions()) = 0; + + /// + /// Get operation brief state. + virtual EOperationBriefState GetBriefState() = 0; + + /// + /// @brief Get error (if operation has failed). + /// + /// @return `Nothing()` if operation is in 'Completed' or 'InProgress' state (or reason for failed / aborted operation). + virtual TMaybe<TYtError> GetError() = 0; + + /// + /// Get job statistics. + virtual TJobStatistics GetJobStatistics() = 0; + + /// + /// Get operation progress. + /// + /// @return `Nothing()` if operation has no running jobs yet, e.g. when it is in "materializing" or "pending" state. + virtual TMaybe<TOperationBriefProgress> GetBriefProgress() = 0; + + /// + /// @brief Abort operation. + /// + /// Operation will be finished immediately. + /// All results of completed/running jobs will be lost. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#abort_op + virtual void AbortOperation() = 0; + + /// + /// @brief Complete operation. + /// + /// Operation will be finished immediately. + /// All results of completed jobs will appear in output tables. + /// All results of running (not completed) jobs will be lost. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#complete_op + virtual void CompleteOperation() = 0; + + /// + /// @brief Suspend operation. + /// + /// Jobs will not be aborted by default, c.f. @ref NYT::TSuspendOperationOptions. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#suspend_op + virtual void SuspendOperation( + const TSuspendOperationOptions& options = TSuspendOperationOptions()) = 0; + + /// + /// @brief Resume previously suspended operation. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#resume_op + virtual void ResumeOperation( + const TResumeOperationOptions& options = TResumeOperationOptions()) = 0; + + /// + /// @brief Get operation attributes. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#get_operation + virtual TOperationAttributes GetAttributes( + const TGetOperationOptions& options = TGetOperationOptions()) = 0; + + /// + /// @brief Update operation runtime parameters. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#update_op_parameters + virtual void UpdateParameters( + const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions()) = 0; + + /// + /// @brief Get job attributes. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#get_job + virtual TJobAttributes GetJob( + const TJobId& jobId, + const TGetJobOptions& options = TGetJobOptions()) = 0; + + /// + /// List jobs satisfying given filters (see @ref NYT::TListJobsOptions). + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#list_jobs + virtual TListJobsResult ListJobs( + const TListJobsOptions& options = TListJobsOptions()) = 0; +}; + +/// +/// @brief Interface of client capable of managing operations. +struct IOperationClient +{ + /// + /// @brief Run Map operation. + /// + /// @param spec Operation spec. + /// @param mapper Instance of a job to run. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/map + IOperationPtr Map( + const TMapOperationSpec& spec, + ::TIntrusivePtr<IMapperBase> mapper, + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run Map operation. + /// + /// @param mapper Instance of a job to run. + /// @param input Input table(s) + /// @param output Output table(s) + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/map + IOperationPtr Map( + ::TIntrusivePtr<IMapperBase> mapper, + const TOneOrMany<TStructuredTablePath>& input, + const TOneOrMany<TStructuredTablePath>& output, + const TMapOperationSpec& spec = TMapOperationSpec(), + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run raw Map operation. + /// + /// @param spec Operation spec. + /// @param rawJob Instance of a raw mapper to run. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/map + virtual IOperationPtr RawMap( + const TRawMapOperationSpec& spec, + ::TIntrusivePtr<IRawJob> rawJob, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run Reduce operation. + /// + /// @param spec Operation spec. + /// @param reducer Instance of a job to run. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/reduce + IOperationPtr Reduce( + const TReduceOperationSpec& spec, + ::TIntrusivePtr<IReducerBase> reducer, + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run Reduce operation. + /// + /// @param reducer Instance of a job to run. + /// @param input Input table(s) + /// @param output Output table(s) + /// @param reduceBy Columns to group rows by. + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/reduce + IOperationPtr Reduce( + ::TIntrusivePtr<IReducerBase> reducer, + const TOneOrMany<TStructuredTablePath>& input, + const TOneOrMany<TStructuredTablePath>& output, + const TSortColumns& reduceBy, + const TReduceOperationSpec& spec = TReduceOperationSpec(), + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run raw Reduce operation. + /// + /// @param spec Operation spec. + /// @param rawJob Instance of a raw reducer to run. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/reduce + virtual IOperationPtr RawReduce( + const TRawReduceOperationSpec& spec, + ::TIntrusivePtr<IRawJob> rawJob, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run JoinReduce operation. + /// + /// @param spec Operation spec. + /// @param reducer Instance of a job to run. + /// @param options Optional parameters. + /// + /// @deprecated Use @ref NYT::IOperationClient::Reduce with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false. + IOperationPtr JoinReduce( + const TJoinReduceOperationSpec& spec, + ::TIntrusivePtr<IReducerBase> reducer, + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run raw JoinReduce operation. + /// + /// @param spec Operation spec. + /// @param rawJob Instance of a raw reducer to run. + /// @param options Optional parameters. + /// + /// @deprecated Use @ref NYT::IOperationClient::RawReduce with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false. + virtual IOperationPtr RawJoinReduce( + const TRawJoinReduceOperationSpec& spec, + ::TIntrusivePtr<IRawJob> rawJob, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run MapReduce operation. + /// + /// @param spec Operation spec. + /// @param mapper Instance of a map job to run (identity mapper if `nullptr`). + /// @param reducer Instance of a reduce job to run. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce + IOperationPtr MapReduce( + const TMapReduceOperationSpec& spec, + ::TIntrusivePtr<IMapperBase> mapper, + ::TIntrusivePtr<IReducerBase> reducer, + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run MapReduce operation. + /// + /// @param spec Operation spec. + /// @param mapper Instance of a map job to run (identity mapper if `nullptr`). + /// @param reducerCombiner Instance of a reduce combiner to run (identity reduce combiner if `nullptr`). + /// @param reducer Instance of a reduce job to run. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce + IOperationPtr MapReduce( + const TMapReduceOperationSpec& spec, + ::TIntrusivePtr<IMapperBase> mapper, + ::TIntrusivePtr<IReducerBase> reduceCombiner, + ::TIntrusivePtr<IReducerBase> reducer, + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run MapReduce operation. + /// + /// @param mapper Instance of mapper to run (identity mapper if `nullptr`). + /// @param reducer Instance of reducer to run. + /// @param input Input table(s) + /// @param output Output table(s) + /// @param reduceBy Columns to group rows by. + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce + IOperationPtr MapReduce( + ::TIntrusivePtr<IMapperBase> mapper, + ::TIntrusivePtr<IReducerBase> reducer, + const TOneOrMany<TStructuredTablePath>& input, + const TOneOrMany<TStructuredTablePath>& output, + const TSortColumns& reduceBy, + TMapReduceOperationSpec spec = TMapReduceOperationSpec(), + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run MapReduce operation. + /// + /// @param mapper Instance of mapper to run (identity mapper if `nullptr`). + /// @param reduceCombiner Instance of reduceCombiner to run (identity reduce combiner if `nullptr`). + /// @param reducer Instance of reducer to run. + /// @param input Input table(s) + /// @param output Output table(s) + /// @param reduceBy Columns to group rows by. + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce + IOperationPtr MapReduce( + ::TIntrusivePtr<IMapperBase> mapper, + ::TIntrusivePtr<IReducerBase> reduceCombiner, + ::TIntrusivePtr<IReducerBase> reducer, + const TOneOrMany<TStructuredTablePath>& input, + const TOneOrMany<TStructuredTablePath>& output, + const TSortColumns& reduceBy, + TMapReduceOperationSpec spec = TMapReduceOperationSpec(), + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run raw MapReduce operation. + /// + /// @param spec Operation spec. + /// @param mapper Instance of a raw mapper to run (identity mapper if `nullptr`). + /// @param mapper Instance of a raw reduce combiner to run (identity reduce combiner if `nullptr`). + /// @param mapper Instance of a raw reducer to run. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce + virtual IOperationPtr RawMapReduce( + const TRawMapReduceOperationSpec& spec, + ::TIntrusivePtr<IRawJob> mapper, + ::TIntrusivePtr<IRawJob> reduceCombiner, + ::TIntrusivePtr<IRawJob> reducer, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run Sort operation. + /// + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/sort + virtual IOperationPtr Sort( + const TSortOperationSpec& spec, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run Sort operation. + /// + /// @param input Input table(s). + /// @param output Output table. + /// @param sortBy Columns to sort input rows by. + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/sort + IOperationPtr Sort( + const TOneOrMany<TRichYPath>& input, + const TRichYPath& output, + const TSortColumns& sortBy, + const TSortOperationSpec& spec = TSortOperationSpec(), + const TOperationOptions& options = TOperationOptions()); + + /// + /// @brief Run Merge operation. + /// + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/merge + virtual IOperationPtr Merge( + const TMergeOperationSpec& spec, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run Erase operation. + /// + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/erase + virtual IOperationPtr Erase( + const TEraseOperationSpec& spec, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run RemoteCopy operation. + /// + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/remote_copy + virtual IOperationPtr RemoteCopy( + const TRemoteCopyOperationSpec& spec, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Run Vanilla operation. + /// + /// @param spec Operation spec. + /// @param options Optional parameters. + /// + /// @see https://yt.yandex-team.ru/docs/description/mr/vanilla + virtual IOperationPtr RunVanilla( + const TVanillaOperationSpec& spec, + const TOperationOptions& options = TOperationOptions()) = 0; + + /// + /// @brief Abort operation. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#abort_op + virtual void AbortOperation( + const TOperationId& operationId) = 0; + + /// + /// @brief Complete operation. + /// + /// @see https://yt.yandex-team.ru/docs/api/commands#complete_op + virtual void CompleteOperation( + const TOperationId& operationId) = 0; + + /// + /// @brief Wait for operation to finish. + virtual void WaitForOperation( + const TOperationId& operationId) = 0; + + /// + /// @brief Check and return operation status. + /// + /// @note this function will never return @ref NYT::EOperationBriefState::Failed or @ref NYT::EOperationBriefState::Aborted status, + /// it will throw @ref NYT::TOperationFailedError instead. + virtual EOperationBriefState CheckOperation( + const TOperationId& operationId) = 0; + + /// + /// @brief Create an operation object given operation id. + /// + /// @throw @ref NYT::TErrorResponse if the operation doesn't exist. + virtual IOperationPtr AttachOperation(const TOperationId& operationId) = 0; + +private: + virtual IOperationPtr DoMap( + const TMapOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> mapper, + const TOperationOptions& options) = 0; + + virtual IOperationPtr DoReduce( + const TReduceOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> reducer, + const TOperationOptions& options) = 0; + + virtual IOperationPtr DoJoinReduce( + const TJoinReduceOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> reducer, + const TOperationOptions& options) = 0; + + virtual IOperationPtr DoMapReduce( + const TMapReduceOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> mapper, + ::TIntrusivePtr<IStructuredJob> reduceCombiner, + ::TIntrusivePtr<IStructuredJob> reducer, + const TOperationOptions& options) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define OPERATION_INL_H_ +#include "operation-inl.h" +#undef OPERATION_INL_H_ |