aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-06-28 12:24:17 +0300
committernadya73 <nadya73@yandex-team.com>2024-06-28 12:34:32 +0300
commitd0dc9572233e6c5a1040c5eb91227066c876b832 (patch)
tree75ca12bfd9def59378d8a9b7b0397c25117b318b
parent71a71fcf019dad59ed18261f0f0099ec40465fd1 (diff)
downloadydb-d0dc9572233e6c5a1040c5eb91227066c876b832.tar.gz
[queues] YT-21356: PushQueueProducer cosmetics and fixes
7ddfadbb0cd05828c28d6723314d2d3c1192c282
-rw-r--r--yt/yt/client/api/queue_transaction.h4
-rw-r--r--yt/yt/client/driver/queue_commands.cpp12
-rw-r--r--yt/yt/client/table_client/schema.cpp92
3 files changed, 75 insertions, 33 deletions
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h
index d818655aa2..646174f8f1 100644
--- a/yt/yt/client/api/queue_transaction.h
+++ b/yt/yt/client/api/queue_transaction.h
@@ -39,7 +39,7 @@ struct TPushQueueProducerResult
* All rows with greater sequence number will be ignored in future calls of PushQueueProducer.
*/
NQueueClient::TQueueProducerSequenceNumber LastSequenceNumber{-1};
- //! Count of rows which were skipped because of their sequence number.
+ //! Number of rows which were skipped because of their sequence number.
/*!
* Skipped rows were written before.
*/
@@ -80,7 +80,7 @@ struct IQueueTransaction
i64 newOffset,
const TAdvanceQueueConsumerOptions& options) = 0;
- //! Write rows in the queue with checking their sequence number.
+ //! Writes rows in the queue with checking their sequence number.
/*!
* If row sequence number is less than sequence number saved in producer table, then this row will not be written.
* #sessionId - an identificator of write session, for example, `<host>-<filename>`.
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index b62a26e286..1db333dc5d 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -29,7 +29,9 @@ using namespace NYson;
////////////////////////////////////////////////////////////////////////////////
-static NLogging::TLogger WithCommandTag(
+namespace {
+
+NLogging::TLogger WithCommandTag(
const NLogging::TLogger& logger,
const ICommandContextPtr& context)
{
@@ -37,6 +39,8 @@ static NLogging::TLogger WithCommandTag(
context->Request().CommandName);
}
+} // namespace
+
////////////////////////////////////////////////////////////////////////////////
void TRegisterQueueConsumerCommand::Register(TRegistrar registrar)
@@ -367,10 +371,12 @@ void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context)
auto queueTableInfoFuture = tableMountCache->GetTableInfo(QueuePath.GetPath());
auto producerTableInfoFuture = tableMountCache->GetTableInfo(ProducerPath.GetPath());
- auto queueTableInfo = WaitFor(queueTableInfoFuture).ValueOrThrow("Path %v is not valid queue", QueuePath);
+ auto queueTableInfo = WaitFor(queueTableInfoFuture)
+ .ValueOrThrow("Path %v does not point to a valid queue", QueuePath);
queueTableInfo->ValidateOrdered();
- auto producerTableInfo = WaitFor(producerTableInfoFuture).ValueOrThrow("Path %v is not valid producer", ProducerPath);
+ auto producerTableInfo = WaitFor(producerTableInfoFuture)
+ .ValueOrThrow("Path %v does not point to a valid producer", ProducerPath);
producerTableInfo->ValidateSorted();
struct TPushQueueProducerBufferTag
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index b191f8c6b5..f11012bfbc 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -983,8 +983,12 @@ TTableSchemaPtr TTableSchema::ToWriteViaQueueProducer() const
}
}
}
- return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_,
- ETableSchemaModification::None, DeletedColumns());
+ return New<TTableSchema>(
+ std::move(columns),
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ DeletedColumns());
}
TTableSchemaPtr TTableSchema::ToWrite() const
@@ -1007,8 +1011,12 @@ TTableSchemaPtr TTableSchema::ToWrite() const
}
}
}
- return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_,
- ETableSchemaModification::None, DeletedColumns());
+ return New<TTableSchema>(
+ std::move(columns),
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ DeletedColumns());
}
TTableSchemaPtr TTableSchema::WithTabletIndex() const
@@ -1019,8 +1027,12 @@ TTableSchemaPtr TTableSchema::WithTabletIndex() const
auto columns = Columns();
// XXX: Is it ok? $tablet_index is usually a key column.
columns.push_back(TColumnSchema(TabletIndexColumnName, ESimpleLogicalValueType::Int64));
- return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_,
- ETableSchemaModification::None, DeletedColumns());
+ return New<TTableSchema>(
+ std::move(columns),
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ DeletedColumns());
}
}
@@ -1032,8 +1044,12 @@ TTableSchemaPtr TTableSchema::ToVersionedWrite() const
auto columns = Columns();
columns.insert(columns.begin(), TColumnSchema(TabletIndexColumnName, ESimpleLogicalValueType::Int64)
.SetSortOrder(ESortOrder::Ascending));
- return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_,
- ETableSchemaModification::None, DeletedColumns());
+ return New<TTableSchema>(
+ std::move(columns),
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ DeletedColumns());
}
}
@@ -1075,8 +1091,12 @@ TTableSchemaPtr TTableSchema::ToKeys() const
}
const auto& info = *ColumnInfo_;
std::vector<TColumnSchema> columns(info.Columns.begin(), info.Columns.begin() + KeyColumnCount_);
- return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_,
- ETableSchemaModification::None, info.DeletedColumns);
+ return New<TTableSchema>(
+ std::move(columns),
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ info.DeletedColumns);
}
TTableSchemaPtr TTableSchema::ToUniqueKeys() const
@@ -1090,8 +1110,12 @@ TTableSchemaPtr TTableSchema::ToUniqueKeys() const
std::vector<TDeletedColumn>());
}
const auto& info = *ColumnInfo_;
- return New<TTableSchema>(info.Columns, Strict_, /*uniqueKeys*/ true,
- ETableSchemaModification::None, info.DeletedColumns);
+ return New<TTableSchema>(
+ info.Columns,
+ Strict_,
+ /*uniqueKeys*/ true,
+ ETableSchemaModification::None,
+ info.DeletedColumns);
}
TTableSchemaPtr TTableSchema::ToStrippedColumnAttributes() const
@@ -1110,8 +1134,12 @@ TTableSchemaPtr TTableSchema::ToStrippedColumnAttributes() const
auto& strippedColumn = strippedColumns.emplace_back(column.Name(), column.LogicalType());
strippedColumn.SetStableName(column.StableName());
}
- return New<TTableSchema>(std::move(strippedColumns), Strict_, /*uniqueKeys*/ false,
- ETableSchemaModification::None, info.DeletedColumns);
+ return New<TTableSchema>(
+ std::move(strippedColumns),
+ Strict_,
+ /*uniqueKeys*/ false,
+ ETableSchemaModification::None,
+ info.DeletedColumns);
}
TTableSchemaPtr TTableSchema::ToSortedStrippedColumnAttributes() const
@@ -1130,8 +1158,12 @@ TTableSchemaPtr TTableSchema::ToSortedStrippedColumnAttributes() const
auto& strippedColumn = strippedColumns.emplace_back(column.Name(), column.LogicalType(), column.SortOrder());
strippedColumn.SetStableName(column.StableName());
}
- return New<TTableSchema>(std::move(strippedColumns), Strict_, UniqueKeys_,
- ETableSchemaModification::None, info.DeletedColumns);
+ return New<TTableSchema>(
+ std::move(strippedColumns),
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ info.DeletedColumns);
}
TTableSchemaPtr TTableSchema::ToCanonical() const
@@ -1152,8 +1184,12 @@ TTableSchemaPtr TTableSchema::ToCanonical() const
[] (const TColumnSchema& lhs, const TColumnSchema& rhs) {
return lhs.Name() < rhs.Name();
});
- return New<TTableSchema>(columns, Strict_, UniqueKeys_,
- ETableSchemaModification::None, info.DeletedColumns);
+ return New<TTableSchema>(
+ columns,
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ info.DeletedColumns);
}
TTableSchemaPtr TTableSchema::ToSorted(const TKeyColumns& keyColumns) const
@@ -1252,11 +1288,11 @@ TTableSchemaPtr TTableSchema::ToReplicationLog() const
columns.push_back(TColumnSchema(TReplicationLogTable::ValueColumnNamePrefix + TabletIndexColumnName, ESimpleLogicalValueType::Int64));
}
return New<TTableSchema>(
- std::move(columns),
- /* strict */ true,
- /* uniqueKeys */ false,
- ETableSchemaModification::None,
- DeletedColumns());
+ std::move(columns),
+ /* strict */ true,
+ /* uniqueKeys */ false,
+ ETableSchemaModification::None,
+ DeletedColumns());
}
TTableSchemaPtr TTableSchema::ToUnversionedUpdate(bool sorted) const
@@ -1294,11 +1330,11 @@ TTableSchemaPtr TTableSchema::ToUnversionedUpdate(bool sorted) const
}
return New<TTableSchema>(
- std::move(columns),
- /*strict*/ true,
- /*uniqueKeys*/ sorted,
- ETableSchemaModification::None,
- info.DeletedColumns);
+ std::move(columns),
+ /*strict*/ true,
+ /*uniqueKeys*/ sorted,
+ ETableSchemaModification::None,
+ info.DeletedColumns);
}
TTableSchemaPtr TTableSchema::ToModifiedSchema(ETableSchemaModification schemaModification) const