aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation.h
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/operation.h
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.h')
-rw-r--r--yt/cpp/mapreduce/client/operation.h203
1 files changed, 203 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/operation.h b/yt/cpp/mapreduce/client/operation.h
new file mode 100644
index 00000000000..141161b0b72
--- /dev/null
+++ b/yt/cpp/mapreduce/client/operation.h
@@ -0,0 +1,203 @@
+#pragma once
+
+#include "fwd.h"
+#include "structured_table_formats.h"
+#include "operation_preparer.h"
+
+#include <yt/cpp/mapreduce/http/fwd.h>
+
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+#include <yt/cpp/mapreduce/interface/retry_policy.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TOperation
+ : public IOperation
+{
+public:
+ class TOperationImpl;
+
+public:
+ explicit TOperation(TClientPtr client);
+ TOperation(TOperationId id, TClientPtr client);
+ virtual const TOperationId& GetId() const override;
+ virtual TString GetWebInterfaceUrl() const override;
+
+ void OnPrepared();
+ void SetDelayedStartFunction(std::function<TOperationId()> start);
+ virtual void Start() override;
+ void OnPreparationException(std::exception_ptr e);
+ virtual bool IsStarted() const override;
+
+ virtual TString GetStatus() const override;
+ void OnStatusUpdated(const TString& newStatus);
+
+ virtual ::NThreading::TFuture<void> GetPreparedFuture() override;
+ virtual ::NThreading::TFuture<void> GetStartedFuture() override;
+ virtual ::NThreading::TFuture<void> Watch() override;
+
+ virtual TVector<TFailedJobInfo> GetFailedJobInfo(const TGetFailedJobInfoOptions& options = TGetFailedJobInfoOptions()) override;
+ virtual EOperationBriefState GetBriefState() override;
+ virtual TMaybe<TYtError> GetError() override;
+ virtual TJobStatistics GetJobStatistics() override;
+ virtual TMaybe<TOperationBriefProgress> GetBriefProgress() override;
+ virtual void AbortOperation() override;
+ virtual void CompleteOperation() override;
+ virtual void SuspendOperation(const TSuspendOperationOptions& options) override;
+ virtual void ResumeOperation(const TResumeOperationOptions& options) override;
+ virtual TOperationAttributes GetAttributes(const TGetOperationOptions& options) override;
+ virtual void UpdateParameters(const TUpdateOperationParametersOptions& options) override;
+ virtual TJobAttributes GetJob(const TJobId& jobId, const TGetJobOptions& options) override;
+ virtual TListJobsResult ListJobs(const TListJobsOptions& options) override;
+
+private:
+ TClientPtr Client_;
+ ::TIntrusivePtr<TOperationImpl> Impl_;
+};
+
+using TOperationPtr = ::TIntrusivePtr<TOperation>;
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TSimpleOperationIo
+{
+ TVector<TRichYPath> Inputs;
+ TVector<TRichYPath> Outputs;
+
+ TFormat InputFormat;
+ TFormat OutputFormat;
+
+ TVector<TSmallJobFile> JobFiles;
+};
+
+TSimpleOperationIo CreateSimpleOperationIoHelper(
+ const IStructuredJob& structuredJob,
+ const TOperationPreparer& preparer,
+ const TOperationOptions& options,
+ TStructuredJobTableList structuredInputs,
+ TStructuredJobTableList structuredOutputs,
+ TUserJobFormatHints hints,
+ ENodeReaderFormat nodeReaderFormat,
+ const THashSet<TString>& columnsUsedInOperations);
+
+////////////////////////////////////////////////////////////////////////////////
+
+void ExecuteMap(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TMapOperationSpec& spec,
+ const ::TIntrusivePtr<IStructuredJob>& mapper,
+ const TOperationOptions& options);
+
+void ExecuteRawMap(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TRawMapOperationSpec& spec,
+ const ::TIntrusivePtr<IRawJob>& mapper,
+ const TOperationOptions& options);
+
+void ExecuteReduce(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TReduceOperationSpec& spec,
+ const ::TIntrusivePtr<IStructuredJob>& reducer,
+ const TOperationOptions& options);
+
+void ExecuteRawReduce(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TRawReduceOperationSpec& spec,
+ const ::TIntrusivePtr<IRawJob>& reducer,
+ const TOperationOptions& options);
+
+void ExecuteJoinReduce(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TJoinReduceOperationSpec& spec,
+ const ::TIntrusivePtr<IStructuredJob>& reducer,
+ const TOperationOptions& options);
+
+void ExecuteRawJoinReduce(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TRawJoinReduceOperationSpec& spec,
+ const ::TIntrusivePtr<IRawJob>& reducer,
+ const TOperationOptions& options);
+
+void ExecuteMapReduce(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TMapReduceOperationSpec& spec,
+ const ::TIntrusivePtr<IStructuredJob>& mapper,
+ const ::TIntrusivePtr<IStructuredJob>& reduceCombiner,
+ const ::TIntrusivePtr<IStructuredJob>& reducer,
+ const TOperationOptions& options);
+
+void ExecuteRawMapReduce(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TRawMapReduceOperationSpec& spec,
+ const ::TIntrusivePtr<IRawJob>& mapper,
+ const ::TIntrusivePtr<IRawJob>& reduceCombiner,
+ const ::TIntrusivePtr<IRawJob>& reducer,
+ const TOperationOptions& options);
+
+void ExecuteSort(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TSortOperationSpec& spec,
+ const TOperationOptions& options);
+
+void ExecuteMerge(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TMergeOperationSpec& spec,
+ const TOperationOptions& options);
+
+void ExecuteErase(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TEraseOperationSpec& spec,
+ const TOperationOptions& options);
+
+void ExecuteRemoteCopy(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TRemoteCopyOperationSpec& spec,
+ const TOperationOptions& options);
+
+void ExecuteVanilla(
+ const TOperationPtr& operation,
+ const TOperationPreparerPtr& preparer,
+ const TVanillaOperationSpec& spec,
+ const TOperationOptions& options);
+
+EOperationBriefState CheckOperation(
+ const IClientRetryPolicyPtr& clientRetryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId);
+
+void WaitForOperation(
+ const IClientRetryPolicyPtr& clientRetryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId);
+
+////////////////////////////////////////////////////////////////////////////////
+
+::TIntrusivePtr<TOperation> ProcessOperation(
+ NYT::NDetail::TClientPtr client,
+ std::function<void()> prepare,
+ ::TIntrusivePtr<TOperation> operation,
+ const TOperationOptions& options);
+
+void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, const TOperationOptions& options);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail