aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/interface/operation.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/interface/operation.cpp
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/interface/operation.cpp')
-rw-r--r--yt/cpp/mapreduce/interface/operation.cpp663
1 files changed, 663 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.cpp b/yt/cpp/mapreduce/interface/operation.cpp
new file mode 100644
index 00000000000..706fc4caa4c
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/operation.cpp
@@ -0,0 +1,663 @@
+#include "operation.h"
+
+#include <util/generic/iterator_range.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+ i64 OutputTableCount = -1;
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+TTaskName::TTaskName(TString taskName)
+ : TaskName_(std::move(taskName))
+{ }
+
+TTaskName::TTaskName(const char* taskName)
+ : TaskName_(taskName)
+{ }
+
+TTaskName::TTaskName(ETaskName taskName)
+ : TaskName_(ToString(taskName))
+{ }
+
+const TString& TTaskName::Get() const
+{
+ return TaskName_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCommandRawJob::TCommandRawJob(TStringBuf command)
+ : Command_(command)
+{ }
+
+const TString& TCommandRawJob::GetCommand() const
+{
+ return Command_;
+}
+
+void TCommandRawJob::Do(const TRawJobContext& /* jobContext */)
+{
+ Y_FAIL("TCommandRawJob::Do must not be called");
+}
+
+REGISTER_NAMED_RAW_JOB("NYT::TCommandRawJob", TCommandRawJob)
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCommandVanillaJob::TCommandVanillaJob(TStringBuf command)
+ : Command_(command)
+{ }
+
+const TString& TCommandVanillaJob::GetCommand() const
+{
+ return Command_;
+}
+
+void TCommandVanillaJob::Do()
+{
+ Y_FAIL("TCommandVanillaJob::Do must not be called");
+}
+
+REGISTER_NAMED_VANILLA_JOB("NYT::TCommandVanillaJob", TCommandVanillaJob);
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool operator==(const TUnspecifiedTableStructure&, const TUnspecifiedTableStructure&)
+{
+ return true;
+}
+
+bool operator==(const TProtobufTableStructure& lhs, const TProtobufTableStructure& rhs)
+{
+ return lhs.Descriptor == rhs.Descriptor;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+const TVector<TStructuredTablePath>& TOperationInputSpecBase::GetStructuredInputs() const
+{
+ return StructuredInputs_;
+}
+
+const TVector<TStructuredTablePath>& TOperationOutputSpecBase::GetStructuredOutputs() const
+{
+ return StructuredOutputs_;
+}
+
+void TOperationInputSpecBase::AddStructuredInput(TStructuredTablePath path)
+{
+ Inputs_.push_back(path.RichYPath);
+ StructuredInputs_.push_back(std::move(path));
+}
+
+void TOperationOutputSpecBase::AddStructuredOutput(TStructuredTablePath path)
+{
+ Outputs_.push_back(path.RichYPath);
+ StructuredOutputs_.push_back(std::move(path));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TVanillaTask& TVanillaTask::AddStructuredOutput(TStructuredTablePath path)
+{
+ TOperationOutputSpecBase::AddStructuredOutput(std::move(path));
+ return *this;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TStructuredRowStreamDescription IVanillaJob<void>::GetInputRowStreamDescription() const
+{
+ return TVoidStructuredRowStream();
+}
+
+TStructuredRowStreamDescription IVanillaJob<void>::GetOutputRowStreamDescription() const
+{
+ return TVoidStructuredRowStream();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRawJobContext::TRawJobContext(size_t outputTableCount)
+ : InputFile_(Duplicate(0))
+{
+ for (size_t i = 0; i != outputTableCount; ++i) {
+ OutputFileList_.emplace_back(Duplicate(3 * i + 1));
+ }
+}
+
+const TFile& TRawJobContext::GetInputFile() const
+{
+ return InputFile_;
+}
+
+const TVector<TFile>& TRawJobContext::GetOutputFileList() const
+{
+ return OutputFileList_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TUserJobSpec& TUserJobSpec::AddLocalFile(
+ const TLocalFilePath& path,
+ const TAddLocalFileOptions& options)
+{
+ LocalFiles_.emplace_back(path, options);
+ return *this;
+}
+
+TUserJobSpec& TUserJobSpec::JobBinaryLocalPath(TString path, TMaybe<TString> md5)
+{
+ JobBinary_ = TJobBinaryLocalPath{path, md5};
+ return *this;
+}
+
+TUserJobSpec& TUserJobSpec::JobBinaryCypressPath(TString path, TMaybe<TTransactionId> transactionId)
+{
+ JobBinary_ = TJobBinaryCypressPath{path, transactionId};
+ return *this;
+}
+
+const TJobBinaryConfig& TUserJobSpec::GetJobBinary() const
+{
+ return JobBinary_;
+}
+
+TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> TUserJobSpec::GetLocalFiles() const
+{
+ return LocalFiles_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TJobOperationPreparer::TInputGroup::TInputGroup(TJobOperationPreparer& preparer, TVector<int> indices)
+ : Preparer_(preparer)
+ , Indices_(std::move(indices))
+{ }
+
+TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnRenaming(const THashMap<TString, TString>& renaming)
+{
+ for (auto i : Indices_) {
+ Preparer_.InputColumnRenaming(i, renaming);
+ }
+ return *this;
+}
+
+TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnFilter(const TVector<TString>& columns)
+{
+ for (auto i : Indices_) {
+ Preparer_.InputColumnFilter(i, columns);
+ }
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::TInputGroup::EndInputGroup()
+{
+ return Preparer_;
+}
+
+TJobOperationPreparer::TOutputGroup::TOutputGroup(TJobOperationPreparer& preparer, TVector<int> indices)
+ : Preparer_(preparer)
+ , Indices_(std::move(indices))
+{ }
+
+TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Schema(const TTableSchema &schema)
+{
+ for (auto i : Indices_) {
+ Preparer_.OutputSchema(i, schema);
+ }
+ return *this;
+}
+
+TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::NoSchema()
+{
+ for (auto i : Indices_) {
+ Preparer_.NoOutputSchema(i);
+ }
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::TOutputGroup::EndOutputGroup()
+{
+ return Preparer_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TJobOperationPreparer::TJobOperationPreparer(const IOperationPreparationContext& context)
+ : Context_(context)
+ , OutputSchemas_(context.GetOutputCount())
+ , InputColumnRenamings_(context.GetInputCount())
+ , InputColumnFilters_(context.GetInputCount())
+ , InputTableDescriptions_(context.GetInputCount())
+ , OutputTableDescriptions_(context.GetOutputCount())
+{ }
+
+TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(int begin, int end)
+{
+ Y_ENSURE_EX(begin <= end, TApiUsageError()
+ << "BeginInputGroup(): begin must not exceed end, got " << begin << ", " << end);
+ TVector<int> indices;
+ for (int i = begin; i < end; ++i) {
+ ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()"));
+ indices.push_back(i);
+ }
+ return TInputGroup(*this, std::move(indices));
+}
+
+
+TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(int begin, int end)
+{
+ Y_ENSURE_EX(begin <= end, TApiUsageError()
+ << "BeginOutputGroup(): begin must not exceed end, got " << begin << ", " << end);
+ TVector<int> indices;
+ for (int i = begin; i < end; ++i) {
+ ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()"));
+ indices.push_back(i);
+ }
+ return TOutputGroup(*this, std::move(indices));
+}
+
+TJobOperationPreparer& TJobOperationPreparer::NodeOutput(int tableIndex)
+{
+ ValidateMissingOutputDescription(tableIndex);
+ OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TNode>();
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::OutputSchema(int tableIndex, TTableSchema schema)
+{
+ ValidateMissingOutputSchema(tableIndex);
+ OutputSchemas_[tableIndex] = std::move(schema);
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::NoOutputSchema(int tableIndex)
+{
+ ValidateMissingOutputSchema(tableIndex);
+ OutputSchemas_[tableIndex] = EmptyNonstrictSchema();
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::InputColumnRenaming(
+ int tableIndex,
+ const THashMap<TString,TString>& renaming)
+{
+ ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnRenaming()"));
+ InputColumnRenamings_[tableIndex] = renaming;
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::InputColumnFilter(int tableIndex, const TVector<TString>& columns)
+{
+ ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnFilter()"));
+ InputColumnFilters_[tableIndex] = columns;
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::FormatHints(TUserJobFormatHints newFormatHints)
+{
+ FormatHints_ = newFormatHints;
+ return *this;
+}
+
+void TJobOperationPreparer::Finish()
+{
+ FinallyValidate();
+}
+
+TVector<TTableSchema> TJobOperationPreparer::GetOutputSchemas()
+{
+ TVector<TTableSchema> result;
+ result.reserve(OutputSchemas_.size());
+ for (auto& schema : OutputSchemas_) {
+ Y_VERIFY(schema.Defined());
+ result.push_back(std::move(*schema));
+ schema.Clear();
+ }
+ return result;
+}
+
+void TJobOperationPreparer::FinallyValidate() const
+{
+ TVector<int> illegallyMissingSchemaIndices;
+ for (int i = 0; i < static_cast<int>(OutputSchemas_.size()); ++i) {
+ if (!OutputSchemas_[i]) {
+ illegallyMissingSchemaIndices.push_back(i);
+ }
+ }
+ if (illegallyMissingSchemaIndices.empty()) {
+ return;
+ }
+ TApiUsageError error;
+ error << "Output table schemas are missing: ";
+ for (auto i : illegallyMissingSchemaIndices) {
+ error << "no. " << i;
+ if (auto path = Context_.GetInputPath(i)) {
+ error << "(" << *path << ")";
+ }
+ error << "; ";
+ }
+ ythrow std::move(error);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TJobOperationPreparer::ValidateInputTableIndex(int tableIndex, TStringBuf message) const
+{
+ Y_ENSURE_EX(
+ 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetInputCount()),
+ TApiUsageError() <<
+ message << ": input table index " << tableIndex << " us out of range [0;" <<
+ OutputSchemas_.size() << ")");
+}
+
+void TJobOperationPreparer::ValidateOutputTableIndex(int tableIndex, TStringBuf message) const
+{
+ Y_ENSURE_EX(
+ 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetOutputCount()),
+ TApiUsageError() <<
+ message << ": output table index " << tableIndex << " us out of range [0;" <<
+ OutputSchemas_.size() << ")");
+}
+
+void TJobOperationPreparer::ValidateMissingOutputSchema(int tableIndex) const
+{
+ ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputSchema()");
+ Y_ENSURE_EX(!OutputSchemas_[tableIndex],
+ TApiUsageError() <<
+ "Output table schema no. " << tableIndex << " " <<
+ "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
+ "is already set");
+}
+
+void TJobOperationPreparer::ValidateMissingInputDescription(int tableIndex) const
+{
+ ValidateInputTableIndex(tableIndex, "ValidateMissingInputDescription()");
+ Y_ENSURE_EX(!InputTableDescriptions_[tableIndex],
+ TApiUsageError() <<
+ "Description for input no. " << tableIndex << " " <<
+ "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
+ "is already set");
+}
+
+void TJobOperationPreparer::ValidateMissingOutputDescription(int tableIndex) const
+{
+ ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputDescription()");
+ Y_ENSURE_EX(!OutputTableDescriptions_[tableIndex],
+ TApiUsageError() <<
+ "Description for output no. " << tableIndex << " " <<
+ "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
+ "is already set");
+}
+
+TTableSchema TJobOperationPreparer::EmptyNonstrictSchema() {
+ return TTableSchema().Strict(false);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+const TVector<THashMap<TString, TString>>& TJobOperationPreparer::GetInputColumnRenamings() const
+{
+ return InputColumnRenamings_;
+}
+
+const TVector<TMaybe<TVector<TString>>>& TJobOperationPreparer::GetInputColumnFilters() const
+{
+ return InputColumnFilters_;
+}
+
+const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetInputDescriptions() const
+{
+ return InputTableDescriptions_;
+}
+
+const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetOutputDescriptions() const
+{
+ return OutputTableDescriptions_;
+}
+
+const TUserJobFormatHints& TJobOperationPreparer::GetFormatHints() const
+{
+ return FormatHints_;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::InputFormatHints(TFormatHints hints)
+{
+ FormatHints_.InputFormatHints(hints);
+ return *this;
+}
+
+TJobOperationPreparer& TJobOperationPreparer::OutputFormatHints(TFormatHints hints)
+{
+ FormatHints_.OutputFormatHints(hints);
+ return *this;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void IJob::PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& resultBuilder) const
+{
+ for (int i = 0; i < context.GetOutputCount(); ++i) {
+ resultBuilder.NoOutputSchema(i);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+IOperationPtr IOperationClient::Map(
+ const TMapOperationSpec& spec,
+ ::TIntrusivePtr<IMapperBase> mapper,
+ const TOperationOptions& options)
+{
+ Y_VERIFY(mapper.Get());
+
+ return DoMap(
+ spec,
+ std::move(mapper),
+ options);
+}
+
+IOperationPtr IOperationClient::Map(
+ ::TIntrusivePtr<IMapperBase> mapper,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TMapOperationSpec& spec,
+ const TOperationOptions& options)
+{
+ Y_ENSURE_EX(spec.Inputs_.empty(),
+ TApiUsageError() << "TMapOperationSpec::Inputs MUST be empty");
+ Y_ENSURE_EX(spec.Outputs_.empty(),
+ TApiUsageError() << "TMapOperationSpec::Outputs MUST be empty");
+
+ auto mapSpec = spec;
+ for (const auto& inputPath : input.Parts_) {
+ mapSpec.AddStructuredInput(inputPath);
+ }
+ for (const auto& outputPath : output.Parts_) {
+ mapSpec.AddStructuredOutput(outputPath);
+ }
+ return Map(mapSpec, std::move(mapper), options);
+}
+
+IOperationPtr IOperationClient::Reduce(
+ const TReduceOperationSpec& spec,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options)
+{
+ Y_VERIFY(reducer.Get());
+
+ return DoReduce(
+ spec,
+ std::move(reducer),
+ options);
+}
+
+IOperationPtr IOperationClient::Reduce(
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TSortColumns& reduceBy,
+ const TReduceOperationSpec& spec,
+ const TOperationOptions& options)
+{
+ Y_ENSURE_EX(spec.Inputs_.empty(),
+ TApiUsageError() << "TReduceOperationSpec::Inputs MUST be empty");
+ Y_ENSURE_EX(spec.Outputs_.empty(),
+ TApiUsageError() << "TReduceOperationSpec::Outputs MUST be empty");
+ Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
+ TApiUsageError() << "TReduceOperationSpec::ReduceBy MUST be empty");
+
+ auto reduceSpec = spec;
+ for (const auto& inputPath : input.Parts_) {
+ reduceSpec.AddStructuredInput(inputPath);
+ }
+ for (const auto& outputPath : output.Parts_) {
+ reduceSpec.AddStructuredOutput(outputPath);
+ }
+ reduceSpec.ReduceBy(reduceBy);
+ return Reduce(reduceSpec, std::move(reducer), options);
+}
+
+IOperationPtr IOperationClient::JoinReduce(
+ const TJoinReduceOperationSpec& spec,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options)
+{
+ Y_VERIFY(reducer.Get());
+
+ return DoJoinReduce(
+ spec,
+ std::move(reducer),
+ options);
+}
+
+IOperationPtr IOperationClient::MapReduce(
+ const TMapReduceOperationSpec& spec,
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options)
+{
+ Y_VERIFY(reducer.Get());
+
+ return DoMapReduce(
+ spec,
+ std::move(mapper),
+ nullptr,
+ std::move(reducer),
+ options);
+}
+
+IOperationPtr IOperationClient::MapReduce(
+ const TMapReduceOperationSpec& spec,
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reduceCombiner,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOperationOptions& options)
+{
+ Y_VERIFY(reducer.Get());
+
+ return DoMapReduce(
+ spec,
+ std::move(mapper),
+ std::move(reduceCombiner),
+ std::move(reducer),
+ options);
+}
+
+IOperationPtr IOperationClient::MapReduce(
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TSortColumns& reduceBy,
+ TMapReduceOperationSpec spec,
+ const TOperationOptions& options)
+{
+ Y_ENSURE_EX(spec.Inputs_.empty(),
+ TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty");
+ Y_ENSURE_EX(spec.Outputs_.empty(),
+ TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty");
+ Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
+ TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty");
+
+ for (const auto& inputPath : input.Parts_) {
+ spec.AddStructuredInput(inputPath);
+ }
+ for (const auto& outputPath : output.Parts_) {
+ spec.AddStructuredOutput(outputPath);
+ }
+ spec.ReduceBy(reduceBy);
+ return MapReduce(spec, std::move(mapper), std::move(reducer), options);
+}
+
+IOperationPtr IOperationClient::MapReduce(
+ ::TIntrusivePtr<IMapperBase> mapper,
+ ::TIntrusivePtr<IReducerBase> reduceCombiner,
+ ::TIntrusivePtr<IReducerBase> reducer,
+ const TOneOrMany<TStructuredTablePath>& input,
+ const TOneOrMany<TStructuredTablePath>& output,
+ const TSortColumns& reduceBy,
+ TMapReduceOperationSpec spec,
+ const TOperationOptions& options)
+{
+ Y_ENSURE_EX(spec.Inputs_.empty(),
+ TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty");
+ Y_ENSURE_EX(spec.Outputs_.empty(),
+ TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty");
+ Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
+ TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty");
+
+ for (const auto& inputPath : input.Parts_) {
+ spec.AddStructuredInput(inputPath);
+ }
+ for (const auto& outputPath : output.Parts_) {
+ spec.AddStructuredOutput(outputPath);
+ }
+ spec.ReduceBy(reduceBy);
+ return MapReduce(spec, std::move(mapper), std::move(reduceCombiner), std::move(reducer), options);
+}
+
+IOperationPtr IOperationClient::Sort(
+ const TOneOrMany<TRichYPath>& input,
+ const TRichYPath& output,
+ const TSortColumns& sortBy,
+ const TSortOperationSpec& spec,
+ const TOperationOptions& options)
+{
+ Y_ENSURE_EX(spec.Inputs_.empty(),
+ TApiUsageError() << "TSortOperationSpec::Inputs MUST be empty");
+ Y_ENSURE_EX(spec.Output_.Path_.empty(),
+ TApiUsageError() << "TSortOperationSpec::Output MUST be empty");
+ Y_ENSURE_EX(spec.SortBy_.Parts_.empty(),
+ TApiUsageError() << "TSortOperationSpec::SortBy MUST be empty");
+
+ auto sortSpec = spec;
+ for (const auto& inputPath : input.Parts_) {
+ sortSpec.AddInput(inputPath);
+ }
+ sortSpec.Output(output);
+ sortSpec.SortBy(sortBy);
+ return Sort(sortSpec, options);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRawTableReaderPtr IStructuredJob::CreateCustomRawJobReader(int) const
+{
+ return nullptr;
+}
+
+THolder<IProxyOutput> IStructuredJob::CreateCustomRawJobWriter(size_t) const
+{
+ return nullptr;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT