diff options
author | apollo1321 <apollo1321@yandex-team.com> | 2024-09-06 11:24:31 +0300 |
---|---|---|
committer | apollo1321 <apollo1321@yandex-team.com> | 2024-09-06 11:39:18 +0300 |
commit | cb6a33d90405395afaebf6de999efeb187f8f8e6 (patch) | |
tree | 3cde48046a8dd2d0c78c6f851a6e5c6aedc3aee9 | |
parent | 894993a7fd2ccb1a2368c47d79fd013c846716f6 (diff) | |
download | ydb-cb6a33d90405395afaebf6de999efeb187f8f8e6.tar.gz |
YT-21709: Empty template for shuffle service
YT-21709: Empty template for shuffle service
6dde8f6e6f5eeb4fc7ad91fe623e980439565545
-rw-r--r-- | yt/yt/client/api/client.h | 2 | ||||
-rw-r--r-- | yt/yt/client/api/delegating_client.h | 24 | ||||
-rw-r--r-- | yt/yt/client/api/internal_client.h | 22 | ||||
-rw-r--r-- | yt/yt/client/api/public.h | 2 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/api_service_proxy.h | 8 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 33 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.h | 20 | ||||
-rw-r--r-- | yt/yt/client/api/shuffle_client.cpp | 19 | ||||
-rw-r--r-- | yt/yt/client/api/shuffle_client.h | 62 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/hedging/hedging.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/client.h | 23 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 59 |
14 files changed, 281 insertions, 2 deletions
diff --git a/yt/yt/client/api/client.h b/yt/yt/client/api/client.h index d65619c86e..c4d6ad47cf 100644 --- a/yt/yt/client/api/client.h +++ b/yt/yt/client/api/client.h @@ -15,6 +15,7 @@ #include "queue_client.h" #include "query_tracker_client.h" #include "flow_client.h" +#include "shuffle_client.h" #include <yt/yt/client/bundle_controller_client/bundle_controller_client.h> @@ -73,6 +74,7 @@ struct IClient , public NBundleControllerClient::IBundleControllerClient , public IFlowClient , public IDistributedTableClient + , public IShuffleClient { //! Terminates all channels. //! Aborts all pending uncommitted transactions. diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index f977dc06c0..c14f2d49d6 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -856,6 +856,30 @@ public: const TParticipantTableWriterOptions& options), (cookie, options)) + // Shuffle Service + DELEGATE_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, ( + const TString& account, + int partitionCount, + const TStartShuffleOptions& options), + (account, partitionCount, options)) + + DELEGATE_METHOD(TFuture<void>, FinishShuffle, ( + const TShuffleHandlePtr& shuffleHandle, + const TFinishShuffleOptions& options), + (shuffleHandle, options)) + + DELEGATE_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, ( + const TShuffleHandlePtr& shuffleHandle, + int partitionIndex, + const NTableClient::TTableReaderConfigPtr& config), + (shuffleHandle, partitionIndex, config)) + + DELEGATE_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, ( + const TShuffleHandlePtr& shuffleHandle, + const TString& partitionColumn, + const NTableClient::TTableWriterConfigPtr& config), + (shuffleHandle, partitionColumn, config)) + #undef DELEGATE_METHOD protected: diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h index cf2ed5a30c..7421b08337 100644 --- a/yt/yt/client/api/internal_client.h +++ b/yt/yt/client/api/internal_client.h @@ -106,6 +106,18 @@ struct TGetOrderedTabletSafeTrimRowCountRequest //////////////////////////////////////////////////////////////////////////////// +struct TRegisterShuffleChunksOptions + : public TTimeoutOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +struct TFetchShuffleChunksOptions + : public TTimeoutOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + //! Provides a set of private APIs. /*! * Only native clients are expected to implement this. @@ -173,6 +185,16 @@ struct IInternalClient NObjectClient::TObjectId leaseId, bool persistent, const TUnreferenceLeaseOptions& options = {}) = 0; + + virtual TFuture<void> RegisterShuffleChunks( + const TShuffleHandlePtr& handle, + const std::vector<NChunkClient::NProto::TChunkSpec>& chunkSpecs, + const TRegisterShuffleChunksOptions& options = {}) = 0; + + virtual TFuture<std::vector<NChunkClient::NProto::TChunkSpec>> FetchShuffleChunks( + const TShuffleHandlePtr& handle, + int partitionIndex, + const TFetchShuffleChunksOptions& options = {}) = 0; }; DEFINE_REFCOUNTED_TYPE(IInternalClient) diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index 349408ae31..847a60ebe5 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -190,6 +190,8 @@ DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter) DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession) DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie) +DECLARE_REFCOUNTED_STRUCT(TShuffleHandle) + //////////////////////////////////////////////////////////////////////////////// inline const TString ClusterNamePath("//sys/@cluster_name"); diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index 14edf0f3c2..6b00c7470a 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -210,6 +210,14 @@ public: DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable, .SetStreamingEnabled(true)); + // Shuffle service + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartShuffle); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishShuffle); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, WriteShuffleData, + .SetStreamingEnabled(true)); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ReadShuffleData, + .SetStreamingEnabled(true)); + // Misc DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CheckClusterLiveness); }; diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index bd432d3d9e..cc9e093381 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -2628,6 +2628,37 @@ TFuture<TGetFlowViewResult> TClient::GetFlowView( })); } +TFuture<TShuffleHandlePtr> TClient::StartShuffle( + const TString& /*account*/, + int /*partitionCount*/, + const TStartShuffleOptions& /*options*/) +{ + YT_UNIMPLEMENTED(); +} + +TFuture<void> TClient::FinishShuffle( + const TShuffleHandlePtr& /*shuffleHandle*/, + const TFinishShuffleOptions& /*options*/) +{ + YT_UNIMPLEMENTED(); +} + +TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader( + const TShuffleHandlePtr& /*shuffleHandle*/, + int /*partitionIndex*/, + const TTableReaderConfigPtr& /*config*/) +{ + YT_UNIMPLEMENTED(); +} + +TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter( + const TShuffleHandlePtr& /*shuffleHandle*/, + const TString& /*partitionColumn*/, + const TTableWriterConfigPtr& /*config*/) +{ + YT_UNIMPLEMENTED(); +} + //////////////////////////////////////////////////////////////////////////////// -} // namespace NYT::NRpcProxy +} // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 9619084ec9..d3905300e3 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -564,6 +564,26 @@ public: const NYPath::TYPath& viewPath, const TGetFlowViewOptions& options) override; + // Shuffle service client + TFuture<TShuffleHandlePtr> StartShuffle( + const TString& account, + int partitionCount, + const TStartShuffleOptions& options) override; + + TFuture<void> FinishShuffle( + const TShuffleHandlePtr& shuffleHandle, + const TFinishShuffleOptions& options) override; + + TFuture<IRowBatchReaderPtr> CreateShuffleReader( + const TShuffleHandlePtr& shuffleHandle, + int partitionIndex, + const NTableClient::TTableReaderConfigPtr& config) override; + + TFuture<IRowBatchWriterPtr> CreateShuffleWriter( + const TShuffleHandlePtr& shuffleHandle, + const TString& partitionColumn, + const NTableClient::TTableWriterConfigPtr& config) override; + private: const TConnectionPtr Connection_; const NRpc::TDynamicChannelPoolPtr ChannelPool_; diff --git a/yt/yt/client/api/shuffle_client.cpp b/yt/yt/client/api/shuffle_client.cpp new file mode 100644 index 0000000000..d64fda36b5 --- /dev/null +++ b/yt/yt/client/api/shuffle_client.cpp @@ -0,0 +1,19 @@ +#include "shuffle_client.h" + +namespace NYT::NApi { + +using namespace NYson; + +//////////////////////////////////////////////////////////////////////////////// + +void TShuffleHandle::Register(TRegistrar registrar) +{ + registrar.Parameter("transaction_id", &TThis::TransactionId); + registrar.Parameter("coordinator_address", &TThis::CoordinatorAddress); + registrar.Parameter("account", &TThis::Account); + registrar.Parameter("partition_count", &TThis::PartitionCount); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/shuffle_client.h b/yt/yt/client/api/shuffle_client.h new file mode 100644 index 0000000000..eb07ccc6d2 --- /dev/null +++ b/yt/yt/client/api/shuffle_client.h @@ -0,0 +1,62 @@ +#pragma once + +#include "client_common.h" + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +struct TShuffleHandle + : public NYTree::TYsonStruct +{ + TGuid TransactionId; + TString CoordinatorAddress; + TString Account; + int PartitionCount; + + REGISTER_YSON_STRUCT(TShuffleHandle); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TShuffleHandle) + +//////////////////////////////////////////////////////////////////////////////// + +struct TStartShuffleOptions + : public TTimeoutOptions +{ }; + +struct TFinishShuffleOptions + : public TTimeoutOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +struct IShuffleClient +{ + virtual ~IShuffleClient() = default; + + virtual TFuture<TShuffleHandlePtr> StartShuffle( + const TString& account, + int partitionCount, + const TStartShuffleOptions& options) = 0; + + virtual TFuture<void> FinishShuffle( + const TShuffleHandlePtr& shuffleHandle, + const TFinishShuffleOptions& options) = 0; + + virtual TFuture<IRowBatchReaderPtr> CreateShuffleReader( + const TShuffleHandlePtr& shuffleHandle, + int partitionIndex, + const NTableClient::TTableReaderConfigPtr& config = New<NTableClient::TTableReaderConfig>()) = 0; + + virtual TFuture<IRowBatchWriterPtr> CreateShuffleWriter( + const TShuffleHandlePtr& shuffleHandle, + const TString& partitionColumn, + const NTableClient::TTableWriterConfigPtr& config = New<NTableClient::TTableWriterConfig>()) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index fe1992162b..a3410ff50a 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -476,6 +476,10 @@ public: UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString& , int, const TStartShuffleOptions&)); + UNIMPLEMENTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&)); + UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&)); + UNIMPLEMENTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const TString&, const NTableClient::TTableWriterConfigPtr&)); private: friend class TTransaction; diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index 7c24690fa4..ae0376bcda 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -227,6 +227,10 @@ public: UNSUPPORTED_METHOD(TFuture<void>, PausePipeline, (const TYPath&, const TPausePipelineOptions&)); UNSUPPORTED_METHOD(TFuture<TPipelineState>, GetPipelineState, (const TYPath&, const TGetPipelineStateOptions&)); UNSUPPORTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&)); + UNSUPPORTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString&, int, const TStartShuffleOptions&)); + UNSUPPORTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&)); + UNSUPPORTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&)); + UNSUPPORTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const TString&, const NTableClient::TTableWriterConfigPtr&)); private: const THedgingExecutorPtr Executor_; diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index b2b1dae28e..b8217c8657 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -841,6 +841,29 @@ public: const TParticipantTableWriterOptions& options), (override)); + MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, ( + const TString& account, + int partitionCount, + const TStartShuffleOptions& options), + (override)); + + MOCK_METHOD(TFuture<void>, FinishShuffle, ( + const TShuffleHandlePtr& shuffleHandle, + const TFinishShuffleOptions& options), + (override)); + + MOCK_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, ( + const TShuffleHandlePtr& shuffleHandle, + int partitionIndex, + const NTableClient::TTableReaderConfigPtr& config), + (override)); + + MOCK_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, ( + const TShuffleHandlePtr& shuffleHandle, + const TString& partitionColumn, + const NTableClient::TTableWriterConfigPtr& config), + (override)); + private: NTabletClient::ITableMountCachePtr TableMountCache_; NTransactionClient::ITimestampProviderPtr TimestampProvider_; diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index b9fc1d7953..ec0e33b3a8 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -31,6 +31,7 @@ SRCS( api/persistent_queue.cpp api/sticky_transaction_pool.cpp api/options.cpp + api/shuffle_client.cpp api/rpc_proxy/address_helpers.cpp api/rpc_proxy/public.cpp diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 467bb6a78a..607dcdb06a 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -3379,7 +3379,7 @@ message TRspGetQueryTrackerInfo message TReqStartDistributedWriteSession { optional string path = 1; - + // TDistributedWriteSessionStartOptions contents... } @@ -3414,3 +3414,60 @@ message TRspParticipantWriteTable { required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie } + +/////////////////////////////////////////////////////////////////////////////// +// NB(apollo1321): Under construction. +// Shuffle Service +//////////////////////////////////////////////////////////////////////////////// + +// Following requests can be handled only by proxies with shuffle manager enabled. + +message TReqStartShuffle +{ + required string account = 1; + required int32 partition_count = 2; +} + +message TRspStartShuffle +{ + required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle +} + +//////////////////////////////////////////////////////////////////////////////// + +message TReqFinishShuffle +{ + required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle +} + +message TRspFinishShuffle +{ +} + +//////////////////////////////////////////////////////////////////////////////// + +// Following requests may be handled by any proxies. + +message TReqReadShuffleData +{ + required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle + optional bytes reader_config = 2; // YSON-serialized TTableReaderConfig + required int32 partition_index = 3; +} + +message TRspReadShuffleData +{ +} + +//////////////////////////////////////////////////////////////////////////////// + +message TReqWriteShuffleData +{ + required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle + optional bytes writer_config = 2; // YSON-serialized TTableWriterConfig + required string partition_column = 3; +} + +message TRspWriteShuffleData +{ +} |