diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-06-28 12:24:17 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-06-28 12:34:32 +0300 |
commit | d0dc9572233e6c5a1040c5eb91227066c876b832 (patch) | |
tree | 75ca12bfd9def59378d8a9b7b0397c25117b318b | |
parent | 71a71fcf019dad59ed18261f0f0099ec40465fd1 (diff) | |
download | ydb-d0dc9572233e6c5a1040c5eb91227066c876b832.tar.gz |
[queues] YT-21356: PushQueueProducer cosmetics and fixes
7ddfadbb0cd05828c28d6723314d2d3c1192c282
-rw-r--r-- | yt/yt/client/api/queue_transaction.h | 4 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.cpp | 12 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.cpp | 92 |
3 files changed, 75 insertions, 33 deletions
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h index d818655aa2..646174f8f1 100644 --- a/yt/yt/client/api/queue_transaction.h +++ b/yt/yt/client/api/queue_transaction.h @@ -39,7 +39,7 @@ struct TPushQueueProducerResult * All rows with greater sequence number will be ignored in future calls of PushQueueProducer. */ NQueueClient::TQueueProducerSequenceNumber LastSequenceNumber{-1}; - //! Count of rows which were skipped because of their sequence number. + //! Number of rows which were skipped because of their sequence number. /*! * Skipped rows were written before. */ @@ -80,7 +80,7 @@ struct IQueueTransaction i64 newOffset, const TAdvanceQueueConsumerOptions& options) = 0; - //! Write rows in the queue with checking their sequence number. + //! Writes rows in the queue with checking their sequence number. /*! * If row sequence number is less than sequence number saved in producer table, then this row will not be written. * #sessionId - an identificator of write session, for example, `<host>-<filename>`. diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index b62a26e286..1db333dc5d 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -29,7 +29,9 @@ using namespace NYson; //////////////////////////////////////////////////////////////////////////////// -static NLogging::TLogger WithCommandTag( +namespace { + +NLogging::TLogger WithCommandTag( const NLogging::TLogger& logger, const ICommandContextPtr& context) { @@ -37,6 +39,8 @@ static NLogging::TLogger WithCommandTag( context->Request().CommandName); } +} // namespace + //////////////////////////////////////////////////////////////////////////////// void TRegisterQueueConsumerCommand::Register(TRegistrar registrar) @@ -367,10 +371,12 @@ void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context) auto queueTableInfoFuture = tableMountCache->GetTableInfo(QueuePath.GetPath()); auto producerTableInfoFuture = tableMountCache->GetTableInfo(ProducerPath.GetPath()); - auto queueTableInfo = WaitFor(queueTableInfoFuture).ValueOrThrow("Path %v is not valid queue", QueuePath); + auto queueTableInfo = WaitFor(queueTableInfoFuture) + .ValueOrThrow("Path %v does not point to a valid queue", QueuePath); queueTableInfo->ValidateOrdered(); - auto producerTableInfo = WaitFor(producerTableInfoFuture).ValueOrThrow("Path %v is not valid producer", ProducerPath); + auto producerTableInfo = WaitFor(producerTableInfoFuture) + .ValueOrThrow("Path %v does not point to a valid producer", ProducerPath); producerTableInfo->ValidateSorted(); struct TPushQueueProducerBufferTag diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index b191f8c6b5..f11012bfbc 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -983,8 +983,12 @@ TTableSchemaPtr TTableSchema::ToWriteViaQueueProducer() const } } } - return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_, - ETableSchemaModification::None, DeletedColumns()); + return New<TTableSchema>( + std::move(columns), + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + DeletedColumns()); } TTableSchemaPtr TTableSchema::ToWrite() const @@ -1007,8 +1011,12 @@ TTableSchemaPtr TTableSchema::ToWrite() const } } } - return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_, - ETableSchemaModification::None, DeletedColumns()); + return New<TTableSchema>( + std::move(columns), + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + DeletedColumns()); } TTableSchemaPtr TTableSchema::WithTabletIndex() const @@ -1019,8 +1027,12 @@ TTableSchemaPtr TTableSchema::WithTabletIndex() const auto columns = Columns(); // XXX: Is it ok? $tablet_index is usually a key column. columns.push_back(TColumnSchema(TabletIndexColumnName, ESimpleLogicalValueType::Int64)); - return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_, - ETableSchemaModification::None, DeletedColumns()); + return New<TTableSchema>( + std::move(columns), + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + DeletedColumns()); } } @@ -1032,8 +1044,12 @@ TTableSchemaPtr TTableSchema::ToVersionedWrite() const auto columns = Columns(); columns.insert(columns.begin(), TColumnSchema(TabletIndexColumnName, ESimpleLogicalValueType::Int64) .SetSortOrder(ESortOrder::Ascending)); - return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_, - ETableSchemaModification::None, DeletedColumns()); + return New<TTableSchema>( + std::move(columns), + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + DeletedColumns()); } } @@ -1075,8 +1091,12 @@ TTableSchemaPtr TTableSchema::ToKeys() const } const auto& info = *ColumnInfo_; std::vector<TColumnSchema> columns(info.Columns.begin(), info.Columns.begin() + KeyColumnCount_); - return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_, - ETableSchemaModification::None, info.DeletedColumns); + return New<TTableSchema>( + std::move(columns), + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + info.DeletedColumns); } TTableSchemaPtr TTableSchema::ToUniqueKeys() const @@ -1090,8 +1110,12 @@ TTableSchemaPtr TTableSchema::ToUniqueKeys() const std::vector<TDeletedColumn>()); } const auto& info = *ColumnInfo_; - return New<TTableSchema>(info.Columns, Strict_, /*uniqueKeys*/ true, - ETableSchemaModification::None, info.DeletedColumns); + return New<TTableSchema>( + info.Columns, + Strict_, + /*uniqueKeys*/ true, + ETableSchemaModification::None, + info.DeletedColumns); } TTableSchemaPtr TTableSchema::ToStrippedColumnAttributes() const @@ -1110,8 +1134,12 @@ TTableSchemaPtr TTableSchema::ToStrippedColumnAttributes() const auto& strippedColumn = strippedColumns.emplace_back(column.Name(), column.LogicalType()); strippedColumn.SetStableName(column.StableName()); } - return New<TTableSchema>(std::move(strippedColumns), Strict_, /*uniqueKeys*/ false, - ETableSchemaModification::None, info.DeletedColumns); + return New<TTableSchema>( + std::move(strippedColumns), + Strict_, + /*uniqueKeys*/ false, + ETableSchemaModification::None, + info.DeletedColumns); } TTableSchemaPtr TTableSchema::ToSortedStrippedColumnAttributes() const @@ -1130,8 +1158,12 @@ TTableSchemaPtr TTableSchema::ToSortedStrippedColumnAttributes() const auto& strippedColumn = strippedColumns.emplace_back(column.Name(), column.LogicalType(), column.SortOrder()); strippedColumn.SetStableName(column.StableName()); } - return New<TTableSchema>(std::move(strippedColumns), Strict_, UniqueKeys_, - ETableSchemaModification::None, info.DeletedColumns); + return New<TTableSchema>( + std::move(strippedColumns), + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + info.DeletedColumns); } TTableSchemaPtr TTableSchema::ToCanonical() const @@ -1152,8 +1184,12 @@ TTableSchemaPtr TTableSchema::ToCanonical() const [] (const TColumnSchema& lhs, const TColumnSchema& rhs) { return lhs.Name() < rhs.Name(); }); - return New<TTableSchema>(columns, Strict_, UniqueKeys_, - ETableSchemaModification::None, info.DeletedColumns); + return New<TTableSchema>( + columns, + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + info.DeletedColumns); } TTableSchemaPtr TTableSchema::ToSorted(const TKeyColumns& keyColumns) const @@ -1252,11 +1288,11 @@ TTableSchemaPtr TTableSchema::ToReplicationLog() const columns.push_back(TColumnSchema(TReplicationLogTable::ValueColumnNamePrefix + TabletIndexColumnName, ESimpleLogicalValueType::Int64)); } return New<TTableSchema>( - std::move(columns), - /* strict */ true, - /* uniqueKeys */ false, - ETableSchemaModification::None, - DeletedColumns()); + std::move(columns), + /* strict */ true, + /* uniqueKeys */ false, + ETableSchemaModification::None, + DeletedColumns()); } TTableSchemaPtr TTableSchema::ToUnversionedUpdate(bool sorted) const @@ -1294,11 +1330,11 @@ TTableSchemaPtr TTableSchema::ToUnversionedUpdate(bool sorted) const } return New<TTableSchema>( - std::move(columns), - /*strict*/ true, - /*uniqueKeys*/ sorted, - ETableSchemaModification::None, - info.DeletedColumns); + std::move(columns), + /*strict*/ true, + /*uniqueKeys*/ sorted, + ETableSchemaModification::None, + info.DeletedColumns); } TTableSchemaPtr TTableSchema::ToModifiedSchema(ETableSchemaModification schemaModification) const |