aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapollo1321 <apollo1321@yandex-team.com>2024-09-06 11:24:31 +0300
committerapollo1321 <apollo1321@yandex-team.com>2024-09-06 11:39:18 +0300
commitcb6a33d90405395afaebf6de999efeb187f8f8e6 (patch)
tree3cde48046a8dd2d0c78c6f851a6e5c6aedc3aee9
parent894993a7fd2ccb1a2368c47d79fd013c846716f6 (diff)
downloadydb-cb6a33d90405395afaebf6de999efeb187f8f8e6.tar.gz
YT-21709: Empty template for shuffle service
YT-21709: Empty template for shuffle service 6dde8f6e6f5eeb4fc7ad91fe623e980439565545
-rw-r--r--yt/yt/client/api/client.h2
-rw-r--r--yt/yt/client/api/delegating_client.h24
-rw-r--r--yt/yt/client/api/internal_client.h22
-rw-r--r--yt/yt/client/api/public.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h8
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp33
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h20
-rw-r--r--yt/yt/client/api/shuffle_client.cpp19
-rw-r--r--yt/yt/client/api/shuffle_client.h62
-rw-r--r--yt/yt/client/federated/client.cpp4
-rw-r--r--yt/yt/client/hedging/hedging.cpp4
-rw-r--r--yt/yt/client/unittests/mock/client.h23
-rw-r--r--yt/yt/client/ya.make1
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto59
14 files changed, 281 insertions, 2 deletions
diff --git a/yt/yt/client/api/client.h b/yt/yt/client/api/client.h
index d65619c86e..c4d6ad47cf 100644
--- a/yt/yt/client/api/client.h
+++ b/yt/yt/client/api/client.h
@@ -15,6 +15,7 @@
#include "queue_client.h"
#include "query_tracker_client.h"
#include "flow_client.h"
+#include "shuffle_client.h"
#include <yt/yt/client/bundle_controller_client/bundle_controller_client.h>
@@ -73,6 +74,7 @@ struct IClient
, public NBundleControllerClient::IBundleControllerClient
, public IFlowClient
, public IDistributedTableClient
+ , public IShuffleClient
{
//! Terminates all channels.
//! Aborts all pending uncommitted transactions.
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index f977dc06c0..c14f2d49d6 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -856,6 +856,30 @@ public:
const TParticipantTableWriterOptions& options),
(cookie, options))
+ // Shuffle Service
+ DELEGATE_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
+ const TString& account,
+ int partitionCount,
+ const TStartShuffleOptions& options),
+ (account, partitionCount, options))
+
+ DELEGATE_METHOD(TFuture<void>, FinishShuffle, (
+ const TShuffleHandlePtr& shuffleHandle,
+ const TFinishShuffleOptions& options),
+ (shuffleHandle, options))
+
+ DELEGATE_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (
+ const TShuffleHandlePtr& shuffleHandle,
+ int partitionIndex,
+ const NTableClient::TTableReaderConfigPtr& config),
+ (shuffleHandle, partitionIndex, config))
+
+ DELEGATE_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (
+ const TShuffleHandlePtr& shuffleHandle,
+ const TString& partitionColumn,
+ const NTableClient::TTableWriterConfigPtr& config),
+ (shuffleHandle, partitionColumn, config))
+
#undef DELEGATE_METHOD
protected:
diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h
index cf2ed5a30c..7421b08337 100644
--- a/yt/yt/client/api/internal_client.h
+++ b/yt/yt/client/api/internal_client.h
@@ -106,6 +106,18 @@ struct TGetOrderedTabletSafeTrimRowCountRequest
////////////////////////////////////////////////////////////////////////////////
+struct TRegisterShuffleChunksOptions
+ : public TTimeoutOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TFetchShuffleChunksOptions
+ : public TTimeoutOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
//! Provides a set of private APIs.
/*!
* Only native clients are expected to implement this.
@@ -173,6 +185,16 @@ struct IInternalClient
NObjectClient::TObjectId leaseId,
bool persistent,
const TUnreferenceLeaseOptions& options = {}) = 0;
+
+ virtual TFuture<void> RegisterShuffleChunks(
+ const TShuffleHandlePtr& handle,
+ const std::vector<NChunkClient::NProto::TChunkSpec>& chunkSpecs,
+ const TRegisterShuffleChunksOptions& options = {}) = 0;
+
+ virtual TFuture<std::vector<NChunkClient::NProto::TChunkSpec>> FetchShuffleChunks(
+ const TShuffleHandlePtr& handle,
+ int partitionIndex,
+ const TFetchShuffleChunksOptions& options = {}) = 0;
};
DEFINE_REFCOUNTED_TYPE(IInternalClient)
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index 349408ae31..847a60ebe5 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -190,6 +190,8 @@ DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter)
DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession)
DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie)
+DECLARE_REFCOUNTED_STRUCT(TShuffleHandle)
+
////////////////////////////////////////////////////////////////////////////////
inline const TString ClusterNamePath("//sys/@cluster_name");
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index 14edf0f3c2..6b00c7470a 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -210,6 +210,14 @@ public:
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable,
.SetStreamingEnabled(true));
+ // Shuffle service
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartShuffle);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishShuffle);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, WriteShuffleData,
+ .SetStreamingEnabled(true));
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ReadShuffleData,
+ .SetStreamingEnabled(true));
+
// Misc
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CheckClusterLiveness);
};
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index bd432d3d9e..cc9e093381 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -2628,6 +2628,37 @@ TFuture<TGetFlowViewResult> TClient::GetFlowView(
}));
}
+TFuture<TShuffleHandlePtr> TClient::StartShuffle(
+ const TString& /*account*/,
+ int /*partitionCount*/,
+ const TStartShuffleOptions& /*options*/)
+{
+ YT_UNIMPLEMENTED();
+}
+
+TFuture<void> TClient::FinishShuffle(
+ const TShuffleHandlePtr& /*shuffleHandle*/,
+ const TFinishShuffleOptions& /*options*/)
+{
+ YT_UNIMPLEMENTED();
+}
+
+TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
+ const TShuffleHandlePtr& /*shuffleHandle*/,
+ int /*partitionIndex*/,
+ const TTableReaderConfigPtr& /*config*/)
+{
+ YT_UNIMPLEMENTED();
+}
+
+TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter(
+ const TShuffleHandlePtr& /*shuffleHandle*/,
+ const TString& /*partitionColumn*/,
+ const TTableWriterConfigPtr& /*config*/)
+{
+ YT_UNIMPLEMENTED();
+}
+
////////////////////////////////////////////////////////////////////////////////
-} // namespace NYT::NRpcProxy
+} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 9619084ec9..d3905300e3 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -564,6 +564,26 @@ public:
const NYPath::TYPath& viewPath,
const TGetFlowViewOptions& options) override;
+ // Shuffle service client
+ TFuture<TShuffleHandlePtr> StartShuffle(
+ const TString& account,
+ int partitionCount,
+ const TStartShuffleOptions& options) override;
+
+ TFuture<void> FinishShuffle(
+ const TShuffleHandlePtr& shuffleHandle,
+ const TFinishShuffleOptions& options) override;
+
+ TFuture<IRowBatchReaderPtr> CreateShuffleReader(
+ const TShuffleHandlePtr& shuffleHandle,
+ int partitionIndex,
+ const NTableClient::TTableReaderConfigPtr& config) override;
+
+ TFuture<IRowBatchWriterPtr> CreateShuffleWriter(
+ const TShuffleHandlePtr& shuffleHandle,
+ const TString& partitionColumn,
+ const NTableClient::TTableWriterConfigPtr& config) override;
+
private:
const TConnectionPtr Connection_;
const NRpc::TDynamicChannelPoolPtr ChannelPool_;
diff --git a/yt/yt/client/api/shuffle_client.cpp b/yt/yt/client/api/shuffle_client.cpp
new file mode 100644
index 0000000000..d64fda36b5
--- /dev/null
+++ b/yt/yt/client/api/shuffle_client.cpp
@@ -0,0 +1,19 @@
+#include "shuffle_client.h"
+
+namespace NYT::NApi {
+
+using namespace NYson;
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TShuffleHandle::Register(TRegistrar registrar)
+{
+ registrar.Parameter("transaction_id", &TThis::TransactionId);
+ registrar.Parameter("coordinator_address", &TThis::CoordinatorAddress);
+ registrar.Parameter("account", &TThis::Account);
+ registrar.Parameter("partition_count", &TThis::PartitionCount);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/shuffle_client.h b/yt/yt/client/api/shuffle_client.h
new file mode 100644
index 0000000000..eb07ccc6d2
--- /dev/null
+++ b/yt/yt/client/api/shuffle_client.h
@@ -0,0 +1,62 @@
+#pragma once
+
+#include "client_common.h"
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TShuffleHandle
+ : public NYTree::TYsonStruct
+{
+ TGuid TransactionId;
+ TString CoordinatorAddress;
+ TString Account;
+ int PartitionCount;
+
+ REGISTER_YSON_STRUCT(TShuffleHandle);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TShuffleHandle)
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TStartShuffleOptions
+ : public TTimeoutOptions
+{ };
+
+struct TFinishShuffleOptions
+ : public TTimeoutOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IShuffleClient
+{
+ virtual ~IShuffleClient() = default;
+
+ virtual TFuture<TShuffleHandlePtr> StartShuffle(
+ const TString& account,
+ int partitionCount,
+ const TStartShuffleOptions& options) = 0;
+
+ virtual TFuture<void> FinishShuffle(
+ const TShuffleHandlePtr& shuffleHandle,
+ const TFinishShuffleOptions& options) = 0;
+
+ virtual TFuture<IRowBatchReaderPtr> CreateShuffleReader(
+ const TShuffleHandlePtr& shuffleHandle,
+ int partitionIndex,
+ const NTableClient::TTableReaderConfigPtr& config = New<NTableClient::TTableReaderConfig>()) = 0;
+
+ virtual TFuture<IRowBatchWriterPtr> CreateShuffleWriter(
+ const TShuffleHandlePtr& shuffleHandle,
+ const TString& partitionColumn,
+ const NTableClient::TTableWriterConfigPtr& config = New<NTableClient::TTableWriterConfig>()) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index fe1992162b..a3410ff50a 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -476,6 +476,10 @@ public:
UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString& , int, const TStartShuffleOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&));
+ UNIMPLEMENTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const TString&, const NTableClient::TTableWriterConfigPtr&));
private:
friend class TTransaction;
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index 7c24690fa4..ae0376bcda 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -227,6 +227,10 @@ public:
UNSUPPORTED_METHOD(TFuture<void>, PausePipeline, (const TYPath&, const TPausePipelineOptions&));
UNSUPPORTED_METHOD(TFuture<TPipelineState>, GetPipelineState, (const TYPath&, const TGetPipelineStateOptions&));
UNSUPPORTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&));
+ UNSUPPORTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString&, int, const TStartShuffleOptions&));
+ UNSUPPORTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&));
+ UNSUPPORTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&));
+ UNSUPPORTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const TString&, const NTableClient::TTableWriterConfigPtr&));
private:
const THedgingExecutorPtr Executor_;
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index b2b1dae28e..b8217c8657 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -841,6 +841,29 @@ public:
const TParticipantTableWriterOptions& options),
(override));
+ MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
+ const TString& account,
+ int partitionCount,
+ const TStartShuffleOptions& options),
+ (override));
+
+ MOCK_METHOD(TFuture<void>, FinishShuffle, (
+ const TShuffleHandlePtr& shuffleHandle,
+ const TFinishShuffleOptions& options),
+ (override));
+
+ MOCK_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (
+ const TShuffleHandlePtr& shuffleHandle,
+ int partitionIndex,
+ const NTableClient::TTableReaderConfigPtr& config),
+ (override));
+
+ MOCK_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (
+ const TShuffleHandlePtr& shuffleHandle,
+ const TString& partitionColumn,
+ const NTableClient::TTableWriterConfigPtr& config),
+ (override));
+
private:
NTabletClient::ITableMountCachePtr TableMountCache_;
NTransactionClient::ITimestampProviderPtr TimestampProvider_;
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index b9fc1d7953..ec0e33b3a8 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -31,6 +31,7 @@ SRCS(
api/persistent_queue.cpp
api/sticky_transaction_pool.cpp
api/options.cpp
+ api/shuffle_client.cpp
api/rpc_proxy/address_helpers.cpp
api/rpc_proxy/public.cpp
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 467bb6a78a..607dcdb06a 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -3379,7 +3379,7 @@ message TRspGetQueryTrackerInfo
message TReqStartDistributedWriteSession
{
optional string path = 1;
-
+
// TDistributedWriteSessionStartOptions contents...
}
@@ -3414,3 +3414,60 @@ message TRspParticipantWriteTable
{
required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie
}
+
+///////////////////////////////////////////////////////////////////////////////
+// NB(apollo1321): Under construction.
+// Shuffle Service
+////////////////////////////////////////////////////////////////////////////////
+
+// Following requests can be handled only by proxies with shuffle manager enabled.
+
+message TReqStartShuffle
+{
+ required string account = 1;
+ required int32 partition_count = 2;
+}
+
+message TRspStartShuffle
+{
+ required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+message TReqFinishShuffle
+{
+ required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle
+}
+
+message TRspFinishShuffle
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Following requests may be handled by any proxies.
+
+message TReqReadShuffleData
+{
+ required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle
+ optional bytes reader_config = 2; // YSON-serialized TTableReaderConfig
+ required int32 partition_index = 3;
+}
+
+message TRspReadShuffleData
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+message TReqWriteShuffleData
+{
+ required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle
+ optional bytes writer_config = 2; // YSON-serialized TTableWriterConfig
+ required string partition_column = 3;
+}
+
+message TRspWriteShuffleData
+{
+}