aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorapachee <apachee@yandex-team.com>2024-05-24 16:21:41 +0300
committerapachee <apachee@yandex-team.com>2024-05-24 16:31:03 +0300
commite2ba2fb36e917cd56f8dd9006d4b6f081a1939ac (patch)
treebf72832d784543986e62e5e486f5b77df018d220 /yt
parent17b724c0bfc83bc671073210ba805e7e833f29d8 (diff)
downloadydb-e2ba2fb36e917cd56f8dd9006d4b6f081a1939ac.tar.gz
[queues] YT-21355: Implement commands for creating and removing producer sessions
Implements client methods create_queue_producer_session and remove_queue_producer_session a1fc793b8bae6faf8aca0772969d36247e5f9545
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/api/delegating_client.h15
-rw-r--r--yt/yt/client/api/queue_client.h28
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp51
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h13
-rw-r--r--yt/yt/client/driver/driver.cpp2
-rw-r--r--yt/yt/client/driver/queue_commands.cpp59
-rw-r--r--yt/yt/client/driver/queue_commands.h37
-rw-r--r--yt/yt/client/federated/client.cpp3
-rw-r--r--yt/yt/client/hedging/hedging.cpp2
-rw-r--r--yt/yt/client/queue_client/producer_client.cpp32
-rw-r--r--yt/yt/client/queue_client/producer_client.h13
-rw-r--r--yt/yt/client/unittests/mock/client.h15
-rw-r--r--yt/yt/client/ya.make1
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto29
15 files changed, 256 insertions, 46 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index c5044d846b..f490a050e5 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -124,6 +124,21 @@ public:
const TListQueueConsumerRegistrationsOptions& options),
(queuePath, consumerPath, options))
+ DELEGATE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const std::optional<NYson::TYsonString>& userMeta,
+ const TCreateQueueProducerSessionOptions& options),
+ (producerPath, queuePath, sessionId, userMeta, options))
+
+ DELEGATE_METHOD(TFuture<void>, RemoveQueueProducerSession, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const TRemoveQueueProducerSessionOptions& options),
+ (producerPath, queuePath, sessionId, options))
+
// Cypress
DELEGATE_METHOD(TFuture<NYson::TYsonString>, GetNode, (
const NYPath::TYPath& path,
diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h
index b6c6edeeac..f878e5eacf 100644
--- a/yt/yt/client/api/queue_client.h
+++ b/yt/yt/client/api/queue_client.h
@@ -73,6 +73,21 @@ struct TListQueueConsumerRegistrationsResult
std::optional<std::vector<int>> Partitions;
};
+struct TCreateQueueProducerSessionOptions
+ : public TTimeoutOptions
+{ };
+
+struct TCreateQueueProducerSessionResult
+{
+ ui64 SequenceNumber;
+ ui64 Epoch;
+ std::optional<NYson::TYsonString> UserMeta;
+};
+
+struct TRemoveQueueProducerSessionOptions
+ : public TTimeoutOptions
+{ };
+
////////////////////////////////////////////////////////////////////////////////
struct IQueueClientBase
@@ -125,6 +140,19 @@ struct IQueueClient
const std::optional<NYPath::TRichYPath>& queuePath,
const std::optional<NYPath::TRichYPath>& consumerPath,
const TListQueueConsumerRegistrationsOptions& options = {}) = 0;
+
+ virtual TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const std::optional<NYson::TYsonString>& userMeta,
+ const TCreateQueueProducerSessionOptions& options = {}) = 0;
+
+ virtual TFuture<void> RemoveQueueProducerSession(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const TRemoveQueueProducerSessionOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index 219c823f91..5ea29e05e8 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -91,6 +91,8 @@ public:
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RegisterQueueConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, UnregisterQueueConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListQueueConsumerRegistrations);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CreateQueueProducerSession);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RemoveQueueProducerSession);
// Scheduler pools
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, TransferPoolResources);
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 58f5d5db70..d41847cbe0 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -873,6 +873,57 @@ TFuture<std::vector<TListQueueConsumerRegistrationsResult>> TClient::ListQueueCo
}));
}
+TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession(
+ const TRichYPath& producerPath,
+ const TRichYPath& queuePath,
+ const TString& sessionId,
+ const std::optional<TYsonString>& userMeta,
+ const TCreateQueueProducerSessionOptions& options)
+{
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.CreateQueueProducerSession();
+ SetTimeoutOptions(*req, options);
+
+ ToProto(req->mutable_producer_path(), producerPath);
+ ToProto(req->mutable_queue_path(), queuePath);
+ ToProto(req->mutable_session_id(), sessionId);
+ if (userMeta) {
+ ToProto(req->mutable_user_meta(), userMeta->AsStringBuf());
+ }
+
+ return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspCreateQueueProducerSessionPtr& rsp) {
+ std::optional<TYsonString> userMeta;
+ if (rsp->has_user_meta()) {
+ userMeta = TYsonString(FromProto<TString>(rsp->user_meta()));
+ }
+
+ return TCreateQueueProducerSessionResult{
+ .SequenceNumber = FromProto<ui64>(rsp->sequence_number()),
+ .Epoch = FromProto<ui64>(rsp->epoch()),
+ .UserMeta = std::move(userMeta),
+ };
+ }));
+}
+
+TFuture<void> TClient::RemoveQueueProducerSession(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const TRemoveQueueProducerSessionOptions& options)
+{
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.RemoveQueueProducerSession();
+ SetTimeoutOptions(*req, options);
+
+ ToProto(req->mutable_producer_path(), producerPath);
+ ToProto(req->mutable_queue_path(), queuePath);
+ ToProto(req->mutable_session_id(), sessionId);
+
+ return req->Invoke().AsVoid();
+}
+
TFuture<void> TClient::AddMember(
const TString& group,
const TString& member,
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 77054ed4e5..d6bd791e6d 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -161,6 +161,19 @@ public:
const std::optional<NYPath::TRichYPath>& consumerPath,
const TListQueueConsumerRegistrationsOptions& options = {}) override;
+ TFuture<TCreateQueueProducerSessionResult> CreateQueueProducerSession(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const std::optional<NYson::TYsonString>& userMeta,
+ const TCreateQueueProducerSessionOptions& options = {}) override;
+
+ TFuture<void> RemoveQueueProducerSession(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const TRemoveQueueProducerSessionOptions& options = {}) override;
+
// Files.
TFuture<NApi::TGetFileFromCacheResult> GetFileFromCache(
const TString& md5,
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index d3377c0806..2d9dada291 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -351,6 +351,8 @@ public:
REGISTER (TPullQueueCommand, "pull_queue", Null, Tabular, false, true, ApiVersion4);
REGISTER (TPullConsumerCommand, "pull_consumer", Null, Tabular, false, true, ApiVersion4);
REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4);
REGISTER (TStartQueryCommand, "start_query", Null, Structured, true, false, ApiVersion4);
REGISTER (TAbortQueryCommand, "abort_query", Null, Structured, true, false, ApiVersion4);
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index f3504ad8c9..2068cce9d6 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -252,4 +252,63 @@ void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
+void TCreateQueueProducerSessionCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("producer_path", &TThis::ProducerPath);
+ registrar.Parameter("queue_path", &TThis::QueuePath);
+ registrar.Parameter("session_id", &TThis::SessionId);
+ registrar.Parameter("user_meta", &TThis::UserMeta)
+ .Optional();
+}
+
+void TCreateQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
+{
+ auto client = context->GetClient();
+
+ std::optional<NYson::TYsonString> requestUserMeta;
+ if (UserMeta) {
+ requestUserMeta = NYson::ConvertToYsonString(UserMeta);
+ }
+
+ auto result = WaitFor(client->CreateQueueProducerSession(
+ ProducerPath,
+ QueuePath,
+ SessionId,
+ requestUserMeta))
+ .ValueOrThrow();
+
+ ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("epoch").Value(result.Epoch)
+ .Item("sequence_number").Value(result.SequenceNumber)
+ .Item("user_meta").Value(result.UserMeta)
+ .EndMap();
+ });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TRemoveQueueProducerSessionCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("producer_path", &TThis::ProducerPath);
+ registrar.Parameter("queue_path", &TThis::QueuePath);
+ registrar.Parameter("session_id", &TThis::SessionId);
+}
+
+void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
+{
+ auto client = context->GetClient();
+
+ WaitFor(client->RemoveQueueProducerSession(
+ ProducerPath,
+ QueuePath,
+ SessionId))
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h
index f2fa8fe5ef..5317985207 100644
--- a/yt/yt/client/driver/queue_commands.h
+++ b/yt/yt/client/driver/queue_commands.h
@@ -125,4 +125,41 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TCreateQueueProducerSessionCommand
+ : public TTypedCommand<NApi::TCreateQueueProducerSessionOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TCreateQueueProducerSessionCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TRichYPath ProducerPath;
+ NYPath::TRichYPath QueuePath;
+ TString SessionId;
+ NYTree::INodePtr UserMeta;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRemoveQueueProducerSessionCommand
+ : public TTypedCommand<NApi::TRemoveQueueProducerSessionOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TRemoveQueueProducerSessionCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TRichYPath ProducerPath;
+ NYPath::TRichYPath QueuePath;
+ TString SessionId;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NDriver
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index a915ee5a98..efbd61c0c5 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -331,6 +331,9 @@ public:
// IClient unsupported methods.
UNIMPLEMENTED_METHOD(TFuture<void>, RegisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, bool, const TRegisterQueueConsumerOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, UnregisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TUnregisterQueueConsumerOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&));
+
UNIMPLEMENTED_METHOD(const NChaosClient::IReplicationCardCachePtr&, GetReplicationCardCache, ());
UNIMPLEMENTED_METHOD(TFuture<void>, MountTable, (const NYPath::TYPath&, const TMountTableOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, UnmountTable, (const NYPath::TYPath&, const TUnmountTableOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index 0ceb3aab9c..d5f6bd946a 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -83,6 +83,8 @@ public:
RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const TRichYPath&, const TRichYPath&, bool, const TRegisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const TRichYPath&, const TRichYPath&, const TUnregisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<TRichYPath>&, const std::optional<TRichYPath>&, const TListQueueConsumerRegistrationsOptions&));
+ RETRYABLE_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const std::optional<NYson::TYsonString>&, const TCreateQueueProducerSessionOptions&));
+ RETRYABLE_METHOD(TFuture<void>, RemoveQueueProducerSession, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, const TRemoveQueueProducerSessionOptions&));
RETRYABLE_METHOD(TFuture<NYson::TYsonString>, ExplainQuery, (const TString&, const TExplainQueryOptions&));
RETRYABLE_METHOD(TFuture<ITableReaderPtr>, CreateTableReader, (const TRichYPath&, const TTableReaderOptions&));
RETRYABLE_METHOD(TFuture<NYson::TYsonString>, GetNode, (const TYPath&, const TGetNodeOptions&));
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp
deleted file mode 100644
index fec3091420..0000000000
--- a/yt/yt/client/queue_client/producer_client.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-#include "producer_client.h"
-
-#include <yt/yt/client/table_client/comparator.h>
-#include <yt/yt/client/table_client/helpers.h>
-#include <yt/yt/client/table_client/schema.h>
-
-namespace NYT::NQueueClient {
-
-using namespace NTableClient;
-
-////////////////////////////////////////////////////////////////////////////////
-
-static const TTableSchemaPtr YTProducerTableSchema = New<TTableSchema>(std::vector<TColumnSchema>{
- TColumnSchema("queue_cluster", EValueType::String, ESortOrder::Ascending).SetRequired(true),
- TColumnSchema("queue_path", EValueType::String, ESortOrder::Ascending).SetRequired(true),
- TColumnSchema("session_id", EValueType::String, ESortOrder::Ascending).SetRequired(true),
- TColumnSchema("sequence_number", EValueType::Uint64).SetRequired(true),
- TColumnSchema("epoch", EValueType::Uint64).SetRequired(true),
- TColumnSchema("user_meta", EValueType::Any).SetRequired(false),
- TColumnSchema("system_meta", EValueType::Any).SetRequired(false),
-}, /*strict*/ true, /*uniqueKeys*/ true);
-
-////////////////////////////////////////////////////////////////////////////////
-
-const NTableClient::TTableSchemaPtr& GetProducerSchema()
-{
- return YTProducerTableSchema;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h
deleted file mode 100644
index e24cdc6b05..0000000000
--- a/yt/yt/client/queue_client/producer_client.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#pragma once
-
-#include <yt/yt/client/table_client/public.h>
-
-namespace NYT::NQueueClient {
-
-////////////////////////////////////////////////////////////////////////////////
-
-const NTableClient::TTableSchemaPtr& GetProducerSchema();
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index ea199b554e..c283bafe3d 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -102,6 +102,21 @@ public:
const TListQueueConsumerRegistrationsOptions& options),
(override));
+ MOCK_METHOD(TFuture<TCreateQueueProducerSessionResult>, CreateQueueProducerSession, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const std::optional<NYson::TYsonString>& userMeta,
+ const TCreateQueueProducerSessionOptions& options),
+ (override));
+
+ MOCK_METHOD(TFuture<void>, RemoveQueueProducerSession, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ const TRemoveQueueProducerSessionOptions& options),
+ (override));
+
MOCK_METHOD(TFuture<NYson::TYsonString>, ExplainQuery, (
const TString& query,
const TExplainQueryOptions& options),
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index a0d7797583..b06f9fd98c 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -147,7 +147,6 @@ SRCS(
queue_client/consumer_client.cpp
queue_client/helpers.cpp
queue_client/partition_reader.cpp
- queue_client/producer_client.cpp
queue_client/queue_rowset.cpp
ypath/rich.cpp
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 789f83cb99..c9c593f22a 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -760,6 +760,35 @@ message TRspListQueueConsumerRegistrations
////////////////////////////////////////////////////////////////////////////////
+message TReqCreateQueueProducerSession
+{
+ optional string producer_path = 1;
+ optional string queue_path = 2;
+ optional string session_id = 3;
+ optional bytes user_meta = 4; // YSON
+}
+
+message TRspCreateQueueProducerSession
+{
+ optional uint64 sequence_number = 1;
+ optional uint64 epoch = 2;
+ optional bytes user_meta = 3; // YSON
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+message TReqRemoveQueueProducerSession
+{
+ optional string producer_path = 1;
+ optional string queue_path = 2;
+ optional string session_id = 3;
+}
+
+message TRspRemoveQueueProducerSession
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
message TReqExplainQuery
{
required string query = 1;