diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/client.h | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/client.h')
-rw-r--r-- | yt/cpp/mapreduce/client/client.h | 506 |
1 files changed, 506 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h new file mode 100644 index 0000000000..0f4df09d0b --- /dev/null +++ b/yt/cpp/mapreduce/client/client.h @@ -0,0 +1,506 @@ +#pragma once + +#include "client_reader.h" +#include "client_writer.h" +#include "transaction_pinger.h" + +#include <yt/cpp/mapreduce/interface/client.h> + +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/requests.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TYtPoller; + +class TClientBase; +using TClientBasePtr = ::TIntrusivePtr<TClientBase>; + +class TClient; +using TClientPtr = ::TIntrusivePtr<TClient>; + +//////////////////////////////////////////////////////////////////////////////// + +class TClientBase + : virtual public IClientBase +{ +public: + TClientBase( + const TClientContext& context, + const TTransactionId& transactionId, + IClientRetryPolicyPtr retryPolicy); + + ITransactionPtr StartTransaction( + const TStartTransactionOptions& options) override; + + // cypress + + TNodeId Create( + const TYPath& path, + ENodeType type, + const TCreateOptions& options) override; + + void Remove( + const TYPath& path, + const TRemoveOptions& options) override; + + bool Exists( + const TYPath& path, + const TExistsOptions& options) override; + + TNode Get( + const TYPath& path, + const TGetOptions& options) override; + + void Set( + const TYPath& path, + const TNode& value, + const TSetOptions& options) override; + + void MultisetAttributes( + const TYPath& path, + const TNode::TMapType& value, + const TMultisetAttributesOptions& options) override; + + TNode::TListType List( + const TYPath& path, + const TListOptions& options) override; + + TNodeId Copy( + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options) override; + + TNodeId Move( + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options) override; + + TNodeId Link( + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options) override; + + void Concatenate( + const TVector<TRichYPath>& sourcePaths, + const TRichYPath& destinationPath, + const TConcatenateOptions& options) override; + + TRichYPath CanonizeYPath(const TRichYPath& path) override; + + TVector<TTableColumnarStatistics> GetTableColumnarStatistics( + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options) override; + + TMultiTablePartitions GetTablePartitions( + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options) override; + + TMaybe<TYPath> GetFileFromCache( + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options = TGetFileFromCacheOptions()) override; + + TYPath PutFileToCache( + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options = TPutFileToCacheOptions()) override; + + IFileReaderPtr CreateFileReader( + const TRichYPath& path, + const TFileReaderOptions& options) override; + + IFileWriterPtr CreateFileWriter( + const TRichYPath& path, + const TFileWriterOptions& options) override; + + TTableWriterPtr<::google::protobuf::Message> CreateTableWriter( + const TRichYPath& path, + const ::google::protobuf::Descriptor& descriptor, + const TTableWriterOptions& options) override; + + TRawTableReaderPtr CreateRawReader( + const TRichYPath& path, + const TFormat& format, + const TTableReaderOptions& options) override; + + TRawTableWriterPtr CreateRawWriter( + const TRichYPath& path, + const TFormat& format, + const TTableWriterOptions& options) override; + + IFileReaderPtr CreateBlobTableReader( + const TYPath& path, + const TKey& key, + const TBlobTableReaderOptions& options) override; + + // operations + + IOperationPtr DoMap( + const TMapOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> mapper, + const TOperationOptions& options) override; + + IOperationPtr RawMap( + const TRawMapOperationSpec& spec, + ::TIntrusivePtr<IRawJob> mapper, + const TOperationOptions& options) override; + + IOperationPtr DoReduce( + const TReduceOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> reducer, + const TOperationOptions& options) override; + + IOperationPtr RawReduce( + const TRawReduceOperationSpec& spec, + ::TIntrusivePtr<IRawJob> mapper, + const TOperationOptions& options) override; + + IOperationPtr DoJoinReduce( + const TJoinReduceOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> reducer, + const TOperationOptions& options) override; + + IOperationPtr RawJoinReduce( + const TRawJoinReduceOperationSpec& spec, + ::TIntrusivePtr<IRawJob> mapper, + const TOperationOptions& options) override; + + IOperationPtr DoMapReduce( + const TMapReduceOperationSpec& spec, + ::TIntrusivePtr<IStructuredJob> mapper, + ::TIntrusivePtr<IStructuredJob> reduceCombiner, + ::TIntrusivePtr<IStructuredJob> reducer, + const TOperationOptions& options) override; + + IOperationPtr RawMapReduce( + const TRawMapReduceOperationSpec& spec, + ::TIntrusivePtr<IRawJob> mapper, + ::TIntrusivePtr<IRawJob> reduceCombiner, + ::TIntrusivePtr<IRawJob> reducer, + const TOperationOptions& options) override; + + IOperationPtr Sort( + const TSortOperationSpec& spec, + const TOperationOptions& options) override; + + IOperationPtr Merge( + const TMergeOperationSpec& spec, + const TOperationOptions& options) override; + + IOperationPtr Erase( + const TEraseOperationSpec& spec, + const TOperationOptions& options) override; + + IOperationPtr RemoteCopy( + const TRemoteCopyOperationSpec& spec, + const TOperationOptions& options = TOperationOptions()) override; + + IOperationPtr RunVanilla( + const TVanillaOperationSpec& spec, + const TOperationOptions& options = TOperationOptions()) override; + + IOperationPtr AttachOperation(const TOperationId& operationId) override; + + EOperationBriefState CheckOperation(const TOperationId& operationId) override; + + void AbortOperation(const TOperationId& operationId) override; + + void CompleteOperation(const TOperationId& operationId) override; + + void WaitForOperation(const TOperationId& operationId) override; + + void AlterTable( + const TYPath& path, + const TAlterTableOptions& options) override; + + TBatchRequestPtr CreateBatchRequest() override; + + IClientPtr GetParentClient() override; + + const TClientContext& GetContext() const; + + const IClientRetryPolicyPtr& GetRetryPolicy() const; + + virtual ITransactionPingerPtr GetTransactionPinger() = 0; + +protected: + virtual TClientPtr GetParentClientImpl() = 0; + +protected: + const TClientContext Context_; + TTransactionId TransactionId_; + IClientRetryPolicyPtr ClientRetryPolicy_; + +private: + ::TIntrusivePtr<TClientReader> CreateClientReader( + const TRichYPath& path, + const TFormat& format, + const TTableReaderOptions& options, + bool useFormatFromTableAttributes = false); + + THolder<TClientWriter> CreateClientWriter( + const TRichYPath& path, + const TFormat& format, + const TTableWriterOptions& options); + + ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader( + const TRichYPath& path, const TTableReaderOptions& options) override; + + ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader( + const TRichYPath& path, const TTableReaderOptions& options) override; + + ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader( + const TRichYPath& path, + const TTableReaderOptions& options, + const Message* prototype) override; + + ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader( + const TRichYPath& path, + const TTableReaderOptions& options, + const ISkiffRowSkipperPtr& skipper, + const NSkiff::TSkiffSchemaPtr& schema) override; + + ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter( + const TRichYPath& path, const TTableWriterOptions& options) override; + + ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter( + const TRichYPath& path, const TTableWriterOptions& options) override; + + ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter( + const TRichYPath& path, + const TTableWriterOptions& options, + const Message* prototype) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TTransaction + : public ITransaction + , public TClientBase +{ +public: + // + // Start a new transaction. + TTransaction( + TClientPtr parentClient, + const TClientContext& context, + const TTransactionId& parentTransactionId, + const TStartTransactionOptions& options); + + // + // Attach an existing transaction. + TTransaction( + TClientPtr parentClient, + const TClientContext& context, + const TTransactionId& transactionId, + const TAttachTransactionOptions& options); + + const TTransactionId& GetId() const override; + + ILockPtr Lock( + const TYPath& path, + ELockMode mode, + const TLockOptions& options) override; + + void Unlock( + const TYPath& path, + const TUnlockOptions& options) override; + + void Commit() override; + + void Abort() override; + + void Ping() override; + + void Detach() override; + + ITransactionPingerPtr GetTransactionPinger() override; + +protected: + TClientPtr GetParentClientImpl() override; + +private: + ITransactionPingerPtr TransactionPinger_; + THolder<TPingableTransaction> PingableTx_; + TClientPtr ParentClient_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TClient + : public IClient + , public TClientBase +{ +public: + TClient( + const TClientContext& context, + const TTransactionId& globalId, + IClientRetryPolicyPtr retryPolicy); + + ~TClient(); + + ITransactionPtr AttachTransaction( + const TTransactionId& transactionId, + const TAttachTransactionOptions& options) override; + + void MountTable( + const TYPath& path, + const TMountTableOptions& options) override; + + void UnmountTable( + const TYPath& path, + const TUnmountTableOptions& options) override; + + void RemountTable( + const TYPath& path, + const TRemountTableOptions& options) override; + + void FreezeTable( + const TYPath& path, + const TFreezeTableOptions& options) override; + + void UnfreezeTable( + const TYPath& path, + const TUnfreezeTableOptions& options) override; + + void ReshardTable( + const TYPath& path, + const TVector<TKey>& keys, + const TReshardTableOptions& options) override; + + void ReshardTable( + const TYPath& path, + i64 tabletCount, + const TReshardTableOptions& options) override; + + void InsertRows( + const TYPath& path, + const TNode::TListType& rows, + const TInsertRowsOptions& options) override; + + void DeleteRows( + const TYPath& path, + const TNode::TListType& keys, + const TDeleteRowsOptions& options) override; + + void TrimRows( + const TYPath& path, + i64 tabletIndex, + i64 rowCount, + const TTrimRowsOptions& options) override; + + TNode::TListType LookupRows( + const TYPath& path, + const TNode::TListType& keys, + const TLookupRowsOptions& options) override; + + TNode::TListType SelectRows( + const TString& query, + const TSelectRowsOptions& options) override; + + void AlterTableReplica( + const TReplicaId& replicaId, + const TAlterTableReplicaOptions& alterTableReplicaOptions) override; + + ui64 GenerateTimestamp() override; + + TAuthorizationInfo WhoAmI() override; + + TOperationAttributes GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options) override; + + TListOperationsResult ListOperations( + const TListOperationsOptions& options) override; + + void UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options) override; + + TJobAttributes GetJob( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& options) override; + + TListJobsResult ListJobs( + const TOperationId& operationId, + const TListJobsOptions& options = TListJobsOptions()) override; + + IFileReaderPtr GetJobInput( + const TJobId& jobId, + const TGetJobInputOptions& options = TGetJobInputOptions()) override; + + IFileReaderPtr GetJobFailContext( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobFailContextOptions& options = TGetJobFailContextOptions()) override; + + IFileReaderPtr GetJobStderr( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& options = TGetJobStderrOptions()) override; + + TNode::TListType SkyShareTable( + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options = TSkyShareTableOptions()) override; + + TCheckPermissionResponse CheckPermission( + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options) override; + + TVector<TTabletInfo> GetTabletInfos( + const TYPath& path, + const TVector<int>& tabletIndexes, + const TGetTabletInfosOptions& options) override; + + void SuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options) override; + + void ResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options) override; + + void Shutdown() override; + + ITransactionPingerPtr GetTransactionPinger() override; + + // Helper methods + TYtPoller& GetYtPoller(); + +protected: + TClientPtr GetParentClientImpl() override; + +private: + template <class TOptions> + void SetTabletParams( + THttpHeader& header, + const TYPath& path, + const TOptions& options); + + void CheckShutdown() const; + + ITransactionPingerPtr TransactionPinger_; + + std::atomic<bool> Shutdown_ = false; + TMutex YtPollerLock_; + THolder<TYtPoller> YtPoller_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TClientPtr CreateClientImpl( + const TString& serverName, + const TCreateClientOptions& options = TCreateClientOptions()); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT |