aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapachee <apachee@yandex-team.com>2024-06-26 11:53:13 +0300
committerapachee <apachee@yandex-team.com>2024-06-26 12:03:09 +0300
commitfbc04094a609c84b6c093c08a0321d151fbf9109 (patch)
treeb2e9271272514609d182bf90d8a6ed679914b7c9
parent6870500d6d5eae3b9282920f03ebeda7875c3be5 (diff)
downloadydb-fbc04094a609c84b6c093c08a0321d151fbf9109.tar.gz
[queues] YT-21355, YT-21972: Cosmetics for queue producer API
acd31302ba56ec4c7d5fd55ad7bcc83638a30184
-rw-r--r--yt/yt/client/api/delegating_client.h7
-rw-r--r--yt/yt/client/api/delegating_transaction.cpp4
-rw-r--r--yt/yt/client/api/delegating_transaction.h4
-rw-r--r--yt/yt/client/api/queue_client.h15
-rw-r--r--yt/yt/client/api/queue_transaction.h10
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp17
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h5
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp15
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h4
-rw-r--r--yt/yt/client/driver/queue_commands.cpp21
-rw-r--r--yt/yt/client/driver/queue_commands.h9
-rw-r--r--yt/yt/client/federated/client.cpp11
-rw-r--r--yt/yt/client/hedging/hedging.cpp4
-rw-r--r--yt/yt/client/queue_client/public.h6
-rw-r--r--yt/yt/client/unittests/mock/client.h5
-rw-r--r--yt/yt/client/unittests/mock/transaction.h4
16 files changed, 74 insertions, 67 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index 09ede07c6a..4976888d0a 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -127,15 +127,14 @@ public:
DELEGATE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- const std::optional<NYson::TYsonString>& userMeta,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TCreateQueueProducerSessionOptions& options),
- (producerPath, queuePath, sessionId, userMeta, options))
+ (producerPath, queuePath, sessionId, options))
DELEGATE_METHOD(TFuture<void>, RemoveQueueProducerSession, (
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TRemoveQueueProducerSessionOptions& options),
(producerPath, queuePath, sessionId, options))
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp
index 9b04f3451f..33bd596d18 100644
--- a/yt/yt/client/api/delegating_transaction.cpp
+++ b/yt/yt/client/api/delegating_transaction.cpp
@@ -302,8 +302,8 @@ DELEGATE_METHOD(TFuture<void>, AdvanceQueueConsumer, (
DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- i64 epoch,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options),
diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h
index 2cf1f780e5..39017b5853 100644
--- a/yt/yt/client/api/delegating_transaction.h
+++ b/yt/yt/client/api/delegating_transaction.h
@@ -246,8 +246,8 @@ public:
TFuture<TPushQueueProducerResult> PushQueueProducer(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- i64 epoch,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options) override;
diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h
index 2c9e970fa6..a50cd9da1b 100644
--- a/yt/yt/client/api/queue_client.h
+++ b/yt/yt/client/api/queue_client.h
@@ -75,13 +75,15 @@ struct TListQueueConsumerRegistrationsResult
struct TCreateQueueProducerSessionOptions
: public TTimeoutOptions
-{ };
+{
+ NYTree::INodePtr UserMeta;
+};
struct TCreateQueueProducerSessionResult
{
- i64 SequenceNumber;
- i64 Epoch;
- std::optional<NYson::TYsonString> UserMeta;
+ NQueueClient::TQueueProducerSequenceNumber SequenceNumber;
+ NQueueClient::TQueueProducerEpoch Epoch;
+ NYTree::INodePtr UserMeta;
};
struct TRemoveQueueProducerSessionOptions
@@ -144,14 +146,13 @@ struct IQueueClient
virtual TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- const std::optional<NYson::TYsonString>& userMeta = {},
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TCreateQueueProducerSessionOptions& options = {}) = 0;
virtual TFuture<void> RemoveQueueProducerSession(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TRemoveQueueProducerSessionOptions& options = {}) = 0;
};
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h
index ebc492db06..d818655aa2 100644
--- a/yt/yt/client/api/queue_transaction.h
+++ b/yt/yt/client/api/queue_transaction.h
@@ -2,6 +2,8 @@
#include "client_common.h"
+#include <yt/yt/client/queue_client/public.h>
+
#include <yt/yt/core/ypath/public.h>
namespace NYT::NApi {
@@ -20,7 +22,7 @@ struct TPushQueueProducerOptions
* Rows in the batch will have such sequence numbers: SequenceNumber, SequenceNumber+1, SequenceNumber+2, ...
* If the option is not set the $sequence_number column must be present in each row.
*/
- std::optional<i64> SequenceNumber;
+ std::optional<NQueueClient::TQueueProducerSequenceNumber> SequenceNumber;
//! Any yson data which will be saved for the session
/*!
@@ -36,7 +38,7 @@ struct TPushQueueProducerResult
/*!
* All rows with greater sequence number will be ignored in future calls of PushQueueProducer.
*/
- i64 LastSequenceNumber = -1;
+ NQueueClient::TQueueProducerSequenceNumber LastSequenceNumber{-1};
//! Count of rows which were skipped because of their sequence number.
/*!
* Skipped rows were written before.
@@ -87,8 +89,8 @@ struct IQueueTransaction
virtual TFuture<TPushQueueProducerResult> PushQueueProducer(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- i64 epoch,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options = {}) = 0;
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index bc6d64843f..f40496f065 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -877,8 +877,7 @@ TFuture<std::vector<TListQueueConsumerRegistrationsResult>> TClient::ListQueueCo
TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession(
const TRichYPath& producerPath,
const TRichYPath& queuePath,
- const TString& sessionId,
- const std::optional<TYsonString>& userMeta,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TCreateQueueProducerSessionOptions& options)
{
auto proxy = CreateApiServiceProxy();
@@ -889,19 +888,19 @@ TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession(
ToProto(req->mutable_producer_path(), producerPath);
ToProto(req->mutable_queue_path(), queuePath);
ToProto(req->mutable_session_id(), sessionId);
- if (userMeta) {
- ToProto(req->mutable_user_meta(), userMeta->AsStringBuf());
+ if (options.UserMeta) {
+ ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString());
}
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspCreateQueueProducerSessionPtr& rsp) {
- std::optional<TYsonString> userMeta;
+ INodePtr userMeta;
if (rsp->has_user_meta()) {
- userMeta = TYsonString(FromProto<TString>(rsp->user_meta()));
+ userMeta = ConvertTo<INodePtr>(TYsonString(FromProto<TString>(rsp->user_meta())));
}
return TCreateQueueProducerSessionResult{
- .SequenceNumber = FromProto<i64>(rsp->sequence_number()),
- .Epoch = FromProto<i64>(rsp->epoch()),
+ .SequenceNumber = FromProto<TQueueProducerSequenceNumber>(rsp->sequence_number()),
+ .Epoch = FromProto<TQueueProducerEpoch>(rsp->epoch()),
.UserMeta = std::move(userMeta),
};
}));
@@ -910,7 +909,7 @@ TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession(
TFuture<void> TClient::RemoveQueueProducerSession(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TRemoveQueueProducerSessionOptions& options)
{
auto proxy = CreateApiServiceProxy();
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 9c336728b8..7f1e145d10 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -164,14 +164,13 @@ public:
TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- const std::optional<NYson::TYsonString>& userMeta,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TCreateQueueProducerSessionOptions& options = {}) override;
TFuture<void> RemoveQueueProducerSession(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TRemoveQueueProducerSessionOptions& options = {}) override;
// Files.
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index a952b7ff97..cadc6d7da0 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -23,6 +23,7 @@ using namespace NApi;
using namespace NYTree;
using namespace NYPath;
using namespace NYson;
+using namespace NQueueClient;
////////////////////////////////////////////////////////////////////////////////
@@ -530,23 +531,23 @@ TFuture<void> TTransaction::AdvanceQueueConsumer(
TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- i64 epoch,
+ const TQueueProducerSessionId& sessionId,
+ TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options)
{
ValidateTabletTransactionId(GetId());
- THROW_ERROR_EXCEPTION_IF(epoch < 0,
+ THROW_ERROR_EXCEPTION_IF(epoch.Underlying() < 0,
"Epoch number %v cannot be negative", epoch);
- THROW_ERROR_EXCEPTION_IF(options.SequenceNumber && *options.SequenceNumber < 0,
+ THROW_ERROR_EXCEPTION_IF(options.SequenceNumber && options.SequenceNumber->Underlying() < 0,
"Sequence number %v cannot be negative", *options.SequenceNumber);
auto req = Proxy_.PushQueueProducer();
SetTimeoutOptions(*req, options);
if (options.SequenceNumber) {
- req->set_sequence_number(*options.SequenceNumber);
+ req->set_sequence_number(options.SequenceNumber->Underlying());
}
if (NTracing::IsCurrentTraceContextRecorded()) {
@@ -562,7 +563,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
ToProto(req->mutable_queue_path(), queuePath);
ToProto(req->mutable_session_id(), sessionId);
- req->set_epoch(epoch);
+ req->set_epoch(epoch.Underlying());
if (options.UserMeta) {
ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString());
@@ -575,7 +576,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPushQueueProducerPtr& rsp) {
return TPushQueueProducerResult{
- .LastSequenceNumber = rsp->last_sequence_number(),
+ .LastSequenceNumber = TQueueProducerSequenceNumber(rsp->last_sequence_number()),
.SkippedRowCount = rsp->skipped_row_count(),
};
}));
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 8b907fec2d..db5138879f 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -89,8 +89,8 @@ public:
TFuture<TPushQueueProducerResult> PushQueueProducer(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- i64 epoch,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options) override;
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index f6db0ff134..b62a26e286 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -21,6 +21,7 @@ namespace NYT::NDriver {
using namespace NApi;
using namespace NConcurrency;
+using namespace NQueueClient;
using namespace NTableClient;
using namespace NTabletClient;
using namespace NYTree;
@@ -282,24 +283,23 @@ void TCreateQueueProducerSessionCommand::Register(TRegistrar registrar)
registrar.Parameter("producer_path", &TThis::ProducerPath);
registrar.Parameter("queue_path", &TThis::QueuePath);
registrar.Parameter("session_id", &TThis::SessionId);
- registrar.Parameter("user_meta", &TThis::UserMeta)
- .Optional();
+ registrar.ParameterWithUniversalAccessor<INodePtr>(
+ "user_meta",
+ [] (TThis* command) -> auto& {
+ return command->Options.UserMeta;
+ })
+ .Optional(/*init*/ false);
}
void TCreateQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
{
auto client = context->GetClient();
- std::optional<NYson::TYsonString> requestUserMeta;
- if (UserMeta) {
- requestUserMeta = NYson::ConvertToYsonString(UserMeta);
- }
-
auto result = WaitFor(client->CreateQueueProducerSession(
ProducerPath,
QueuePath,
SessionId,
- requestUserMeta))
+ Options))
.ValueOrThrow();
ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
@@ -328,7 +328,8 @@ void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
WaitFor(client->RemoveQueueProducerSession(
ProducerPath,
QueuePath,
- SessionId))
+ SessionId,
+ Options))
.ThrowOnError();
ProduceEmptyOutput(context);
@@ -338,7 +339,7 @@ void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
void TPushQueueProducerCommand::Register(TRegistrar registrar)
{
- registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ registrar.ParameterWithUniversalAccessor<std::optional<TQueueProducerSequenceNumber>>(
"sequence_number",
[] (TThis* command) -> auto& {
return command->Options.SequenceNumber;
diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h
index 249ef9bddb..7414f01a8f 100644
--- a/yt/yt/client/driver/queue_commands.h
+++ b/yt/yt/client/driver/queue_commands.h
@@ -136,8 +136,7 @@ public:
private:
NYPath::TRichYPath ProducerPath;
NYPath::TRichYPath QueuePath;
- TString SessionId;
- NYTree::INodePtr UserMeta;
+ NQueueClient::TQueueProducerSessionId SessionId;
void DoExecute(ICommandContextPtr context) override;
};
@@ -155,7 +154,7 @@ public:
private:
NYPath::TRichYPath ProducerPath;
NYPath::TRichYPath QueuePath;
- TString SessionId;
+ NQueueClient::TQueueProducerSessionId SessionId;
void DoExecute(ICommandContextPtr context) override;
};
@@ -178,8 +177,8 @@ public:
private:
NYPath::TRichYPath ProducerPath;
NYPath::TRichYPath QueuePath;
- TString SessionId;
- i64 Epoch;
+ NQueueClient::TQueueProducerSessionId SessionId;
+ NQueueClient::TQueueProducerEpoch Epoch;
void DoExecute(ICommandContextPtr context) override;
};
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index b5f79cee42..0a52560296 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -27,6 +27,7 @@
namespace NYT::NClient::NFederated {
using namespace NApi;
+using namespace NQueueClient;
////////////////////////////////////////////////////////////////////////////////
@@ -93,8 +94,8 @@ public:
TFuture<TPushQueueProducerResult> PushQueueProducer(
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- i64 epoch,
+ const TQueueProducerSessionId& sessionId,
+ TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options) override;
@@ -340,8 +341,8 @@ public:
// IClient unsupported methods.
UNIMPLEMENTED_METHOD(TFuture<void>, RegisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, bool, const TRegisterQueueConsumerOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, UnregisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TUnregisterQueueConsumerOptions&));
- UNIMPLEMENTED_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&));
- UNIMPLEMENTED_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, const TCreateQueueProducerSessionOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, const TRemoveQueueProducerSessionOptions&));
UNIMPLEMENTED_METHOD(const NChaosClient::IReplicationCardCachePtr&, GetReplicationCardCache, ());
UNIMPLEMENTED_METHOD(TFuture<void>, MountTable, (const NYPath::TYPath&, const TMountTableOptions&));
@@ -517,7 +518,7 @@ TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const
TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
TRANSACTION_METHOD_IMPL(void, AdvanceQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceQueueConsumerOptions&));
-TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, i64, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&));
+TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, TQueueProducerEpoch, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index fa64da3a74..4bc18bebd0 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -84,8 +84,8 @@ public:
RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const TRichYPath&, const TRichYPath&, bool, const TRegisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const TRichYPath&, const TRichYPath&, const TUnregisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<TRichYPath>&, const std::optional<TRichYPath>&, const TListQueueConsumerRegistrationsOptions&));
- RETRYABLE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&));
- RETRYABLE_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&));
+ RETRYABLE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const NQueueClient::TQueueProducerSessionId&, const TCreateQueueProducerSessionOptions&));
+ RETRYABLE_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const NQueueClient::TQueueProducerSessionId&, const TRemoveQueueProducerSessionOptions&));
RETRYABLE_METHOD(TFuture<NYson::TYsonString>, ExplainQuery, (const TString&, const TExplainQueryOptions&));
RETRYABLE_METHOD(TFuture<ITableReaderPtr>, CreateTableReader, (const TRichYPath&, const TTableReaderOptions&));
RETRYABLE_METHOD(TFuture<NYson::TYsonString>, GetNode, (const TYPath&, const TGetNodeOptions&));
diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h
index 72ea49cd2b..12f0378140 100644
--- a/yt/yt/client/queue_client/public.h
+++ b/yt/yt/client/queue_client/public.h
@@ -31,4 +31,10 @@ DECLARE_REFCOUNTED_CLASS(TQueueStaticExportDestinationConfig)
////////////////////////////////////////////////////////////////////////////////
+YT_DEFINE_STRONG_TYPEDEF(TQueueProducerSessionId, TString);
+YT_DEFINE_STRONG_TYPEDEF(TQueueProducerEpoch, i64);
+YT_DEFINE_STRONG_TYPEDEF(TQueueProducerSequenceNumber, i64);
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 8c1403d790..234a1c3131 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -105,15 +105,14 @@ public:
MOCK_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- const std::optional<NYson::TYsonString>& userMeta,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TCreateQueueProducerSessionOptions& options),
(override));
MOCK_METHOD(TFuture<void>, RemoveQueueProducerSession, (
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
const TRemoveQueueProducerSessionOptions& options),
(override));
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index bc0191bf63..d158b665e6 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -217,8 +217,8 @@ public:
MOCK_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
- const TString& sessionId,
- i64 epoch,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options), (override));