diff options
author | apachee <apachee@yandex-team.com> | 2024-03-22 18:39:09 +0300 |
---|---|---|
committer | apachee <apachee@yandex-team.com> | 2024-03-22 18:58:09 +0300 |
commit | e20d8d7107fe83f612b832be904fac0257138f30 (patch) | |
tree | cbc700f682d9b7f701fc5bd2b70f0725e273c497 | |
parent | b20b35f12ef6cc066d54b12c4289e161f25b1d12 (diff) | |
download | ydb-e20d8d7107fe83f612b832be904fac0257138f30.tar.gz |
YT-21331: Add create producer
977ebaab24db41b1ac370577b3d487d29d7f1419
-rw-r--r-- | yt/yt/client/object_client/helpers.cpp | 3 | ||||
-rw-r--r-- | yt/yt/client/object_client/public.h | 1 | ||||
-rw-r--r-- | yt/yt/client/queue_client/producer_client.cpp | 32 | ||||
-rw-r--r-- | yt/yt/client/queue_client/producer_client.h | 13 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 |
5 files changed, 49 insertions, 1 deletions
diff --git a/yt/yt/client/object_client/helpers.cpp b/yt/yt/client/object_client/helpers.cpp index c5ce04def9..945ed6b02f 100644 --- a/yt/yt/client/object_client/helpers.cpp +++ b/yt/yt/client/object_client/helpers.cpp @@ -123,7 +123,8 @@ bool IsVersionedType(EObjectType type) type == EObjectType::ClusterProxyNode || type == EObjectType::SequoiaMapNode || type == EObjectType::Pipeline || - type == EObjectType::Consumer; + type == EObjectType::Consumer || + type == EObjectType::Producer; } bool IsUserType(EObjectType type) diff --git a/yt/yt/client/object_client/public.h b/yt/yt/client/object_client/public.h index 06ab3c6c43..a51cdd2bc4 100644 --- a/yt/yt/client/object_client/public.h +++ b/yt/yt/client/object_client/public.h @@ -342,6 +342,7 @@ DEFINE_ENUM(EObjectType, // Queue stuff ((Consumer) (1700)) + ((Producer) (1701)) ); //! A bit mask marking schema types. diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp new file mode 100644 index 0000000000..24b1470c0f --- /dev/null +++ b/yt/yt/client/queue_client/producer_client.cpp @@ -0,0 +1,32 @@ +#include "producer_client.h" + +#include <yt/yt/client/table_client/comparator.h> +#include <yt/yt/client/table_client/helpers.h> +#include <yt/yt/client/table_client/schema.h> + +namespace NYT::NQueueClient { + +using namespace NTableClient; + +//////////////////////////////////////////////////////////////////////////////// + +static const TTableSchemaPtr YTProducerTableSchema = New<TTableSchema>(std::vector<TColumnSchema>{ + TColumnSchema("queue_cluster", EValueType::String, ESortOrder::Ascending).SetRequired(true), + TColumnSchema("queue_path", EValueType::String, ESortOrder::Ascending).SetRequired(true), + TColumnSchema("session_id", EValueType::String, ESortOrder::Ascending).SetRequired(true), + TColumnSchema("sequence_number", EValueType::Uint64, ESortOrder::Ascending).SetRequired(true), + TColumnSchema("epoch", EValueType::Uint64).SetRequired(true), + TColumnSchema("user_meta", EValueType::Any).SetRequired(false), + TColumnSchema("system_meta", EValueType::Any).SetRequired(false), +}, /*strict*/ true, /*uniqueKeys*/ true); + +//////////////////////////////////////////////////////////////////////////////// + +const NTableClient::TTableSchemaPtr& GetProducerSchema() +{ + return YTProducerTableSchema; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NQueueClient diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h new file mode 100644 index 0000000000..e24cdc6b05 --- /dev/null +++ b/yt/yt/client/queue_client/producer_client.h @@ -0,0 +1,13 @@ +#pragma once + +#include <yt/yt/client/table_client/public.h> + +namespace NYT::NQueueClient { + +//////////////////////////////////////////////////////////////////////////////// + +const NTableClient::TTableSchemaPtr& GetProducerSchema(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NQueueClient diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index 2a3b885d36..859c66ab85 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -147,6 +147,7 @@ SRCS( queue_client/consumer_client.cpp queue_client/helpers.cpp queue_client/partition_reader.cpp + queue_client/producer_client.cpp queue_client/queue_rowset.cpp ypath/rich.cpp |