aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/interface/operation-inl.h
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/interface/operation-inl.h
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/interface/operation-inl.h')
-rw-r--r--yt/cpp/mapreduce/interface/operation-inl.h928
1 files changed, 928 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/operation-inl.h b/yt/cpp/mapreduce/interface/operation-inl.h
new file mode 100644
index 0000000000..8d53cd446f
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/operation-inl.h
@@ -0,0 +1,928 @@
+#pragma once
+
+#ifndef OPERATION_INL_H_
+#error "Direct inclusion of this file is not allowed, use operation.h"
+#include "operation.h"
+#endif
+#undef OPERATION_INL_H_
+
+#include "errors.h"
+
+#include <util/generic/bt_exception.h>
+#include <util/generic/singleton.h>
+#include <util/system/type_name.h>
+
+#include <util/stream/file.h>
+#include <util/stream/buffer.h>
+#include <util/string/subst.h>
+
+#include <typeindex>
+
+namespace NYT {
+
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template<class T>
+void Assign(TVector<T>& array, size_t idx, const T& value) {
+ array.resize(std::max(array.size(), idx + 1));
+ array[idx] = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TRow>
+TStructuredRowStreamDescription GetStructuredRowStreamDescription()
+{
+ if constexpr (std::is_same_v<TRow, NYT::TNode>) {
+ return TTNodeStructuredRowStream{};
+ } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) {
+ return TTYaMRRowStructuredRowStream{};
+ } else if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) {
+ return TProtobufStructuredRowStream{nullptr};
+ } else if constexpr (TIsBaseOf<::google::protobuf::Message, TRow>::Value) {
+ return TProtobufStructuredRowStream{TRow::descriptor()};
+ } else if constexpr (TIsProtoOneOf<TRow>::value) {
+ return TProtobufStructuredRowStream{nullptr};
+ } else {
+ static_assert(TDependentFalse<TRow>, "Unknown row type");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TRow>
+TStructuredTablePath Structured(TRichYPath richYPath)
+{
+ return TStructuredTablePath(std::move(richYPath), StructuredTableDescription<TRow>());
+}
+
+template <typename TRow>
+TTableStructure StructuredTableDescription()
+{
+ if constexpr (std::is_same_v<TRow, NYT::TNode>) {
+ return TUnspecifiedTableStructure{};
+ } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) {
+ return TUnspecifiedTableStructure{};
+ } else if constexpr (std::is_base_of_v<::google::protobuf::Message, TRow>) {
+ if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) {
+ static_assert(TDependentFalse<TRow>, "Cannot use ::google::protobuf::Message as table descriptor");
+ } else {
+ return TProtobufTableStructure{TRow::descriptor()};
+ }
+ } else {
+ static_assert(TDependentFalse<TRow>, "Unknown row type");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TDerived>
+TDerived& TRawOperationIoTableSpec<TDerived>::AddInput(const TRichYPath& path)
+{
+ Inputs_.push_back(path);
+ return static_cast<TDerived&>(*this);
+}
+
+template <typename TDerived>
+TDerived& TRawOperationIoTableSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path)
+{
+ NDetail::Assign(Inputs_, tableIndex, path);
+}
+
+template <typename TDerived>
+TDerived& TRawOperationIoTableSpec<TDerived>::AddOutput(const TRichYPath& path)
+{
+ Outputs_.push_back(path);
+ return static_cast<TDerived&>(*this);
+}
+
+template <typename TDerived>
+TDerived& TRawOperationIoTableSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path)
+{
+ NDetail::Assign(Outputs_, tableIndex, path);
+}
+
+template <typename TDerived>
+const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetInputs() const
+{
+ return Inputs_;
+}
+
+template <typename TDerived>
+const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetOutputs() const
+{
+ return Outputs_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TDerived>
+TDerived& TRawMapReduceOperationIoSpec<TDerived>::AddMapOutput(const TRichYPath& path)
+{
+ MapOutputs_.push_back(path);
+ return static_cast<TDerived&>(*this);
+}
+
+template <typename TDerived>
+TDerived& TRawMapReduceOperationIoSpec<TDerived>::SetMapOutput(size_t tableIndex, const TRichYPath& path)
+{
+ NDetail::Assign(MapOutputs_, tableIndex, path);
+}
+
+template <typename TDerived>
+const TVector<TRichYPath>& TRawMapReduceOperationIoSpec<TDerived>::GetMapOutputs() const
+{
+ return MapOutputs_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+::TIntrusivePtr<INodeReaderImpl> CreateJobNodeReader(TRawTableReaderPtr rawTableReader);
+::TIntrusivePtr<IYaMRReaderImpl> CreateJobYaMRReader(TRawTableReaderPtr rawTableReader);
+::TIntrusivePtr<IProtoReaderImpl> CreateJobProtoReader(TRawTableReaderPtr rawTableReader);
+
+::TIntrusivePtr<INodeWriterImpl> CreateJobNodeWriter(THolder<IProxyOutput> rawTableWriter);
+::TIntrusivePtr<IYaMRWriterImpl> CreateJobYaMRWriter(THolder<IProxyOutput> rawTableWriter);
+::TIntrusivePtr<IProtoWriterImpl> CreateJobProtoWriter(THolder<IProxyOutput> rawTableWriter);
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader);
+
+template <>
+inline ::TIntrusivePtr<INodeReaderImpl> CreateJobReaderImpl<TNode>(TRawTableReaderPtr rawTableReader)
+{
+ return CreateJobNodeReader(rawTableReader);
+}
+
+template <>
+inline ::TIntrusivePtr<IYaMRReaderImpl> CreateJobReaderImpl<TYaMRRow>(TRawTableReaderPtr rawTableReader)
+{
+ return CreateJobYaMRReader(rawTableReader);
+}
+
+template <>
+inline ::TIntrusivePtr<IProtoReaderImpl> CreateJobReaderImpl<Message>(TRawTableReaderPtr rawTableReader)
+{
+ return CreateJobProtoReader(rawTableReader);
+}
+
+template <class T>
+inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader)
+{
+ if constexpr (TIsBaseOf<Message, T>::Value || NDetail::TIsProtoOneOf<T>::value) {
+ return CreateJobProtoReader(rawTableReader);
+ } else {
+ static_assert(TDependentFalse<T>, "Unknown row type");
+ }
+}
+
+template <class T>
+inline TTableReaderPtr<T> CreateJobReader(TRawTableReaderPtr rawTableReader)
+{
+ return new TTableReader<T>(CreateJobReaderImpl<T>(rawTableReader));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter);
+
+template <>
+inline TTableWriterPtr<TNode> CreateJobWriter<TNode>(THolder<IProxyOutput> rawJobWriter)
+{
+ return new TTableWriter<TNode>(CreateJobNodeWriter(std::move(rawJobWriter)));
+}
+
+template <>
+inline TTableWriterPtr<TYaMRRow> CreateJobWriter<TYaMRRow>(THolder<IProxyOutput> rawJobWriter)
+{
+ return new TTableWriter<TYaMRRow>(CreateJobYaMRWriter(std::move(rawJobWriter)));
+}
+
+template <>
+inline TTableWriterPtr<Message> CreateJobWriter<Message>(THolder<IProxyOutput> rawJobWriter)
+{
+ return new TTableWriter<Message>(CreateJobProtoWriter(std::move(rawJobWriter)));
+}
+
+template <class T, class = void>
+struct TProtoWriterCreator;
+
+template <class T>
+struct TProtoWriterCreator<T, std::enable_if_t<TIsBaseOf<Message, T>::Value>>
+{
+ static TTableWriterPtr<T> Create(::TIntrusivePtr<IProtoWriterImpl> writer)
+ {
+ return new TTableWriter<T>(writer);
+ }
+};
+
+template <class T>
+inline TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter)
+{
+ if constexpr (TIsBaseOf<Message, T>::Value) {
+ return TProtoWriterCreator<T>::Create(CreateJobProtoWriter(std::move(rawJobWriter)));
+ } else {
+ static_assert(TDependentFalse<T>, "Unknown row type");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+void TOperationInputSpecBase::AddInput(const TRichYPath& path)
+{
+ Inputs_.push_back(path);
+ StructuredInputs_.emplace_back(Structured<T>(path));
+}
+
+template <class T>
+void TOperationInputSpecBase::SetInput(size_t tableIndex, const TRichYPath& path)
+{
+ NDetail::Assign(Inputs_, tableIndex, path);
+ NDetail::Assign(StructuredInputs_, tableIndex, Structured<T>(path));
+}
+
+
+template <class T>
+void TOperationOutputSpecBase::AddOutput(const TRichYPath& path)
+{
+ Outputs_.push_back(path);
+ StructuredOutputs_.emplace_back(Structured<T>(path));
+}
+
+template <class T>
+void TOperationOutputSpecBase::SetOutput(size_t tableIndex, const TRichYPath& path)
+{
+ NDetail::Assign(Outputs_, tableIndex, path);
+ NDetail::Assign(StructuredOutputs_, tableIndex, Structured<T>(path));
+}
+
+template <class TDerived>
+template <class T>
+TDerived& TOperationIOSpec<TDerived>::AddInput(const TRichYPath& path)
+{
+ static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
+ TOperationInputSpecBase::AddInput<T>(path);
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+template <class T>
+TDerived& TOperationIOSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path)
+{
+ static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
+ TOperationInputSpecBase::SetInput<T>(tableIndex, path);
+ return *static_cast<TDerived*>(this);
+}
+
+
+template <class TDerived>
+template <class T>
+TDerived& TOperationIOSpec<TDerived>::AddOutput(const TRichYPath& path)
+{
+ static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
+ TOperationOutputSpecBase::AddOutput<T>(path);
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+template <class T>
+TDerived& TOperationIOSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path)
+{
+ static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
+ TOperationOutputSpecBase::SetOutput<T>(tableIndex, path);
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+TDerived& TOperationIOSpec<TDerived>::AddStructuredInput(TStructuredTablePath path)
+{
+ TOperationInputSpecBase::AddStructuredInput(std::move(path));
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+TDerived& TOperationIOSpec<TDerived>::AddStructuredOutput(TStructuredTablePath path)
+{
+ TOperationOutputSpecBase::AddStructuredOutput(std::move(path));
+ return *static_cast<TDerived*>(this);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+TVanillaTask& TVanillaTask::AddOutput(const TRichYPath& path)
+{
+ static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
+ TOperationOutputSpecBase::AddOutput<T>(path);
+ return *this;
+}
+
+template <class T>
+TVanillaTask& TVanillaTask::SetOutput(size_t tableIndex, const TRichYPath& path)
+{
+ static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
+ TOperationOutputSpecBase::SetOutput<T>(tableIndex, path);
+ return *this;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+void ResetUseClientProtobuf(const char* methodName);
+
+} // namespace NDetail
+
+template <class TDerived>
+TDerived& TOperationIOSpec<TDerived>::AddProtobufInput_VerySlow_Deprecated(const TRichYPath& path)
+{
+ NDetail::ResetUseClientProtobuf("AddProtobufInput_VerySlow_Deprecated");
+ Inputs_.push_back(path);
+ StructuredInputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr}));
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+TDerived& TOperationIOSpec<TDerived>::AddProtobufOutput_VerySlow_Deprecated(const TRichYPath& path)
+{
+ NDetail::ResetUseClientProtobuf("AddProtobufOutput_VerySlow_Deprecated");
+ Outputs_.push_back(path);
+ StructuredOutputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr}));
+ return *static_cast<TDerived*>(this);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TRow>
+TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::Description()
+{
+ for (auto i : Indices_) {
+ Preparer_.InputDescription<TRow>(i);
+ }
+ return *this;
+}
+
+template <typename TRow>
+TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Description(bool inferSchema)
+{
+ for (auto i : Indices_) {
+ Preparer_.OutputDescription<TRow>(i, inferSchema);
+ }
+ return *this;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TCont>
+TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(const TCont& indices)
+{
+ for (auto i : indices) {
+ ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()"));
+ }
+ return TInputGroup(*this, TVector<int>(std::begin(indices), std::end(indices)));
+}
+
+template <typename TCont>
+TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(const TCont& indices)
+{
+ for (auto i : indices) {
+ ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()"));
+ }
+ return TOutputGroup(*this, indices);
+}
+
+
+template <typename TRow>
+TJobOperationPreparer& TJobOperationPreparer::InputDescription(int tableIndex)
+{
+ ValidateMissingInputDescription(tableIndex);
+ InputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>();
+ return *this;
+}
+
+template <typename TRow>
+TJobOperationPreparer& TJobOperationPreparer::OutputDescription(int tableIndex, bool inferSchema)
+{
+ ValidateMissingOutputDescription(tableIndex);
+ OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>();
+ if (inferSchema && !OutputSchemas_[tableIndex]) {
+ OutputSchemas_[tableIndex] = CreateTableSchema<TRow>();
+ }
+ return *this;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TDerived>
+template <class TRow>
+TDerived& TIntermediateTablesHintSpec<TDerived>::HintMapOutput()
+{
+ IntermediateMapOutputDescription_ = StructuredTableDescription<TRow>();
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+template <class TRow>
+TDerived& TIntermediateTablesHintSpec<TDerived>::AddMapOutput(const TRichYPath& path)
+{
+ MapOutputs_.push_back(path);
+ StructuredMapOutputs_.emplace_back(Structured<TRow>(path));
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+template <class TRow>
+TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerInput()
+{
+ IntermediateReduceCombinerInputDescription_ = StructuredTableDescription<TRow>();
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+template <class TRow>
+TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerOutput()
+{
+ IntermediateReduceCombinerOutputDescription_ = StructuredTableDescription<TRow>();
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+template <class TRow>
+TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceInput()
+{
+ IntermediateReducerInputDescription_ = StructuredTableDescription<TRow>();
+ return *static_cast<TDerived*>(this);
+}
+
+template <class TDerived>
+const TVector<TStructuredTablePath>& TIntermediateTablesHintSpec<TDerived>::GetStructuredMapOutputs() const
+{
+ return StructuredMapOutputs_;
+}
+
+template <class TDerived>
+const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateMapOutputDescription() const
+{
+ return IntermediateMapOutputDescription_;
+}
+
+template <class TDerived>
+const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerInputDescription() const
+{
+ return IntermediateReduceCombinerInputDescription_;
+}
+
+template <class TDerived>
+const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerOutputDescription() const
+{
+ return IntermediateReduceCombinerOutputDescription_;
+}
+
+template <class TDerived>
+const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReducerInputDescription() const
+{
+ return IntermediateReducerInputDescription_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReducerContext
+{
+ bool Break = false;
+ static TReducerContext* Get() { return Singleton<TReducerContext>(); }
+};
+
+template <class TR, class TW>
+inline void IReducer<TR, TW>::Break()
+{
+ TReducerContext::Get()->Break = true;
+}
+
+template <typename TReader, typename TWriter>
+void FeedJobInput(
+ IMapper<TReader, TWriter>* mapper,
+ typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
+ TWriter* writer)
+{
+ using TInputRow = typename TReader::TRowType;
+
+ auto reader = MakeIntrusive<TTableReader<TInputRow>>(readerImpl);
+ mapper->Do(reader.Get(), writer);
+}
+
+template <typename TReader, typename TWriter>
+void FeedJobInput(
+ IReducer<TReader, TWriter>* reducer,
+ typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
+ TWriter* writer)
+{
+ using TInputRow = typename TReader::TRowType;
+
+ auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl);
+ for (; rangesReader->IsValid(); rangesReader->Next()) {
+ reducer->Do(&rangesReader->GetRange(), writer);
+ if (TReducerContext::Get()->Break) {
+ break;
+ }
+ }
+}
+
+template <typename TReader, typename TWriter>
+void FeedJobInput(
+ IAggregatorReducer<TReader, TWriter>* reducer,
+ typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
+ TWriter* writer)
+{
+ using TInputRow = typename TReader::TRowType;
+
+ auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl);
+ reducer->Do(rangesReader.Get(), writer);
+}
+
+template <class TRawJob>
+int RunRawJob(size_t outputTableCount, IInputStream& jobStateStream)
+{
+ TRawJobContext context(outputTableCount);
+
+ TRawJob job;
+ job.Load(jobStateStream);
+ job.Do(context);
+ return 0;
+}
+
+template <>
+inline int RunRawJob<TCommandRawJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */)
+{
+ Y_FAIL();
+}
+
+template <class TVanillaJob>
+int RunVanillaJob(size_t outputTableCount, IInputStream& jobStateStream)
+{
+ TVanillaJob job;
+ job.Load(jobStateStream);
+
+ if constexpr (std::is_base_of<IVanillaJob<>, TVanillaJob>::value) {
+ Y_VERIFY(outputTableCount == 0, "Void vanilla job expects zero 'outputTableCount'");
+ job.Do();
+ } else {
+ Y_VERIFY(outputTableCount, "Vanilla job with table writer expects nonzero 'outputTableCount'");
+ using TOutputRow = typename TVanillaJob::TWriter::TRowType;
+
+ THolder<IProxyOutput> rawJobWriter;
+ if (auto customWriter = job.CreateCustomRawJobWriter(outputTableCount)) {
+ rawJobWriter = std::move(customWriter);
+ } else {
+ rawJobWriter = CreateRawJobWriter(outputTableCount);
+ }
+ auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter));
+
+ job.Start(writer.Get());
+ job.Do(writer.Get());
+ job.Finish(writer.Get());
+
+ writer->Finish();
+ }
+ return 0;
+}
+
+template <>
+inline int RunVanillaJob<TCommandVanillaJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */)
+{
+ Y_FAIL();
+}
+
+template <class TJob>
+ requires TIsBaseOf<IStructuredJob, TJob>::Value
+int RunJob(size_t outputTableCount, IInputStream& jobStateStream)
+{
+ using TInputRow = typename TJob::TReader::TRowType;
+ using TOutputRow = typename TJob::TWriter::TRowType;
+
+ auto job = MakeIntrusive<TJob>();
+ job->Load(jobStateStream);
+
+ TRawTableReaderPtr rawJobReader;
+ if (auto customReader = job->CreateCustomRawJobReader(/*fd*/ 0)) {
+ rawJobReader = customReader;
+ } else {
+ rawJobReader = CreateRawJobReader(/*fd*/ 0);
+ }
+ auto readerImpl = CreateJobReaderImpl<TInputRow>(rawJobReader);
+
+ // Many users don't expect to have jobs with empty input so we skip such jobs.
+ if (!readerImpl->IsValid()) {
+ return 0;
+ }
+
+ THolder<IProxyOutput> rawJobWriter;
+ if (auto customWriter = job->CreateCustomRawJobWriter(outputTableCount)) {
+ rawJobWriter = std::move(customWriter);
+ } else {
+ rawJobWriter = CreateRawJobWriter(outputTableCount);
+ }
+ auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter));
+
+ job->Start(writer.Get());
+ FeedJobInput(job.Get(), readerImpl.Get(), writer.Get());
+ job->Finish(writer.Get());
+
+ writer->Finish();
+
+ return 0;
+}
+
+//
+// We leave RunMapJob/RunReduceJob/RunAggregatorReducer for backward compatibility,
+// some user use them already. :(
+
+template <class TMapper>
+int RunMapJob(size_t outputTableCount, IInputStream& jobStateStream)
+{
+ return RunJob<TMapper>(outputTableCount, jobStateStream);
+}
+
+template <class TReducer>
+int RunReduceJob(size_t outputTableCount, IInputStream& jobStateStream)
+{
+ return RunJob<TReducer>(outputTableCount, jobStateStream);
+}
+
+template <class TReducer>
+int RunAggregatorReducer(size_t outputTableCount, IInputStream& jobStateStream)
+{
+ return RunJob<TReducer>(outputTableCount, jobStateStream);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename T, typename = void>
+struct TIsConstructibleFromNode
+ : std::false_type
+{ };
+
+template <typename T>
+struct TIsConstructibleFromNode<T, std::void_t<decltype(T::FromNode(std::declval<TNode&>()))>>
+ : std::true_type
+{ };
+
+template <class TJob>
+::TIntrusivePtr<NYT::IStructuredJob> ConstructJobFromNode(const TNode& node)
+{
+ if constexpr (TIsConstructibleFromNode<TJob>::value) {
+ Y_ENSURE(node.GetType() != TNode::Undefined,
+ "job has FromNode method but constructor arguments were not provided");
+ return TJob::FromNode(node);
+ } else {
+ Y_ENSURE(node.GetType() == TNode::Undefined,
+ "constructor arguments provided but job does not contain FromNode method");
+ return MakeIntrusive<TJob>();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+using TJobFunction = int (*)(size_t, IInputStream&);
+using TConstructJobFunction = ::TIntrusivePtr<NYT::IStructuredJob> (*)(const TNode&);
+
+class TJobFactory
+{
+public:
+ static TJobFactory* Get()
+ {
+ return Singleton<TJobFactory>();
+ }
+
+ template <class TJob>
+ void RegisterJob(const char* name)
+ {
+ RegisterJobImpl<TJob>(name, RunJob<TJob>);
+ JobConstructors[name] = ConstructJobFromNode<TJob>;
+ }
+
+ template <class TRawJob>
+ void RegisterRawJob(const char* name)
+ {
+ RegisterJobImpl<TRawJob>(name, RunRawJob<TRawJob>);
+ }
+
+ template <class TVanillaJob>
+ void RegisterVanillaJob(const char* name)
+ {
+ RegisterJobImpl<TVanillaJob>(name, RunVanillaJob<TVanillaJob>);
+ }
+
+ TString GetJobName(const IJob* job)
+ {
+ const auto typeIndex = std::type_index(typeid(*job));
+ CheckJobRegistered(typeIndex);
+ return JobNames[typeIndex];
+ }
+
+ TJobFunction GetJobFunction(const char* name)
+ {
+ CheckNameRegistered(name);
+ return JobFunctions[name];
+ }
+
+ TConstructJobFunction GetConstructingFunction(const char* name)
+ {
+ CheckNameRegistered(name);
+ return JobConstructors[name];
+ }
+
+private:
+ TMap<std::type_index, TString> JobNames;
+ THashMap<TString, TJobFunction> JobFunctions;
+ THashMap<TString, TConstructJobFunction> JobConstructors;
+
+ template <typename TJob, typename TRunner>
+ void RegisterJobImpl(const char* name, TRunner runner) {
+ const auto typeIndex = std::type_index(typeid(TJob));
+ CheckNotRegistered(typeIndex, name);
+ JobNames[typeIndex] = name;
+ JobFunctions[name] = runner;
+ }
+
+ void CheckNotRegistered(const std::type_index& typeIndex, const char* name)
+ {
+ Y_ENSURE(!JobNames.contains(typeIndex),
+ "type_info '" << typeIndex.name() << "'"
+ "is already registered under name '" << JobNames[typeIndex] << "'");
+ Y_ENSURE(!JobFunctions.contains(name),
+ "job with name '" << name << "' is already registered");
+ }
+
+ void CheckJobRegistered(const std::type_index& typeIndex)
+ {
+ Y_ENSURE(JobNames.contains(typeIndex),
+ "type_info '" << typeIndex.name() << "' is not registered, use REGISTER_* macros");
+ }
+
+ void CheckNameRegistered(const char* name)
+ {
+ Y_ENSURE(JobFunctions.contains(name),
+ "job with name '" << name << "' is not registered, use REGISTER_* macros");
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TMapper>
+struct TMapperRegistrator
+{
+ TMapperRegistrator(const char* name)
+ {
+ static_assert(TMapper::JobType == IJob::EType::Mapper,
+ "REGISTER_MAPPER is not compatible with this job class");
+
+ NYT::TJobFactory::Get()->RegisterJob<TMapper>(name);
+ }
+};
+
+template <class TReducer>
+struct TReducerRegistrator
+{
+ TReducerRegistrator(const char* name)
+ {
+ static_assert(TReducer::JobType == IJob::EType::Reducer ||
+ TReducer::JobType == IJob::EType::ReducerAggregator,
+ "REGISTER_REDUCER is not compatible with this job class");
+
+ NYT::TJobFactory::Get()->RegisterJob<TReducer>(name);
+ }
+};
+
+template <class TRawJob>
+struct TRawJobRegistrator
+{
+ TRawJobRegistrator(const char* name)
+ {
+ static_assert(TRawJob::JobType == IJob::EType::RawJob,
+ "REGISTER_RAW_JOB is not compatible with this job class");
+ NYT::TJobFactory::Get()->RegisterRawJob<TRawJob>(name);
+ }
+};
+
+template <class TVanillaJob>
+struct TVanillaJobRegistrator
+{
+ TVanillaJobRegistrator(const char* name)
+ {
+ static_assert(TVanillaJob::JobType == IJob::EType::VanillaJob,
+ "REGISTER_VANILLA_JOB is not compatible with this job class");
+ NYT::TJobFactory::Get()->RegisterVanillaJob<TVanillaJob>(name);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+inline TString YtRegistryTypeName(const TString& name) {
+ TString res = name;
+#ifdef _win_
+ SubstGlobal(res, "class ", "");
+#endif
+ return res;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+#define REGISTER_MAPPER(...) \
+static const NYT::TMapperRegistrator<__VA_ARGS__> \
+Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data());
+
+#define REGISTER_NAMED_MAPPER(name, ...) \
+static const NYT::TMapperRegistrator<__VA_ARGS__> \
+Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
+
+#define REGISTER_REDUCER(...) \
+static const NYT::TReducerRegistrator<__VA_ARGS__> \
+Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data());
+
+#define REGISTER_NAMED_REDUCER(name, ...) \
+static const NYT::TReducerRegistrator<__VA_ARGS__> \
+Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
+
+#define REGISTER_NAMED_RAW_JOB(name, ...) \
+static const NYT::TRawJobRegistrator<__VA_ARGS__> \
+Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
+
+#define REGISTER_RAW_JOB(...) \
+REGISTER_NAMED_RAW_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__)
+
+#define REGISTER_NAMED_VANILLA_JOB(name, ...) \
+static NYT::TVanillaJobRegistrator<__VA_ARGS__> \
+Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
+
+#define REGISTER_VANILLA_JOB(...) \
+REGISTER_NAMED_VANILLA_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__)
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TReader, typename TWriter>
+TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetInputRowStreamDescription() const
+{
+ return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
+}
+
+template <typename TReader, typename TWriter>
+TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetOutputRowStreamDescription() const
+{
+ return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TReader, typename TWriter>
+TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetInputRowStreamDescription() const
+{
+ return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
+}
+
+template <typename TReader, typename TWriter>
+TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetOutputRowStreamDescription() const
+{
+ return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TReader, typename TWriter>
+TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetInputRowStreamDescription() const
+{
+ return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
+}
+
+template <typename TReader, typename TWriter>
+TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetOutputRowStreamDescription() const
+{
+ return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename TWriter>
+TStructuredRowStreamDescription IVanillaJob<TWriter>::GetInputRowStreamDescription() const
+{
+ return TVoidStructuredRowStream();
+}
+
+template <typename TWriter>
+TStructuredRowStreamDescription IVanillaJob<TWriter>::GetOutputRowStreamDescription() const
+{
+ return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT