aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation_preparer.h
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/operation_preparer.h
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/operation_preparer.h')
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.h129
1 files changed, 129 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h
new file mode 100644
index 0000000000..7ced54e3b5
--- /dev/null
+++ b/yt/cpp/mapreduce/client/operation_preparer.h
@@ -0,0 +1,129 @@
+#pragma once
+
+#include "client.h"
+#include "structured_table_formats.h"
+
+#include <yt/cpp/mapreduce/interface/operation.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TOperation;
+
+class TOperationPreparer
+ : public TThrRefBase
+{
+public:
+ TOperationPreparer(TClientPtr client, TTransactionId transactionId);
+
+ const TClientContext& GetContext() const;
+ TTransactionId GetTransactionId() const;
+ ITransactionPingerPtr GetTransactionPinger() const;
+ TClientPtr GetClient() const;
+
+ const TString& GetPreparationId() const;
+
+ void LockFiles(TVector<TRichYPath>* paths);
+
+ TOperationId StartOperation(
+ TOperation* operation,
+ const TString& operationType,
+ const TNode& spec,
+ bool useStartOperationRequest = false);
+
+ const IClientRetryPolicyPtr& GetClientRetryPolicy() const;
+
+private:
+ TClientPtr Client_;
+ TTransactionId TransactionId_;
+ THolder<TPingableTransaction> FileTransaction_;
+ IClientRetryPolicyPtr ClientRetryPolicy_;
+ const TString PreparationId_;
+
+private:
+ void CheckValidity() const;
+};
+
+using TOperationPreparerPtr = ::TIntrusivePtr<TOperationPreparer>;
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IItemToUpload
+{
+ virtual ~IItemToUpload() = default;
+
+ virtual TString CalculateMD5() const = 0;
+ virtual THolder<IInputStream> CreateInputStream() const = 0;
+ virtual TString GetDescription() const = 0;
+ virtual ui64 GetDataSize() const = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TJobPreparer
+ : private TNonCopyable
+{
+public:
+ TJobPreparer(
+ TOperationPreparer& operationPreparer,
+ const TUserJobSpec& spec,
+ const IJob& job,
+ size_t outputTableCount,
+ const TVector<TSmallJobFile>& smallFileList,
+ const TOperationOptions& options);
+
+ TVector<TRichYPath> GetFiles() const;
+ const TString& GetClassName() const;
+ const TString& GetCommand() const;
+ const TUserJobSpec& GetSpec() const;
+ bool ShouldMountSandbox() const;
+ ui64 GetTotalFileSize() const;
+
+private:
+ TOperationPreparer& OperationPreparer_;
+ TUserJobSpec Spec_;
+ TOperationOptions Options_;
+
+ TVector<TRichYPath> CypressFiles_;
+ TVector<TRichYPath> CachedFiles_;
+
+ TString ClassName_;
+ TString Command_;
+ ui64 TotalFileSize_ = 0;
+
+private:
+ TString GetFileStorage() const;
+ TYPath GetCachePath() const;
+
+ bool IsLocalMode() const;
+ int GetFileCacheReplicationFactor() const;
+
+ void CreateStorage() const;
+
+ void CreateFileInCypress(const TString& path) const;
+ TString PutFileToCypressCache(const TString& path, const TString& md5Signature, TTransactionId transactionId) const;
+ TMaybe<TString> GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const;
+
+ TDuration GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const;
+ TString UploadToRandomPath(const IItemToUpload& itemToUpload) const;
+ TString UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const;
+ TMaybe<TString> TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const;
+ TString UploadToCache(const IItemToUpload& itemToUpload) const;
+
+ void UseFileInCypress(const TRichYPath& file);
+
+ void UploadLocalFile(
+ const TLocalFilePath& localPath,
+ const TAddLocalFileOptions& options,
+ bool isApiFile = false);
+
+ void UploadBinary(const TJobBinaryConfig& jobBinary);
+ void UploadSmallFile(const TSmallJobFile& smallFile);
+
+ void PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail