diff options
| author | apachee <[email protected]> | 2024-07-02 16:53:30 +0300 |
|---|---|---|
| committer | apachee <[email protected]> | 2024-07-02 17:05:47 +0300 |
| commit | 04a099c3da3ea1315d4897079ed4ee3aad614eb8 (patch) | |
| tree | 590298d8d1be28488529e3789d18e66ef50d081d | |
| parent | f2f13f03dddc1856da3cd2955a597a0203bf8d28 (diff) | |
YT-22096: Remove BigRT consumers & related entities from YT
e9f848fd6dec24dbe50c1874d60b970a980a475f
| -rw-r--r-- | yt/yt/client/api/delegating_transaction.cpp | 7 | ||||
| -rw-r--r-- | yt/yt/client/api/delegating_transaction.h | 5 | ||||
| -rw-r--r-- | yt/yt/client/api/queue_transaction.h | 7 | ||||
| -rw-r--r-- | yt/yt/client/api/queue_transaction_mixin.cpp | 12 | ||||
| -rw-r--r-- | yt/yt/client/api/queue_transaction_mixin.h | 7 | ||||
| -rw-r--r-- | yt/yt/client/queue_client/consumer_client.cpp | 55 | ||||
| -rw-r--r-- | yt/yt/client/queue_client/consumer_client.h | 18 | ||||
| -rw-r--r-- | yt/yt/client/queue_client/partition_reader.cpp | 233 | ||||
| -rw-r--r-- | yt/yt/client/queue_client/partition_reader.h | 6 |
9 files changed, 0 insertions, 350 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp index 33bd596d18e..2b1cb55d323 100644 --- a/yt/yt/client/api/delegating_transaction.cpp +++ b/yt/yt/client/api/delegating_transaction.cpp @@ -276,13 +276,6 @@ DELEGATE_METHOD(void, ModifyRows, ( (path, nameTable, modifications, options)) DELEGATE_METHOD(void, AdvanceConsumer, ( - const NYPath::TYPath& path, - int partitionIndex, - std::optional<i64> oldOffset, - i64 newOffset), - (path, partitionIndex, oldOffset, newOffset)) - -DELEGATE_METHOD(void, AdvanceConsumer, ( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h index 39017b5853c..37aeb46f4a7 100644 --- a/yt/yt/client/api/delegating_transaction.h +++ b/yt/yt/client/api/delegating_transaction.h @@ -225,11 +225,6 @@ public: // Queues void AdvanceConsumer( - const NYPath::TYPath& path, - int partitionIndex, - std::optional<i64> oldOffset, - i64 newOffset) override; - void AdvanceConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h index 646174f8f13..4c474456195 100644 --- a/yt/yt/client/api/queue_transaction.h +++ b/yt/yt/client/api/queue_transaction.h @@ -54,13 +54,6 @@ struct IQueueTransaction // TODO(nadya73): Remove it: YT-20712 virtual void AdvanceConsumer( - const NYPath::TYPath& path, - int partitionIndex, - std::optional<i64> oldOffset, - i64 newOffset) = 0; - - // TODO(nadya73): Remove it: YT-20712 - virtual void AdvanceConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, diff --git a/yt/yt/client/api/queue_transaction_mixin.cpp b/yt/yt/client/api/queue_transaction_mixin.cpp index cac63774ae4..cbd93b5b74c 100644 --- a/yt/yt/client/api/queue_transaction_mixin.cpp +++ b/yt/yt/client/api/queue_transaction_mixin.cpp @@ -12,18 +12,6 @@ using namespace NConcurrency; ///////////////////////////////////////////////////////////////////////////// void TQueueTransactionMixin::AdvanceConsumer( - const NYPath::TYPath& path, - int partitionIndex, - std::optional<i64> oldOffset, - i64 newOffset) -{ - THROW_ERROR_EXCEPTION_IF(newOffset < 0, "Queue consumer offset %v cannot be negative", newOffset); - - auto consumerClient = CreateBigRTConsumerClient(GetClient(), path); - consumerClient->Advance(this, partitionIndex, oldOffset, newOffset); -} - -void TQueueTransactionMixin::AdvanceConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, diff --git a/yt/yt/client/api/queue_transaction_mixin.h b/yt/yt/client/api/queue_transaction_mixin.h index 36a5ed80149..60cc056fca0 100644 --- a/yt/yt/client/api/queue_transaction_mixin.h +++ b/yt/yt/client/api/queue_transaction_mixin.h @@ -14,13 +14,6 @@ class TQueueTransactionMixin public: // TODO(nadya73): Remove it: YT-20712 void AdvanceConsumer( - const NYPath::TYPath& path, - int partitionIndex, - std::optional<i64> oldOffset, - i64 newOffset) override; - - // TODO(nadya73): Remove it: YT-20712 - void AdvanceConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp index 656972c76b6..1de69d46e23 100644 --- a/yt/yt/client/queue_client/consumer_client.cpp +++ b/yt/yt/client/queue_client/consumer_client.cpp @@ -60,11 +60,6 @@ static const TTableSchemaPtr YTConsumerTableSchema = New<TTableSchema>(std::vect TColumnSchema("meta", EValueType::Any).SetRequired(false), }, /*strict*/ true, /*uniqueKeys*/ true); -static const TTableSchemaPtr BigRTConsumerTableSchema = New<TTableSchema>(std::vector<TColumnSchema>{ - TColumnSchema("ShardId", EValueType::Uint64, ESortOrder::Ascending), - TColumnSchema("Offset", EValueType::Uint64), -}, /*strict*/ true, /*uniqueKeys*/ true); - //////////////////////////////////////////////////////////////////////////////// void TConsumerMeta::Register(TRegistrar registrar) @@ -340,14 +335,6 @@ public: })); } - TFuture<TCrossClusterReference> FetchTargetQueue() const override - { - return ConsumerClusterClient_->GetNode(ConsumerPath_ + "/@target_queue") - .Apply(BIND([] (const TYsonString& ysonString) { - return TCrossClusterReference::FromString(ConvertTo<TString>(ysonString)); - })); - } - TFuture<TPartitionStatistics> FetchPartitionStatistics( const TYPath& queuePath, int partitionIndex) const override @@ -590,48 +577,6 @@ private: //////////////////////////////////////////////////////////////////////////////// -ISubConsumerClientPtr CreateBigRTConsumerClient( - const IClientPtr& client, - const TYPath& path, - const TTableSchema& schema) -{ - if (!schema.IsUniqueKeys()) { - THROW_ERROR_EXCEPTION("Consumer schema must have unique keys, schema does not") - << TErrorAttribute("actual_schema", schema); - } - - if (schema == *BigRTConsumerTableSchema) { - return New<TGenericConsumerClient>( - client, - client, - path, - /*queuePath*/ std::nullopt, - TUnversionedOwningRow(), - "ShardId", - "Offset", - /*decrementOffset*/ true, - BigRTConsumerTableSchema, - /*queueTableSchema*/ nullptr); - } else { - THROW_ERROR_EXCEPTION("Table schema is not recognized as a valid BigRT consumer schema") - << TErrorAttribute("expected_schema", *BigRTConsumerTableSchema) - << TErrorAttribute("actual_schema", schema); - } -} - -ISubConsumerClientPtr CreateBigRTConsumerClient( - const IClientPtr& client, - const TYPath& path) -{ - auto tableInfo = WaitFor(client->GetTableMountCache()->GetTableInfo(path)) - .ValueOrThrow(); - auto schema = tableInfo->Schemas[ETableSchemaKind::Primary]; - - return CreateBigRTConsumerClient(client, path, *schema); -} - -//////////////////////////////////////////////////////////////////////////////// - class TYTConsumerClient : public IConsumerClient { diff --git a/yt/yt/client/queue_client/consumer_client.h b/yt/yt/client/queue_client/consumer_client.h index e51f905a9e7..4ec901ec159 100644 --- a/yt/yt/client/queue_client/consumer_client.h +++ b/yt/yt/client/queue_client/consumer_client.h @@ -66,9 +66,6 @@ struct ISubConsumerClient const std::vector<int>& partitionIndexes, bool withLastConsumeTime = false) const = 0; - //! Fetch and parse the target_queue attribute of this consumer. - virtual TFuture<TCrossClusterReference> FetchTargetQueue() const = 0; - struct TPartitionStatistics { i64 FlushedDataWeight = 0; @@ -95,21 +92,6 @@ DEFINE_REFCOUNTED_TYPE(IConsumerClient) //////////////////////////////////////////////////////////////////////////////// -// TODO(max42): get rid of the following two methods. -// They are left as a temporary mean to keep API tests working. - -//! Creates a BigRT single-queue consumer client. -ISubConsumerClientPtr CreateBigRTConsumerClient( - const NApi::IClientPtr& client, - const NYPath::TYPath& path, - const NTableClient::TTableSchema& schema); - -//! Uses the table mount cache to fetch the consumer's schema and make -//! sure the consumer actually has BigRT consumer schema. -ISubConsumerClientPtr CreateBigRTConsumerClient( - const NApi::IClientPtr& client, - const NYPath::TYPath& path); - //! Creates a native YT multi-queue consumer client. IConsumerClientPtr CreateConsumerClient( const NApi::IClientPtr& consumerClusterClient, diff --git a/yt/yt/client/queue_client/partition_reader.cpp b/yt/yt/client/queue_client/partition_reader.cpp index a7c4b3a5635..c46d60b42d6 100644 --- a/yt/yt/client/queue_client/partition_reader.cpp +++ b/yt/yt/client/queue_client/partition_reader.cpp @@ -22,239 +22,6 @@ using namespace NYTree; //////////////////////////////////////////////////////////////////////////////// -class TPartitionReader - : public IPartitionReader -{ -public: - TPartitionReader( - TPartitionReaderConfigPtr config, - IClientPtr client, - TYPath consumerPath, - int partitionIndex) - : Config_(std::move(config)) - , Client_(std::move(client)) - , ConsumerPath_(std::move(consumerPath)) - , PartitionIndex_(partitionIndex) - , Logger(QueueClientLogger().WithTag("Consumer: %v, Partition: %v", ConsumerPath_, PartitionIndex_)) - , RowBatchReadOptions_({Config_->MaxRowCount, Config_->MaxDataWeight, Config_->DataWeightPerRowHint}) - , PullQueueOptions_({.UseNativeTabletNodeApi = Config_->UseNativeTabletNodeApi}) - { } - - TFuture<void> Open() override - { - return BIND(&TPartitionReader::DoOpen, MakeStrong(this)) - .AsyncVia(GetCurrentInvoker()) - .Run(); - } - - TFuture<IPersistentQueueRowsetPtr> Read() override - { - YT_VERIFY(Opened_); - - return BIND(&TPartitionReader::DoRead, MakeStrong(this)) - .AsyncVia(GetCurrentInvoker()) - .Run(); - } - -private: - const TPartitionReaderConfigPtr Config_; - const IClientPtr Client_; - const TYPath ConsumerPath_; - const int PartitionIndex_; - - NLogging::TLogger Logger; - - bool Opened_ = false; - - TYPath QueuePath_; - i64 ApproximateDataWeightPerRow_ = 0; - ISubConsumerClientPtr ConsumerClient_; - TQueueRowBatchReadOptions RowBatchReadOptions_; - TPullQueueOptions PullQueueOptions_; - - class TPersistentQueueRowset - : public IPersistentQueueRowset - { - public: - TPersistentQueueRowset(IQueueRowsetPtr rowset, TWeakPtr<TPartitionReader> partitionReader, i64 currentOffset) - : Rowset_(std::move(rowset)) - , PartitionReader_(std::move(partitionReader)) - , CurrentOffset_(currentOffset) - { } - - const NTableClient::TTableSchemaPtr& GetSchema() const override - { - return Rowset_->GetSchema(); - } - - const NTableClient::TNameTablePtr& GetNameTable() const override - { - return Rowset_->GetNameTable(); - } - - TSharedRange<NTableClient::TUnversionedRow> GetRows() const override - { - return Rowset_->GetRows(); - } - - i64 GetStartOffset() const override - { - return Rowset_->GetStartOffset(); - } - - i64 GetFinishOffset() const override - { - return Rowset_->GetFinishOffset(); - } - - void Commit(const NApi::ITransactionPtr& transaction) override - { - YT_VERIFY(transaction); - - if (auto partitionReader = PartitionReader_.Lock()) { - // TODO(achulkov2): Check that this is the first uncommitted batch returned to the user and crash otherwise. - // Will be much easier to do once we figure out how prefetching & batch storage will look like. - - // TODO(achulkov2): Mark this batch as committed in the partition reader. - - transaction->AdvanceConsumer( - partitionReader->ConsumerPath_, - partitionReader->PartitionIndex_, - CurrentOffset_, - GetFinishOffset()); - } else { - THROW_ERROR_EXCEPTION("Partition reader destroyed"); - } - } - - private: - const IQueueRowsetPtr Rowset_; - const TWeakPtr<TPartitionReader> PartitionReader_; - const i64 CurrentOffset_; - }; - - IPersistentQueueRowsetPtr DoRead() - { - YT_LOG_DEBUG("Reading rowset"); - TWallTimer timer; - - if (!Config_->DataWeightPerRowHint && ApproximateDataWeightPerRow_) { - RowBatchReadOptions_.DataWeightPerRowHint = ApproximateDataWeightPerRow_; - } - - auto currentOffset = FetchCurrentOffset(); - - // TODO(achulkov2): Log the options in the RPC method (via SetRequestInfo) instead? - YT_LOG_DEBUG( - "Pulling from queue (Offset: %v, MaxRowCount: %v, MaxDataWeight: %v, DataWeightPerRowHint: %v)", - currentOffset, - RowBatchReadOptions_.MaxRowCount, - RowBatchReadOptions_.MaxDataWeight, - RowBatchReadOptions_.DataWeightPerRowHint); - auto rowset = WaitFor(Client_->PullQueue( - QueuePath_, - currentOffset, - PartitionIndex_, - RowBatchReadOptions_, - PullQueueOptions_)) - .ValueOrThrow(); - - HandleRowset(rowset); - - YT_LOG_DEBUG("Rowset read (WallTime: %v)", timer.GetElapsedTime()); - - return New<TPersistentQueueRowset>(rowset, MakeWeak(this), currentOffset); - - } - - void HandleRowset(const IQueueRowsetPtr& rowset) - { - // TODO(achulkov2): When prefetching is implemented we'll have some sort of struct for holding the batch + stats. - // TODO(achulkov2): Don't burn CPU here and get the data weight from PullQueue somehow. - auto dataWeight = static_cast<i64>(GetDataWeight(rowset->GetRows())); - i64 rowCount = std::ssize(rowset->GetRows()); - - RecomputeApproximateDataWeightPerRow(dataWeight, rowCount); - - YT_LOG_DEBUG( - "Rowset obtained (RowCount: %v, DataWeight: %v, StartOffset: %v, FinishOffset: %v)", - rowCount, - dataWeight, - rowset->GetStartOffset(), - rowset->GetFinishOffset()); - } - - void RecomputeApproximateDataWeightPerRow(i64 dataWeight, i64 rowCount) - { - if (rowCount == 0) { - return; - } - - auto newDataWeightPerRowHint = dataWeight / rowCount; - - if (ApproximateDataWeightPerRow_) { - // TODO(achulkov2): Anything better? Variable coefficient? - ApproximateDataWeightPerRow_ = (ApproximateDataWeightPerRow_ + newDataWeightPerRowHint) / 2; - } else { - ApproximateDataWeightPerRow_ = newDataWeightPerRowHint; - } - - YT_LOG_DEBUG("Recomputed approximate data weight per row (ApproximateDataWeightPerRow: %v)", ApproximateDataWeightPerRow_); - } - - // NB: Can throw. - i64 FetchCurrentOffset() const - { - TWallTimer timer; - - std::vector<int> partitionIndexesToFetch{PartitionIndex_}; - auto partitions = WaitFor(ConsumerClient_->CollectPartitions(partitionIndexesToFetch)) - .ValueOrThrow(); - - YT_VERIFY(partitions.size() <= 1); - - i64 currentOffset = 0; - - if (!partitions.empty()) { - YT_VERIFY(partitions[0].PartitionIndex == PartitionIndex_); - currentOffset = partitions[0].NextRowIndex; - } - - YT_LOG_DEBUG("Fetched current offset (Offset: %v, WallTime: %v)", currentOffset, timer.GetElapsedTime()); - return currentOffset; - } - - void DoOpen() - { - YT_LOG_DEBUG("Opening partition reader"); - - ConsumerClient_ = CreateBigRTConsumerClient(Client_, ConsumerPath_); - - QueuePath_ = WaitFor(ConsumerClient_->FetchTargetQueue()) - .ValueOrThrow().Path; - - Logger.AddTag("Queue: %v", QueuePath_); - - auto partitionStatistics = WaitFor(ConsumerClient_->FetchPartitionStatistics(QueuePath_, PartitionIndex_)) - .ValueOrThrow(); - - RecomputeApproximateDataWeightPerRow(partitionStatistics.FlushedDataWeight, partitionStatistics.FlushedRowCount); - - Opened_ = true; - - YT_LOG_DEBUG("Partition reader opened"); - } -}; - -IPartitionReaderPtr CreatePartitionReader( - TPartitionReaderConfigPtr config, - IClientPtr client, - TYPath path, - int partitionIndex) -{ - return New<TPartitionReader>(std::move(config), std::move(client), std::move(path), partitionIndex); -} - class TMultiQueueConsumerPartitionReader : public IPartitionReader { diff --git a/yt/yt/client/queue_client/partition_reader.h b/yt/yt/client/queue_client/partition_reader.h index 019c7189c66..1629a9e063b 100644 --- a/yt/yt/client/queue_client/partition_reader.h +++ b/yt/yt/client/queue_client/partition_reader.h @@ -29,12 +29,6 @@ struct IPartitionReader DEFINE_REFCOUNTED_TYPE(IPartitionReader) -IPartitionReaderPtr CreatePartitionReader( - TPartitionReaderConfigPtr config, - NApi::IClientPtr client, - NYPath::TYPath path, - int partitionIndex); - IPartitionReaderPtr CreateMultiQueueConsumerPartitionReader( TPartitionReaderConfigPtr config, NApi::IClientPtr client, |
