diff options
author | akozhikhov <akozhikhov@yandex-team.com> | 2023-11-29 21:15:33 +0300 |
---|---|---|
committer | akozhikhov <akozhikhov@yandex-team.com> | 2023-11-29 21:34:30 +0300 |
commit | 14eab774d513ed7e1abc3f8bda032d8266208394 (patch) | |
tree | 49b1e7de2420f3803f3e30a49d376d3f9d2b3507 | |
parent | 30f30da73cc5bb3d45bf1726275921ec159e9abb (diff) | |
download | ydb-14eab774d513ed7e1abc3f8bda032d8266208394.tar.gz |
Queue agent cosmetics
-rw-r--r-- | yt/yt/client/queue_client/common.h | 4 | ||||
-rw-r--r-- | yt/yt/client/queue_client/config.cpp | 7 | ||||
-rw-r--r-- | yt/yt/client/queue_client/config.h | 1 | ||||
-rw-r--r-- | yt/yt/client/queue_client/consumer_client.cpp | 10 | ||||
-rw-r--r-- | yt/yt/client/queue_client/partition_reader.cpp | 23 |
5 files changed, 23 insertions, 22 deletions
diff --git a/yt/yt/client/queue_client/common.h b/yt/yt/client/queue_client/common.h index 36a448ac92..0f008f39dc 100644 --- a/yt/yt/client/queue_client/common.h +++ b/yt/yt/client/queue_client/common.h @@ -16,8 +16,8 @@ struct TCrossClusterReference TString Cluster; NYPath::TYPath Path; - bool operator ==(const TCrossClusterReference& other) const; - bool operator <(const TCrossClusterReference& other) const; + bool operator==(const TCrossClusterReference& other) const; + bool operator<(const TCrossClusterReference& other) const; operator NYPath::TRichYPath() const; diff --git a/yt/yt/client/queue_client/config.cpp b/yt/yt/client/queue_client/config.cpp index 14b3fdb7fe..60127f3820 100644 --- a/yt/yt/client/queue_client/config.cpp +++ b/yt/yt/client/queue_client/config.cpp @@ -37,7 +37,9 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar) .Default(); registrar.Postprocessor([] (TThis* trimConfig) { - if (trimConfig->RetainedLifetimeDuration && trimConfig->RetainedLifetimeDuration->GetValue() % TDuration::Seconds(1).GetValue() != 0) { + if (trimConfig->RetainedLifetimeDuration && + trimConfig->RetainedLifetimeDuration->GetValue() % TDuration::Seconds(1).GetValue() != 0) + { THROW_ERROR_EXCEPTION("The value of \"retained_lifetime_duration\" must be a multiple of 1000 (1 second)"); } }); @@ -45,7 +47,8 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar) bool operator==(const TQueueAutoTrimConfig& lhs, const TQueueAutoTrimConfig& rhs) { - return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) == std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration); + return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) == + std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/queue_client/config.h b/yt/yt/client/queue_client/config.h index f3b5d9d176..e39dd7f0dc 100644 --- a/yt/yt/client/queue_client/config.h +++ b/yt/yt/client/queue_client/config.h @@ -34,7 +34,6 @@ DEFINE_REFCOUNTED_TYPE(TPartitionReaderConfig) //////////////////////////////////////////////////////////////////////////////// - //! Automatic trimming configuration for a single queue. //! //! All rows up to the smallest offset among vital consumers are considered trimmable. diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp index cace53eb53..2089ba21aa 100644 --- a/yt/yt/client/queue_client/consumer_client.cpp +++ b/yt/yt/client/queue_client/consumer_client.cpp @@ -73,10 +73,6 @@ public: , OffsetColumnId_(NameTable_->GetId(OffsetColumnName_)) , SubConsumerColumnFilter_{PartitionIndexColumnId_, OffsetColumnId_} , DecrementOffset_(decrementOffset) - , SubConsumerSchema_(New<TTableSchema>(std::vector<TColumnSchema>{ - TColumnSchema(TString(PartitionIndexColumnName_), EValueType::Uint64, ESortOrder::Ascending), - TColumnSchema(TString(OffsetColumnName_), EValueType::Uint64), - }, /*strict*/ true, /*uniqueKeys*/ true)) { if (RowPrefix_.GetCount() == 0) { RowPrefixCondition_ = "1 = 1"; @@ -338,8 +334,6 @@ private: //! should be set to true. bool DecrementOffset_ = false; - TTableSchemaPtr SubConsumerSchema_; - std::vector<TPartitionInfo> DoCollectPartitions( const IClientPtr& client, const TString& selectQuery, @@ -370,7 +364,6 @@ private: std::vector<ui64> partitionIndices; for (auto row : selectRowsResult.Rowset->GetRows()) { - YT_VERIFY(row.GetCount() == 2); const auto& partitionIndexValue = row[*partitionIndexRowsetColumnId]; @@ -514,7 +507,8 @@ public: } private: - TYPath Path_; + const TYPath Path_; + static const TNameTablePtr NameTable_; static const int QueueClusterColumnId_; static const int QueuePathColumnId_; diff --git a/yt/yt/client/queue_client/partition_reader.cpp b/yt/yt/client/queue_client/partition_reader.cpp index f9be907e40..fa2934469d 100644 --- a/yt/yt/client/queue_client/partition_reader.cpp +++ b/yt/yt/client/queue_client/partition_reader.cpp @@ -59,8 +59,9 @@ public: private: const TPartitionReaderConfigPtr Config_; const IClientPtr Client_; - TYPath ConsumerPath_; - int PartitionIndex_; + const TYPath ConsumerPath_; + const int PartitionIndex_; + NLogging::TLogger Logger; bool Opened_ = false; @@ -127,9 +128,9 @@ private: } private: - IQueueRowsetPtr Rowset_; - TWeakPtr<TPartitionReader> PartitionReader_; - i64 CurrentOffset_; + const IQueueRowsetPtr Rowset_; + const TWeakPtr<TPartitionReader> PartitionReader_; + const i64 CurrentOffset_; }; IPersistentQueueRowsetPtr DoRead() @@ -299,6 +300,7 @@ private: const int PartitionIndex_; const TQueueRowBatchReadOptions RowBatchReadOptions_; const NLogging::TLogger Logger; + TPullConsumerOptions PullConsumerOptions_; ISubConsumerClientPtr ConsumerClient_; @@ -308,7 +310,10 @@ private: : public IPersistentQueueRowset { public: - TPersistentQueueRowset(IQueueRowsetPtr rowset, TWeakPtr<TMultiQueueConsumerPartitionReader> partitionReader, i64 currentOffset) + TPersistentQueueRowset( + IQueueRowsetPtr rowset, + TWeakPtr<TMultiQueueConsumerPartitionReader> partitionReader, + i64 currentOffset) : Rowset_(std::move(rowset)) , PartitionReader_(std::move(partitionReader)) , CurrentOffset_(currentOffset) @@ -361,9 +366,9 @@ private: } private: - IQueueRowsetPtr Rowset_; - TWeakPtr<TMultiQueueConsumerPartitionReader> PartitionReader_; - i64 CurrentOffset_; + const IQueueRowsetPtr Rowset_; + const TWeakPtr<TMultiQueueConsumerPartitionReader> PartitionReader_; + const i64 CurrentOffset_; }; IPersistentQueueRowsetPtr DoRead() |