diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/interface/operation-inl.h | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/interface/operation-inl.h')
-rw-r--r-- | yt/cpp/mapreduce/interface/operation-inl.h | 928 |
1 files changed, 928 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/operation-inl.h b/yt/cpp/mapreduce/interface/operation-inl.h new file mode 100644 index 0000000000..8d53cd446f --- /dev/null +++ b/yt/cpp/mapreduce/interface/operation-inl.h @@ -0,0 +1,928 @@ +#pragma once + +#ifndef OPERATION_INL_H_ +#error "Direct inclusion of this file is not allowed, use operation.h" +#include "operation.h" +#endif +#undef OPERATION_INL_H_ + +#include "errors.h" + +#include <util/generic/bt_exception.h> +#include <util/generic/singleton.h> +#include <util/system/type_name.h> + +#include <util/stream/file.h> +#include <util/stream/buffer.h> +#include <util/string/subst.h> + +#include <typeindex> + +namespace NYT { + +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +template<class T> +void Assign(TVector<T>& array, size_t idx, const T& value) { + array.resize(std::max(array.size(), idx + 1)); + array[idx] = value; +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TRow> +TStructuredRowStreamDescription GetStructuredRowStreamDescription() +{ + if constexpr (std::is_same_v<TRow, NYT::TNode>) { + return TTNodeStructuredRowStream{}; + } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) { + return TTYaMRRowStructuredRowStream{}; + } else if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) { + return TProtobufStructuredRowStream{nullptr}; + } else if constexpr (TIsBaseOf<::google::protobuf::Message, TRow>::Value) { + return TProtobufStructuredRowStream{TRow::descriptor()}; + } else if constexpr (TIsProtoOneOf<TRow>::value) { + return TProtobufStructuredRowStream{nullptr}; + } else { + static_assert(TDependentFalse<TRow>, "Unknown row type"); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TRow> +TStructuredTablePath Structured(TRichYPath richYPath) +{ + return TStructuredTablePath(std::move(richYPath), StructuredTableDescription<TRow>()); +} + +template <typename TRow> +TTableStructure StructuredTableDescription() +{ + if constexpr (std::is_same_v<TRow, NYT::TNode>) { + return TUnspecifiedTableStructure{}; + } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) { + return TUnspecifiedTableStructure{}; + } else if constexpr (std::is_base_of_v<::google::protobuf::Message, TRow>) { + if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) { + static_assert(TDependentFalse<TRow>, "Cannot use ::google::protobuf::Message as table descriptor"); + } else { + return TProtobufTableStructure{TRow::descriptor()}; + } + } else { + static_assert(TDependentFalse<TRow>, "Unknown row type"); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TDerived> +TDerived& TRawOperationIoTableSpec<TDerived>::AddInput(const TRichYPath& path) +{ + Inputs_.push_back(path); + return static_cast<TDerived&>(*this); +} + +template <typename TDerived> +TDerived& TRawOperationIoTableSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path) +{ + NDetail::Assign(Inputs_, tableIndex, path); +} + +template <typename TDerived> +TDerived& TRawOperationIoTableSpec<TDerived>::AddOutput(const TRichYPath& path) +{ + Outputs_.push_back(path); + return static_cast<TDerived&>(*this); +} + +template <typename TDerived> +TDerived& TRawOperationIoTableSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path) +{ + NDetail::Assign(Outputs_, tableIndex, path); +} + +template <typename TDerived> +const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetInputs() const +{ + return Inputs_; +} + +template <typename TDerived> +const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetOutputs() const +{ + return Outputs_; +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TDerived> +TDerived& TRawMapReduceOperationIoSpec<TDerived>::AddMapOutput(const TRichYPath& path) +{ + MapOutputs_.push_back(path); + return static_cast<TDerived&>(*this); +} + +template <typename TDerived> +TDerived& TRawMapReduceOperationIoSpec<TDerived>::SetMapOutput(size_t tableIndex, const TRichYPath& path) +{ + NDetail::Assign(MapOutputs_, tableIndex, path); +} + +template <typename TDerived> +const TVector<TRichYPath>& TRawMapReduceOperationIoSpec<TDerived>::GetMapOutputs() const +{ + return MapOutputs_; +} + +//////////////////////////////////////////////////////////////////////////////// + +::TIntrusivePtr<INodeReaderImpl> CreateJobNodeReader(TRawTableReaderPtr rawTableReader); +::TIntrusivePtr<IYaMRReaderImpl> CreateJobYaMRReader(TRawTableReaderPtr rawTableReader); +::TIntrusivePtr<IProtoReaderImpl> CreateJobProtoReader(TRawTableReaderPtr rawTableReader); + +::TIntrusivePtr<INodeWriterImpl> CreateJobNodeWriter(THolder<IProxyOutput> rawTableWriter); +::TIntrusivePtr<IYaMRWriterImpl> CreateJobYaMRWriter(THolder<IProxyOutput> rawTableWriter); +::TIntrusivePtr<IProtoWriterImpl> CreateJobProtoWriter(THolder<IProxyOutput> rawTableWriter); + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader); + +template <> +inline ::TIntrusivePtr<INodeReaderImpl> CreateJobReaderImpl<TNode>(TRawTableReaderPtr rawTableReader) +{ + return CreateJobNodeReader(rawTableReader); +} + +template <> +inline ::TIntrusivePtr<IYaMRReaderImpl> CreateJobReaderImpl<TYaMRRow>(TRawTableReaderPtr rawTableReader) +{ + return CreateJobYaMRReader(rawTableReader); +} + +template <> +inline ::TIntrusivePtr<IProtoReaderImpl> CreateJobReaderImpl<Message>(TRawTableReaderPtr rawTableReader) +{ + return CreateJobProtoReader(rawTableReader); +} + +template <class T> +inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader) +{ + if constexpr (TIsBaseOf<Message, T>::Value || NDetail::TIsProtoOneOf<T>::value) { + return CreateJobProtoReader(rawTableReader); + } else { + static_assert(TDependentFalse<T>, "Unknown row type"); + } +} + +template <class T> +inline TTableReaderPtr<T> CreateJobReader(TRawTableReaderPtr rawTableReader) +{ + return new TTableReader<T>(CreateJobReaderImpl<T>(rawTableReader)); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter); + +template <> +inline TTableWriterPtr<TNode> CreateJobWriter<TNode>(THolder<IProxyOutput> rawJobWriter) +{ + return new TTableWriter<TNode>(CreateJobNodeWriter(std::move(rawJobWriter))); +} + +template <> +inline TTableWriterPtr<TYaMRRow> CreateJobWriter<TYaMRRow>(THolder<IProxyOutput> rawJobWriter) +{ + return new TTableWriter<TYaMRRow>(CreateJobYaMRWriter(std::move(rawJobWriter))); +} + +template <> +inline TTableWriterPtr<Message> CreateJobWriter<Message>(THolder<IProxyOutput> rawJobWriter) +{ + return new TTableWriter<Message>(CreateJobProtoWriter(std::move(rawJobWriter))); +} + +template <class T, class = void> +struct TProtoWriterCreator; + +template <class T> +struct TProtoWriterCreator<T, std::enable_if_t<TIsBaseOf<Message, T>::Value>> +{ + static TTableWriterPtr<T> Create(::TIntrusivePtr<IProtoWriterImpl> writer) + { + return new TTableWriter<T>(writer); + } +}; + +template <class T> +inline TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter) +{ + if constexpr (TIsBaseOf<Message, T>::Value) { + return TProtoWriterCreator<T>::Create(CreateJobProtoWriter(std::move(rawJobWriter))); + } else { + static_assert(TDependentFalse<T>, "Unknown row type"); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +void TOperationInputSpecBase::AddInput(const TRichYPath& path) +{ + Inputs_.push_back(path); + StructuredInputs_.emplace_back(Structured<T>(path)); +} + +template <class T> +void TOperationInputSpecBase::SetInput(size_t tableIndex, const TRichYPath& path) +{ + NDetail::Assign(Inputs_, tableIndex, path); + NDetail::Assign(StructuredInputs_, tableIndex, Structured<T>(path)); +} + + +template <class T> +void TOperationOutputSpecBase::AddOutput(const TRichYPath& path) +{ + Outputs_.push_back(path); + StructuredOutputs_.emplace_back(Structured<T>(path)); +} + +template <class T> +void TOperationOutputSpecBase::SetOutput(size_t tableIndex, const TRichYPath& path) +{ + NDetail::Assign(Outputs_, tableIndex, path); + NDetail::Assign(StructuredOutputs_, tableIndex, Structured<T>(path)); +} + +template <class TDerived> +template <class T> +TDerived& TOperationIOSpec<TDerived>::AddInput(const TRichYPath& path) +{ + static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)"); + TOperationInputSpecBase::AddInput<T>(path); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +template <class T> +TDerived& TOperationIOSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path) +{ + static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)"); + TOperationInputSpecBase::SetInput<T>(tableIndex, path); + return *static_cast<TDerived*>(this); +} + + +template <class TDerived> +template <class T> +TDerived& TOperationIOSpec<TDerived>::AddOutput(const TRichYPath& path) +{ + static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)"); + TOperationOutputSpecBase::AddOutput<T>(path); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +template <class T> +TDerived& TOperationIOSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path) +{ + static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)"); + TOperationOutputSpecBase::SetOutput<T>(tableIndex, path); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +TDerived& TOperationIOSpec<TDerived>::AddStructuredInput(TStructuredTablePath path) +{ + TOperationInputSpecBase::AddStructuredInput(std::move(path)); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +TDerived& TOperationIOSpec<TDerived>::AddStructuredOutput(TStructuredTablePath path) +{ + TOperationOutputSpecBase::AddStructuredOutput(std::move(path)); + return *static_cast<TDerived*>(this); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +TVanillaTask& TVanillaTask::AddOutput(const TRichYPath& path) +{ + static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)"); + TOperationOutputSpecBase::AddOutput<T>(path); + return *this; +} + +template <class T> +TVanillaTask& TVanillaTask::SetOutput(size_t tableIndex, const TRichYPath& path) +{ + static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)"); + TOperationOutputSpecBase::SetOutput<T>(tableIndex, path); + return *this; +} + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +void ResetUseClientProtobuf(const char* methodName); + +} // namespace NDetail + +template <class TDerived> +TDerived& TOperationIOSpec<TDerived>::AddProtobufInput_VerySlow_Deprecated(const TRichYPath& path) +{ + NDetail::ResetUseClientProtobuf("AddProtobufInput_VerySlow_Deprecated"); + Inputs_.push_back(path); + StructuredInputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr})); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +TDerived& TOperationIOSpec<TDerived>::AddProtobufOutput_VerySlow_Deprecated(const TRichYPath& path) +{ + NDetail::ResetUseClientProtobuf("AddProtobufOutput_VerySlow_Deprecated"); + Outputs_.push_back(path); + StructuredOutputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr})); + return *static_cast<TDerived*>(this); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TRow> +TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::Description() +{ + for (auto i : Indices_) { + Preparer_.InputDescription<TRow>(i); + } + return *this; +} + +template <typename TRow> +TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Description(bool inferSchema) +{ + for (auto i : Indices_) { + Preparer_.OutputDescription<TRow>(i, inferSchema); + } + return *this; +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TCont> +TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(const TCont& indices) +{ + for (auto i : indices) { + ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()")); + } + return TInputGroup(*this, TVector<int>(std::begin(indices), std::end(indices))); +} + +template <typename TCont> +TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(const TCont& indices) +{ + for (auto i : indices) { + ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()")); + } + return TOutputGroup(*this, indices); +} + + +template <typename TRow> +TJobOperationPreparer& TJobOperationPreparer::InputDescription(int tableIndex) +{ + ValidateMissingInputDescription(tableIndex); + InputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>(); + return *this; +} + +template <typename TRow> +TJobOperationPreparer& TJobOperationPreparer::OutputDescription(int tableIndex, bool inferSchema) +{ + ValidateMissingOutputDescription(tableIndex); + OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>(); + if (inferSchema && !OutputSchemas_[tableIndex]) { + OutputSchemas_[tableIndex] = CreateTableSchema<TRow>(); + } + return *this; +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class TDerived> +template <class TRow> +TDerived& TIntermediateTablesHintSpec<TDerived>::HintMapOutput() +{ + IntermediateMapOutputDescription_ = StructuredTableDescription<TRow>(); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +template <class TRow> +TDerived& TIntermediateTablesHintSpec<TDerived>::AddMapOutput(const TRichYPath& path) +{ + MapOutputs_.push_back(path); + StructuredMapOutputs_.emplace_back(Structured<TRow>(path)); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +template <class TRow> +TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerInput() +{ + IntermediateReduceCombinerInputDescription_ = StructuredTableDescription<TRow>(); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +template <class TRow> +TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerOutput() +{ + IntermediateReduceCombinerOutputDescription_ = StructuredTableDescription<TRow>(); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +template <class TRow> +TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceInput() +{ + IntermediateReducerInputDescription_ = StructuredTableDescription<TRow>(); + return *static_cast<TDerived*>(this); +} + +template <class TDerived> +const TVector<TStructuredTablePath>& TIntermediateTablesHintSpec<TDerived>::GetStructuredMapOutputs() const +{ + return StructuredMapOutputs_; +} + +template <class TDerived> +const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateMapOutputDescription() const +{ + return IntermediateMapOutputDescription_; +} + +template <class TDerived> +const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerInputDescription() const +{ + return IntermediateReduceCombinerInputDescription_; +} + +template <class TDerived> +const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerOutputDescription() const +{ + return IntermediateReduceCombinerOutputDescription_; +} + +template <class TDerived> +const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReducerInputDescription() const +{ + return IntermediateReducerInputDescription_; +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TReducerContext +{ + bool Break = false; + static TReducerContext* Get() { return Singleton<TReducerContext>(); } +}; + +template <class TR, class TW> +inline void IReducer<TR, TW>::Break() +{ + TReducerContext::Get()->Break = true; +} + +template <typename TReader, typename TWriter> +void FeedJobInput( + IMapper<TReader, TWriter>* mapper, + typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl, + TWriter* writer) +{ + using TInputRow = typename TReader::TRowType; + + auto reader = MakeIntrusive<TTableReader<TInputRow>>(readerImpl); + mapper->Do(reader.Get(), writer); +} + +template <typename TReader, typename TWriter> +void FeedJobInput( + IReducer<TReader, TWriter>* reducer, + typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl, + TWriter* writer) +{ + using TInputRow = typename TReader::TRowType; + + auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl); + for (; rangesReader->IsValid(); rangesReader->Next()) { + reducer->Do(&rangesReader->GetRange(), writer); + if (TReducerContext::Get()->Break) { + break; + } + } +} + +template <typename TReader, typename TWriter> +void FeedJobInput( + IAggregatorReducer<TReader, TWriter>* reducer, + typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl, + TWriter* writer) +{ + using TInputRow = typename TReader::TRowType; + + auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl); + reducer->Do(rangesReader.Get(), writer); +} + +template <class TRawJob> +int RunRawJob(size_t outputTableCount, IInputStream& jobStateStream) +{ + TRawJobContext context(outputTableCount); + + TRawJob job; + job.Load(jobStateStream); + job.Do(context); + return 0; +} + +template <> +inline int RunRawJob<TCommandRawJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */) +{ + Y_FAIL(); +} + +template <class TVanillaJob> +int RunVanillaJob(size_t outputTableCount, IInputStream& jobStateStream) +{ + TVanillaJob job; + job.Load(jobStateStream); + + if constexpr (std::is_base_of<IVanillaJob<>, TVanillaJob>::value) { + Y_VERIFY(outputTableCount == 0, "Void vanilla job expects zero 'outputTableCount'"); + job.Do(); + } else { + Y_VERIFY(outputTableCount, "Vanilla job with table writer expects nonzero 'outputTableCount'"); + using TOutputRow = typename TVanillaJob::TWriter::TRowType; + + THolder<IProxyOutput> rawJobWriter; + if (auto customWriter = job.CreateCustomRawJobWriter(outputTableCount)) { + rawJobWriter = std::move(customWriter); + } else { + rawJobWriter = CreateRawJobWriter(outputTableCount); + } + auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter)); + + job.Start(writer.Get()); + job.Do(writer.Get()); + job.Finish(writer.Get()); + + writer->Finish(); + } + return 0; +} + +template <> +inline int RunVanillaJob<TCommandVanillaJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */) +{ + Y_FAIL(); +} + +template <class TJob> + requires TIsBaseOf<IStructuredJob, TJob>::Value +int RunJob(size_t outputTableCount, IInputStream& jobStateStream) +{ + using TInputRow = typename TJob::TReader::TRowType; + using TOutputRow = typename TJob::TWriter::TRowType; + + auto job = MakeIntrusive<TJob>(); + job->Load(jobStateStream); + + TRawTableReaderPtr rawJobReader; + if (auto customReader = job->CreateCustomRawJobReader(/*fd*/ 0)) { + rawJobReader = customReader; + } else { + rawJobReader = CreateRawJobReader(/*fd*/ 0); + } + auto readerImpl = CreateJobReaderImpl<TInputRow>(rawJobReader); + + // Many users don't expect to have jobs with empty input so we skip such jobs. + if (!readerImpl->IsValid()) { + return 0; + } + + THolder<IProxyOutput> rawJobWriter; + if (auto customWriter = job->CreateCustomRawJobWriter(outputTableCount)) { + rawJobWriter = std::move(customWriter); + } else { + rawJobWriter = CreateRawJobWriter(outputTableCount); + } + auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter)); + + job->Start(writer.Get()); + FeedJobInput(job.Get(), readerImpl.Get(), writer.Get()); + job->Finish(writer.Get()); + + writer->Finish(); + + return 0; +} + +// +// We leave RunMapJob/RunReduceJob/RunAggregatorReducer for backward compatibility, +// some user use them already. :( + +template <class TMapper> +int RunMapJob(size_t outputTableCount, IInputStream& jobStateStream) +{ + return RunJob<TMapper>(outputTableCount, jobStateStream); +} + +template <class TReducer> +int RunReduceJob(size_t outputTableCount, IInputStream& jobStateStream) +{ + return RunJob<TReducer>(outputTableCount, jobStateStream); +} + +template <class TReducer> +int RunAggregatorReducer(size_t outputTableCount, IInputStream& jobStateStream) +{ + return RunJob<TReducer>(outputTableCount, jobStateStream); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename T, typename = void> +struct TIsConstructibleFromNode + : std::false_type +{ }; + +template <typename T> +struct TIsConstructibleFromNode<T, std::void_t<decltype(T::FromNode(std::declval<TNode&>()))>> + : std::true_type +{ }; + +template <class TJob> +::TIntrusivePtr<NYT::IStructuredJob> ConstructJobFromNode(const TNode& node) +{ + if constexpr (TIsConstructibleFromNode<TJob>::value) { + Y_ENSURE(node.GetType() != TNode::Undefined, + "job has FromNode method but constructor arguments were not provided"); + return TJob::FromNode(node); + } else { + Y_ENSURE(node.GetType() == TNode::Undefined, + "constructor arguments provided but job does not contain FromNode method"); + return MakeIntrusive<TJob>(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +using TJobFunction = int (*)(size_t, IInputStream&); +using TConstructJobFunction = ::TIntrusivePtr<NYT::IStructuredJob> (*)(const TNode&); + +class TJobFactory +{ +public: + static TJobFactory* Get() + { + return Singleton<TJobFactory>(); + } + + template <class TJob> + void RegisterJob(const char* name) + { + RegisterJobImpl<TJob>(name, RunJob<TJob>); + JobConstructors[name] = ConstructJobFromNode<TJob>; + } + + template <class TRawJob> + void RegisterRawJob(const char* name) + { + RegisterJobImpl<TRawJob>(name, RunRawJob<TRawJob>); + } + + template <class TVanillaJob> + void RegisterVanillaJob(const char* name) + { + RegisterJobImpl<TVanillaJob>(name, RunVanillaJob<TVanillaJob>); + } + + TString GetJobName(const IJob* job) + { + const auto typeIndex = std::type_index(typeid(*job)); + CheckJobRegistered(typeIndex); + return JobNames[typeIndex]; + } + + TJobFunction GetJobFunction(const char* name) + { + CheckNameRegistered(name); + return JobFunctions[name]; + } + + TConstructJobFunction GetConstructingFunction(const char* name) + { + CheckNameRegistered(name); + return JobConstructors[name]; + } + +private: + TMap<std::type_index, TString> JobNames; + THashMap<TString, TJobFunction> JobFunctions; + THashMap<TString, TConstructJobFunction> JobConstructors; + + template <typename TJob, typename TRunner> + void RegisterJobImpl(const char* name, TRunner runner) { + const auto typeIndex = std::type_index(typeid(TJob)); + CheckNotRegistered(typeIndex, name); + JobNames[typeIndex] = name; + JobFunctions[name] = runner; + } + + void CheckNotRegistered(const std::type_index& typeIndex, const char* name) + { + Y_ENSURE(!JobNames.contains(typeIndex), + "type_info '" << typeIndex.name() << "'" + "is already registered under name '" << JobNames[typeIndex] << "'"); + Y_ENSURE(!JobFunctions.contains(name), + "job with name '" << name << "' is already registered"); + } + + void CheckJobRegistered(const std::type_index& typeIndex) + { + Y_ENSURE(JobNames.contains(typeIndex), + "type_info '" << typeIndex.name() << "' is not registered, use REGISTER_* macros"); + } + + void CheckNameRegistered(const char* name) + { + Y_ENSURE(JobFunctions.contains(name), + "job with name '" << name << "' is not registered, use REGISTER_* macros"); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +template <class TMapper> +struct TMapperRegistrator +{ + TMapperRegistrator(const char* name) + { + static_assert(TMapper::JobType == IJob::EType::Mapper, + "REGISTER_MAPPER is not compatible with this job class"); + + NYT::TJobFactory::Get()->RegisterJob<TMapper>(name); + } +}; + +template <class TReducer> +struct TReducerRegistrator +{ + TReducerRegistrator(const char* name) + { + static_assert(TReducer::JobType == IJob::EType::Reducer || + TReducer::JobType == IJob::EType::ReducerAggregator, + "REGISTER_REDUCER is not compatible with this job class"); + + NYT::TJobFactory::Get()->RegisterJob<TReducer>(name); + } +}; + +template <class TRawJob> +struct TRawJobRegistrator +{ + TRawJobRegistrator(const char* name) + { + static_assert(TRawJob::JobType == IJob::EType::RawJob, + "REGISTER_RAW_JOB is not compatible with this job class"); + NYT::TJobFactory::Get()->RegisterRawJob<TRawJob>(name); + } +}; + +template <class TVanillaJob> +struct TVanillaJobRegistrator +{ + TVanillaJobRegistrator(const char* name) + { + static_assert(TVanillaJob::JobType == IJob::EType::VanillaJob, + "REGISTER_VANILLA_JOB is not compatible with this job class"); + NYT::TJobFactory::Get()->RegisterVanillaJob<TVanillaJob>(name); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +inline TString YtRegistryTypeName(const TString& name) { + TString res = name; +#ifdef _win_ + SubstGlobal(res, "class ", ""); +#endif + return res; +} + +//////////////////////////////////////////////////////////////////////////////// + +#define REGISTER_MAPPER(...) \ +static const NYT::TMapperRegistrator<__VA_ARGS__> \ +Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()); + +#define REGISTER_NAMED_MAPPER(name, ...) \ +static const NYT::TMapperRegistrator<__VA_ARGS__> \ +Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name); + +#define REGISTER_REDUCER(...) \ +static const NYT::TReducerRegistrator<__VA_ARGS__> \ +Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()); + +#define REGISTER_NAMED_REDUCER(name, ...) \ +static const NYT::TReducerRegistrator<__VA_ARGS__> \ +Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name); + +#define REGISTER_NAMED_RAW_JOB(name, ...) \ +static const NYT::TRawJobRegistrator<__VA_ARGS__> \ +Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name); + +#define REGISTER_RAW_JOB(...) \ +REGISTER_NAMED_RAW_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__) + +#define REGISTER_NAMED_VANILLA_JOB(name, ...) \ +static NYT::TVanillaJobRegistrator<__VA_ARGS__> \ +Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name); + +#define REGISTER_VANILLA_JOB(...) \ +REGISTER_NAMED_VANILLA_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__) + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TReader, typename TWriter> +TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetInputRowStreamDescription() const +{ + return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>(); +} + +template <typename TReader, typename TWriter> +TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetOutputRowStreamDescription() const +{ + return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TReader, typename TWriter> +TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetInputRowStreamDescription() const +{ + return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>(); +} + +template <typename TReader, typename TWriter> +TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetOutputRowStreamDescription() const +{ + return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TReader, typename TWriter> +TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetInputRowStreamDescription() const +{ + return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>(); +} + +template <typename TReader, typename TWriter> +TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetOutputRowStreamDescription() const +{ + return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <typename TWriter> +TStructuredRowStreamDescription IVanillaJob<TWriter>::GetInputRowStreamDescription() const +{ + return TVoidStructuredRowStream(); +} + +template <typename TWriter> +TStructuredRowStreamDescription IVanillaJob<TWriter>::GetOutputRowStreamDescription() const +{ + return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |