aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorachulkov2 <achulkov2@yandex-team.com>2023-09-29 18:45:50 +0300
committerachulkov2 <achulkov2@yandex-team.com>2023-09-29 20:07:46 +0300
commitadeed5ebfd6176d95e33751f903a273704cb960d (patch)
tree6e3118eb047273814832fde289f855ae86d6d74f
parentfc8989e974bac5f69e0bab5458c054d3fa8beb62 (diff)
downloadydb-adeed5ebfd6176d95e33751f903a273704cb960d.tar.gz
YT-19517: Add exports from queues into small static tables
-rw-r--r--yt/yt/client/queue_client/config.cpp30
-rw-r--r--yt/yt/client/queue_client/config.h36
-rw-r--r--yt/yt/client/queue_client/public.h1
3 files changed, 66 insertions, 1 deletions
diff --git a/yt/yt/client/queue_client/config.cpp b/yt/yt/client/queue_client/config.cpp
index 0f7354f096..869df92636 100644
--- a/yt/yt/client/queue_client/config.cpp
+++ b/yt/yt/client/queue_client/config.cpp
@@ -54,7 +54,35 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar)
bool operator==(const TQueueAutoTrimConfig& lhs, const TQueueAutoTrimConfig& rhs)
{
- return lhs.Enable == rhs.Enable && lhs.RetainedRows == rhs.RetainedRows;
+ return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) == std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TQueueStaticExportConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("export_period", &TThis::ExportPeriod)
+ .GreaterThan(TDuration::Zero());
+ registrar.Parameter("export_directory", &TThis::ExportDirectory);
+
+ registrar.Postprocessor([] (TThis* config) {
+ if (config->ExportPeriod.GetValue() % TDuration::Seconds(1).GetValue() != 0) {
+ THROW_ERROR_EXCEPTION("The value of \"export_period\" must be a multiple of 1000 (1 second)");
+ }
+ });
+}
+
+bool operator==(const TQueueStaticExportConfig& lhs, const TQueueStaticExportConfig& rhs)
+{
+ return std::tie(lhs.ExportPeriod, lhs.ExportDirectory) == std::tie(rhs.ExportPeriod, rhs.ExportDirectory);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TQueueStaticExportDestinationConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("originating_queue_id", &TThis::OriginatingQueueId)
+ .Default();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/queue_client/config.h b/yt/yt/client/queue_client/config.h
index 53c6a2dce0..bd52433fbc 100644
--- a/yt/yt/client/queue_client/config.h
+++ b/yt/yt/client/queue_client/config.h
@@ -2,6 +2,10 @@
#include "public.h"
+#include <yt/yt/client/object_client/public.h>
+
+#include <yt/yt/client/ypath/rich.h>
+
#include <yt/yt/core/ytree/yson_struct.h>
namespace NYT::NQueueClient {
@@ -60,4 +64,36 @@ bool operator==(const TQueueAutoTrimConfig& lhs, const TQueueAutoTrimConfig& rhs
////////////////////////////////////////////////////////////////////////////////
+class TQueueStaticExportConfig
+ : public NYTree::TYsonStructLite
+{
+public:
+ //! Export will be performed at times that are multiple of this period.
+ TDuration ExportPeriod;
+
+ //! Path to directory that will contain resulting static tables with exported data.
+ NYPath::TYPath ExportDirectory;
+
+ REGISTER_YSON_STRUCT_LITE(TQueueStaticExportConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+bool operator==(const TQueueStaticExportConfig& lhs, const TQueueStaticExportConfig& rhs);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TQueueStaticExportDestinationConfig
+ : public NYTree::TYsonStructLite
+{
+public:
+ NObjectClient::TObjectId OriginatingQueueId;
+
+ REGISTER_YSON_STRUCT_LITE(TQueueStaticExportDestinationConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h
index ffc1805416..df1761c324 100644
--- a/yt/yt/client/queue_client/public.h
+++ b/yt/yt/client/queue_client/public.h
@@ -23,6 +23,7 @@ DECLARE_REFCOUNTED_STRUCT(ISubConsumerClient)
DECLARE_REFCOUNTED_STRUCT(IPartitionReader)
DECLARE_REFCOUNTED_CLASS(TPartitionReaderConfig)
+DECLARE_REFCOUNTED_CLASS(TQueueStaticExportDestinationConfig)
////////////////////////////////////////////////////////////////////////////////