diff options
author | apachee <apachee@yandex-team.com> | 2024-06-26 11:53:13 +0300 |
---|---|---|
committer | apachee <apachee@yandex-team.com> | 2024-06-26 12:03:09 +0300 |
commit | fbc04094a609c84b6c093c08a0321d151fbf9109 (patch) | |
tree | b2e9271272514609d182bf90d8a6ed679914b7c9 | |
parent | 6870500d6d5eae3b9282920f03ebeda7875c3be5 (diff) | |
download | ydb-fbc04094a609c84b6c093c08a0321d151fbf9109.tar.gz |
[queues] YT-21355, YT-21972: Cosmetics for queue producer API
acd31302ba56ec4c7d5fd55ad7bcc83638a30184
-rw-r--r-- | yt/yt/client/api/delegating_client.h | 7 | ||||
-rw-r--r-- | yt/yt/client/api/delegating_transaction.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/api/delegating_transaction.h | 4 | ||||
-rw-r--r-- | yt/yt/client/api/queue_client.h | 15 | ||||
-rw-r--r-- | yt/yt/client/api/queue_transaction.h | 10 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 17 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.h | 5 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.cpp | 15 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.h | 4 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.cpp | 21 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.h | 9 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 11 | ||||
-rw-r--r-- | yt/yt/client/hedging/hedging.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/queue_client/public.h | 6 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/client.h | 5 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/transaction.h | 4 |
16 files changed, 74 insertions, 67 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index 09ede07c6a..4976888d0a 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -127,15 +127,14 @@ public: DELEGATE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, ( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - const std::optional<NYson::TYsonString>& userMeta, + const NQueueClient::TQueueProducerSessionId& sessionId, const TCreateQueueProducerSessionOptions& options), - (producerPath, queuePath, sessionId, userMeta, options)) + (producerPath, queuePath, sessionId, options)) DELEGATE_METHOD(TFuture<void>, RemoveQueueProducerSession, ( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, + const NQueueClient::TQueueProducerSessionId& sessionId, const TRemoveQueueProducerSessionOptions& options), (producerPath, queuePath, sessionId, options)) diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp index 9b04f3451f..33bd596d18 100644 --- a/yt/yt/client/api/delegating_transaction.cpp +++ b/yt/yt/client/api/delegating_transaction.cpp @@ -302,8 +302,8 @@ DELEGATE_METHOD(TFuture<void>, AdvanceQueueConsumer, ( DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - i64 epoch, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options), diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h index 2cf1f780e5..39017b5853 100644 --- a/yt/yt/client/api/delegating_transaction.h +++ b/yt/yt/client/api/delegating_transaction.h @@ -246,8 +246,8 @@ public: TFuture<TPushQueueProducerResult> PushQueueProducer( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - i64 epoch, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options) override; diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h index 2c9e970fa6..a50cd9da1b 100644 --- a/yt/yt/client/api/queue_client.h +++ b/yt/yt/client/api/queue_client.h @@ -75,13 +75,15 @@ struct TListQueueConsumerRegistrationsResult struct TCreateQueueProducerSessionOptions : public TTimeoutOptions -{ }; +{ + NYTree::INodePtr UserMeta; +}; struct TCreateQueueProducerSessionResult { - i64 SequenceNumber; - i64 Epoch; - std::optional<NYson::TYsonString> UserMeta; + NQueueClient::TQueueProducerSequenceNumber SequenceNumber; + NQueueClient::TQueueProducerEpoch Epoch; + NYTree::INodePtr UserMeta; }; struct TRemoveQueueProducerSessionOptions @@ -144,14 +146,13 @@ struct IQueueClient virtual TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - const std::optional<NYson::TYsonString>& userMeta = {}, + const NQueueClient::TQueueProducerSessionId& sessionId, const TCreateQueueProducerSessionOptions& options = {}) = 0; virtual TFuture<void> RemoveQueueProducerSession( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, + const NQueueClient::TQueueProducerSessionId& sessionId, const TRemoveQueueProducerSessionOptions& options = {}) = 0; }; diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h index ebc492db06..d818655aa2 100644 --- a/yt/yt/client/api/queue_transaction.h +++ b/yt/yt/client/api/queue_transaction.h @@ -2,6 +2,8 @@ #include "client_common.h" +#include <yt/yt/client/queue_client/public.h> + #include <yt/yt/core/ypath/public.h> namespace NYT::NApi { @@ -20,7 +22,7 @@ struct TPushQueueProducerOptions * Rows in the batch will have such sequence numbers: SequenceNumber, SequenceNumber+1, SequenceNumber+2, ... * If the option is not set the $sequence_number column must be present in each row. */ - std::optional<i64> SequenceNumber; + std::optional<NQueueClient::TQueueProducerSequenceNumber> SequenceNumber; //! Any yson data which will be saved for the session /*! @@ -36,7 +38,7 @@ struct TPushQueueProducerResult /*! * All rows with greater sequence number will be ignored in future calls of PushQueueProducer. */ - i64 LastSequenceNumber = -1; + NQueueClient::TQueueProducerSequenceNumber LastSequenceNumber{-1}; //! Count of rows which were skipped because of their sequence number. /*! * Skipped rows were written before. @@ -87,8 +89,8 @@ struct IQueueTransaction virtual TFuture<TPushQueueProducerResult> PushQueueProducer( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - i64 epoch, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options = {}) = 0; diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index bc6d64843f..f40496f065 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -877,8 +877,7 @@ TFuture<std::vector<TListQueueConsumerRegistrationsResult>> TClient::ListQueueCo TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession( const TRichYPath& producerPath, const TRichYPath& queuePath, - const TString& sessionId, - const std::optional<TYsonString>& userMeta, + const NQueueClient::TQueueProducerSessionId& sessionId, const TCreateQueueProducerSessionOptions& options) { auto proxy = CreateApiServiceProxy(); @@ -889,19 +888,19 @@ TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession( ToProto(req->mutable_producer_path(), producerPath); ToProto(req->mutable_queue_path(), queuePath); ToProto(req->mutable_session_id(), sessionId); - if (userMeta) { - ToProto(req->mutable_user_meta(), userMeta->AsStringBuf()); + if (options.UserMeta) { + ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString()); } return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspCreateQueueProducerSessionPtr& rsp) { - std::optional<TYsonString> userMeta; + INodePtr userMeta; if (rsp->has_user_meta()) { - userMeta = TYsonString(FromProto<TString>(rsp->user_meta())); + userMeta = ConvertTo<INodePtr>(TYsonString(FromProto<TString>(rsp->user_meta()))); } return TCreateQueueProducerSessionResult{ - .SequenceNumber = FromProto<i64>(rsp->sequence_number()), - .Epoch = FromProto<i64>(rsp->epoch()), + .SequenceNumber = FromProto<TQueueProducerSequenceNumber>(rsp->sequence_number()), + .Epoch = FromProto<TQueueProducerEpoch>(rsp->epoch()), .UserMeta = std::move(userMeta), }; })); @@ -910,7 +909,7 @@ TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession( TFuture<void> TClient::RemoveQueueProducerSession( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, + const NQueueClient::TQueueProducerSessionId& sessionId, const TRemoveQueueProducerSessionOptions& options) { auto proxy = CreateApiServiceProxy(); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 9c336728b8..7f1e145d10 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -164,14 +164,13 @@ public: TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - const std::optional<NYson::TYsonString>& userMeta, + const NQueueClient::TQueueProducerSessionId& sessionId, const TCreateQueueProducerSessionOptions& options = {}) override; TFuture<void> RemoveQueueProducerSession( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, + const NQueueClient::TQueueProducerSessionId& sessionId, const TRemoveQueueProducerSessionOptions& options = {}) override; // Files. diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index a952b7ff97..cadc6d7da0 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -23,6 +23,7 @@ using namespace NApi; using namespace NYTree; using namespace NYPath; using namespace NYson; +using namespace NQueueClient; //////////////////////////////////////////////////////////////////////////////// @@ -530,23 +531,23 @@ TFuture<void> TTransaction::AdvanceQueueConsumer( TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - i64 epoch, + const TQueueProducerSessionId& sessionId, + TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options) { ValidateTabletTransactionId(GetId()); - THROW_ERROR_EXCEPTION_IF(epoch < 0, + THROW_ERROR_EXCEPTION_IF(epoch.Underlying() < 0, "Epoch number %v cannot be negative", epoch); - THROW_ERROR_EXCEPTION_IF(options.SequenceNumber && *options.SequenceNumber < 0, + THROW_ERROR_EXCEPTION_IF(options.SequenceNumber && options.SequenceNumber->Underlying() < 0, "Sequence number %v cannot be negative", *options.SequenceNumber); auto req = Proxy_.PushQueueProducer(); SetTimeoutOptions(*req, options); if (options.SequenceNumber) { - req->set_sequence_number(*options.SequenceNumber); + req->set_sequence_number(options.SequenceNumber->Underlying()); } if (NTracing::IsCurrentTraceContextRecorded()) { @@ -562,7 +563,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( ToProto(req->mutable_queue_path(), queuePath); ToProto(req->mutable_session_id(), sessionId); - req->set_epoch(epoch); + req->set_epoch(epoch.Underlying()); if (options.UserMeta) { ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString()); @@ -575,7 +576,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPushQueueProducerPtr& rsp) { return TPushQueueProducerResult{ - .LastSequenceNumber = rsp->last_sequence_number(), + .LastSequenceNumber = TQueueProducerSequenceNumber(rsp->last_sequence_number()), .SkippedRowCount = rsp->skipped_row_count(), }; })); diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 8b907fec2d..db5138879f 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -89,8 +89,8 @@ public: TFuture<TPushQueueProducerResult> PushQueueProducer( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - i64 epoch, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options) override; diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index f6db0ff134..b62a26e286 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -21,6 +21,7 @@ namespace NYT::NDriver { using namespace NApi; using namespace NConcurrency; +using namespace NQueueClient; using namespace NTableClient; using namespace NTabletClient; using namespace NYTree; @@ -282,24 +283,23 @@ void TCreateQueueProducerSessionCommand::Register(TRegistrar registrar) registrar.Parameter("producer_path", &TThis::ProducerPath); registrar.Parameter("queue_path", &TThis::QueuePath); registrar.Parameter("session_id", &TThis::SessionId); - registrar.Parameter("user_meta", &TThis::UserMeta) - .Optional(); + registrar.ParameterWithUniversalAccessor<INodePtr>( + "user_meta", + [] (TThis* command) -> auto& { + return command->Options.UserMeta; + }) + .Optional(/*init*/ false); } void TCreateQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) { auto client = context->GetClient(); - std::optional<NYson::TYsonString> requestUserMeta; - if (UserMeta) { - requestUserMeta = NYson::ConvertToYsonString(UserMeta); - } - auto result = WaitFor(client->CreateQueueProducerSession( ProducerPath, QueuePath, SessionId, - requestUserMeta)) + Options)) .ValueOrThrow(); ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) { @@ -328,7 +328,8 @@ void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) WaitFor(client->RemoveQueueProducerSession( ProducerPath, QueuePath, - SessionId)) + SessionId, + Options)) .ThrowOnError(); ProduceEmptyOutput(context); @@ -338,7 +339,7 @@ void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) void TPushQueueProducerCommand::Register(TRegistrar registrar) { - registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + registrar.ParameterWithUniversalAccessor<std::optional<TQueueProducerSequenceNumber>>( "sequence_number", [] (TThis* command) -> auto& { return command->Options.SequenceNumber; diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h index 249ef9bddb..7414f01a8f 100644 --- a/yt/yt/client/driver/queue_commands.h +++ b/yt/yt/client/driver/queue_commands.h @@ -136,8 +136,7 @@ public: private: NYPath::TRichYPath ProducerPath; NYPath::TRichYPath QueuePath; - TString SessionId; - NYTree::INodePtr UserMeta; + NQueueClient::TQueueProducerSessionId SessionId; void DoExecute(ICommandContextPtr context) override; }; @@ -155,7 +154,7 @@ public: private: NYPath::TRichYPath ProducerPath; NYPath::TRichYPath QueuePath; - TString SessionId; + NQueueClient::TQueueProducerSessionId SessionId; void DoExecute(ICommandContextPtr context) override; }; @@ -178,8 +177,8 @@ public: private: NYPath::TRichYPath ProducerPath; NYPath::TRichYPath QueuePath; - TString SessionId; - i64 Epoch; + NQueueClient::TQueueProducerSessionId SessionId; + NQueueClient::TQueueProducerEpoch Epoch; void DoExecute(ICommandContextPtr context) override; }; diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index b5f79cee42..0a52560296 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -27,6 +27,7 @@ namespace NYT::NClient::NFederated { using namespace NApi; +using namespace NQueueClient; //////////////////////////////////////////////////////////////////////////////// @@ -93,8 +94,8 @@ public: TFuture<TPushQueueProducerResult> PushQueueProducer( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - i64 epoch, + const TQueueProducerSessionId& sessionId, + TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options) override; @@ -340,8 +341,8 @@ public: // IClient unsupported methods. UNIMPLEMENTED_METHOD(TFuture<void>, RegisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, bool, const TRegisterQueueConsumerOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, UnregisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TUnregisterQueueConsumerOptions&)); - UNIMPLEMENTED_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&)); - UNIMPLEMENTED_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, const TCreateQueueProducerSessionOptions&)); + UNIMPLEMENTED_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, const TRemoveQueueProducerSessionOptions&)); UNIMPLEMENTED_METHOD(const NChaosClient::IReplicationCardCachePtr&, GetReplicationCardCache, ()); UNIMPLEMENTED_METHOD(TFuture<void>, MountTable, (const NYPath::TYPath&, const TMountTableOptions&)); @@ -517,7 +518,7 @@ TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); TRANSACTION_METHOD_IMPL(void, AdvanceQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceQueueConsumerOptions&)); -TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, i64, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&)); +TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, TQueueProducerEpoch, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index fa64da3a74..4bc18bebd0 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -84,8 +84,8 @@ public: RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const TRichYPath&, const TRichYPath&, bool, const TRegisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const TRichYPath&, const TRichYPath&, const TUnregisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<TRichYPath>&, const std::optional<TRichYPath>&, const TListQueueConsumerRegistrationsOptions&)); - RETRYABLE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&)); - RETRYABLE_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&)); + RETRYABLE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const NQueueClient::TQueueProducerSessionId&, const TCreateQueueProducerSessionOptions&)); + RETRYABLE_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const NQueueClient::TQueueProducerSessionId&, const TRemoveQueueProducerSessionOptions&)); RETRYABLE_METHOD(TFuture<NYson::TYsonString>, ExplainQuery, (const TString&, const TExplainQueryOptions&)); RETRYABLE_METHOD(TFuture<ITableReaderPtr>, CreateTableReader, (const TRichYPath&, const TTableReaderOptions&)); RETRYABLE_METHOD(TFuture<NYson::TYsonString>, GetNode, (const TYPath&, const TGetNodeOptions&)); diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h index 72ea49cd2b..12f0378140 100644 --- a/yt/yt/client/queue_client/public.h +++ b/yt/yt/client/queue_client/public.h @@ -31,4 +31,10 @@ DECLARE_REFCOUNTED_CLASS(TQueueStaticExportDestinationConfig) //////////////////////////////////////////////////////////////////////////////// +YT_DEFINE_STRONG_TYPEDEF(TQueueProducerSessionId, TString); +YT_DEFINE_STRONG_TYPEDEF(TQueueProducerEpoch, i64); +YT_DEFINE_STRONG_TYPEDEF(TQueueProducerSequenceNumber, i64); + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NQueueClient diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 8c1403d790..234a1c3131 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -105,15 +105,14 @@ public: MOCK_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, ( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - const std::optional<NYson::TYsonString>& userMeta, + const NQueueClient::TQueueProducerSessionId& sessionId, const TCreateQueueProducerSessionOptions& options), (override)); MOCK_METHOD(TFuture<void>, RemoveQueueProducerSession, ( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, + const NQueueClient::TQueueProducerSessionId& sessionId, const TRemoveQueueProducerSessionOptions& options), (override)); diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index bc0191bf63..d158b665e6 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -217,8 +217,8 @@ public: MOCK_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, - const TString& sessionId, - i64 epoch, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options), (override)); |