diff options
| author | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
| commit | 3e1899838408bbad47622007aa382bc8a2b01f87 (patch) | |
| tree | 0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/interface/operation.cpp | |
| parent | 5463eb3f5e72a86f858a3d27c886470a724ede34 (diff) | |
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing
changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/interface/operation.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/interface/operation.cpp | 663 |
1 files changed, 0 insertions, 663 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.cpp b/yt/cpp/mapreduce/interface/operation.cpp deleted file mode 100644 index 706fc4caa4c..00000000000 --- a/yt/cpp/mapreduce/interface/operation.cpp +++ /dev/null @@ -1,663 +0,0 @@ -#include "operation.h" - -#include <util/generic/iterator_range.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -namespace NDetail { - i64 OutputTableCount = -1; -} // namespace NDetail - -//////////////////////////////////////////////////////////////////////////////// - -TTaskName::TTaskName(TString taskName) - : TaskName_(std::move(taskName)) -{ } - -TTaskName::TTaskName(const char* taskName) - : TaskName_(taskName) -{ } - -TTaskName::TTaskName(ETaskName taskName) - : TaskName_(ToString(taskName)) -{ } - -const TString& TTaskName::Get() const -{ - return TaskName_; -} - -//////////////////////////////////////////////////////////////////////////////// - -TCommandRawJob::TCommandRawJob(TStringBuf command) - : Command_(command) -{ } - -const TString& TCommandRawJob::GetCommand() const -{ - return Command_; -} - -void TCommandRawJob::Do(const TRawJobContext& /* jobContext */) -{ - Y_FAIL("TCommandRawJob::Do must not be called"); -} - -REGISTER_NAMED_RAW_JOB("NYT::TCommandRawJob", TCommandRawJob) - -//////////////////////////////////////////////////////////////////////////////// - -TCommandVanillaJob::TCommandVanillaJob(TStringBuf command) - : Command_(command) -{ } - -const TString& TCommandVanillaJob::GetCommand() const -{ - return Command_; -} - -void TCommandVanillaJob::Do() -{ - Y_FAIL("TCommandVanillaJob::Do must not be called"); -} - -REGISTER_NAMED_VANILLA_JOB("NYT::TCommandVanillaJob", TCommandVanillaJob); - -//////////////////////////////////////////////////////////////////////////////// - -bool operator==(const TUnspecifiedTableStructure&, const TUnspecifiedTableStructure&) -{ - return true; -} - -bool operator==(const TProtobufTableStructure& lhs, const TProtobufTableStructure& rhs) -{ - return lhs.Descriptor == rhs.Descriptor; -} - -//////////////////////////////////////////////////////////////////////////////// - -const TVector<TStructuredTablePath>& TOperationInputSpecBase::GetStructuredInputs() const -{ - return StructuredInputs_; -} - -const TVector<TStructuredTablePath>& TOperationOutputSpecBase::GetStructuredOutputs() const -{ - return StructuredOutputs_; -} - -void TOperationInputSpecBase::AddStructuredInput(TStructuredTablePath path) -{ - Inputs_.push_back(path.RichYPath); - StructuredInputs_.push_back(std::move(path)); -} - -void TOperationOutputSpecBase::AddStructuredOutput(TStructuredTablePath path) -{ - Outputs_.push_back(path.RichYPath); - StructuredOutputs_.push_back(std::move(path)); -} - -//////////////////////////////////////////////////////////////////////////////// - -TVanillaTask& TVanillaTask::AddStructuredOutput(TStructuredTablePath path) -{ - TOperationOutputSpecBase::AddStructuredOutput(std::move(path)); - return *this; -} - -//////////////////////////////////////////////////////////////////////////////// - -TStructuredRowStreamDescription IVanillaJob<void>::GetInputRowStreamDescription() const -{ - return TVoidStructuredRowStream(); -} - -TStructuredRowStreamDescription IVanillaJob<void>::GetOutputRowStreamDescription() const -{ - return TVoidStructuredRowStream(); -} - -//////////////////////////////////////////////////////////////////////////////// - -TRawJobContext::TRawJobContext(size_t outputTableCount) - : InputFile_(Duplicate(0)) -{ - for (size_t i = 0; i != outputTableCount; ++i) { - OutputFileList_.emplace_back(Duplicate(3 * i + 1)); - } -} - -const TFile& TRawJobContext::GetInputFile() const -{ - return InputFile_; -} - -const TVector<TFile>& TRawJobContext::GetOutputFileList() const -{ - return OutputFileList_; -} - -//////////////////////////////////////////////////////////////////////////////// - -TUserJobSpec& TUserJobSpec::AddLocalFile( - const TLocalFilePath& path, - const TAddLocalFileOptions& options) -{ - LocalFiles_.emplace_back(path, options); - return *this; -} - -TUserJobSpec& TUserJobSpec::JobBinaryLocalPath(TString path, TMaybe<TString> md5) -{ - JobBinary_ = TJobBinaryLocalPath{path, md5}; - return *this; -} - -TUserJobSpec& TUserJobSpec::JobBinaryCypressPath(TString path, TMaybe<TTransactionId> transactionId) -{ - JobBinary_ = TJobBinaryCypressPath{path, transactionId}; - return *this; -} - -const TJobBinaryConfig& TUserJobSpec::GetJobBinary() const -{ - return JobBinary_; -} - -TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> TUserJobSpec::GetLocalFiles() const -{ - return LocalFiles_; -} - -//////////////////////////////////////////////////////////////////////////////// - -TJobOperationPreparer::TInputGroup::TInputGroup(TJobOperationPreparer& preparer, TVector<int> indices) - : Preparer_(preparer) - , Indices_(std::move(indices)) -{ } - -TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnRenaming(const THashMap<TString, TString>& renaming) -{ - for (auto i : Indices_) { - Preparer_.InputColumnRenaming(i, renaming); - } - return *this; -} - -TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnFilter(const TVector<TString>& columns) -{ - for (auto i : Indices_) { - Preparer_.InputColumnFilter(i, columns); - } - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::TInputGroup::EndInputGroup() -{ - return Preparer_; -} - -TJobOperationPreparer::TOutputGroup::TOutputGroup(TJobOperationPreparer& preparer, TVector<int> indices) - : Preparer_(preparer) - , Indices_(std::move(indices)) -{ } - -TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Schema(const TTableSchema &schema) -{ - for (auto i : Indices_) { - Preparer_.OutputSchema(i, schema); - } - return *this; -} - -TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::NoSchema() -{ - for (auto i : Indices_) { - Preparer_.NoOutputSchema(i); - } - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::TOutputGroup::EndOutputGroup() -{ - return Preparer_; -} - -//////////////////////////////////////////////////////////////////////////////// - -TJobOperationPreparer::TJobOperationPreparer(const IOperationPreparationContext& context) - : Context_(context) - , OutputSchemas_(context.GetOutputCount()) - , InputColumnRenamings_(context.GetInputCount()) - , InputColumnFilters_(context.GetInputCount()) - , InputTableDescriptions_(context.GetInputCount()) - , OutputTableDescriptions_(context.GetOutputCount()) -{ } - -TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(int begin, int end) -{ - Y_ENSURE_EX(begin <= end, TApiUsageError() - << "BeginInputGroup(): begin must not exceed end, got " << begin << ", " << end); - TVector<int> indices; - for (int i = begin; i < end; ++i) { - ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()")); - indices.push_back(i); - } - return TInputGroup(*this, std::move(indices)); -} - - -TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(int begin, int end) -{ - Y_ENSURE_EX(begin <= end, TApiUsageError() - << "BeginOutputGroup(): begin must not exceed end, got " << begin << ", " << end); - TVector<int> indices; - for (int i = begin; i < end; ++i) { - ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()")); - indices.push_back(i); - } - return TOutputGroup(*this, std::move(indices)); -} - -TJobOperationPreparer& TJobOperationPreparer::NodeOutput(int tableIndex) -{ - ValidateMissingOutputDescription(tableIndex); - OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TNode>(); - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::OutputSchema(int tableIndex, TTableSchema schema) -{ - ValidateMissingOutputSchema(tableIndex); - OutputSchemas_[tableIndex] = std::move(schema); - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::NoOutputSchema(int tableIndex) -{ - ValidateMissingOutputSchema(tableIndex); - OutputSchemas_[tableIndex] = EmptyNonstrictSchema(); - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::InputColumnRenaming( - int tableIndex, - const THashMap<TString,TString>& renaming) -{ - ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnRenaming()")); - InputColumnRenamings_[tableIndex] = renaming; - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::InputColumnFilter(int tableIndex, const TVector<TString>& columns) -{ - ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnFilter()")); - InputColumnFilters_[tableIndex] = columns; - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::FormatHints(TUserJobFormatHints newFormatHints) -{ - FormatHints_ = newFormatHints; - return *this; -} - -void TJobOperationPreparer::Finish() -{ - FinallyValidate(); -} - -TVector<TTableSchema> TJobOperationPreparer::GetOutputSchemas() -{ - TVector<TTableSchema> result; - result.reserve(OutputSchemas_.size()); - for (auto& schema : OutputSchemas_) { - Y_VERIFY(schema.Defined()); - result.push_back(std::move(*schema)); - schema.Clear(); - } - return result; -} - -void TJobOperationPreparer::FinallyValidate() const -{ - TVector<int> illegallyMissingSchemaIndices; - for (int i = 0; i < static_cast<int>(OutputSchemas_.size()); ++i) { - if (!OutputSchemas_[i]) { - illegallyMissingSchemaIndices.push_back(i); - } - } - if (illegallyMissingSchemaIndices.empty()) { - return; - } - TApiUsageError error; - error << "Output table schemas are missing: "; - for (auto i : illegallyMissingSchemaIndices) { - error << "no. " << i; - if (auto path = Context_.GetInputPath(i)) { - error << "(" << *path << ")"; - } - error << "; "; - } - ythrow std::move(error); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TJobOperationPreparer::ValidateInputTableIndex(int tableIndex, TStringBuf message) const -{ - Y_ENSURE_EX( - 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetInputCount()), - TApiUsageError() << - message << ": input table index " << tableIndex << " us out of range [0;" << - OutputSchemas_.size() << ")"); -} - -void TJobOperationPreparer::ValidateOutputTableIndex(int tableIndex, TStringBuf message) const -{ - Y_ENSURE_EX( - 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetOutputCount()), - TApiUsageError() << - message << ": output table index " << tableIndex << " us out of range [0;" << - OutputSchemas_.size() << ")"); -} - -void TJobOperationPreparer::ValidateMissingOutputSchema(int tableIndex) const -{ - ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputSchema()"); - Y_ENSURE_EX(!OutputSchemas_[tableIndex], - TApiUsageError() << - "Output table schema no. " << tableIndex << " " << - "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " << - "is already set"); -} - -void TJobOperationPreparer::ValidateMissingInputDescription(int tableIndex) const -{ - ValidateInputTableIndex(tableIndex, "ValidateMissingInputDescription()"); - Y_ENSURE_EX(!InputTableDescriptions_[tableIndex], - TApiUsageError() << - "Description for input no. " << tableIndex << " " << - "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " << - "is already set"); -} - -void TJobOperationPreparer::ValidateMissingOutputDescription(int tableIndex) const -{ - ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputDescription()"); - Y_ENSURE_EX(!OutputTableDescriptions_[tableIndex], - TApiUsageError() << - "Description for output no. " << tableIndex << " " << - "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " << - "is already set"); -} - -TTableSchema TJobOperationPreparer::EmptyNonstrictSchema() { - return TTableSchema().Strict(false); -} - -//////////////////////////////////////////////////////////////////////////////// - -const TVector<THashMap<TString, TString>>& TJobOperationPreparer::GetInputColumnRenamings() const -{ - return InputColumnRenamings_; -} - -const TVector<TMaybe<TVector<TString>>>& TJobOperationPreparer::GetInputColumnFilters() const -{ - return InputColumnFilters_; -} - -const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetInputDescriptions() const -{ - return InputTableDescriptions_; -} - -const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetOutputDescriptions() const -{ - return OutputTableDescriptions_; -} - -const TUserJobFormatHints& TJobOperationPreparer::GetFormatHints() const -{ - return FormatHints_; -} - -TJobOperationPreparer& TJobOperationPreparer::InputFormatHints(TFormatHints hints) -{ - FormatHints_.InputFormatHints(hints); - return *this; -} - -TJobOperationPreparer& TJobOperationPreparer::OutputFormatHints(TFormatHints hints) -{ - FormatHints_.OutputFormatHints(hints); - return *this; -} - -//////////////////////////////////////////////////////////////////////////////// - -void IJob::PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& resultBuilder) const -{ - for (int i = 0; i < context.GetOutputCount(); ++i) { - resultBuilder.NoOutputSchema(i); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -IOperationPtr IOperationClient::Map( - const TMapOperationSpec& spec, - ::TIntrusivePtr<IMapperBase> mapper, - const TOperationOptions& options) -{ - Y_VERIFY(mapper.Get()); - - return DoMap( - spec, - std::move(mapper), - options); -} - -IOperationPtr IOperationClient::Map( - ::TIntrusivePtr<IMapperBase> mapper, - const TOneOrMany<TStructuredTablePath>& input, - const TOneOrMany<TStructuredTablePath>& output, - const TMapOperationSpec& spec, - const TOperationOptions& options) -{ - Y_ENSURE_EX(spec.Inputs_.empty(), - TApiUsageError() << "TMapOperationSpec::Inputs MUST be empty"); - Y_ENSURE_EX(spec.Outputs_.empty(), - TApiUsageError() << "TMapOperationSpec::Outputs MUST be empty"); - - auto mapSpec = spec; - for (const auto& inputPath : input.Parts_) { - mapSpec.AddStructuredInput(inputPath); - } - for (const auto& outputPath : output.Parts_) { - mapSpec.AddStructuredOutput(outputPath); - } - return Map(mapSpec, std::move(mapper), options); -} - -IOperationPtr IOperationClient::Reduce( - const TReduceOperationSpec& spec, - ::TIntrusivePtr<IReducerBase> reducer, - const TOperationOptions& options) -{ - Y_VERIFY(reducer.Get()); - - return DoReduce( - spec, - std::move(reducer), - options); -} - -IOperationPtr IOperationClient::Reduce( - ::TIntrusivePtr<IReducerBase> reducer, - const TOneOrMany<TStructuredTablePath>& input, - const TOneOrMany<TStructuredTablePath>& output, - const TSortColumns& reduceBy, - const TReduceOperationSpec& spec, - const TOperationOptions& options) -{ - Y_ENSURE_EX(spec.Inputs_.empty(), - TApiUsageError() << "TReduceOperationSpec::Inputs MUST be empty"); - Y_ENSURE_EX(spec.Outputs_.empty(), - TApiUsageError() << "TReduceOperationSpec::Outputs MUST be empty"); - Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(), - TApiUsageError() << "TReduceOperationSpec::ReduceBy MUST be empty"); - - auto reduceSpec = spec; - for (const auto& inputPath : input.Parts_) { - reduceSpec.AddStructuredInput(inputPath); - } - for (const auto& outputPath : output.Parts_) { - reduceSpec.AddStructuredOutput(outputPath); - } - reduceSpec.ReduceBy(reduceBy); - return Reduce(reduceSpec, std::move(reducer), options); -} - -IOperationPtr IOperationClient::JoinReduce( - const TJoinReduceOperationSpec& spec, - ::TIntrusivePtr<IReducerBase> reducer, - const TOperationOptions& options) -{ - Y_VERIFY(reducer.Get()); - - return DoJoinReduce( - spec, - std::move(reducer), - options); -} - -IOperationPtr IOperationClient::MapReduce( - const TMapReduceOperationSpec& spec, - ::TIntrusivePtr<IMapperBase> mapper, - ::TIntrusivePtr<IReducerBase> reducer, - const TOperationOptions& options) -{ - Y_VERIFY(reducer.Get()); - - return DoMapReduce( - spec, - std::move(mapper), - nullptr, - std::move(reducer), - options); -} - -IOperationPtr IOperationClient::MapReduce( - const TMapReduceOperationSpec& spec, - ::TIntrusivePtr<IMapperBase> mapper, - ::TIntrusivePtr<IReducerBase> reduceCombiner, - ::TIntrusivePtr<IReducerBase> reducer, - const TOperationOptions& options) -{ - Y_VERIFY(reducer.Get()); - - return DoMapReduce( - spec, - std::move(mapper), - std::move(reduceCombiner), - std::move(reducer), - options); -} - -IOperationPtr IOperationClient::MapReduce( - ::TIntrusivePtr<IMapperBase> mapper, - ::TIntrusivePtr<IReducerBase> reducer, - const TOneOrMany<TStructuredTablePath>& input, - const TOneOrMany<TStructuredTablePath>& output, - const TSortColumns& reduceBy, - TMapReduceOperationSpec spec, - const TOperationOptions& options) -{ - Y_ENSURE_EX(spec.Inputs_.empty(), - TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty"); - Y_ENSURE_EX(spec.Outputs_.empty(), - TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty"); - Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(), - TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty"); - - for (const auto& inputPath : input.Parts_) { - spec.AddStructuredInput(inputPath); - } - for (const auto& outputPath : output.Parts_) { - spec.AddStructuredOutput(outputPath); - } - spec.ReduceBy(reduceBy); - return MapReduce(spec, std::move(mapper), std::move(reducer), options); -} - -IOperationPtr IOperationClient::MapReduce( - ::TIntrusivePtr<IMapperBase> mapper, - ::TIntrusivePtr<IReducerBase> reduceCombiner, - ::TIntrusivePtr<IReducerBase> reducer, - const TOneOrMany<TStructuredTablePath>& input, - const TOneOrMany<TStructuredTablePath>& output, - const TSortColumns& reduceBy, - TMapReduceOperationSpec spec, - const TOperationOptions& options) -{ - Y_ENSURE_EX(spec.Inputs_.empty(), - TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty"); - Y_ENSURE_EX(spec.Outputs_.empty(), - TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty"); - Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(), - TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty"); - - for (const auto& inputPath : input.Parts_) { - spec.AddStructuredInput(inputPath); - } - for (const auto& outputPath : output.Parts_) { - spec.AddStructuredOutput(outputPath); - } - spec.ReduceBy(reduceBy); - return MapReduce(spec, std::move(mapper), std::move(reduceCombiner), std::move(reducer), options); -} - -IOperationPtr IOperationClient::Sort( - const TOneOrMany<TRichYPath>& input, - const TRichYPath& output, - const TSortColumns& sortBy, - const TSortOperationSpec& spec, - const TOperationOptions& options) -{ - Y_ENSURE_EX(spec.Inputs_.empty(), - TApiUsageError() << "TSortOperationSpec::Inputs MUST be empty"); - Y_ENSURE_EX(spec.Output_.Path_.empty(), - TApiUsageError() << "TSortOperationSpec::Output MUST be empty"); - Y_ENSURE_EX(spec.SortBy_.Parts_.empty(), - TApiUsageError() << "TSortOperationSpec::SortBy MUST be empty"); - - auto sortSpec = spec; - for (const auto& inputPath : input.Parts_) { - sortSpec.AddInput(inputPath); - } - sortSpec.Output(output); - sortSpec.SortBy(sortBy); - return Sort(sortSpec, options); -} - -//////////////////////////////////////////////////////////////////////////////// - -TRawTableReaderPtr IStructuredJob::CreateCustomRawJobReader(int) const -{ - return nullptr; -} - -THolder<IProxyOutput> IStructuredJob::CreateCustomRawJobWriter(size_t) const -{ - return nullptr; -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT |
