aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorakozhikhov <akozhikhov@yandex-team.com>2023-11-29 21:15:33 +0300
committerakozhikhov <akozhikhov@yandex-team.com>2023-11-29 21:34:30 +0300
commit14eab774d513ed7e1abc3f8bda032d8266208394 (patch)
tree49b1e7de2420f3803f3e30a49d376d3f9d2b3507
parent30f30da73cc5bb3d45bf1726275921ec159e9abb (diff)
downloadydb-14eab774d513ed7e1abc3f8bda032d8266208394.tar.gz
Queue agent cosmetics
-rw-r--r--yt/yt/client/queue_client/common.h4
-rw-r--r--yt/yt/client/queue_client/config.cpp7
-rw-r--r--yt/yt/client/queue_client/config.h1
-rw-r--r--yt/yt/client/queue_client/consumer_client.cpp10
-rw-r--r--yt/yt/client/queue_client/partition_reader.cpp23
5 files changed, 23 insertions, 22 deletions
diff --git a/yt/yt/client/queue_client/common.h b/yt/yt/client/queue_client/common.h
index 36a448ac92..0f008f39dc 100644
--- a/yt/yt/client/queue_client/common.h
+++ b/yt/yt/client/queue_client/common.h
@@ -16,8 +16,8 @@ struct TCrossClusterReference
TString Cluster;
NYPath::TYPath Path;
- bool operator ==(const TCrossClusterReference& other) const;
- bool operator <(const TCrossClusterReference& other) const;
+ bool operator==(const TCrossClusterReference& other) const;
+ bool operator<(const TCrossClusterReference& other) const;
operator NYPath::TRichYPath() const;
diff --git a/yt/yt/client/queue_client/config.cpp b/yt/yt/client/queue_client/config.cpp
index 14b3fdb7fe..60127f3820 100644
--- a/yt/yt/client/queue_client/config.cpp
+++ b/yt/yt/client/queue_client/config.cpp
@@ -37,7 +37,9 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar)
.Default();
registrar.Postprocessor([] (TThis* trimConfig) {
- if (trimConfig->RetainedLifetimeDuration && trimConfig->RetainedLifetimeDuration->GetValue() % TDuration::Seconds(1).GetValue() != 0) {
+ if (trimConfig->RetainedLifetimeDuration &&
+ trimConfig->RetainedLifetimeDuration->GetValue() % TDuration::Seconds(1).GetValue() != 0)
+ {
THROW_ERROR_EXCEPTION("The value of \"retained_lifetime_duration\" must be a multiple of 1000 (1 second)");
}
});
@@ -45,7 +47,8 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar)
bool operator==(const TQueueAutoTrimConfig& lhs, const TQueueAutoTrimConfig& rhs)
{
- return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) == std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration);
+ return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) ==
+ std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/queue_client/config.h b/yt/yt/client/queue_client/config.h
index f3b5d9d176..e39dd7f0dc 100644
--- a/yt/yt/client/queue_client/config.h
+++ b/yt/yt/client/queue_client/config.h
@@ -34,7 +34,6 @@ DEFINE_REFCOUNTED_TYPE(TPartitionReaderConfig)
////////////////////////////////////////////////////////////////////////////////
-
//! Automatic trimming configuration for a single queue.
//!
//! All rows up to the smallest offset among vital consumers are considered trimmable.
diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp
index cace53eb53..2089ba21aa 100644
--- a/yt/yt/client/queue_client/consumer_client.cpp
+++ b/yt/yt/client/queue_client/consumer_client.cpp
@@ -73,10 +73,6 @@ public:
, OffsetColumnId_(NameTable_->GetId(OffsetColumnName_))
, SubConsumerColumnFilter_{PartitionIndexColumnId_, OffsetColumnId_}
, DecrementOffset_(decrementOffset)
- , SubConsumerSchema_(New<TTableSchema>(std::vector<TColumnSchema>{
- TColumnSchema(TString(PartitionIndexColumnName_), EValueType::Uint64, ESortOrder::Ascending),
- TColumnSchema(TString(OffsetColumnName_), EValueType::Uint64),
- }, /*strict*/ true, /*uniqueKeys*/ true))
{
if (RowPrefix_.GetCount() == 0) {
RowPrefixCondition_ = "1 = 1";
@@ -338,8 +334,6 @@ private:
//! should be set to true.
bool DecrementOffset_ = false;
- TTableSchemaPtr SubConsumerSchema_;
-
std::vector<TPartitionInfo> DoCollectPartitions(
const IClientPtr& client,
const TString& selectQuery,
@@ -370,7 +364,6 @@ private:
std::vector<ui64> partitionIndices;
for (auto row : selectRowsResult.Rowset->GetRows()) {
-
YT_VERIFY(row.GetCount() == 2);
const auto& partitionIndexValue = row[*partitionIndexRowsetColumnId];
@@ -514,7 +507,8 @@ public:
}
private:
- TYPath Path_;
+ const TYPath Path_;
+
static const TNameTablePtr NameTable_;
static const int QueueClusterColumnId_;
static const int QueuePathColumnId_;
diff --git a/yt/yt/client/queue_client/partition_reader.cpp b/yt/yt/client/queue_client/partition_reader.cpp
index f9be907e40..fa2934469d 100644
--- a/yt/yt/client/queue_client/partition_reader.cpp
+++ b/yt/yt/client/queue_client/partition_reader.cpp
@@ -59,8 +59,9 @@ public:
private:
const TPartitionReaderConfigPtr Config_;
const IClientPtr Client_;
- TYPath ConsumerPath_;
- int PartitionIndex_;
+ const TYPath ConsumerPath_;
+ const int PartitionIndex_;
+
NLogging::TLogger Logger;
bool Opened_ = false;
@@ -127,9 +128,9 @@ private:
}
private:
- IQueueRowsetPtr Rowset_;
- TWeakPtr<TPartitionReader> PartitionReader_;
- i64 CurrentOffset_;
+ const IQueueRowsetPtr Rowset_;
+ const TWeakPtr<TPartitionReader> PartitionReader_;
+ const i64 CurrentOffset_;
};
IPersistentQueueRowsetPtr DoRead()
@@ -299,6 +300,7 @@ private:
const int PartitionIndex_;
const TQueueRowBatchReadOptions RowBatchReadOptions_;
const NLogging::TLogger Logger;
+
TPullConsumerOptions PullConsumerOptions_;
ISubConsumerClientPtr ConsumerClient_;
@@ -308,7 +310,10 @@ private:
: public IPersistentQueueRowset
{
public:
- TPersistentQueueRowset(IQueueRowsetPtr rowset, TWeakPtr<TMultiQueueConsumerPartitionReader> partitionReader, i64 currentOffset)
+ TPersistentQueueRowset(
+ IQueueRowsetPtr rowset,
+ TWeakPtr<TMultiQueueConsumerPartitionReader> partitionReader,
+ i64 currentOffset)
: Rowset_(std::move(rowset))
, PartitionReader_(std::move(partitionReader))
, CurrentOffset_(currentOffset)
@@ -361,9 +366,9 @@ private:
}
private:
- IQueueRowsetPtr Rowset_;
- TWeakPtr<TMultiQueueConsumerPartitionReader> PartitionReader_;
- i64 CurrentOffset_;
+ const IQueueRowsetPtr Rowset_;
+ const TWeakPtr<TMultiQueueConsumerPartitionReader> PartitionReader_;
+ const i64 CurrentOffset_;
};
IPersistentQueueRowsetPtr DoRead()