diff options
author | achulkov2 <achulkov2@yandex-team.com> | 2023-09-29 18:45:50 +0300 |
---|---|---|
committer | achulkov2 <achulkov2@yandex-team.com> | 2023-09-29 20:07:46 +0300 |
commit | adeed5ebfd6176d95e33751f903a273704cb960d (patch) | |
tree | 6e3118eb047273814832fde289f855ae86d6d74f | |
parent | fc8989e974bac5f69e0bab5458c054d3fa8beb62 (diff) | |
download | ydb-adeed5ebfd6176d95e33751f903a273704cb960d.tar.gz |
YT-19517: Add exports from queues into small static tables
-rw-r--r-- | yt/yt/client/queue_client/config.cpp | 30 | ||||
-rw-r--r-- | yt/yt/client/queue_client/config.h | 36 | ||||
-rw-r--r-- | yt/yt/client/queue_client/public.h | 1 |
3 files changed, 66 insertions, 1 deletions
diff --git a/yt/yt/client/queue_client/config.cpp b/yt/yt/client/queue_client/config.cpp index 0f7354f096..869df92636 100644 --- a/yt/yt/client/queue_client/config.cpp +++ b/yt/yt/client/queue_client/config.cpp @@ -54,7 +54,35 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar) bool operator==(const TQueueAutoTrimConfig& lhs, const TQueueAutoTrimConfig& rhs) { - return lhs.Enable == rhs.Enable && lhs.RetainedRows == rhs.RetainedRows; + return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) == std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TQueueStaticExportConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("export_period", &TThis::ExportPeriod) + .GreaterThan(TDuration::Zero()); + registrar.Parameter("export_directory", &TThis::ExportDirectory); + + registrar.Postprocessor([] (TThis* config) { + if (config->ExportPeriod.GetValue() % TDuration::Seconds(1).GetValue() != 0) { + THROW_ERROR_EXCEPTION("The value of \"export_period\" must be a multiple of 1000 (1 second)"); + } + }); +} + +bool operator==(const TQueueStaticExportConfig& lhs, const TQueueStaticExportConfig& rhs) +{ + return std::tie(lhs.ExportPeriod, lhs.ExportDirectory) == std::tie(rhs.ExportPeriod, rhs.ExportDirectory); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TQueueStaticExportDestinationConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("originating_queue_id", &TThis::OriginatingQueueId) + .Default(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/queue_client/config.h b/yt/yt/client/queue_client/config.h index 53c6a2dce0..bd52433fbc 100644 --- a/yt/yt/client/queue_client/config.h +++ b/yt/yt/client/queue_client/config.h @@ -2,6 +2,10 @@ #include "public.h" +#include <yt/yt/client/object_client/public.h> + +#include <yt/yt/client/ypath/rich.h> + #include <yt/yt/core/ytree/yson_struct.h> namespace NYT::NQueueClient { @@ -60,4 +64,36 @@ bool operator==(const TQueueAutoTrimConfig& lhs, const TQueueAutoTrimConfig& rhs //////////////////////////////////////////////////////////////////////////////// +class TQueueStaticExportConfig + : public NYTree::TYsonStructLite +{ +public: + //! Export will be performed at times that are multiple of this period. + TDuration ExportPeriod; + + //! Path to directory that will contain resulting static tables with exported data. + NYPath::TYPath ExportDirectory; + + REGISTER_YSON_STRUCT_LITE(TQueueStaticExportConfig); + + static void Register(TRegistrar registrar); +}; + +bool operator==(const TQueueStaticExportConfig& lhs, const TQueueStaticExportConfig& rhs); + +//////////////////////////////////////////////////////////////////////////////// + +class TQueueStaticExportDestinationConfig + : public NYTree::TYsonStructLite +{ +public: + NObjectClient::TObjectId OriginatingQueueId; + + REGISTER_YSON_STRUCT_LITE(TQueueStaticExportDestinationConfig); + + static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NQueueClient diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h index ffc1805416..df1761c324 100644 --- a/yt/yt/client/queue_client/public.h +++ b/yt/yt/client/queue_client/public.h @@ -23,6 +23,7 @@ DECLARE_REFCOUNTED_STRUCT(ISubConsumerClient) DECLARE_REFCOUNTED_STRUCT(IPartitionReader) DECLARE_REFCOUNTED_CLASS(TPartitionReaderConfig) +DECLARE_REFCOUNTED_CLASS(TQueueStaticExportDestinationConfig) //////////////////////////////////////////////////////////////////////////////// |