diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/operation_preparer.h | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/operation_preparer.h')
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.h | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h new file mode 100644 index 0000000000..7ced54e3b5 --- /dev/null +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -0,0 +1,129 @@ +#pragma once + +#include "client.h" +#include "structured_table_formats.h" + +#include <yt/cpp/mapreduce/interface/operation.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TOperation; + +class TOperationPreparer + : public TThrRefBase +{ +public: + TOperationPreparer(TClientPtr client, TTransactionId transactionId); + + const TClientContext& GetContext() const; + TTransactionId GetTransactionId() const; + ITransactionPingerPtr GetTransactionPinger() const; + TClientPtr GetClient() const; + + const TString& GetPreparationId() const; + + void LockFiles(TVector<TRichYPath>* paths); + + TOperationId StartOperation( + TOperation* operation, + const TString& operationType, + const TNode& spec, + bool useStartOperationRequest = false); + + const IClientRetryPolicyPtr& GetClientRetryPolicy() const; + +private: + TClientPtr Client_; + TTransactionId TransactionId_; + THolder<TPingableTransaction> FileTransaction_; + IClientRetryPolicyPtr ClientRetryPolicy_; + const TString PreparationId_; + +private: + void CheckValidity() const; +}; + +using TOperationPreparerPtr = ::TIntrusivePtr<TOperationPreparer>; + +//////////////////////////////////////////////////////////////////////////////// + +struct IItemToUpload +{ + virtual ~IItemToUpload() = default; + + virtual TString CalculateMD5() const = 0; + virtual THolder<IInputStream> CreateInputStream() const = 0; + virtual TString GetDescription() const = 0; + virtual ui64 GetDataSize() const = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TJobPreparer + : private TNonCopyable +{ +public: + TJobPreparer( + TOperationPreparer& operationPreparer, + const TUserJobSpec& spec, + const IJob& job, + size_t outputTableCount, + const TVector<TSmallJobFile>& smallFileList, + const TOperationOptions& options); + + TVector<TRichYPath> GetFiles() const; + const TString& GetClassName() const; + const TString& GetCommand() const; + const TUserJobSpec& GetSpec() const; + bool ShouldMountSandbox() const; + ui64 GetTotalFileSize() const; + +private: + TOperationPreparer& OperationPreparer_; + TUserJobSpec Spec_; + TOperationOptions Options_; + + TVector<TRichYPath> CypressFiles_; + TVector<TRichYPath> CachedFiles_; + + TString ClassName_; + TString Command_; + ui64 TotalFileSize_ = 0; + +private: + TString GetFileStorage() const; + TYPath GetCachePath() const; + + bool IsLocalMode() const; + int GetFileCacheReplicationFactor() const; + + void CreateStorage() const; + + void CreateFileInCypress(const TString& path) const; + TString PutFileToCypressCache(const TString& path, const TString& md5Signature, TTransactionId transactionId) const; + TMaybe<TString> GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const; + + TDuration GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const; + TString UploadToRandomPath(const IItemToUpload& itemToUpload) const; + TString UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const; + TMaybe<TString> TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const; + TString UploadToCache(const IItemToUpload& itemToUpload) const; + + void UseFileInCypress(const TRichYPath& file); + + void UploadLocalFile( + const TLocalFilePath& localPath, + const TAddLocalFileOptions& options, + bool isApiFile = false); + + void UploadBinary(const TJobBinaryConfig& jobBinary); + void UploadSmallFile(const TSmallJobFile& smallFile); + + void PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail |