aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapachee <apachee@yandex-team.com>2024-03-22 18:39:09 +0300
committerapachee <apachee@yandex-team.com>2024-03-22 18:58:09 +0300
commite20d8d7107fe83f612b832be904fac0257138f30 (patch)
treecbc700f682d9b7f701fc5bd2b70f0725e273c497
parentb20b35f12ef6cc066d54b12c4289e161f25b1d12 (diff)
downloadydb-e20d8d7107fe83f612b832be904fac0257138f30.tar.gz
YT-21331: Add create producer
977ebaab24db41b1ac370577b3d487d29d7f1419
-rw-r--r--yt/yt/client/object_client/helpers.cpp3
-rw-r--r--yt/yt/client/object_client/public.h1
-rw-r--r--yt/yt/client/queue_client/producer_client.cpp32
-rw-r--r--yt/yt/client/queue_client/producer_client.h13
-rw-r--r--yt/yt/client/ya.make1
5 files changed, 49 insertions, 1 deletions
diff --git a/yt/yt/client/object_client/helpers.cpp b/yt/yt/client/object_client/helpers.cpp
index c5ce04def9..945ed6b02f 100644
--- a/yt/yt/client/object_client/helpers.cpp
+++ b/yt/yt/client/object_client/helpers.cpp
@@ -123,7 +123,8 @@ bool IsVersionedType(EObjectType type)
type == EObjectType::ClusterProxyNode ||
type == EObjectType::SequoiaMapNode ||
type == EObjectType::Pipeline ||
- type == EObjectType::Consumer;
+ type == EObjectType::Consumer ||
+ type == EObjectType::Producer;
}
bool IsUserType(EObjectType type)
diff --git a/yt/yt/client/object_client/public.h b/yt/yt/client/object_client/public.h
index 06ab3c6c43..a51cdd2bc4 100644
--- a/yt/yt/client/object_client/public.h
+++ b/yt/yt/client/object_client/public.h
@@ -342,6 +342,7 @@ DEFINE_ENUM(EObjectType,
// Queue stuff
((Consumer) (1700))
+ ((Producer) (1701))
);
//! A bit mask marking schema types.
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp
new file mode 100644
index 0000000000..24b1470c0f
--- /dev/null
+++ b/yt/yt/client/queue_client/producer_client.cpp
@@ -0,0 +1,32 @@
+#include "producer_client.h"
+
+#include <yt/yt/client/table_client/comparator.h>
+#include <yt/yt/client/table_client/helpers.h>
+#include <yt/yt/client/table_client/schema.h>
+
+namespace NYT::NQueueClient {
+
+using namespace NTableClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+static const TTableSchemaPtr YTProducerTableSchema = New<TTableSchema>(std::vector<TColumnSchema>{
+ TColumnSchema("queue_cluster", EValueType::String, ESortOrder::Ascending).SetRequired(true),
+ TColumnSchema("queue_path", EValueType::String, ESortOrder::Ascending).SetRequired(true),
+ TColumnSchema("session_id", EValueType::String, ESortOrder::Ascending).SetRequired(true),
+ TColumnSchema("sequence_number", EValueType::Uint64, ESortOrder::Ascending).SetRequired(true),
+ TColumnSchema("epoch", EValueType::Uint64).SetRequired(true),
+ TColumnSchema("user_meta", EValueType::Any).SetRequired(false),
+ TColumnSchema("system_meta", EValueType::Any).SetRequired(false),
+}, /*strict*/ true, /*uniqueKeys*/ true);
+
+////////////////////////////////////////////////////////////////////////////////
+
+const NTableClient::TTableSchemaPtr& GetProducerSchema()
+{
+ return YTProducerTableSchema;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h
new file mode 100644
index 0000000000..e24cdc6b05
--- /dev/null
+++ b/yt/yt/client/queue_client/producer_client.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <yt/yt/client/table_client/public.h>
+
+namespace NYT::NQueueClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+const NTableClient::TTableSchemaPtr& GetProducerSchema();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index 2a3b885d36..859c66ab85 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -147,6 +147,7 @@ SRCS(
queue_client/consumer_client.cpp
queue_client/helpers.cpp
queue_client/partition_reader.cpp
+ queue_client/producer_client.cpp
queue_client/queue_rowset.cpp
ypath/rich.cpp