diff options
author | apachee <apachee@yandex-team.com> | 2024-05-24 16:21:41 +0300 |
---|---|---|
committer | apachee <apachee@yandex-team.com> | 2024-05-24 16:31:03 +0300 |
commit | e2ba2fb36e917cd56f8dd9006d4b6f081a1939ac (patch) | |
tree | bf72832d784543986e62e5e486f5b77df018d220 /yt | |
parent | 17b724c0bfc83bc671073210ba805e7e833f29d8 (diff) | |
download | ydb-e2ba2fb36e917cd56f8dd9006d4b6f081a1939ac.tar.gz |
[queues] YT-21355: Implement commands for creating and removing producer sessions
Implements client methods create_queue_producer_session and remove_queue_producer_session
a1fc793b8bae6faf8aca0772969d36247e5f9545
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/client/api/delegating_client.h | 15 | ||||
-rw-r--r-- | yt/yt/client/api/queue_client.h | 28 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/api_service_proxy.h | 2 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 51 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.h | 13 | ||||
-rw-r--r-- | yt/yt/client/driver/driver.cpp | 2 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.cpp | 59 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.h | 37 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 3 | ||||
-rw-r--r-- | yt/yt/client/hedging/hedging.cpp | 2 | ||||
-rw-r--r-- | yt/yt/client/queue_client/producer_client.cpp | 32 | ||||
-rw-r--r-- | yt/yt/client/queue_client/producer_client.h | 13 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/client.h | 15 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 29 |
15 files changed, 256 insertions, 46 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index c5044d846b..f490a050e5 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -124,6 +124,21 @@ public: const TListQueueConsumerRegistrationsOptions& options), (queuePath, consumerPath, options)) + DELEGATE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const std::optional<NYson::TYsonString>& userMeta, + const TCreateQueueProducerSessionOptions& options), + (producerPath, queuePath, sessionId, userMeta, options)) + + DELEGATE_METHOD(TFuture<void>, RemoveQueueProducerSession, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const TRemoveQueueProducerSessionOptions& options), + (producerPath, queuePath, sessionId, options)) + // Cypress DELEGATE_METHOD(TFuture<NYson::TYsonString>, GetNode, ( const NYPath::TYPath& path, diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h index b6c6edeeac..f878e5eacf 100644 --- a/yt/yt/client/api/queue_client.h +++ b/yt/yt/client/api/queue_client.h @@ -73,6 +73,21 @@ struct TListQueueConsumerRegistrationsResult std::optional<std::vector<int>> Partitions; }; +struct TCreateQueueProducerSessionOptions + : public TTimeoutOptions +{ }; + +struct TCreateQueueProducerSessionResult +{ + ui64 SequenceNumber; + ui64 Epoch; + std::optional<NYson::TYsonString> UserMeta; +}; + +struct TRemoveQueueProducerSessionOptions + : public TTimeoutOptions +{ }; + //////////////////////////////////////////////////////////////////////////////// struct IQueueClientBase @@ -125,6 +140,19 @@ struct IQueueClient const std::optional<NYPath::TRichYPath>& queuePath, const std::optional<NYPath::TRichYPath>& consumerPath, const TListQueueConsumerRegistrationsOptions& options = {}) = 0; + + virtual TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const std::optional<NYson::TYsonString>& userMeta, + const TCreateQueueProducerSessionOptions& options = {}) = 0; + + virtual TFuture<void> RemoveQueueProducerSession( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const TRemoveQueueProducerSessionOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index 219c823f91..5ea29e05e8 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -91,6 +91,8 @@ public: DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RegisterQueueConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, UnregisterQueueConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListQueueConsumerRegistrations); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CreateQueueProducerSession); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RemoveQueueProducerSession); // Scheduler pools DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, TransferPoolResources); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 58f5d5db70..d41847cbe0 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -873,6 +873,57 @@ TFuture<std::vector<TListQueueConsumerRegistrationsResult>> TClient::ListQueueCo })); } +TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession( + const TRichYPath& producerPath, + const TRichYPath& queuePath, + const TString& sessionId, + const std::optional<TYsonString>& userMeta, + const TCreateQueueProducerSessionOptions& options) +{ + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.CreateQueueProducerSession(); + SetTimeoutOptions(*req, options); + + ToProto(req->mutable_producer_path(), producerPath); + ToProto(req->mutable_queue_path(), queuePath); + ToProto(req->mutable_session_id(), sessionId); + if (userMeta) { + ToProto(req->mutable_user_meta(), userMeta->AsStringBuf()); + } + + return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspCreateQueueProducerSessionPtr& rsp) { + std::optional<TYsonString> userMeta; + if (rsp->has_user_meta()) { + userMeta = TYsonString(FromProto<TString>(rsp->user_meta())); + } + + return TCreateQueueProducerSessionResult{ + .SequenceNumber = FromProto<ui64>(rsp->sequence_number()), + .Epoch = FromProto<ui64>(rsp->epoch()), + .UserMeta = std::move(userMeta), + }; + })); +} + +TFuture<void> TClient::RemoveQueueProducerSession( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const TRemoveQueueProducerSessionOptions& options) +{ + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.RemoveQueueProducerSession(); + SetTimeoutOptions(*req, options); + + ToProto(req->mutable_producer_path(), producerPath); + ToProto(req->mutable_queue_path(), queuePath); + ToProto(req->mutable_session_id(), sessionId); + + return req->Invoke().AsVoid(); +} + TFuture<void> TClient::AddMember( const TString& group, const TString& member, diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 77054ed4e5..d6bd791e6d 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -161,6 +161,19 @@ public: const std::optional<NYPath::TRichYPath>& consumerPath, const TListQueueConsumerRegistrationsOptions& options = {}) override; + TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const std::optional<NYson::TYsonString>& userMeta, + const TCreateQueueProducerSessionOptions& options = {}) override; + + TFuture<void> RemoveQueueProducerSession( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const TRemoveQueueProducerSessionOptions& options = {}) override; + // Files. TFuture<NApi::TGetFileFromCacheResult> GetFileFromCache( const TString& md5, diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index d3377c0806..2d9dada291 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -351,6 +351,8 @@ public: REGISTER (TPullQueueCommand, "pull_queue", Null, Tabular, false, true, ApiVersion4); REGISTER (TPullConsumerCommand, "pull_consumer", Null, Tabular, false, true, ApiVersion4); REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4); + REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4); + REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4); REGISTER (TStartQueryCommand, "start_query", Null, Structured, true, false, ApiVersion4); REGISTER (TAbortQueryCommand, "abort_query", Null, Structured, true, false, ApiVersion4); diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index f3504ad8c9..2068cce9d6 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -252,4 +252,63 @@ void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// +void TCreateQueueProducerSessionCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("producer_path", &TThis::ProducerPath); + registrar.Parameter("queue_path", &TThis::QueuePath); + registrar.Parameter("session_id", &TThis::SessionId); + registrar.Parameter("user_meta", &TThis::UserMeta) + .Optional(); +} + +void TCreateQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) +{ + auto client = context->GetClient(); + + std::optional<NYson::TYsonString> requestUserMeta; + if (UserMeta) { + requestUserMeta = NYson::ConvertToYsonString(UserMeta); + } + + auto result = WaitFor(client->CreateQueueProducerSession( + ProducerPath, + QueuePath, + SessionId, + requestUserMeta)) + .ValueOrThrow(); + + ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) { + BuildYsonFluently(consumer) + .BeginMap() + .Item("epoch").Value(result.Epoch) + .Item("sequence_number").Value(result.SequenceNumber) + .Item("user_meta").Value(result.UserMeta) + .EndMap(); + }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TRemoveQueueProducerSessionCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("producer_path", &TThis::ProducerPath); + registrar.Parameter("queue_path", &TThis::QueuePath); + registrar.Parameter("session_id", &TThis::SessionId); +} + +void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) +{ + auto client = context->GetClient(); + + WaitFor(client->RemoveQueueProducerSession( + ProducerPath, + QueuePath, + SessionId)) + .ThrowOnError(); + + ProduceEmptyOutput(context); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NDriver diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h index f2fa8fe5ef..5317985207 100644 --- a/yt/yt/client/driver/queue_commands.h +++ b/yt/yt/client/driver/queue_commands.h @@ -125,4 +125,41 @@ private: //////////////////////////////////////////////////////////////////////////////// +class TCreateQueueProducerSessionCommand + : public TTypedCommand<NApi::TCreateQueueProducerSessionOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TCreateQueueProducerSessionCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TRichYPath ProducerPath; + NYPath::TRichYPath QueuePath; + TString SessionId; + NYTree::INodePtr UserMeta; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TRemoveQueueProducerSessionCommand + : public TTypedCommand<NApi::TRemoveQueueProducerSessionOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TRemoveQueueProducerSessionCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TRichYPath ProducerPath; + NYPath::TRichYPath QueuePath; + TString SessionId; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NDriver diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index a915ee5a98..efbd61c0c5 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -331,6 +331,9 @@ public: // IClient unsupported methods. UNIMPLEMENTED_METHOD(TFuture<void>, RegisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, bool, const TRegisterQueueConsumerOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, UnregisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TUnregisterQueueConsumerOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&)); + UNIMPLEMENTED_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&)); + UNIMPLEMENTED_METHOD(const NChaosClient::IReplicationCardCachePtr&, GetReplicationCardCache, ()); UNIMPLEMENTED_METHOD(TFuture<void>, MountTable, (const NYPath::TYPath&, const TMountTableOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, UnmountTable, (const NYPath::TYPath&, const TUnmountTableOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index 0ceb3aab9c..d5f6bd946a 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -83,6 +83,8 @@ public: RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const TRichYPath&, const TRichYPath&, bool, const TRegisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const TRichYPath&, const TRichYPath&, const TUnregisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<TRichYPath>&, const std::optional<TRichYPath>&, const TListQueueConsumerRegistrationsOptions&)); + RETRYABLE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&)); + RETRYABLE_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&)); RETRYABLE_METHOD(TFuture<NYson::TYsonString>, ExplainQuery, (const TString&, const TExplainQueryOptions&)); RETRYABLE_METHOD(TFuture<ITableReaderPtr>, CreateTableReader, (const TRichYPath&, const TTableReaderOptions&)); RETRYABLE_METHOD(TFuture<NYson::TYsonString>, GetNode, (const TYPath&, const TGetNodeOptions&)); diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp deleted file mode 100644 index fec3091420..0000000000 --- a/yt/yt/client/queue_client/producer_client.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "producer_client.h" - -#include <yt/yt/client/table_client/comparator.h> -#include <yt/yt/client/table_client/helpers.h> -#include <yt/yt/client/table_client/schema.h> - -namespace NYT::NQueueClient { - -using namespace NTableClient; - -//////////////////////////////////////////////////////////////////////////////// - -static const TTableSchemaPtr YTProducerTableSchema = New<TTableSchema>(std::vector<TColumnSchema>{ - TColumnSchema("queue_cluster", EValueType::String, ESortOrder::Ascending).SetRequired(true), - TColumnSchema("queue_path", EValueType::String, ESortOrder::Ascending).SetRequired(true), - TColumnSchema("session_id", EValueType::String, ESortOrder::Ascending).SetRequired(true), - TColumnSchema("sequence_number", EValueType::Uint64).SetRequired(true), - TColumnSchema("epoch", EValueType::Uint64).SetRequired(true), - TColumnSchema("user_meta", EValueType::Any).SetRequired(false), - TColumnSchema("system_meta", EValueType::Any).SetRequired(false), -}, /*strict*/ true, /*uniqueKeys*/ true); - -//////////////////////////////////////////////////////////////////////////////// - -const NTableClient::TTableSchemaPtr& GetProducerSchema() -{ - return YTProducerTableSchema; -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NQueueClient diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h deleted file mode 100644 index e24cdc6b05..0000000000 --- a/yt/yt/client/queue_client/producer_client.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include <yt/yt/client/table_client/public.h> - -namespace NYT::NQueueClient { - -//////////////////////////////////////////////////////////////////////////////// - -const NTableClient::TTableSchemaPtr& GetProducerSchema(); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NQueueClient diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index ea199b554e..c283bafe3d 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -102,6 +102,21 @@ public: const TListQueueConsumerRegistrationsOptions& options), (override)); + MOCK_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const std::optional<NYson::TYsonString>& userMeta, + const TCreateQueueProducerSessionOptions& options), + (override)); + + MOCK_METHOD(TFuture<void>, RemoveQueueProducerSession, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + const TRemoveQueueProducerSessionOptions& options), + (override)); + MOCK_METHOD(TFuture<NYson::TYsonString>, ExplainQuery, ( const TString& query, const TExplainQueryOptions& options), diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index a0d7797583..b06f9fd98c 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -147,7 +147,6 @@ SRCS( queue_client/consumer_client.cpp queue_client/helpers.cpp queue_client/partition_reader.cpp - queue_client/producer_client.cpp queue_client/queue_rowset.cpp ypath/rich.cpp diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 789f83cb99..c9c593f22a 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -760,6 +760,35 @@ message TRspListQueueConsumerRegistrations //////////////////////////////////////////////////////////////////////////////// +message TReqCreateQueueProducerSession +{ + optional string producer_path = 1; + optional string queue_path = 2; + optional string session_id = 3; + optional bytes user_meta = 4; // YSON +} + +message TRspCreateQueueProducerSession +{ + optional uint64 sequence_number = 1; + optional uint64 epoch = 2; + optional bytes user_meta = 3; // YSON +} + +//////////////////////////////////////////////////////////////////////////////// + +message TReqRemoveQueueProducerSession +{ + optional string producer_path = 1; + optional string queue_path = 2; + optional string session_id = 3; +} + +message TRspRemoveQueueProducerSession +{ } + +//////////////////////////////////////////////////////////////////////////////// + message TReqExplainQuery { required string query = 1; |