aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/interface/operation.h
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/interface/operation.h
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/interface/operation.h')
-rw-r--r--yt/cpp/mapreduce/interface/operation.h3494
1 files changed, 3494 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
new file mode 100644
index 00000000000..171a7e4af79
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -0,0 +1,3494 @@
+#pragma once
+
+///
+/// @file yt/cpp/mapreduce/interface/operation.h
+///
+/// Header containing interface to run operations in YT
+/// and retrieve information about them.
+/// @see [the doc](https://yt.yandex-team.ru/docs/description/mr/map_reduce_overview.html).
+
+#include "client_method_options.h"
+#include "errors.h"
+#include "io.h"
+#include "job_statistics.h"
+#include "job_counters.h"
+
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/type_info/type_info.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/variant.h>
+#include <util/generic/vector.h>
+#include <util/generic/maybe.h>
+#include <util/system/file.h>
+#include <util/system/types.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// Tag class marking that the row type for table is not specified.
+struct TUnspecifiedTableStructure
+{ };
+
+/// Tag class marking that table rows have protobuf type.
+struct TProtobufTableStructure
+{
+ /// @brief Descriptor of the protobuf type of table rows.
+ ///
+ /// @note If table is tagged with @ref ::google::protobuf::Message instead of real proto class
+ /// this descriptor might be null.
+ const ::google::protobuf::Descriptor* Descriptor = nullptr;
+};
+
+
+/// Tag class to specify table row type.
+using TTableStructure = std::variant<
+ TUnspecifiedTableStructure,
+ TProtobufTableStructure
+>;
+
+bool operator==(const TUnspecifiedTableStructure&, const TUnspecifiedTableStructure&);
+bool operator==(const TProtobufTableStructure& lhs, const TProtobufTableStructure& rhs);
+
+/// Table path marked with @ref NYT::TTableStructure tag.
+struct TStructuredTablePath
+{
+ TStructuredTablePath(TRichYPath richYPath = TRichYPath(), TTableStructure description = TUnspecifiedTableStructure())
+ : RichYPath(std::move(richYPath))
+ , Description(std::move(description))
+ { }
+
+ TStructuredTablePath(TRichYPath richYPath, const ::google::protobuf::Descriptor* descriptor)
+ : RichYPath(std::move(richYPath))
+ , Description(TProtobufTableStructure({descriptor}))
+ { }
+
+ TStructuredTablePath(TYPath path)
+ : RichYPath(std::move(path))
+ , Description(TUnspecifiedTableStructure())
+ { }
+
+ TStructuredTablePath(const char* path)
+ : RichYPath(path)
+ , Description(TUnspecifiedTableStructure())
+ { }
+
+ TRichYPath RichYPath;
+ TTableStructure Description;
+};
+
+/// Create marked table path from row type.
+template <typename TRow>
+TStructuredTablePath Structured(TRichYPath richYPath);
+
+/// Create tag class from row type.
+template <typename TRow>
+TTableStructure StructuredTableDescription();
+
+///////////////////////////////////////////////////////////////////////////////
+
+/// Tag class marking that row stream is empty.
+struct TVoidStructuredRowStream
+{ };
+
+/// Tag class marking that row stream consists of `NYT::TNode`.
+struct TTNodeStructuredRowStream
+{ };
+
+/// Tag class marking that row stream consists of @ref NYT::TYaMRRow.
+struct TTYaMRRowStructuredRowStream
+{ };
+
+/// Tag class marking that row stream consists of protobuf rows of given type.
+struct TProtobufStructuredRowStream
+{
+ /// @brief Descriptor of the protobuf type of table rows.
+ ///
+ /// @note If `Descriptor` is nullptr, then row stream consists of multiple message types.
+ const ::google::protobuf::Descriptor* Descriptor = nullptr;
+};
+
+/// Tag class to specify type of rows in an operation row stream
+using TStructuredRowStreamDescription = std::variant<
+ TVoidStructuredRowStream,
+ TTNodeStructuredRowStream,
+ TTYaMRRowStructuredRowStream,
+ TProtobufStructuredRowStream
+>;
+
+///////////////////////////////////////////////////////////////////////////////
+
+/// Tag class marking that current binary should be used in operation.
+struct TJobBinaryDefault
+{ };
+
+/// Tag class marking that binary from specified local path should be used in operation.
+struct TJobBinaryLocalPath
+{
+ TString Path;
+ TMaybe<TString> MD5CheckSum;
+};
+
+/// Tag class marking that binary from specified Cypress path should be used in operation.
+struct TJobBinaryCypressPath
+{
+ TYPath Path;
+ TMaybe<TTransactionId> TransactionId;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+
+/// @cond Doxygen_Suppress
+namespace NDetail {
+ extern i64 OutputTableCount;
+} // namespace NDetail
+/// @endcond
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Auto merge mode.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/automerge
+enum class EAutoMergeMode
+{
+ /// Auto merge is disabled.
+ Disabled /* "disabled" */,
+
+ /// Mode that tries to achieve good chunk sizes and doesn't limit usage of chunk quota for intermediate chunks.
+ Relaxed /* "relaxed" */,
+
+ /// Mode that tries to optimize usage of chunk quota for intermediate chunks, operation might run slower.
+ Economy /* "economy" */,
+
+ ///
+ /// @brief Manual configuration of automerge parameters.
+ ///
+ /// @ref TAutoMergeSpec
+ Manual /* "manual" */,
+};
+
+///
+/// @brief Options for auto merge operation stage.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/automerge
+class TAutoMergeSpec
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TAutoMergeSpec;
+ /// @endcond
+
+ /// Mode of the auto merge.
+ FLUENT_FIELD_OPTION(EAutoMergeMode, Mode);
+
+ /// @brief Upper limit for number of intermediate chunks.
+ ///
+ /// Works only for Manual mode.
+ FLUENT_FIELD_OPTION(i64, MaxIntermediateChunkCount);
+
+ /// @brief Number of chunks limit to merge in one job.
+ ///
+ /// Works only for Manual mode.
+ FLUENT_FIELD_OPTION(i64, ChunkCountPerMergeJob);
+
+ /// @brief Automerge will not merge chunks that are larger than `DesiredChunkSize * (ChunkSizeThreshold / 100.)`
+ ///
+ /// Works only for Manual mode.
+ FLUENT_FIELD_OPTION(i64, ChunkSizeThreshold);
+};
+
+/// Base for operations with auto merge options.
+template <class TDerived>
+class TWithAutoMergeSpec
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ /// @brief Options for auto merge operation stage.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/automerge
+ FLUENT_FIELD_OPTION(TAutoMergeSpec, AutoMerge);
+};
+
+///
+/// @brief Resources controlled by scheduler and used by running operations.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/scheduler/scheduler_and_pools#resursy
+class TSchedulerResources
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TSchedulerResources;
+ /// @endcond
+
+ /// Each job consumes exactly one user slot.
+ FLUENT_FIELD_OPTION_ENCAPSULATED(i64, UserSlots);
+
+ /// Number of (virtual) cpu cores consumed by all jobs.
+ FLUENT_FIELD_OPTION_ENCAPSULATED(i64, Cpu);
+
+ /// Amount of memory in bytes.
+ FLUENT_FIELD_OPTION_ENCAPSULATED(i64, Memory);
+};
+
+/// Base for input format hints of a user job.
+template <class TDerived>
+class TUserJobInputFormatHintsBase
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ /// @brief Fine tune input format of the job.
+ FLUENT_FIELD_OPTION(TFormatHints, InputFormatHints);
+};
+
+/// Base for output format hints of a user job.
+template <class TDerived>
+class TUserJobOutputFormatHintsBase
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ /// @brief Fine tune output format of the job.
+ FLUENT_FIELD_OPTION(TFormatHints, OutputFormatHints);
+};
+
+/// Base for format hints of a user job.
+template <class TDerived>
+class TUserJobFormatHintsBase
+ : public TUserJobInputFormatHintsBase<TDerived>
+ , public TUserJobOutputFormatHintsBase<TDerived>
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+};
+
+/// User job format hints.
+class TUserJobFormatHints
+ : public TUserJobFormatHintsBase<TUserJobFormatHints>
+{ };
+
+/// Spec of input and output tables of a raw operation.
+template <class TDerived>
+class TRawOperationIoTableSpec
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ /// Add input table path to input path list.
+ TDerived& AddInput(const TRichYPath& path);
+
+ /// Set input table path no. `tableIndex`.
+ TDerived& SetInput(size_t tableIndex, const TRichYPath& path);
+
+ /// Add output table path to output path list.
+ TDerived& AddOutput(const TRichYPath& path);
+
+ /// Set output table path no. `tableIndex`.
+ TDerived& SetOutput(size_t tableIndex, const TRichYPath& path);
+
+ /// Get all input table paths.
+ const TVector<TRichYPath>& GetInputs() const;
+
+ /// Get all output table paths.
+ const TVector<TRichYPath>& GetOutputs() const;
+
+private:
+ TVector<TRichYPath> Inputs_;
+ TVector<TRichYPath> Outputs_;
+};
+
+/// Base spec for IO in "simple" raw operations (Map, Reduce etc.).
+template <class TDerived>
+struct TSimpleRawOperationIoSpec
+ : public TRawOperationIoTableSpec<TDerived>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ /// @brief Describes format for both input and output.
+ ///
+ /// @note `Format' is overriden by `InputFormat' and `OutputFormat'.
+ FLUENT_FIELD_OPTION(TFormat, Format);
+
+ /// Describes input format.
+ FLUENT_FIELD_OPTION(TFormat, InputFormat);
+
+ /// Describes output format.
+ FLUENT_FIELD_OPTION(TFormat, OutputFormat);
+};
+
+/// Spec for IO in MapReduce operation.
+template <class TDerived>
+class TRawMapReduceOperationIoSpec
+ : public TRawOperationIoTableSpec<TDerived>
+{
+public:
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ /// @brief Describes format for both input and output of mapper.
+ ///
+ /// @note `MapperFormat' is overriden by `MapperInputFormat' and `MapperOutputFormat'.
+ FLUENT_FIELD_OPTION(TFormat, MapperFormat);
+
+ /// Describes mapper input format.
+ FLUENT_FIELD_OPTION(TFormat, MapperInputFormat);
+
+ /// Describes mapper output format.
+ FLUENT_FIELD_OPTION(TFormat, MapperOutputFormat);
+
+ /// @brief Describes format for both input and output of reduce combiner.
+ ///
+ /// @note `ReduceCombinerFormat' is overriden by `ReduceCombinerInputFormat' and `ReduceCombinerOutputFormat'.
+ FLUENT_FIELD_OPTION(TFormat, ReduceCombinerFormat);
+
+ /// Describes reduce combiner input format.
+ FLUENT_FIELD_OPTION(TFormat, ReduceCombinerInputFormat);
+
+ /// Describes reduce combiner output format.
+ FLUENT_FIELD_OPTION(TFormat, ReduceCombinerOutputFormat);
+
+ /// @brief Describes format for both input and output of reducer.
+ ///
+ /// @note `ReducerFormat' is overriden by `ReducerInputFormat' and `ReducerOutputFormat'.
+ FLUENT_FIELD_OPTION(TFormat, ReducerFormat);
+
+ /// Describes reducer input format.
+ FLUENT_FIELD_OPTION(TFormat, ReducerInputFormat);
+
+ /// Describes reducer output format.
+ FLUENT_FIELD_OPTION(TFormat, ReducerOutputFormat);
+
+ /// Add direct map output table path.
+ TDerived& AddMapOutput(const TRichYPath& path);
+
+ /// Set direct map output table path no. `tableIndex`.
+ TDerived& SetMapOutput(size_t tableIndex, const TRichYPath& path);
+
+ /// Get all direct map output table paths
+ const TVector<TRichYPath>& GetMapOutputs() const;
+
+private:
+ TVector<TRichYPath> MapOutputs_;
+};
+
+///
+/// @brief Base spec of operations with input tables.
+class TOperationInputSpecBase
+{
+public:
+ template <class T, class = void>
+ struct TFormatAdder;
+
+ ///
+ /// @brief Add input table path to input path list and specify type of rows.
+ template <class T>
+ void AddInput(const TRichYPath& path);
+
+ ///
+ /// @brief Add input table path as structured paths.
+ void AddStructuredInput(TStructuredTablePath path);
+
+ ///
+ /// @brief Set input table path and type.
+ template <class T>
+ void SetInput(size_t tableIndex, const TRichYPath& path);
+
+ ///
+ /// @brief All input paths.
+ TVector<TRichYPath> Inputs_;
+
+ ///
+ /// @brief Get all input structured paths.
+ const TVector<TStructuredTablePath>& GetStructuredInputs() const;
+
+private:
+ TVector<TStructuredTablePath> StructuredInputs_;
+ friend struct TOperationIOSpecBase;
+ template <class T>
+ friend struct TOperationIOSpec;
+};
+
+///
+/// @brief Base spec of operations with output tables.
+class TOperationOutputSpecBase
+{
+public:
+ template <class T, class = void>
+ struct TFormatAdder;
+
+ ///
+ /// @brief Add output table path to output path list and specify type of rows.
+ template <class T>
+ void AddOutput(const TRichYPath& path);
+
+ ///
+ /// @brief Add output table path as structured paths.
+ void AddStructuredOutput(TStructuredTablePath path);
+
+ ///
+ /// @brief Set output table path and type.
+ template <class T>
+ void SetOutput(size_t tableIndex, const TRichYPath& path);
+
+ ///
+ /// @brief All output paths.
+ TVector<TRichYPath> Outputs_;
+
+ ///
+ /// @brief Get all output structured paths.
+ const TVector<TStructuredTablePath>& GetStructuredOutputs() const;
+
+private:
+ TVector<TStructuredTablePath> StructuredOutputs_;
+ friend struct TOperationIOSpecBase;
+ template <class T>
+ friend struct TOperationIOSpec;
+};
+
+///
+/// @brief Base spec for operations with inputs and outputs.
+struct TOperationIOSpecBase
+ : public TOperationInputSpecBase
+ , public TOperationOutputSpecBase
+{ };
+
+///
+/// @brief Base spec for operations with inputs and outputs.
+template <class TDerived>
+struct TOperationIOSpec
+ : public TOperationIOSpecBase
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ template <class T>
+ TDerived& AddInput(const TRichYPath& path);
+
+ TDerived& AddStructuredInput(TStructuredTablePath path);
+
+ template <class T>
+ TDerived& SetInput(size_t tableIndex, const TRichYPath& path);
+
+ template <class T>
+ TDerived& AddOutput(const TRichYPath& path);
+
+ TDerived& AddStructuredOutput(TStructuredTablePath path);
+
+ template <class T>
+ TDerived& SetOutput(size_t tableIndex, const TRichYPath& path);
+
+
+ // DON'T USE THESE METHODS! They are left solely for backward compatibility.
+ // These methods are the only way to do equivalent of (Add/Set)(Input/Output)<Message>
+ // but please consider using (Add/Set)(Input/Output)<TConcreteMessage>
+ // (where TConcreteMessage is some descendant of Message)
+ // because they are faster and better (see https://st.yandex-team.ru/YT-6967)
+ TDerived& AddProtobufInput_VerySlow_Deprecated(const TRichYPath& path);
+ TDerived& AddProtobufOutput_VerySlow_Deprecated(const TRichYPath& path);
+};
+
+///
+/// @brief Base spec for all operations.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/operations_options
+template <class TDerived>
+struct TOperationSpecBase
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ ///
+ /// @brief Limit on operation execution time.
+ ///
+ /// If operation doesn't finish in time it will be aborted.
+ FLUENT_FIELD_OPTION(TDuration, TimeLimit);
+
+ /// @brief Title to be shown in web interface.
+ FLUENT_FIELD_OPTION(TString, Title);
+
+ /// @brief Pool to be used for this operation.
+ FLUENT_FIELD_OPTION(TString, Pool);
+
+ /// @brief Weight of operation.
+ ///
+ /// Coefficient defining how much resources operation gets relative to its siblings in the same pool.
+ FLUENT_FIELD_OPTION(double, Weight);
+
+ /// @breif Pool tree list that operation will use.
+ FLUENT_OPTIONAL_VECTOR_FIELD_ENCAPSULATED(TString, PoolTree);
+
+ /// How much resources can be consumed by operation.
+ FLUENT_FIELD_OPTION_ENCAPSULATED(TSchedulerResources, ResourceLimits);
+};
+
+///
+/// @brief Base spec for all operations with user jobs.
+template <class TDerived>
+struct TUserOperationSpecBase
+ : TOperationSpecBase<TDerived>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ /// How many jobs can fail before operation is failed.
+ FLUENT_FIELD_OPTION(ui64, MaxFailedJobCount);
+
+ /// On any unsuccessful job completion (i.e. abortion or failure) force the whole operation to fail.
+ FLUENT_FIELD_OPTION(bool, FailOnJobRestart);
+
+ ///
+ /// @brief Table to save whole stderr of operation.
+ ///
+ /// @see https://clubs.at.yandex-team.ru/yt/1045
+ FLUENT_FIELD_OPTION(TYPath, StderrTablePath);
+
+ ///
+ /// @brief Table to save coredumps of operation.
+ ///
+ /// @see https://clubs.at.yandex-team.ru/yt/1045
+ FLUENT_FIELD_OPTION(TYPath, CoreTablePath);
+
+ ///
+ /// @brief How long should the scheduler wait for the job to be started on a node.
+ ///
+ /// When you run huge jobs that require preemption of all the other jobs on
+ /// a node, the default timeout might be insufficient and your job may be
+ /// aborted with 'waiting_timeout' reason. This is especially problematic
+ /// when you are setting 'FailOnJobRestart' option.
+ ///
+ /// @note The value must be between 10 seconds and 10 minutes.
+ FLUENT_FIELD_OPTION(TDuration, WaitingJobTimeout);
+};
+
+///
+/// @brief Class to provide information on intermediate mapreduce stream protobuf types.
+///
+/// When using protobuf format it is important to know exact types of proto messages
+/// that are used in input/output.
+///
+/// Sometimes such messages cannot be derived from job class
+/// i.e. when job class uses `NYT::TTableReader<::google::protobuf::Message>`
+/// or `NYT::TTableWriter<::google::protobuf::Message>`.
+///
+/// When using such jobs user can provide exact message type using this class.
+///
+/// @note Only input/output that relate to intermediate tables can be hinted.
+/// Input to map and output of reduce is derived from `AddInput`/`AddOutput`.
+template <class TDerived>
+struct TIntermediateTablesHintSpec
+{
+ /// Specify intermediate map output type.
+ template <class T>
+ TDerived& HintMapOutput();
+
+ /// Specify reduce combiner input.
+ template <class T>
+ TDerived& HintReduceCombinerInput();
+
+ /// Specify reduce combiner output.
+ template <class T>
+ TDerived& HintReduceCombinerOutput();
+
+ /// Specify reducer input.
+ template <class T>
+ TDerived& HintReduceInput();
+
+ ///
+ /// @brief Add output of map stage.
+ ///
+ /// Mapper output table #0 is always intermediate table that is going to be reduced later.
+ /// Rows that mapper write to tables #1, #2, ... are saved in MapOutput tables.
+ template <class T>
+ TDerived& AddMapOutput(const TRichYPath& path);
+
+ TVector<TRichYPath> MapOutputs_;
+
+ const TVector<TStructuredTablePath>& GetStructuredMapOutputs() const;
+ const TMaybe<TTableStructure>& GetIntermediateMapOutputDescription() const;
+ const TMaybe<TTableStructure>& GetIntermediateReduceCombinerInputDescription() const;
+ const TMaybe<TTableStructure>& GetIntermediateReduceCombinerOutputDescription() const;
+ const TMaybe<TTableStructure>& GetIntermediateReducerInputDescription() const;
+
+private:
+ TVector<TStructuredTablePath> StructuredMapOutputs_;
+ TMaybe<TTableStructure> IntermediateMapOutputDescription_;
+ TMaybe<TTableStructure> IntermediateReduceCombinerInputDescription_;
+ TMaybe<TTableStructure> IntermediateReduceCombinerOutputDescription_;
+ TMaybe<TTableStructure> IntermediateReducerInputDescription_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TAddLocalFileOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TAddLocalFileOptions;
+ /// @endcond
+
+ ///
+ /// @brief Path by which job will see the uploaded file.
+ ///
+ /// Defaults to basename of the local path.
+ FLUENT_FIELD_OPTION(TString, PathInJob);
+
+ ///
+ /// @brief MD5 checksum of uploaded file.
+ ///
+ /// If not specified it is computed by this library.
+ /// If this argument is provided, the user can some cpu and disk IO.
+ FLUENT_FIELD_OPTION(TString, MD5CheckSum);
+
+ ///
+ /// @brief Do not put file into node cache
+ ///
+ /// @see NYT::TRichYPath::BypassArtifactCache
+ FLUENT_FIELD_OPTION(bool, BypassArtifactCache);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Binary to run job profiler on.
+enum class EProfilingBinary
+{
+ /// Profile job proxy.
+ JobProxy /* "job_proxy" */,
+
+ /// Profile user job.
+ UserJob /* "user_job" */,
+};
+
+/// @brief Type of job profiler.
+enum class EProfilerType
+{
+ /// Profile CPU usage.
+ Cpu /* "cpu" */,
+
+ /// Profile memory usage.
+ Memory /* "memory" */,
+
+ /// Profiler peak memory usage.
+ PeakMemory /* "peak_memory" */,
+};
+
+/// @brief Specifies a job profiler.
+struct TJobProfilerSpec
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TJobProfilerSpec;
+ /// @endcond
+
+ /// @brief Binary to profile.
+ FLUENT_FIELD_OPTION(EProfilingBinary, ProfilingBinary);
+
+ /// @brief Type of the profiler.
+ FLUENT_FIELD_OPTION(EProfilerType, ProfilerType);
+
+ /// @brief Probabiliy of the job being selected for profiling.
+ FLUENT_FIELD_OPTION(double, ProfilingProbability);
+
+ /// @brief For sampling profilers, sets the number of samples per second.
+ FLUENT_FIELD_OPTION(int, SamplingFrequency);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Spec of user job.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/operations_options#user_script_options
+struct TUserJobSpec
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TUserJobSpec;
+ /// @endcond
+
+ ///
+ /// @brief Specify a local file to upload to Cypress and prepare for use in job.
+ TSelf& AddLocalFile(const TLocalFilePath& path, const TAddLocalFileOptions& options = TAddLocalFileOptions());
+
+ ///
+ /// @brief Get the list of all added local files.
+ TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> GetLocalFiles() const;
+
+ /// @brief Paths to files in Cypress to use in job.
+ FLUENT_VECTOR_FIELD(TRichYPath, File);
+
+ ///
+ /// @brief MemoryLimit specifies how much memory job process can use.
+ ///
+ /// @note
+ /// If job uses tmpfs (check @ref NYT::TOperationOptions::MountSandboxInTmpfs)
+ /// YT computes its memory usage as total of:
+ /// - memory usage of job process itself (including mapped files);
+ /// - total size of tmpfs used by this job.
+ ///
+ /// @note
+ /// When @ref NYT::TOperationOptions::MountSandboxInTmpfs is enabled library will compute
+ /// total size of all files used by this job and add this total size to MemoryLimit.
+ /// Thus you shouldn't include size of your files (e.g. binary file) into MemoryLimit.
+ ///
+ /// @note
+ /// Final memory memory_limit passed to YT is calculated as follows:
+ ///
+ /// @note
+ /// ```
+ /// memory_limit = MemoryLimit + <total-size-of-used-files> + ExtraTmpfsSize
+ /// ```
+ ///
+ /// @see NYT::TUserJobSpec::ExtraTmpfsSize
+ FLUENT_FIELD_OPTION(i64, MemoryLimit);
+
+ ///
+ /// @brief Size of data that is going to be written to tmpfs.
+ ///
+ /// This option should be used if job writes data to tmpfs.
+ ///
+ /// ExtraTmpfsSize should not include size of files specified with
+ /// @ref NYT::TUserJobSpec::AddLocalFile or @ref NYT::TUserJobSpec::AddFile
+ /// These files are copied to tmpfs automatically and their total size
+ /// is computed automatically.
+ ///
+ /// @see NYT::TOperationOptions::MountSandboxInTmpfs
+ /// @see NYT::TUserJobSpec::MemoryLimit
+ FLUENT_FIELD_OPTION(i64, ExtraTmpfsSize);
+
+ ///
+ /// @brief Maximum number of CPU cores for a single job to use.
+ FLUENT_FIELD_OPTION(double, CpuLimit);
+
+ ///
+ /// @brief Fraction of @ref NYT::TUserJobSpec::MemoryLimit that job gets at start.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/operations_options#memory_reserve_factor
+ FLUENT_FIELD_OPTION(double, MemoryReserveFactor);
+
+ ///
+ /// @brief Local path to executable to be used inside jobs.
+ ////
+ /// Provided executable must use C++ YT API library (this library)
+ /// and implement job class that is going to be used.
+ ///
+ /// This option might be useful if we want to start operation from nonlinux machines
+ /// (in that case we use `JobBinary` to provide path to the same program compiled for linux).
+ /// Other example of using this option is uploading executable to cypress in advance
+ /// and save the time required to upload current executable to cache.
+ /// `md5` argument can be used to save cpu time and disk IO when binary MD5 checksum is known.
+ /// When argument is not provided library will compute it itself.
+ TUserJobSpec& JobBinaryLocalPath(TString path, TMaybe<TString> md5 = Nothing());
+
+ ///
+ /// @brief Cypress path to executable to be used inside jobs.
+ TUserJobSpec& JobBinaryCypressPath(TString path, TMaybe<TTransactionId> transactionId = Nothing());
+
+ ///
+ /// @brief String that will be prepended to the command.
+ ///
+ /// This option overrides @ref NYT::TOperationOptions::JobCommandPrefix.
+ FLUENT_FIELD(TString, JobCommandPrefix);
+
+ ///
+ /// @brief String that will be appended to the command.
+ ///
+ /// This option overrides @ref NYT::TOperationOptions::JobCommandSuffix.
+ FLUENT_FIELD(TString, JobCommandSuffix);
+
+ ///
+ /// @brief Map of environment variables that will be set for jobs.
+ FLUENT_MAP_FIELD(TString, TString, Environment);
+
+ ///
+ /// @brief Limit for all files inside job sandbox (in bytes).
+ FLUENT_FIELD_OPTION(ui64, DiskSpaceLimit);
+
+ ///
+ /// @brief Number of ports reserved for the job (passed through environment in YT_PORT_0, YT_PORT_1, ...).
+ FLUENT_FIELD_OPTION(ui16, PortCount);
+
+ ///
+ /// @brief Network project used to isolate job network.
+ FLUENT_FIELD_OPTION(TString, NetworkProject);
+
+ ///
+ /// @brief Limit on job execution time.
+ ///
+ /// Jobs that exceed this limit will be considered failed.
+ FLUENT_FIELD_OPTION(TDuration, JobTimeLimit);
+
+ ///
+ /// @brief Get job binary config.
+ const TJobBinaryConfig& GetJobBinary() const;
+
+ ///
+ /// @brief List of profilers to run.
+ FLUENT_VECTOR_FIELD(TJobProfilerSpec, JobProfiler);
+
+private:
+ TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> LocalFiles_;
+ TJobBinaryConfig JobBinary_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Spec of Map operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/map
+template <typename TDerived>
+struct TMapOperationSpecBase
+ : public TUserOperationSpecBase<TDerived>
+ , public TWithAutoMergeSpec<TDerived>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ ///
+ /// @brief Spec of mapper job.
+ FLUENT_FIELD(TUserJobSpec, MapperSpec);
+
+ ///
+ /// @brief Whether to guarantee the order of rows passed to mapper matches the order in the table.
+ ///
+ /// When `Ordered' is false (by default), there is no guaranties about order of reading rows.
+ /// In this case mapper might work slightly faster because row delivered from fast node can be processed YT waits
+ /// response from slow nodes.
+ /// When `Ordered' is true, rows will come in order in which they are stored in input tables.
+ FLUENT_FIELD_OPTION(bool, Ordered);
+
+ ///
+ /// @brief Recommended number of jobs to run.
+ ///
+ /// `JobCount' has higher priority than @ref NYT::TMapOperationSpecBase::DataSizePerJob.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui32, JobCount);
+
+ ///
+ /// @brief Recommended of data size for each job.
+ ///
+ /// `DataSizePerJob` has lower priority that @ref NYT::TMapOperationSpecBase::JobCount.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui64, DataSizePerJob);
+};
+
+///
+/// @brief Spec of Map operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/map
+struct TMapOperationSpec
+ : public TMapOperationSpecBase<TMapOperationSpec>
+ , public TOperationIOSpec<TMapOperationSpec>
+ , public TUserJobFormatHintsBase<TMapOperationSpec>
+{ };
+
+///
+/// @brief Spec of raw Map operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/map
+struct TRawMapOperationSpec
+ : public TMapOperationSpecBase<TRawMapOperationSpec>
+ , public TSimpleRawOperationIoSpec<TRawMapOperationSpec>
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Spec of Reduce operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/reduce
+template <typename TDerived>
+struct TReduceOperationSpecBase
+ : public TUserOperationSpecBase<TDerived>
+ , public TWithAutoMergeSpec<TDerived>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ ///
+ /// @brief Spec of reduce job.
+ FLUENT_FIELD(TUserJobSpec, ReducerSpec);
+
+ ///
+ /// @brief Columns to sort rows by (must include `ReduceBy` as prefix).
+ FLUENT_FIELD(TSortColumns, SortBy);
+
+ ///
+ /// @brief Columns to group rows by.
+ FLUENT_FIELD(TSortColumns, ReduceBy);
+
+ ///
+ /// @brief Columns to join foreign tables by (must be prefix of `ReduceBy`).
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables
+ FLUENT_FIELD_OPTION(TSortColumns, JoinBy);
+
+ ///
+ /// @brief Guarantee to feed all rows with same `ReduceBy` columns to a single job (`true` by default).
+ FLUENT_FIELD_OPTION(bool, EnableKeyGuarantee);
+
+ ///
+ /// @brief Recommended number of jobs to run.
+ ///
+ /// `JobCount' has higher priority than @ref NYT::TReduceOperationSpecBase::DataSizePerJob.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui32, JobCount);
+
+ ///
+ /// @brief Recommended of data size for each job.
+ ///
+ /// `DataSizePerJob` has lower priority that @ref NYT::TReduceOperationSpecBase::JobCount.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui64, DataSizePerJob);
+};
+
+///
+/// @brief Spec of Reduce operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/reduce
+struct TReduceOperationSpec
+ : public TReduceOperationSpecBase<TReduceOperationSpec>
+ , public TOperationIOSpec<TReduceOperationSpec>
+ , public TUserJobFormatHintsBase<TReduceOperationSpec>
+{ };
+
+///
+/// @brief Spec of raw Reduce operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/reduce
+struct TRawReduceOperationSpec
+ : public TReduceOperationSpecBase<TRawReduceOperationSpec>
+ , public TSimpleRawOperationIoSpec<TRawReduceOperationSpec>
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Spec of JoinReduce operation.
+///
+/// @deprecated Instead the user should run a reduce operation
+/// with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false`.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables
+template <typename TDerived>
+struct TJoinReduceOperationSpecBase
+ : public TUserOperationSpecBase<TDerived>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ ///
+ /// @brief Spec of reduce job.
+ FLUENT_FIELD(TUserJobSpec, ReducerSpec);
+
+ ///
+ /// @brief Columns to join foreign tables by (must be prefix of `ReduceBy`).
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables
+ FLUENT_FIELD(TSortColumns, JoinBy);
+
+ ///
+ /// @brief Recommended number of jobs to run.
+ ///
+ /// `JobCount' has higher priority than @ref NYT::TJoinReduceOperationSpecBase::DataSizePerJob.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui32, JobCount);
+
+ ///
+ /// @brief Recommended of data size for each job.
+ ///
+ /// `DataSizePerJob` has lower priority that @ref NYT::TJoinReduceOperationSpecBase::JobCount.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui64, DataSizePerJob);
+};
+
+///
+/// @brief Spec of JoinReduce operation.
+///
+/// @deprecated Instead the user should run a reduce operation
+/// with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false`.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables
+struct TJoinReduceOperationSpec
+ : public TJoinReduceOperationSpecBase<TJoinReduceOperationSpec>
+ , public TOperationIOSpec<TJoinReduceOperationSpec>
+ , public TUserJobFormatHintsBase<TJoinReduceOperationSpec>
+{ };
+
+///
+/// @brief Spec of raw JoinReduce operation.
+///
+/// @deprecated Instead the user should run a reduce operation
+/// with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false`.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/reduce#foreign_tables
+struct TRawJoinReduceOperationSpec
+ : public TJoinReduceOperationSpecBase<TRawJoinReduceOperationSpec>
+ , public TSimpleRawOperationIoSpec<TRawJoinReduceOperationSpec>
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Spec of MapReduce operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+template <typename TDerived>
+struct TMapReduceOperationSpecBase
+ : public TUserOperationSpecBase<TDerived>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TDerived;
+ /// @endcond
+
+ ///
+ /// @brief Spec of map job.
+ FLUENT_FIELD(TUserJobSpec, MapperSpec);
+
+ ///
+ /// @brief Spec of reduce job.
+ FLUENT_FIELD(TUserJobSpec, ReducerSpec);
+
+ ///
+ /// @brief Spec of reduce combiner.
+ FLUENT_FIELD(TUserJobSpec, ReduceCombinerSpec);
+
+ ///
+ /// @brief Columns to sort rows by (must include `ReduceBy` as prefix).
+ FLUENT_FIELD(TSortColumns, SortBy);
+
+ ///
+ /// @brief Columns to group rows by.
+ FLUENT_FIELD(TSortColumns, ReduceBy);
+
+ ///
+ /// @brief Recommended number of map jobs to run.
+ ///
+ /// `JobCount' has higher priority than @ref NYT::TMapReduceOperationSpecBase::DataSizePerMapJob.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui32, MapJobCount);
+
+ ///
+ /// @brief Recommended of data size for each map job.
+ ///
+ /// `DataSizePerJob` has lower priority that @ref NYT::TMapReduceOperationSpecBase::MapJobCount.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui64, DataSizePerMapJob);
+
+ ///
+ /// @brief Recommended number of intermediate data partitions.
+ FLUENT_FIELD_OPTION(ui64, PartitionCount);
+
+ ///
+ /// @brief Recommended size of intermediate data partitions.
+ FLUENT_FIELD_OPTION(ui64, PartitionDataSize);
+
+ ///
+ /// @brief Account to use for intermediate data.
+ FLUENT_FIELD_OPTION(TString, IntermediateDataAccount);
+
+ ///
+ /// @brief Replication factor for intermediate data (1 by default).
+ FLUENT_FIELD_OPTION(ui64, IntermediateDataReplicationFactor);
+
+ ///
+ /// @brief Recommended size of data to be passed to a single reduce combiner.
+ FLUENT_FIELD_OPTION(ui64, DataSizePerSortJob);
+
+ ///
+ /// @brief Whether to guarantee the order of rows passed to mapper matches the order in the table.
+ ///
+ /// @see @ref NYT::TMapOperationSpec::Ordered for more info.
+ FLUENT_FIELD_OPTION(bool, Ordered);
+
+ ///
+ /// @brief Guarantee to run reduce combiner before reducer.
+ FLUENT_FIELD_OPTION(bool, ForceReduceCombiners);
+};
+
+///
+/// @brief Spec of MapReduce operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+struct TMapReduceOperationSpec
+ : public TMapReduceOperationSpecBase<TMapReduceOperationSpec>
+ , public TOperationIOSpec<TMapReduceOperationSpec>
+ , public TIntermediateTablesHintSpec<TMapReduceOperationSpec>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TMapReduceOperationSpec;
+ /// @endcond
+
+ ///
+ /// @brief Format hints for mapper.
+ FLUENT_FIELD_DEFAULT(TUserJobFormatHints, MapperFormatHints, TUserJobFormatHints());
+
+ ///
+ /// @brief Format hints for reducer.
+ FLUENT_FIELD_DEFAULT(TUserJobFormatHints, ReducerFormatHints, TUserJobFormatHints());
+
+ ///
+ /// @brief Format hints for reduce combiner.
+ FLUENT_FIELD_DEFAULT(TUserJobFormatHints, ReduceCombinerFormatHints, TUserJobFormatHints());
+};
+
+///
+/// @brief Spec of raw MapReduce operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+struct TRawMapReduceOperationSpec
+ : public TMapReduceOperationSpecBase<TRawMapReduceOperationSpec>
+ , public TRawMapReduceOperationIoSpec<TRawMapReduceOperationSpec>
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Schema inference mode.
+///
+/// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference
+enum class ESchemaInferenceMode : int
+{
+ FromInput /* "from_input" */,
+ FromOutput /* "from_output" */,
+ Auto /* "auto" */,
+};
+
+///
+/// @brief Spec of Sort operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/sort
+struct TSortOperationSpec
+ : TOperationSpecBase<TSortOperationSpec>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TSortOperationSpec;
+ /// @endcond
+
+ ///
+ /// @brief Paths to input tables.
+ FLUENT_VECTOR_FIELD(TRichYPath, Input);
+
+ ///
+ /// @brief Path to output table.
+ FLUENT_FIELD(TRichYPath, Output);
+
+ ///
+ /// @brief Columns to sort table by.
+ FLUENT_FIELD(TSortColumns, SortBy);
+
+ ///
+ /// @brief Recommended number of intermediate data partitions.
+ FLUENT_FIELD_OPTION(ui64, PartitionCount);
+
+ ///
+ /// @brief Recommended size of intermediate data partitions.
+ FLUENT_FIELD_OPTION(ui64, PartitionDataSize);
+
+ ///
+ /// @brief Recommended number of partition jobs to run.
+ ///
+ /// `JobCount' has higher priority than @ref NYT::TSortOperationSpec::DataSizePerPartitionJob.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui64, PartitionJobCount);
+
+ ///
+ /// @brief Recommended of data size for each partition job.
+ ///
+ /// `DataSizePerJob` has lower priority that @ref NYT::TSortOperationSpec::PartitionJobCount.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui64, DataSizePerPartitionJob);
+
+ ///
+ /// @brief Inference mode for output table schema.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference
+ FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
+
+ ///
+ /// @brief Account to use for intermediate data.
+ FLUENT_FIELD_OPTION(TString, IntermediateDataAccount);
+
+ ///
+ /// @brief Replication factor for intermediate data (1 by default).
+ FLUENT_FIELD_OPTION(ui64, IntermediateDataReplicationFactor);
+};
+
+
+///
+/// @brief Merge mode.
+enum EMergeMode : int
+{
+ MM_UNORDERED /* "unordered" */,
+ MM_ORDERED /* "ordered" */,
+ MM_SORTED /* "sorted" */,
+};
+
+///
+/// @brief Spec of Merge operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/merge
+struct TMergeOperationSpec
+ : TOperationSpecBase<TMergeOperationSpec>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TMergeOperationSpec;
+ /// @endcond
+
+ ///
+ /// @brief Paths to input tables.
+ FLUENT_VECTOR_FIELD(TRichYPath, Input);
+
+ ///
+ /// @brief Path to output table.
+ FLUENT_FIELD(TRichYPath, Output);
+
+ ///
+ /// @brief Columns by which to merge (for @ref NYT::EMergeMode::MM_SORTED).
+ FLUENT_FIELD(TSortColumns, MergeBy);
+
+ ///
+ /// @brief Merge mode.
+ FLUENT_FIELD_DEFAULT(EMergeMode, Mode, MM_UNORDERED);
+
+ ///
+ /// @brief Combine output chunks to larger ones.
+ FLUENT_FIELD_DEFAULT(bool, CombineChunks, false);
+
+ ///
+ /// @brief Guarantee that all input chunks will be read.
+ FLUENT_FIELD_DEFAULT(bool, ForceTransform, false);
+
+ ///
+ /// @brief Recommended number of jobs to run.
+ ///
+ /// `JobCount' has higher priority than @ref NYT::TMergeOperationSpec::DataSizePerJob.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui32, JobCount);
+
+ ///
+ /// @brief Recommended of data size for each job.
+ ///
+ /// `DataSizePerJob` has lower priority that @ref NYT::TMergeOperationSpec::JobCount.
+ /// This option only provide a recommendation and may be ignored if conflicting with YT internal limits.
+ FLUENT_FIELD_OPTION(ui64, DataSizePerJob);
+
+ ///
+ /// @brief Inference mode for output table schema.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference
+ FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
+};
+
+///
+/// @brief Spec of Erase operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/erase
+struct TEraseOperationSpec
+ : TOperationSpecBase<TEraseOperationSpec>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TEraseOperationSpec;
+ /// @endcond
+
+ ///
+ /// @brief Which table (or row range) to erase.
+ FLUENT_FIELD(TRichYPath, TablePath);
+
+ ///
+ /// Combine output chunks to larger ones.
+ FLUENT_FIELD_DEFAULT(bool, CombineChunks, false);
+
+ ///
+ /// @brief Inference mode for output table schema.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference
+ FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
+};
+
+///
+/// @brief Spec of RemoteCopy operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/remote_copy
+struct TRemoteCopyOperationSpec
+ : TOperationSpecBase<TRemoteCopyOperationSpec>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TRemoteCopyOperationSpec;
+ /// @endcond
+
+ ///
+ /// @brief Source cluster name.
+ FLUENT_FIELD(TString, ClusterName);
+
+ ///
+ /// @brief Network to use for copy (all remote cluster nodes must have it configured).
+ FLUENT_FIELD_OPTION(TString, NetworkName);
+
+ ///
+ /// @brief Paths to input tables.
+ FLUENT_VECTOR_FIELD(TRichYPath, Input);
+
+ ///
+ /// @brief Path to output table.
+ FLUENT_FIELD(TRichYPath, Output);
+
+ ///
+ /// @brief Inference mode for output table schema.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/storage/static_schema.html#schema_inference
+ FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
+
+ ///
+ /// @brief Copy user attributes from input to output table (allowed only for single input table).
+ FLUENT_FIELD_DEFAULT(bool, CopyAttributes, false);
+
+ ///
+ /// @brief Names of user attributes to copy from input to output table.
+ ///
+ /// @note To make this option make sense set @ref NYT::TRemoteCopyOperationSpec::CopyAttributes to `true`.
+ FLUENT_VECTOR_FIELD(TString, AttributeKey);
+
+private:
+
+ ///
+ /// @brief Config for remote cluster connection.
+ FLUENT_FIELD_OPTION(TNode, ClusterConnection);
+};
+
+class IVanillaJobBase;
+
+///
+/// @brief Task of Vanilla operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/vanilla
+struct TVanillaTask
+ : public TOperationOutputSpecBase
+ , public TUserJobOutputFormatHintsBase<TVanillaTask>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TVanillaTask;
+ /// @endcond
+
+ ///
+ /// @brief Add output table path and specify the task output type (i.e. TMyProtoMessage).
+ template <class T>
+ TSelf& AddOutput(const TRichYPath& path);
+
+ ///
+ /// @brief Add output table path as structured path.
+ TSelf& AddStructuredOutput(TStructuredTablePath path);
+
+ ///
+ /// @brief Set output table path and specify the task output type (i.e. TMyProtoMessage).
+ template <class T>
+ TSelf& SetOutput(size_t tableIndex, const TRichYPath& path);
+
+ ///
+ /// @brief Task name.
+ FLUENT_FIELD(TString, Name);
+
+ ///
+ /// @brief Job to be executed in this task.
+ FLUENT_FIELD(::TIntrusivePtr<IVanillaJobBase>, Job);
+
+ ///
+ /// @brief User job spec.
+ FLUENT_FIELD(TUserJobSpec, Spec);
+
+ ///
+ /// @brief Number of jobs to run and wait for successful completion.
+ ///
+ /// @note If @ref NYT::TUserOperationSpecBase::FailOnJobRestart is `false`, a failed job will be restarted
+ /// and will not count in this amount.
+ FLUENT_FIELD(ui64, JobCount);
+
+ ///
+ /// @brief Network project name.
+ FLUENT_FIELD(TMaybe<TString>, NetworkProject);
+
+};
+
+///
+/// @brief Spec of Vanilla operation.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/vanilla
+struct TVanillaOperationSpec
+ : TUserOperationSpecBase<TVanillaOperationSpec>
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TVanillaOperationSpec;
+ /// @endcond
+
+ ///
+ /// @brief Description of tasks to run in this operation.
+ FLUENT_VECTOR_FIELD(TVanillaTask, Task);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Options for @ref NYT::IOperationClient::Map and other operation start commands.
+struct TOperationOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TOperationOptions;
+ /// @endcond
+
+ ///
+ /// @brief Additional field to put to operation spec.
+ FLUENT_FIELD_OPTION(TNode, Spec);
+
+ ///
+ /// @brief Start operation mode.
+ enum class EStartOperationMode : int
+ {
+ ///
+ /// @brief Prepare operation asynchronously. Call IOperation::Start() to start operation.
+ AsyncPrepare,
+
+ ///
+ /// @brief Prepare and start operation asynchronously. Don't wait for operation completion.
+ AsyncStart,
+
+ ///
+ /// @brief Prepare and start operation synchronously. Don't wait for operation completion.
+ SyncStart,
+
+ ///
+ /// @brief Prepare, start and wait for operation completion synchronously.
+ SyncWait,
+ };
+
+ ///
+ /// @brief Start operation mode.
+ FLUENT_FIELD_DEFAULT(EStartOperationMode, StartOperationMode, EStartOperationMode::SyncWait);
+
+ ///
+ /// @brief Wait for operation finish synchronously.
+ ///
+ /// @deprecated Use StartOperationMode() instead.
+ TSelf& Wait(bool value) {
+ StartOperationMode_ = value ? EStartOperationMode::SyncWait : EStartOperationMode::SyncStart;
+ return static_cast<TSelf&>(*this);
+ }
+
+ ///
+ ///
+ /// @brief Use format from table attribute (for YAMR-like format).
+ ///
+ /// @deprecated
+ FLUENT_FIELD_DEFAULT(bool, UseTableFormats, false);
+
+ ///
+ /// @brief Prefix for bash command running the jobs.
+ ///
+ /// Can be overridden for the specific job type in the @ref NYT::TUserJobSpec.
+ FLUENT_FIELD(TString, JobCommandPrefix);
+
+ ///
+ /// @brief Suffix for bash command running the jobs.
+ ///
+ /// Can be overridden for the specific job type in the @ref NYT::TUserJobSpec.
+ FLUENT_FIELD(TString, JobCommandSuffix);
+
+ ///
+ /// @brief Put all files required by the job into tmpfs.
+ ///
+ /// This option can be set globally using @ref NYT::TConfig::MountSandboxInTmpfs.
+ /// @see https://yt.yandex-team.ru/docs/problems/woodpeckers
+ FLUENT_FIELD_DEFAULT(bool, MountSandboxInTmpfs, false);
+
+ ///
+ /// @brief Path to directory to store temporary files.
+ FLUENT_FIELD_OPTION(TString, FileStorage);
+
+ ///
+ /// @brief Expiration timeout for uploaded files.
+ FLUENT_FIELD_OPTION(TDuration, FileExpirationTimeout);
+
+ ///
+ /// @brief Info to be passed securely to the job.
+ FLUENT_FIELD_OPTION(TNode, SecureVault);
+
+ ///
+ /// @brief File cache mode.
+ enum class EFileCacheMode : int
+ {
+ ///
+ /// @brief Use YT API commands "get_file_from_cache" and "put_file_to_cache".
+ ApiCommandBased,
+
+ ///
+ /// @brief Upload files to random paths inside @ref NYT::TOperationOptions::FileStorage without caching.
+ CachelessRandomPathUpload,
+ };
+
+ ///
+ /// @brief File cache mode.
+ FLUENT_FIELD_DEFAULT(EFileCacheMode, FileCacheMode, EFileCacheMode::ApiCommandBased);
+
+ ///
+ /// @brief Id of transaction within which all Cypress file storage entries will be checked/created.
+ ///
+ /// By default, the root transaction is used.
+ ///
+ /// @note Set a specific transaction only if you
+ /// 1. specify non-default file storage path in @ref NYT::TOperationOptions::FileStorage or in @ref NYT::TConfig::RemoteTempFilesDirectory.
+ /// 2. use `CachelessRandomPathUpload` caching mode (@ref NYT::TOperationOptions::FileCacheMode).
+ FLUENT_FIELD(TTransactionId, FileStorageTransactionId);
+
+ ///
+ /// @brief Ensure stderr and core tables exist before starting operation.
+ ///
+ /// If set to `false`, it is user's responsibility to ensure these tables exist.
+ FLUENT_FIELD_DEFAULT(bool, CreateDebugOutputTables, true);
+
+ ///
+ /// @brief Ensure output tables exist before starting operation.
+ ///
+ /// If set to `false`, it is user's responsibility to ensure output tables exist.
+ FLUENT_FIELD_DEFAULT(bool, CreateOutputTables, true);
+
+ ///
+ /// @brief Try to infer schema of inexistent table from the type of written rows.
+ ///
+ /// @note Default values for this option may differ depending on the row type.
+ /// For protobuf it's currently `false` by default.
+ FLUENT_FIELD_OPTION(bool, InferOutputSchema);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Get operation secure vault (specified in @ref NYT::TOperationOptions::SecureVault) inside a job.
+const TNode& GetJobSecureVault();
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Context passed to @ref NYT::IRawJob::Do.
+class TRawJobContext
+{
+public:
+ explicit TRawJobContext(size_t outputTableCount);
+
+ ///
+ /// @brief Get file corresponding to input stream.
+ const TFile& GetInputFile() const;
+
+ ///
+ /// @brief Get files corresponding to output streams.
+ const TVector<TFile>& GetOutputFileList() const;
+
+private:
+ TFile InputFile_;
+ TVector<TFile> OutputFileList_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Interface for classes that can be Saved/Loaded (to be used with @ref Y_SAVELOAD_JOB).
+class ISerializableForJob
+{
+public:
+ virtual ~ISerializableForJob() = default;
+
+ ///
+ /// @brief Dump state to output stream to be restored in job.
+ virtual void Save(IOutputStream& stream) const = 0;
+
+ ///
+ /// @brief Load state from a stream.
+ virtual void Load(IInputStream& stream) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Provider of information about operation inputs/outputs during @ref NYT::IJob::PrepareOperation.
+class IOperationPreparationContext
+{
+public:
+ virtual ~IOperationPreparationContext() = default;
+
+ /// @brief Get the number of input tables.
+ virtual int GetInputCount() const = 0;
+
+ /// @brief Get the number of output tables.
+ virtual int GetOutputCount() const = 0;
+
+ /// @brief Get the schema of input table no. `index`.
+ virtual const TTableSchema& GetInputSchema(int index) const = 0;
+
+ /// @brief Get all the input table schemas.
+ virtual const TVector<TTableSchema>& GetInputSchemas() const = 0;
+
+ /// @brief Path to the input table if available (`Nothing()` for intermediate tables).
+ virtual TMaybe<TYPath> GetInputPath(int index) const = 0;
+
+ /// @brief Path to the output table if available (`Nothing()` for intermediate tables).
+ virtual TMaybe<TYPath> GetOutputPath(int index) const = 0;
+};
+
+///
+/// @brief Fluent builder class for @ref NYT::IJob::PrepareOperation.
+///
+/// @note Method calls are supposed to be chained.
+class TJobOperationPreparer
+{
+public:
+
+ ///
+ /// @brief Group of input tables that allows to specify properties on all of them at once.
+ ///
+ /// The instances are created with @ref NYT::TJobOperationPreparer::BeginInputGroup, not directly.
+ class TInputGroup
+ {
+ public:
+ TInputGroup(TJobOperationPreparer& preparer, TVector<int> indices);
+
+ /// @brief Specify the type of input rows.
+ template <typename TRow>
+ TInputGroup& Description();
+
+ /// @brief Specify renaming of input columns.
+ TInputGroup& ColumnRenaming(const THashMap<TString, TString>& renaming);
+
+ /// @brief Specify what input columns to send to job
+ ///
+ /// @note Filter is applied before renaming, so it must specify original column names.
+ TInputGroup& ColumnFilter(const TVector<TString>& columns);
+
+ /// @brief Finish describing the input group.
+ TJobOperationPreparer& EndInputGroup();
+
+ private:
+ TJobOperationPreparer& Preparer_;
+ TVector<int> Indices_;
+ };
+
+ ///
+ /// @brief Group of output tables that allows to specify properties on all of them at once.
+ ///
+ /// The instances are created with @ref NYT::TJobOperationPreparer::BeginOutputGroup, not directly.
+ class TOutputGroup
+ {
+ public:
+ TOutputGroup(TJobOperationPreparer& preparer, TVector<int> indices);
+
+ /// @brief Specify the type of output rows.
+ ///
+ /// @tparam TRow type of output rows from tables of this group.
+ /// @param inferSchema Infer schema from `TRow` and specify it for these output tables.
+ template <typename TRow>
+ TOutputGroup& Description(bool inferSchema = true);
+
+ /// @brief Specify schema for these tables.
+ TOutputGroup& Schema(const TTableSchema& schema);
+
+ /// @brief Specify that all the the tables in this group are unschematized.
+ ///
+ /// It is equivalent of `.Schema(TTableSchema().Strict(false)`.
+ TOutputGroup& NoSchema();
+
+ /// @brief Finish describing the output group.
+ TJobOperationPreparer& EndOutputGroup();
+
+ private:
+ TJobOperationPreparer& Preparer_;
+ TVector<int> Indices_;
+ };
+
+public:
+ explicit TJobOperationPreparer(const IOperationPreparationContext& context);
+
+ /// @brief Begin input group consisting of tables with indices `[begin, end)`.
+ ///
+ /// @param begin First index.
+ /// @param end Index after the last one.
+ TInputGroup BeginInputGroup(int begin, int end);
+
+ /// @brief Begin input group consisting of tables with indices from `indices`.
+ ///
+ /// @tparam TCont Container with integers. Must support `std::begin` and `std::end` functions.
+ /// @param indices Indices of tables to include in the group.
+ template <typename TCont>
+ TInputGroup BeginInputGroup(const TCont& indices);
+
+ /// @brief Begin output group consisting of tables with indices `[begin, end)`.
+ ///
+ /// @param begin First index.
+ /// @param end Index after the last one.
+ TOutputGroup BeginOutputGroup(int begin, int end);
+
+ /// @brief Begin input group consisting of tables with indices from `indices`.
+ ///
+ /// @tparam TCont Container with integers. Must support `std::begin` and `std::end` functions.
+ /// @param indices Indices of tables to include in the group.
+ template <typename TCont>
+ TOutputGroup BeginOutputGroup(const TCont& indices);
+
+ /// @brief Specify the schema for output table no `tableIndex`.
+ ///
+ /// @note All the output schemas must be specified either with this method, `NoOutputSchema` or `OutputDescription` with `inferSchema == true`
+ TJobOperationPreparer& OutputSchema(int tableIndex, TTableSchema schema);
+
+ /// @brief Mark the output table no. `tableIndex` as unschematized.
+ TJobOperationPreparer& NoOutputSchema(int tableIndex);
+
+ /// @brief Specify renaming of input columns for table no. `tableIndex`.
+ TJobOperationPreparer& InputColumnRenaming(int tableIndex, const THashMap<TString, TString>& renaming);
+
+ /// @brief Specify what input columns of table no. `tableIndex` to send to job
+ ///
+ /// @note Filter is applied before renaming, so it must specify original column names.
+ TJobOperationPreparer& InputColumnFilter(int tableIndex, const TVector<TString>& columns);
+
+ /// @brief Specify the type of input rows for table no. `tableIndex`.
+ ///
+ /// @tparam TRow type of input rows.
+ template <typename TRow>
+ TJobOperationPreparer& InputDescription(int tableIndex);
+
+ /// @brief Specify the type of output rows for table no. `tableIndex`.
+ ///
+ /// @tparam TRow type of output rows.
+ /// @param inferSchema Infer schema from `TRow` and specify it for the output tables.
+ template <typename TRow>
+ TJobOperationPreparer& OutputDescription(int tableIndex, bool inferSchema = true);
+
+ /// @brief Set type of output rows for table no. `tableIndex` to TNode
+ ///
+ /// @note Set schema via `OutputSchema` if needed
+ TJobOperationPreparer& NodeOutput(int tableIndex);
+
+ /// @brief Specify input format hints.
+ ///
+ /// These hints have lower priority than ones specified in spec.
+ TJobOperationPreparer& InputFormatHints(TFormatHints hints);
+
+ /// @brief Specify output format hints.
+ ///
+ /// These hints have lower priority than ones specified in spec.
+ TJobOperationPreparer& OutputFormatHints(TFormatHints hints);
+
+ /// @brief Specify format hints.
+ ///
+ /// These hints have lower priority than ones specified in spec.
+ TJobOperationPreparer& FormatHints(TUserJobFormatHints newFormatHints);
+
+ /// @name "Private" members
+ /// The following methods should not be used by clients in @ref NYT::IJob::PrepareOperation
+ ///@{
+
+ /// @brief Finish the building process.
+ void Finish();
+
+ /// @brief Get output table schemas as specified by the user.
+ TVector<TTableSchema> GetOutputSchemas();
+
+ /// @brief Get input column renamings as specified by the user.
+ const TVector<THashMap<TString, TString>>& GetInputColumnRenamings() const;
+
+ /// @brief Get input column filters as specified by the user.
+ const TVector<TMaybe<TVector<TString>>>& GetInputColumnFilters() const;
+
+ /// @brief Get input column descriptions as specified by the user.
+ const TVector<TMaybe<TTableStructure>>& GetInputDescriptions() const;
+
+ /// @brief Get output column descriptions as specified by the user.
+ const TVector<TMaybe<TTableStructure>>& GetOutputDescriptions() const;
+
+ /// @brief Get format hints as specified by the user.
+ const TUserJobFormatHints& GetFormatHints() const;
+
+ ///@}
+private:
+
+ /// @brief Validate that schema for output table no. `tableIndex` has not been set yet.
+ void ValidateMissingOutputSchema(int tableIndex) const;
+
+ /// @brief Validate that description for input table no. `tableIndex` has not been set yet.
+ void ValidateMissingInputDescription(int tableIndex) const;
+
+ /// @brief Validate that description for output table no. `tableIndex` has not been set yet.
+ void ValidateMissingOutputDescription(int tableIndex) const;
+
+ /// @brief Validate that `tableIndex` is in correct range for input table indices.
+ ///
+ /// @param message Message to add to the exception in case of violation.
+ void ValidateInputTableIndex(int tableIndex, TStringBuf message) const;
+
+ /// @brief Validate that `tableIndex` is in correct range for output table indices.
+ ///
+ /// @param message Message to add to the exception in case of violation.
+ void ValidateOutputTableIndex(int tableIndex, TStringBuf message) const;
+
+ /// @brief Validate that all the output schemas has been set.
+ void FinallyValidate() const;
+
+ static TTableSchema EmptyNonstrictSchema();
+
+private:
+ const IOperationPreparationContext& Context_;
+
+ TVector<TMaybe<TTableSchema>> OutputSchemas_;
+ TVector<THashMap<TString, TString>> InputColumnRenamings_;
+ TVector<TMaybe<TVector<TString>>> InputColumnFilters_;
+ TVector<TMaybe<TTableStructure>> InputTableDescriptions_;
+ TVector<TMaybe<TTableStructure>> OutputTableDescriptions_;
+ TUserJobFormatHints FormatHints_ = {};
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Interface for all user jobs.
+class IJob
+ : public TThrRefBase
+{
+public:
+
+ ///
+ /// @brief Type of job.
+ enum EType
+ {
+ Mapper,
+ Reducer,
+ ReducerAggregator,
+ RawJob,
+ VanillaJob,
+ };
+
+ ///
+ /// @brief Save job state to stream to be restored on cluster nodes.
+ virtual void Save(IOutputStream& stream) const
+ {
+ Y_UNUSED(stream);
+ }
+
+ ///
+ /// @brief Restore job state from a stream.
+ virtual void Load(IInputStream& stream)
+ {
+ Y_UNUSED(stream);
+ }
+
+ ///
+ /// @brief Get operation secure vault (specified in @ref NYT::TOperationOptions::SecureVault) inside a job.
+ const TNode& SecureVault() const
+ {
+ return GetJobSecureVault();
+ }
+
+ ///
+ /// @brief Get number of output tables.
+ i64 GetOutputTableCount() const
+ {
+ Y_VERIFY(NDetail::OutputTableCount > 0);
+
+ return NDetail::OutputTableCount;
+ }
+
+ ///
+ /// @brief Method allowing user to control some properties of input and output tables and formats.
+ ///
+ /// User can override this method in their job class to:
+ /// - specify output table schemas.
+ /// The most natural way is usually through @ref NYT::TJobOperationPreparer::OutputDescription (especially for protobuf),
+ /// but you can use @ref NYT::TJobOperationPreparer::OutputSchema directly
+ /// - specify output row type (@ref NYT::TJobOperationPreparer::OutputDescription)
+ /// - specify input row type (@ref NYT::TJobOperationPreparer::InputDescription)
+ /// - specify input column filter and renaming (@ref NYT::TJobOperationPreparer::InputColumnFilter and @ref NYT::TJobOperationPreparer::InputColumnRenaming)
+ /// - specify format hints (@ref NYT::TJobOperationPreparer::InputFormatHints,
+ /// NYT::TJobOperationPreparer::OutputFormatHints and @ref NYT::TJobOperationPreparer::FormatHints)
+ /// - maybe something more, cf. the methods of @ref NYT::TJobOperationPreparer.
+ ///
+ /// If one has several similar tables, groups can be used.
+ /// Groups are delimited by @ref NYT::TJobOperationPreparer::BeginInputGroup /
+ /// @ref NYT::TJobOperationPreparer::TInputGroup::EndInputGroup and
+ /// @ref NYT::TJobOperationPreparer::BeginOutputGroup /
+ /// @ref NYT::TJobOperationPreparer::TOutputGroup::EndOutputGroup.
+ /// Example:
+ /// @code{.cpp}
+ /// preparer
+ /// .BeginInputGroup({1,2,4,8})
+ /// .ColumnRenaming({{"a", "b"}, {"c", "d"}})
+ /// .ColumnFilter({"a", "c"})
+ /// .EndInputGroup();
+ /// @endcode
+ ///
+ /// @note All the output table schemas must be set
+ /// (possibly as empty nonstrict using @ref NYT::TJobOperationPreparer::NoOutputSchema or
+ /// @ref NYT::TJobOperationPreparer::TOutputGroup::NoSchema).
+ /// By default all the output table schemas are marked as empty nonstrict.
+ virtual void PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& preparer) const;
+};
+
+///
+/// @brief Declare what fields of currently declared job class to save and restore on cluster node.
+#define Y_SAVELOAD_JOB(...) \
+ virtual void Save(IOutputStream& stream) const override { Save(&stream); } \
+ virtual void Load(IInputStream& stream) override { Load(&stream); } \
+ Y_PASS_VA_ARGS(Y_SAVELOAD_DEFINE(__VA_ARGS__))
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Interface for jobs with typed inputs and outputs.
+class IStructuredJob
+ : public IJob
+{
+public:
+ ///
+ /// @brief This methods are called when creating table reader and writer for the job.
+ ///
+ /// Override them if you want to implement custom input logic. (e.g. addtitional bufferization)
+ virtual TRawTableReaderPtr CreateCustomRawJobReader(int fd) const;
+ virtual THolder<IProxyOutput> CreateCustomRawJobWriter(size_t outputTableCount) const;
+
+ virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const = 0;
+ virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Create default raw job reader.
+TRawTableReaderPtr CreateRawJobReader(int fd = 0);
+
+///
+/// @brief Create default raw job writer.
+THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount);
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Base interface for structured (typed) map jobs.
+class IMapperBase
+ : public IStructuredJob
+{ };
+
+///
+/// @brief Base interface for structured (typed) map jobs with given reader and writer.
+template <class TR, class TW>
+class IMapper
+ : public IMapperBase
+{
+public:
+ using TReader = TR;
+ using TWriter = TW;
+
+public:
+ /// Type of job implemented by this class.
+ static constexpr EType JobType = EType::Mapper;
+
+ ///
+ /// @brief This method is called before feeding input rows to mapper (before `Do` method).
+ virtual void Start(TWriter* writer)
+ {
+ Y_UNUSED(writer);
+ }
+
+ ///
+ /// @brief This method is called exactly once for the whole job input.
+ ///
+ /// Read input rows from `reader` and write output ones to `writer`.
+ virtual void Do(TReader* reader, TWriter* writer) = 0;
+
+ ///
+ /// @brief This method is called after feeding input rows to mapper (after `Do` method).
+ virtual void Finish(TWriter* writer)
+ {
+ Y_UNUSED(writer);
+ }
+
+ virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override;
+ virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Base interface for structured (typed) reduce jobs.
+///
+/// It is common base for @ref NYT::IReducer and @ref NYT::IAggregatorReducer.
+class IReducerBase
+ : public IStructuredJob
+{ };
+
+///
+/// @brief Base interface for structured (typed) reduce jobs with given reader and writer.
+template <class TR, class TW>
+class IReducer
+ : public IReducerBase
+{
+public:
+ using TReader = TR;
+ using TWriter = TW;
+
+public:
+ /// Type of job implemented by this class.
+ static constexpr EType JobType = EType::Reducer;
+
+public:
+
+ ///
+ /// @brief This method is called before feeding input rows to reducer (before `Do` method).
+ virtual void Start(TWriter* writer)
+ {
+ Y_UNUSED(writer);
+ }
+
+ ///
+ /// @brief This method is called exactly once for each range with same value of `ReduceBy` (or `JoinBy`) keys.
+ virtual void Do(TReader* reader, TWriter* writer) = 0;
+
+ ///
+ /// @brief This method is called after feeding input rows to reducer (after `Do` method).
+ virtual void Finish(TWriter* writer)
+ {
+ Y_UNUSED(writer);
+ }
+
+ ///
+ /// @brief Refuse to process the remaining row ranges and finish the job (successfully).
+ void Break();
+
+ virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override;
+ virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Base interface of jobs used inside reduce operations.
+///
+/// Unlike @ref NYT::IReducer jobs their `Do' method is called only once
+/// and takes whole range of records split by key boundaries.
+///
+/// Template argument `TR` must be @ref NYT::TTableRangesReader.
+template <class TR, class TW>
+class IAggregatorReducer
+ : public IReducerBase
+{
+public:
+ using TReader = TR;
+ using TWriter = TW;
+
+public:
+ /// Type of job implemented by this class.
+ static constexpr EType JobType = EType::ReducerAggregator;
+
+public:
+ ///
+ /// @brief This method is called before feeding input rows to reducer (before `Do` method).
+ virtual void Start(TWriter* writer)
+ {
+ Y_UNUSED(writer);
+ }
+
+ ///
+ /// @brief This method is called exactly once for the whole job input.
+ virtual void Do(TReader* reader, TWriter* writer) = 0;
+
+ ///
+ /// @brief This method is called after feeding input rows to reducer (after `Do` method).
+ virtual void Finish(TWriter* writer)
+ {
+ Y_UNUSED(writer);
+ }
+
+ virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override;
+ virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Interface for raw jobs (i.e. reading and writing byte streams).
+class IRawJob
+ : public IJob
+{
+public:
+ /// Type of job implemented by this class.
+ static constexpr EType JobType = EType::RawJob;
+
+ ///
+ /// @brief This method is called exactly once for the whole job input.
+ virtual void Do(const TRawJobContext& jobContext) = 0;
+};
+
+///
+/// @brief Interface of jobs that run the given bash command.
+class ICommandJob
+ : public IJob
+{
+public:
+ ///
+ /// @brief Get bash command to run.
+ ///
+ /// @note This method is called on the client side.
+ virtual const TString& GetCommand() const = 0;
+};
+
+///
+/// @brief Raw job executing given bash command.
+///
+/// @note The binary will not be uploaded.
+class TCommandRawJob
+ : public IRawJob
+ , public ICommandJob
+{
+public:
+ ///
+ /// @brief Create job with specified command.
+ ///
+ /// @param command Bash command to run.
+ explicit TCommandRawJob(TStringBuf command = {});
+
+ const TString& GetCommand() const override;
+ void Do(const TRawJobContext& jobContext) override;
+
+private:
+ TString Command_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Base interface for vanilla jobs.
+///
+/// @see https://yt.yandex-team.ru/docs/description/mr/vanilla
+class IVanillaJobBase
+ : public virtual IStructuredJob
+{
+public:
+ /// Type of job implemented by this class.
+ static constexpr EType JobType = EType::VanillaJob;
+};
+
+template <class TW = void>
+class IVanillaJob;
+
+///
+/// @brief Interface of vanilla job without outputs.
+template <>
+class IVanillaJob<void>
+ : public IVanillaJobBase
+{
+public:
+ ///
+ /// @brief This method is called exactly once for each vanilla job.
+ virtual void Do() = 0;
+
+ virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override;
+ virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override;
+};
+
+///
+/// @brief Vanilla job executing given bash command.
+///
+/// @note The binary will not be uploaded.
+class TCommandVanillaJob
+ : public IVanillaJob<>
+ , public ICommandJob
+{
+public:
+ ///
+ /// @brief Create job with specified command.
+ ///
+ /// @param command Bash command to run.
+ explicit TCommandVanillaJob(TStringBuf command = {});
+
+ const TString& GetCommand() const override;
+ void Do() override;
+
+private:
+ TString Command_;
+};
+
+///
+/// @brief Interface for vanilla jobs with output tables.
+template <class TW>
+class IVanillaJob
+ : public IVanillaJobBase
+{
+public:
+ using TWriter = TW;
+
+public:
+ ///
+ /// @brief This method is called before `Do` method.
+ virtual void Start(TWriter* /* writer */)
+ { }
+
+ ///
+ /// @brief This method is called exactly once for each vanilla job.
+ ///
+ /// Write output rows to `writer`.
+ virtual void Do(TWriter* writer) = 0;
+
+ ///
+ /// @brief This method is called after `Do` method.
+ virtual void Finish(TWriter* /* writer */)
+ { }
+
+ virtual TStructuredRowStreamDescription GetInputRowStreamDescription() const override;
+ virtual TStructuredRowStreamDescription GetOutputRowStreamDescription() const override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Attributes to request for an operation.
+enum class EOperationAttribute : int
+{
+ Id /* "id" */,
+ Type /* "type" */,
+ State /* "state" */,
+ AuthenticatedUser /* "authenticated_user" */,
+ StartTime /* "start_time" */,
+ FinishTime /* "finish_time" */,
+ BriefProgress /* "brief_progress" */,
+ BriefSpec /* "brief_spec" */,
+ Suspended /* "suspended" */,
+ Result /* "result" */,
+ Progress /* "progress" */,
+ Events /* "events" */,
+ Spec /* "spec" */,
+ FullSpec /* "full_spec" */,
+ UnrecognizedSpec /* "unrecognized_spec" */,
+};
+
+///
+/// @brief Class describing which attributes to request in @ref NYT::IClient::GetOperation or @ref NYT::IClient::ListOperations.
+struct TOperationAttributeFilter
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TOperationAttributeFilter;
+ /// @endcond
+
+ TVector<EOperationAttribute> Attributes_;
+
+ ///
+ /// @brief Add attribute to the filter. Calls are supposed to be chained.
+ TSelf& Add(EOperationAttribute attribute)
+ {
+ Attributes_.push_back(attribute);
+ return *this;
+ }
+};
+
+///
+/// @brief Options for @ref NYT::IClient::GetOperation call.
+struct TGetOperationOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetOperationOptions;
+ /// @endcond
+
+ ///
+ /// @brief What attributes to request (if omitted, the default set of attributes will be requested).
+ FLUENT_FIELD_OPTION(TOperationAttributeFilter, AttributeFilter);
+};
+
+///
+/// @brief "Coarse-grained" state of an operation.
+enum class EOperationBriefState : int
+{
+ InProgress /* "in_progress" */,
+ Completed /* "completed" */,
+ Aborted /* "aborted" */,
+
+ /// Failed
+ Failed /* "failed" */,
+};
+
+///
+/// @brief Operation type.
+enum class EOperationType : int
+{
+ Map /* "map" */,
+ Merge /* "merge" */,
+ Erase /* "erase" */,
+ Sort /* "sort" */,
+ Reduce /* "reduce" */,
+ MapReduce /* "map_reduce" */,
+ RemoteCopy /* "remote_copy" */,
+ JoinReduce /* "join_reduce" */,
+ Vanilla /* "vanilla" */,
+};
+
+///
+/// @brief Operation progress.
+struct TOperationProgress
+{
+ ///
+ /// @brief Total job statistics.
+ TJobStatistics JobStatistics;
+
+ ///
+ /// @brief Job counter for various job states with hierarchy.
+ TJobCounters JobCounters;
+
+ ///
+ /// @brief Time when this progress was built on scheduler or CA.
+ TMaybe<TInstant> BuildTime;
+};
+
+///
+/// @brief Brief operation progress (numbers of jobs in these states).
+struct TOperationBriefProgress
+{
+ ui64 Aborted = 0;
+ ui64 Completed = 0;
+ ui64 Failed = 0;
+ ui64 Lost = 0;
+ ui64 Pending = 0;
+ ui64 Running = 0;
+ ui64 Total = 0;
+};
+
+///
+/// @brief Operation result.
+struct TOperationResult
+{
+ ///
+ /// @brief For a unsuccessfully finished operation: description of error.
+ TMaybe<TYtError> Error;
+};
+
+///
+/// @brief Operation event (change of state).
+struct TOperationEvent
+{
+ ///
+ /// @brief New state of operation.
+ TString State;
+
+ ///
+ /// @brief Time of state change.
+ TInstant Time;
+};
+
+///
+/// @brief Operation info.
+///
+/// A field may be `Nothing()` either if it was not requested (see @ref NYT::TGetOperationOptions::AttributeFilter)
+/// or it is not available (i.e. `FinishTime` for a running operation).
+/// @see https://yt.yandex-team.ru/docs/api/commands#get_operation
+struct TOperationAttributes
+{
+ ///
+ /// @brief Operation id.
+ TMaybe<TOperationId> Id;
+
+ ///
+ /// @brief Operation type.
+ TMaybe<EOperationType> Type;
+
+ ///
+ /// @brief Operation state.
+ TMaybe<TString> State;
+
+ ///
+ /// @brief "Coarse-grained" operation state.
+ TMaybe<EOperationBriefState> BriefState;
+
+ ///
+ /// @brief Name of user that started the operation.
+ TMaybe<TString> AuthenticatedUser;
+
+ ///
+ /// @brief Operation start time.
+ TMaybe<TInstant> StartTime;
+
+ ///
+ /// @brief Operation finish time (if the operation has finished).
+ TMaybe<TInstant> FinishTime;
+
+ ///
+ /// @brief Brief progress of the operation.
+ TMaybe<TOperationBriefProgress> BriefProgress;
+
+ ///
+ /// @brief Brief spec of operation (light-weight fields only).
+ TMaybe<TNode> BriefSpec;
+
+ ///
+ /// @brief Spec of the operation as provided by the user.
+ TMaybe<TNode> Spec;
+
+ ///
+ /// @brief Full spec of operation (all fields not specified by user are filled with default values).
+ TMaybe<TNode> FullSpec;
+
+ ///
+ /// @brief Fields not recognized by scheduler.
+ TMaybe<TNode> UnrecognizedSpec;
+
+ ///
+ /// @brief Is operation suspended.
+ TMaybe<bool> Suspended;
+
+ ///
+ /// @brief Operation result.
+ TMaybe<TOperationResult> Result;
+
+ ///
+ /// @brief Operation progress.
+ TMaybe<TOperationProgress> Progress;
+
+ ///
+ /// @brief List of operation events (changes of state).
+ TMaybe<TVector<TOperationEvent>> Events;
+
+ ///
+ /// @brief Map from alert name to its description.
+ TMaybe<THashMap<TString, TYtError>> Alerts;
+};
+
+///
+/// @brief Direction of cursor for paging, see @ref NYT::TListOperationsOptions::CursorDirection.
+enum class ECursorDirection
+{
+ Past /* "past" */,
+ Future /* "future" */,
+};
+
+///
+/// @brief Options of @ref NYT::IClient::ListOperations command.
+///
+/// @see https://yt.yandex-team.ru/docs/api/commands.html#list_operations
+struct TListOperationsOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TListOperationsOptions;
+ /// @endcond
+
+ ///
+ /// @name Time range specification
+ ///
+ /// List operations with start time in half-closed interval
+ /// `[CursorTime, ToTime)` if `CursorDirection == Future` or
+ /// `[FromTime, CursorTime)` if `CursorDirection == Past`.
+ ///@{
+
+ ///
+ /// @brief Search for operations with start time >= `FromTime`.
+ FLUENT_FIELD_OPTION(TInstant, FromTime);
+
+ ///
+ /// @brief Search for operations with start time < `ToTime`.
+ FLUENT_FIELD_OPTION(TInstant, ToTime);
+
+ ///
+ /// @brief Additional restriction on operation start time (useful for pagination).
+ ///
+ /// Search for operations with start time >= `CursorTime` if `CursorDirection == Future`
+ /// and with start time < `CursorTime` if `CursorDirection == Past`
+ FLUENT_FIELD_OPTION(TInstant, CursorTime);
+
+ ///
+ /// @brief Direction of pagination (see @ref NYT::TListOperationsOptions::CursorTime).
+ FLUENT_FIELD_OPTION(ECursorDirection, CursorDirection);
+
+ ///@}
+
+ ///
+ /// @name Filters
+ /// Choose operations satisfying given filters.
+ ///@{
+
+ ///
+ /// @brief Search for `Filter` as a substring in operation text factors
+ /// (e.g. title or input/output table paths).
+ FLUENT_FIELD_OPTION(TString, Filter);
+
+ ///
+ /// @brief Choose operations whose pools include `Pool`.
+ FLUENT_FIELD_OPTION(TString, Pool);
+
+ ///
+ /// @brief Choose operations with given @ref NYT::TOperationAttributes::AuthenticatedUser.
+ FLUENT_FIELD_OPTION(TString, User);
+
+ ///
+ /// @brief Choose operations with given @ref NYT::TOperationAttributes::State.
+ FLUENT_FIELD_OPTION(TString, State);
+
+ ///
+ /// @brief Choose operations with given @ref NYT::TOperationAttributes::Type.
+ FLUENT_FIELD_OPTION(EOperationType, Type);
+
+ ///
+ /// @brief Choose operations having (or not having) any failed jobs.
+ FLUENT_FIELD_OPTION(bool, WithFailedJobs);
+
+ ///@}
+
+ ///
+ /// @brief Search for operations in the archive in addition to Cypress.
+ FLUENT_FIELD_OPTION(bool, IncludeArchive);
+
+ ///
+ /// @brief Include the counters for different filter parameters in the response.
+ ///
+ /// Include number of operations for each pool, user, state, type
+ /// and the number of operations having failed jobs.
+ FLUENT_FIELD_OPTION(bool, IncludeCounters);
+
+ ///
+ /// @brief Return no more than `Limit` operations (current default and maximum value is 1000).
+ FLUENT_FIELD_OPTION(i64, Limit);
+};
+
+///
+/// @brief Response for @ref NYT::IClient::ListOperations command.
+struct TListOperationsResult
+{
+ ///
+ /// @brief Found operations' attributes.
+ TVector<TOperationAttributes> Operations;
+
+ ///
+ /// @name Counters for different filter.
+ ///
+ /// If counters were requested (@ref NYT::TListOperationsOptions::IncludeCounters is `true`)
+ /// the maps contain the number of operations found for each pool, user, state and type.
+ /// NOTE:
+ /// 1) Counters ignore CursorTime and CursorDirection,
+ /// they always are collected in the whole [FromTime, ToTime) interval.
+ /// 2) Each next counter in the sequence [pool, user, state, type, with_failed_jobs]
+ /// takes into account all the previous filters (i.e. if you set User filter to "some-user"
+ /// type counts describe only operations with user "some-user").
+ /// @{
+
+ ///
+ /// @brief Number of operations for each pool.
+ TMaybe<THashMap<TString, i64>> PoolCounts;
+
+ ///
+ /// @brief Number of operations for each user (subject to previous filters).
+ TMaybe<THashMap<TString, i64>> UserCounts;
+
+ ///
+ /// @brief Number of operations for each state (subject to previous filters).
+ TMaybe<THashMap<TString, i64>> StateCounts;
+
+ ///
+ /// @brief Number of operations for each type (subject to previous filters).
+ TMaybe<THashMap<EOperationType, i64>> TypeCounts;
+
+ ///
+ /// @brief Number of operations having failed jobs (subject to all previous filters).
+ TMaybe<i64> WithFailedJobsCount;
+
+ /// @}
+
+ ///
+ /// @brief Whether some operations were not returned due to @ref NYT::TListOperationsOptions::Limit.
+ ///
+ /// `Incomplete == true` means that not all operations satisfying filters
+ /// were returned (limit exceeded) and you need to repeat the request with new @ref NYT::TListOperationsOptions::CursorTime
+ /// (e.g. `CursorTime == *Operations.back().StartTime`, but don't forget to
+ /// remove the duplicates).
+ bool Incomplete;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Data source for @ref NYT::IClient::ListJobs command.
+enum class EListJobsDataSource : int
+{
+ Runtime /* "runtime" */,
+ Archive /* "archive" */,
+ Auto /* "auto" */,
+ Manual /* "manual" */,
+};
+
+///
+/// @brief Job type.
+enum class EJobType : int
+{
+ SchedulerFirst /* "scheduler_first" */,
+ Map /* "map" */,
+ PartitionMap /* "partition_map" */,
+ SortedMerge /* "sorted_merge" */,
+ OrderedMerge /* "ordered_merge" */,
+ UnorderedMerge /* "unordered_merge" */,
+ Partition /* "partition" */,
+ SimpleSort /* "simple_sort" */,
+ FinalSort /* "final_sort" */,
+ SortedReduce /* "sorted_reduce" */,
+ PartitionReduce /* "partition_reduce" */,
+ ReduceCombiner /* "reduce_combiner" */,
+ RemoteCopy /* "remote_copy" */,
+ IntermediateSort /* "intermediate_sort" */,
+ OrderedMap /* "ordered_map" */,
+ JoinReduce /* "join_reduce" */,
+ Vanilla /* "vanilla" */,
+ SchedulerUnknown /* "scheduler_unknown" */,
+ SchedulerLast /* "scheduler_last" */,
+ ReplicatorFirst /* "replicator_first" */,
+ ReplicateChunk /* "replicate_chunk" */,
+ RemoveChunk /* "remove_chunk" */,
+ RepairChunk /* "repair_chunk" */,
+ SealChunk /* "seal_chunk" */,
+ ReplicatorLast /* "replicator_last" */,
+};
+
+///
+/// @brief Well-known task names.
+enum class ETaskName : int
+{
+ Map /* "map" */,
+ PartitionMap0 /* "partition_map(0)" */,
+ SortedMerge /* "sorted_merge" */,
+ OrderedMerge /* "ordered_merge" */,
+ UnorderedMerge /* "unordered_merge" */,
+ Partition0 /* "partition(0)" */,
+ Partition1 /* "partition(1)" */,
+ Partition2 /* "partition(2)" */,
+ SimpleSort /* "simple_sort" */,
+ FinalSort /* "final_sort" */,
+ SortedReduce /* "sorted_reduce" */,
+ PartitionReduce /* "partition_reduce" */,
+ ReduceCombiner /* "reduce_combiner" */,
+ RemoteCopy /* "remote_copy" */,
+ IntermediateSort /* "intermediate_sort" */,
+ OrderedMap /* "ordered_map" */,
+ JoinReduce /* "join_reduce" */,
+};
+
+///
+/// @brief Task name (can either well-known or just a string).
+class TTaskName
+{
+public:
+
+ // Constructors are implicit by design.
+
+ ///
+ /// @brief Construct a custom task name.
+ TTaskName(TString taskName);
+
+ ///
+ /// @brief Construct a custom task name.
+ TTaskName(const char* taskName);
+
+ ///
+ /// @brief Construct a well-known task name.
+ TTaskName(ETaskName taskName);
+
+ const TString& Get() const;
+
+private:
+ TString TaskName_;
+};
+
+///
+/// @brief Job state.
+enum class EJobState : int
+{
+ None /* "none" */,
+ Waiting /* "waiting" */,
+ Running /* "running" */,
+ Aborting /* "aborting" */,
+ Completed /* "completed" */,
+ Failed /* "failed" */,
+ Aborted /* "aborted" */,
+ Lost /* "lost" */,
+};
+
+///
+/// @brief Job sort field.
+///
+/// @see @ref NYT::TListJobsOptions.
+enum class EJobSortField : int
+{
+ Type /* "type" */,
+ State /* "state" */,
+ StartTime /* "start_time" */,
+ FinishTime /* "finish_time" */,
+ Address /* "address" */,
+ Duration /* "duration" */,
+ Progress /* "progress" */,
+ Id /* "id" */,
+};
+
+///
+/// @brief Job sort direction.
+///
+/// @see @ref NYT::TListJobsOptions.
+enum class EJobSortDirection : int
+{
+ Ascending /* "ascending" */,
+ Descending /* "descending" */,
+};
+
+///
+/// @brief Options for @ref NYT::IClient::ListJobs.
+///
+/// @see https://yt.yandex-team.ru/docs/api/commands.html#list_jobs
+struct TListJobsOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TListJobsOptions;
+ /// @endcond
+
+ ///
+ /// @name Filters
+ /// Return only jobs with given value of parameter (type, state, address and existence of stderr).
+ /// If a field is `Nothing()`, return jobs with all possible values of the corresponding parameter.
+ /// @{
+
+ ///
+ /// @brief Job type.
+ FLUENT_FIELD_OPTION(EJobType, Type);
+
+ ///
+ /// @brief Job state.
+ FLUENT_FIELD_OPTION(EJobState, State);
+
+ ///
+ /// @brief Address of the cluster node where job was running.
+ FLUENT_FIELD_OPTION(TString, Address);
+
+ ///
+ /// @brief Return only jobs whose stderr has been saved.
+ FLUENT_FIELD_OPTION(bool, WithStderr);
+
+ ///
+ /// @brief Return only jobs whose spec has been saved.
+ FLUENT_FIELD_OPTION(bool, WithSpec);
+
+ ///
+ /// @brief Return only jobs whose fail context has been saved.
+ FLUENT_FIELD_OPTION(bool, WithFailContext);
+
+ /// @}
+
+ ///
+ /// @name Sort options
+ /// @{
+
+ ///
+ /// @brief Sort by this field.
+ FLUENT_FIELD_OPTION(EJobSortField, SortField);
+
+ ///
+ /// @brief Sort order.
+ FLUENT_FIELD_OPTION(ESortOrder, SortOrder);
+
+ /// @}
+
+ ///
+ /// @brief Data source.
+ ///
+ /// Where to search for jobs: in scheduler and Cypress ('Runtime'), in archive ('Archive'),
+ /// automatically basing on operation presence in Cypress ('Auto') or choose manually (`Manual').
+ FLUENT_FIELD_OPTION(EListJobsDataSource, DataSource);
+
+ /// @deprecated
+ FLUENT_FIELD_OPTION(bool, IncludeCypress);
+
+ /// @deprecated
+ FLUENT_FIELD_OPTION(bool, IncludeControllerAgent);
+
+ /// @deprecated
+ FLUENT_FIELD_OPTION(bool, IncludeArchive);
+
+ ///
+ /// @brief Maximum number of jobs to return.
+ FLUENT_FIELD_OPTION(i64, Limit);
+
+ ///
+ /// @brief Number of jobs (in specified sort order) to skip.
+ ///
+ /// Together with @ref NYT::TListJobsOptions::Limit may be used for pagination.
+ FLUENT_FIELD_OPTION(i64, Offset);
+};
+
+///
+/// @brief Description of a core dump that happened in the job.
+struct TCoreInfo
+{
+ i64 ProcessId;
+ TString ExecutableName;
+ TMaybe<ui64> Size;
+ TMaybe<TYtError> Error;
+};
+
+///
+/// @brief Job attributes.
+///
+/// A field may be `Nothing()` if it is not available (i.e. `FinishTime` for a running job).
+///
+/// @see https://yt.yandex-team.ru/docs/api/commands#get_job
+struct TJobAttributes
+{
+ ///
+ /// @brief Job id.
+ TMaybe<TJobId> Id;
+
+ ///
+ /// @brief Job type
+ TMaybe<EJobType> Type;
+
+ ///
+ /// @brief Job state.
+ TMaybe<EJobState> State;
+
+ ///
+ /// @brief Address of a cluster node where job was running.
+ TMaybe<TString> Address;
+
+ ///
+ /// @brief The name of the task that job corresponds to.
+ TMaybe<TString> TaskName;
+
+ ///
+ /// @brief Job start time.
+ TMaybe<TInstant> StartTime;
+
+ ///
+ /// @brief Job finish time (for a finished job).
+ TMaybe<TInstant> FinishTime;
+
+ ///
+ /// @brief Estimated ratio of job's completed work.
+ TMaybe<double> Progress;
+
+ ///
+ /// @brief Size of saved job stderr.
+ TMaybe<i64> StderrSize;
+
+ ///
+ /// @brief Error for a unsuccessfully finished job.
+ TMaybe<TYtError> Error;
+
+ ///
+ /// @brief Job brief statistics.
+ TMaybe<TNode> BriefStatistics;
+
+ ///
+ /// @brief Job input paths (with ranges).
+ TMaybe<TVector<TRichYPath>> InputPaths;
+
+ ///
+ /// @brief Infos for core dumps produced by job.
+ TMaybe<TVector<TCoreInfo>> CoreInfos;
+};
+
+///
+/// @brief Response for @ref NYT::IOperation::ListJobs.
+struct TListJobsResult
+{
+ ///
+ /// @brief Jobs.
+ TVector<TJobAttributes> Jobs;
+
+ ///
+ /// @deprecated
+ TMaybe<i64> CypressJobCount;
+
+ ///
+ /// @brief Number of jobs retrieved from controller agent.
+ TMaybe<i64> ControllerAgentJobCount;
+
+ ///
+ /// @brief Number of jobs retrieved from archive.
+ TMaybe<i64> ArchiveJobCount;
+};
+
+////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Options for @ref NYT::IClient::GetJob.
+struct TGetJobOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetJobOptions;
+ /// @endcond
+};
+
+///
+/// @brief Options for @ref NYT::IClient::GetJobInput.
+struct TGetJobInputOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetJobInputOptions;
+ /// @endcond
+};
+
+///
+/// @brief Options for @ref NYT::IClient::GetJobFailContext.
+struct TGetJobFailContextOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetJobFailContextOptions;
+ /// @endcond
+};
+
+///
+/// @brief Options for @ref NYT::IClient::GetJobStderr.
+struct TGetJobStderrOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetJobStderrOptions;
+ /// @endcond
+};
+
+////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Options for @ref NYT::IOperation::GetFailedJobInfo.
+struct TGetFailedJobInfoOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetFailedJobInfoOptions;
+ /// @endcond
+
+ ///
+ /// @brief How many jobs to download. Which jobs will be chosen is undefined.
+ FLUENT_FIELD_DEFAULT(ui64, MaxJobCount, 10);
+
+ ///
+ /// @brief How much of stderr tail should be downloaded.
+ FLUENT_FIELD_DEFAULT(ui64, StderrTailSize, 64 * 1024);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Interface representing an operation.
+struct IOperation
+ : public TThrRefBase
+{
+ virtual ~IOperation() = default;
+
+ ///
+ /// @brief Get operation id.
+ virtual const TOperationId& GetId() const = 0;
+
+ ///
+ /// @brief Get URL of the operation in YT Web UI.
+ virtual TString GetWebInterfaceUrl() const = 0;
+
+ ///
+ /// @brief Get last error for not started operations. Get state on YT cluster for started operations.
+ ///
+ /// For not started operations last error is an error that's being retried during operation
+ /// preparation/start (e.g. lock files, start operation request).
+ virtual TString GetStatus() const = 0;
+
+ ///
+ /// @brief Get preparation future.
+ ///
+ /// @return future that is set when operation is prepared.
+ virtual ::NThreading::TFuture<void> GetPreparedFuture() = 0;
+
+ ///
+ /// @brief Start operation synchronously.
+ ///
+ /// @note: Do NOT call this method twice.
+ ///
+ /// If operation is not prepared yet, Start() will block waiting for preparation finish.
+ /// Be ready to catch exception if operation preparation or start failed.
+ virtual void Start() = 0;
+
+ ///
+ /// @brief Is the operation started
+ ///
+ /// Returns true if the operation is started on the cluster
+ virtual bool IsStarted() const = 0;
+
+ ///
+ /// @brief Get start future.
+ ///
+ /// @return future that is set when operation is started.
+ virtual ::NThreading::TFuture<void> GetStartedFuture() = 0;
+
+ ///
+ /// @brief Start watching operation.
+ ///
+ /// @return future that is set when operation is complete.
+ ///
+ /// @note: the user should check value of returned future to ensure that operation completed successfully e.g.
+ /// @code{.cpp}
+ /// auto operationComplete = operation->Watch();
+ /// operationComplete.Wait();
+ /// operationComplete.GetValue(); /// will throw if operation completed with errors
+ /// @endcode
+ ///
+ /// If operation is completed successfully the returned future contains void value.
+ /// If operation is completed with error future contains @ref NYT::TOperationFailedError.
+ /// In rare cases when error occurred while waiting (e.g. YT become unavailable) future might contain other exception.
+ virtual ::NThreading::TFuture<void> Watch() = 0;
+
+ ///
+ /// @brief Get information about failed jobs.
+ ///
+ /// Can be called for operation in any stage.
+ /// Though user should keep in mind that this method always fetches info from cypress
+ /// and doesn't work when operation is archived. Successfully completed operations can be archived
+ /// quite quickly (in about ~30 seconds).
+ virtual TVector<TFailedJobInfo> GetFailedJobInfo(const TGetFailedJobInfoOptions& options = TGetFailedJobInfoOptions()) = 0;
+
+ ///
+ /// Get operation brief state.
+ virtual EOperationBriefState GetBriefState() = 0;
+
+ ///
+ /// @brief Get error (if operation has failed).
+ ///
+ /// @return `Nothing()` if operation is in 'Completed' or 'InProgress' state (or reason for failed / aborted operation).
+ virtual TMaybe<TYtError> GetError() = 0;
+
+ ///
+ /// Get job statistics.
+ virtual TJobStatistics GetJobStatistics() = 0;
+
+ ///
+ /// Get operation progress.
+ ///
+ /// @return `Nothing()` if operation has no running jobs yet, e.g. when it is in "materializing" or "pending" state.
+ virtual TMaybe<TOperationBriefProgress> GetBriefProgress() = 0;
+
+ ///
+ /// @brief Abort operation.
+ ///
+ /// Operation will be finished immediately.
+ /// All results of completed/running jobs will be lost.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#abort_op
+ virtual void AbortOperation() = 0;
+
+ ///
+ /// @brief Complete operation.
+ ///
+ /// Operation will be finished immediately.
+ /// All results of completed jobs will appear in output tables.
+ /// All results of running (not completed) jobs will be lost.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#complete_op
+ virtual void CompleteOperation() = 0;
+
+ ///
+ /// @brief Suspend operation.
+ ///
+ /// Jobs will not be aborted by default, c.f. @ref NYT::TSuspendOperationOptions.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#suspend_op
+ virtual void SuspendOperation(
+ const TSuspendOperationOptions& options = TSuspendOperationOptions()) = 0;
+
+ ///
+ /// @brief Resume previously suspended operation.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#resume_op
+ virtual void ResumeOperation(
+ const TResumeOperationOptions& options = TResumeOperationOptions()) = 0;
+
+ ///
+ /// @brief Get operation attributes.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#get_operation
+ virtual TOperationAttributes GetAttributes(
+ const TGetOperationOptions& options = TGetOperationOptions()) = 0;
+
+ ///
+ /// @brief Update operation runtime parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#update_op_parameters
+ virtual void UpdateParameters(
+ const TUpdateOperationParametersOptions& options = TUpdateOperationParametersOptions()) = 0;
+
+ ///
+ /// @brief Get job attributes.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#get_job
+ virtual TJobAttributes GetJob(
+ const TJobId& jobId,
+ const TGetJobOptions& options = TGetJobOptions()) = 0;
+
+ ///
+ /// List jobs satisfying given filters (see @ref NYT::TListJobsOptions).
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#list_jobs
+ virtual TListJobsResult ListJobs(
+ const TListJobsOptions& options = TListJobsOptions()) = 0;
+};
+
+///
+/// @brief Interface of client capable of managing operations.
+struct IOperationClient
+{
+ ///
+ /// @brief Run Map operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param mapper Instance of a job to run.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/map
+ IOperationPtr Map(
+ const TMapOperationSpec& spec,
+ ::TIntrusivePtr<IMapperBase> mapper,
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run Map operation.
+ ///
+ /// @param mapper Instance of a job to run.
+ /// @param input Input table(s)
+ /// @param output Output table(s)
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/map
+ IOperationPtr Map(
+ ::TIntrusivePtr<IMapperBase> mapper,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TMapOperationSpec& spec = TMapOperationSpec(),
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run raw Map operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param rawJob Instance of a raw mapper to run.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/map
+ virtual IOperationPtr RawMap(
+ const TRawMapOperationSpec& spec,
+ ::TIntrusivePtr<IRawJob> rawJob,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run Reduce operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param reducer Instance of a job to run.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/reduce
+ IOperationPtr Reduce(
+ const TReduceOperationSpec& spec,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run Reduce operation.
+ ///
+ /// @param reducer Instance of a job to run.
+ /// @param input Input table(s)
+ /// @param output Output table(s)
+ /// @param reduceBy Columns to group rows by.
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/reduce
+ IOperationPtr Reduce(
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TSortColumns& reduceBy,
+ const TReduceOperationSpec& spec = TReduceOperationSpec(),
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run raw Reduce operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param rawJob Instance of a raw reducer to run.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/reduce
+ virtual IOperationPtr RawReduce(
+ const TRawReduceOperationSpec& spec,
+ ::TIntrusivePtr<IRawJob> rawJob,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run JoinReduce operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param reducer Instance of a job to run.
+ /// @param options Optional parameters.
+ ///
+ /// @deprecated Use @ref NYT::IOperationClient::Reduce with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false.
+ IOperationPtr JoinReduce(
+ const TJoinReduceOperationSpec& spec,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run raw JoinReduce operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param rawJob Instance of a raw reducer to run.
+ /// @param options Optional parameters.
+ ///
+ /// @deprecated Use @ref NYT::IOperationClient::RawReduce with @ref NYT::TReduceOperationSpec::EnableKeyGuarantee set to `false.
+ virtual IOperationPtr RawJoinReduce(
+ const TRawJoinReduceOperationSpec& spec,
+ ::TIntrusivePtr<IRawJob> rawJob,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run MapReduce operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param mapper Instance of a map job to run (identity mapper if `nullptr`).
+ /// @param reducer Instance of a reduce job to run.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+ IOperationPtr MapReduce(
+ const TMapReduceOperationSpec& spec,
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run MapReduce operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param mapper Instance of a map job to run (identity mapper if `nullptr`).
+ /// @param reducerCombiner Instance of a reduce combiner to run (identity reduce combiner if `nullptr`).
+ /// @param reducer Instance of a reduce job to run.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+ IOperationPtr MapReduce(
+ const TMapReduceOperationSpec& spec,
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reduceCombiner,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run MapReduce operation.
+ ///
+ /// @param mapper Instance of mapper to run (identity mapper if `nullptr`).
+ /// @param reducer Instance of reducer to run.
+ /// @param input Input table(s)
+ /// @param output Output table(s)
+ /// @param reduceBy Columns to group rows by.
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+ IOperationPtr MapReduce(
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TSortColumns& reduceBy,
+ TMapReduceOperationSpec spec = TMapReduceOperationSpec(),
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run MapReduce operation.
+ ///
+ /// @param mapper Instance of mapper to run (identity mapper if `nullptr`).
+ /// @param reduceCombiner Instance of reduceCombiner to run (identity reduce combiner if `nullptr`).
+ /// @param reducer Instance of reducer to run.
+ /// @param input Input table(s)
+ /// @param output Output table(s)
+ /// @param reduceBy Columns to group rows by.
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+ IOperationPtr MapReduce(
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reduceCombiner,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TSortColumns& reduceBy,
+ TMapReduceOperationSpec spec = TMapReduceOperationSpec(),
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run raw MapReduce operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param mapper Instance of a raw mapper to run (identity mapper if `nullptr`).
+ /// @param mapper Instance of a raw reduce combiner to run (identity reduce combiner if `nullptr`).
+ /// @param mapper Instance of a raw reducer to run.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/mapreduce
+ virtual IOperationPtr RawMapReduce(
+ const TRawMapReduceOperationSpec& spec,
+ ::TIntrusivePtr<IRawJob> mapper,
+ ::TIntrusivePtr<IRawJob> reduceCombiner,
+ ::TIntrusivePtr<IRawJob> reducer,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run Sort operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/sort
+ virtual IOperationPtr Sort(
+ const TSortOperationSpec& spec,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run Sort operation.
+ ///
+ /// @param input Input table(s).
+ /// @param output Output table.
+ /// @param sortBy Columns to sort input rows by.
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/sort
+ IOperationPtr Sort(
+ const TOneOrMany<TRichYPath>& input,
+ const TRichYPath& output,
+ const TSortColumns& sortBy,
+ const TSortOperationSpec& spec = TSortOperationSpec(),
+ const TOperationOptions& options = TOperationOptions());
+
+ ///
+ /// @brief Run Merge operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/merge
+ virtual IOperationPtr Merge(
+ const TMergeOperationSpec& spec,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run Erase operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/erase
+ virtual IOperationPtr Erase(
+ const TEraseOperationSpec& spec,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run RemoteCopy operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/remote_copy
+ virtual IOperationPtr RemoteCopy(
+ const TRemoteCopyOperationSpec& spec,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Run Vanilla operation.
+ ///
+ /// @param spec Operation spec.
+ /// @param options Optional parameters.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/description/mr/vanilla
+ virtual IOperationPtr RunVanilla(
+ const TVanillaOperationSpec& spec,
+ const TOperationOptions& options = TOperationOptions()) = 0;
+
+ ///
+ /// @brief Abort operation.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#abort_op
+ virtual void AbortOperation(
+ const TOperationId& operationId) = 0;
+
+ ///
+ /// @brief Complete operation.
+ ///
+ /// @see https://yt.yandex-team.ru/docs/api/commands#complete_op
+ virtual void CompleteOperation(
+ const TOperationId& operationId) = 0;
+
+ ///
+ /// @brief Wait for operation to finish.
+ virtual void WaitForOperation(
+ const TOperationId& operationId) = 0;
+
+ ///
+ /// @brief Check and return operation status.
+ ///
+ /// @note this function will never return @ref NYT::EOperationBriefState::Failed or @ref NYT::EOperationBriefState::Aborted status,
+ /// it will throw @ref NYT::TOperationFailedError instead.
+ virtual EOperationBriefState CheckOperation(
+ const TOperationId& operationId) = 0;
+
+ ///
+ /// @brief Create an operation object given operation id.
+ ///
+ /// @throw @ref NYT::TErrorResponse if the operation doesn't exist.
+ virtual IOperationPtr AttachOperation(const TOperationId& operationId) = 0;
+
+private:
+ virtual IOperationPtr DoMap(
+ const TMapOperationSpec& spec,
+ ::TIntrusivePtr<IStructuredJob> mapper,
+ const TOperationOptions& options) = 0;
+
+ virtual IOperationPtr DoReduce(
+ const TReduceOperationSpec& spec,
+ ::TIntrusivePtr<IStructuredJob> reducer,
+ const TOperationOptions& options) = 0;
+
+ virtual IOperationPtr DoJoinReduce(
+ const TJoinReduceOperationSpec& spec,
+ ::TIntrusivePtr<IStructuredJob> reducer,
+ const TOperationOptions& options) = 0;
+
+ virtual IOperationPtr DoMapReduce(
+ const TMapReduceOperationSpec& spec,
+ ::TIntrusivePtr<IStructuredJob> mapper,
+ ::TIntrusivePtr<IStructuredJob> reduceCombiner,
+ ::TIntrusivePtr<IStructuredJob> reducer,
+ const TOperationOptions& options) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define OPERATION_INL_H_
+#include "operation-inl.h"
+#undef OPERATION_INL_H_