summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapachee <[email protected]>2024-07-02 16:53:30 +0300
committerapachee <[email protected]>2024-07-02 17:05:47 +0300
commit04a099c3da3ea1315d4897079ed4ee3aad614eb8 (patch)
tree590298d8d1be28488529e3789d18e66ef50d081d
parentf2f13f03dddc1856da3cd2955a597a0203bf8d28 (diff)
YT-22096: Remove BigRT consumers & related entities from YT
e9f848fd6dec24dbe50c1874d60b970a980a475f
-rw-r--r--yt/yt/client/api/delegating_transaction.cpp7
-rw-r--r--yt/yt/client/api/delegating_transaction.h5
-rw-r--r--yt/yt/client/api/queue_transaction.h7
-rw-r--r--yt/yt/client/api/queue_transaction_mixin.cpp12
-rw-r--r--yt/yt/client/api/queue_transaction_mixin.h7
-rw-r--r--yt/yt/client/queue_client/consumer_client.cpp55
-rw-r--r--yt/yt/client/queue_client/consumer_client.h18
-rw-r--r--yt/yt/client/queue_client/partition_reader.cpp233
-rw-r--r--yt/yt/client/queue_client/partition_reader.h6
9 files changed, 0 insertions, 350 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp
index 33bd596d18e..2b1cb55d323 100644
--- a/yt/yt/client/api/delegating_transaction.cpp
+++ b/yt/yt/client/api/delegating_transaction.cpp
@@ -276,13 +276,6 @@ DELEGATE_METHOD(void, ModifyRows, (
(path, nameTable, modifications, options))
DELEGATE_METHOD(void, AdvanceConsumer, (
- const NYPath::TYPath& path,
- int partitionIndex,
- std::optional<i64> oldOffset,
- i64 newOffset),
- (path, partitionIndex, oldOffset, newOffset))
-
-DELEGATE_METHOD(void, AdvanceConsumer, (
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h
index 39017b5853c..37aeb46f4a7 100644
--- a/yt/yt/client/api/delegating_transaction.h
+++ b/yt/yt/client/api/delegating_transaction.h
@@ -225,11 +225,6 @@ public:
// Queues
void AdvanceConsumer(
- const NYPath::TYPath& path,
- int partitionIndex,
- std::optional<i64> oldOffset,
- i64 newOffset) override;
- void AdvanceConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h
index 646174f8f13..4c474456195 100644
--- a/yt/yt/client/api/queue_transaction.h
+++ b/yt/yt/client/api/queue_transaction.h
@@ -54,13 +54,6 @@ struct IQueueTransaction
// TODO(nadya73): Remove it: YT-20712
virtual void AdvanceConsumer(
- const NYPath::TYPath& path,
- int partitionIndex,
- std::optional<i64> oldOffset,
- i64 newOffset) = 0;
-
- // TODO(nadya73): Remove it: YT-20712
- virtual void AdvanceConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
diff --git a/yt/yt/client/api/queue_transaction_mixin.cpp b/yt/yt/client/api/queue_transaction_mixin.cpp
index cac63774ae4..cbd93b5b74c 100644
--- a/yt/yt/client/api/queue_transaction_mixin.cpp
+++ b/yt/yt/client/api/queue_transaction_mixin.cpp
@@ -12,18 +12,6 @@ using namespace NConcurrency;
/////////////////////////////////////////////////////////////////////////////
void TQueueTransactionMixin::AdvanceConsumer(
- const NYPath::TYPath& path,
- int partitionIndex,
- std::optional<i64> oldOffset,
- i64 newOffset)
-{
- THROW_ERROR_EXCEPTION_IF(newOffset < 0, "Queue consumer offset %v cannot be negative", newOffset);
-
- auto consumerClient = CreateBigRTConsumerClient(GetClient(), path);
- consumerClient->Advance(this, partitionIndex, oldOffset, newOffset);
-}
-
-void TQueueTransactionMixin::AdvanceConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
diff --git a/yt/yt/client/api/queue_transaction_mixin.h b/yt/yt/client/api/queue_transaction_mixin.h
index 36a5ed80149..60cc056fca0 100644
--- a/yt/yt/client/api/queue_transaction_mixin.h
+++ b/yt/yt/client/api/queue_transaction_mixin.h
@@ -14,13 +14,6 @@ class TQueueTransactionMixin
public:
// TODO(nadya73): Remove it: YT-20712
void AdvanceConsumer(
- const NYPath::TYPath& path,
- int partitionIndex,
- std::optional<i64> oldOffset,
- i64 newOffset) override;
-
- // TODO(nadya73): Remove it: YT-20712
- void AdvanceConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp
index 656972c76b6..1de69d46e23 100644
--- a/yt/yt/client/queue_client/consumer_client.cpp
+++ b/yt/yt/client/queue_client/consumer_client.cpp
@@ -60,11 +60,6 @@ static const TTableSchemaPtr YTConsumerTableSchema = New<TTableSchema>(std::vect
TColumnSchema("meta", EValueType::Any).SetRequired(false),
}, /*strict*/ true, /*uniqueKeys*/ true);
-static const TTableSchemaPtr BigRTConsumerTableSchema = New<TTableSchema>(std::vector<TColumnSchema>{
- TColumnSchema("ShardId", EValueType::Uint64, ESortOrder::Ascending),
- TColumnSchema("Offset", EValueType::Uint64),
-}, /*strict*/ true, /*uniqueKeys*/ true);
-
////////////////////////////////////////////////////////////////////////////////
void TConsumerMeta::Register(TRegistrar registrar)
@@ -340,14 +335,6 @@ public:
}));
}
- TFuture<TCrossClusterReference> FetchTargetQueue() const override
- {
- return ConsumerClusterClient_->GetNode(ConsumerPath_ + "/@target_queue")
- .Apply(BIND([] (const TYsonString& ysonString) {
- return TCrossClusterReference::FromString(ConvertTo<TString>(ysonString));
- }));
- }
-
TFuture<TPartitionStatistics> FetchPartitionStatistics(
const TYPath& queuePath,
int partitionIndex) const override
@@ -590,48 +577,6 @@ private:
////////////////////////////////////////////////////////////////////////////////
-ISubConsumerClientPtr CreateBigRTConsumerClient(
- const IClientPtr& client,
- const TYPath& path,
- const TTableSchema& schema)
-{
- if (!schema.IsUniqueKeys()) {
- THROW_ERROR_EXCEPTION("Consumer schema must have unique keys, schema does not")
- << TErrorAttribute("actual_schema", schema);
- }
-
- if (schema == *BigRTConsumerTableSchema) {
- return New<TGenericConsumerClient>(
- client,
- client,
- path,
- /*queuePath*/ std::nullopt,
- TUnversionedOwningRow(),
- "ShardId",
- "Offset",
- /*decrementOffset*/ true,
- BigRTConsumerTableSchema,
- /*queueTableSchema*/ nullptr);
- } else {
- THROW_ERROR_EXCEPTION("Table schema is not recognized as a valid BigRT consumer schema")
- << TErrorAttribute("expected_schema", *BigRTConsumerTableSchema)
- << TErrorAttribute("actual_schema", schema);
- }
-}
-
-ISubConsumerClientPtr CreateBigRTConsumerClient(
- const IClientPtr& client,
- const TYPath& path)
-{
- auto tableInfo = WaitFor(client->GetTableMountCache()->GetTableInfo(path))
- .ValueOrThrow();
- auto schema = tableInfo->Schemas[ETableSchemaKind::Primary];
-
- return CreateBigRTConsumerClient(client, path, *schema);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
class TYTConsumerClient
: public IConsumerClient
{
diff --git a/yt/yt/client/queue_client/consumer_client.h b/yt/yt/client/queue_client/consumer_client.h
index e51f905a9e7..4ec901ec159 100644
--- a/yt/yt/client/queue_client/consumer_client.h
+++ b/yt/yt/client/queue_client/consumer_client.h
@@ -66,9 +66,6 @@ struct ISubConsumerClient
const std::vector<int>& partitionIndexes,
bool withLastConsumeTime = false) const = 0;
- //! Fetch and parse the target_queue attribute of this consumer.
- virtual TFuture<TCrossClusterReference> FetchTargetQueue() const = 0;
-
struct TPartitionStatistics
{
i64 FlushedDataWeight = 0;
@@ -95,21 +92,6 @@ DEFINE_REFCOUNTED_TYPE(IConsumerClient)
////////////////////////////////////////////////////////////////////////////////
-// TODO(max42): get rid of the following two methods.
-// They are left as a temporary mean to keep API tests working.
-
-//! Creates a BigRT single-queue consumer client.
-ISubConsumerClientPtr CreateBigRTConsumerClient(
- const NApi::IClientPtr& client,
- const NYPath::TYPath& path,
- const NTableClient::TTableSchema& schema);
-
-//! Uses the table mount cache to fetch the consumer's schema and make
-//! sure the consumer actually has BigRT consumer schema.
-ISubConsumerClientPtr CreateBigRTConsumerClient(
- const NApi::IClientPtr& client,
- const NYPath::TYPath& path);
-
//! Creates a native YT multi-queue consumer client.
IConsumerClientPtr CreateConsumerClient(
const NApi::IClientPtr& consumerClusterClient,
diff --git a/yt/yt/client/queue_client/partition_reader.cpp b/yt/yt/client/queue_client/partition_reader.cpp
index a7c4b3a5635..c46d60b42d6 100644
--- a/yt/yt/client/queue_client/partition_reader.cpp
+++ b/yt/yt/client/queue_client/partition_reader.cpp
@@ -22,239 +22,6 @@ using namespace NYTree;
////////////////////////////////////////////////////////////////////////////////
-class TPartitionReader
- : public IPartitionReader
-{
-public:
- TPartitionReader(
- TPartitionReaderConfigPtr config,
- IClientPtr client,
- TYPath consumerPath,
- int partitionIndex)
- : Config_(std::move(config))
- , Client_(std::move(client))
- , ConsumerPath_(std::move(consumerPath))
- , PartitionIndex_(partitionIndex)
- , Logger(QueueClientLogger().WithTag("Consumer: %v, Partition: %v", ConsumerPath_, PartitionIndex_))
- , RowBatchReadOptions_({Config_->MaxRowCount, Config_->MaxDataWeight, Config_->DataWeightPerRowHint})
- , PullQueueOptions_({.UseNativeTabletNodeApi = Config_->UseNativeTabletNodeApi})
- { }
-
- TFuture<void> Open() override
- {
- return BIND(&TPartitionReader::DoOpen, MakeStrong(this))
- .AsyncVia(GetCurrentInvoker())
- .Run();
- }
-
- TFuture<IPersistentQueueRowsetPtr> Read() override
- {
- YT_VERIFY(Opened_);
-
- return BIND(&TPartitionReader::DoRead, MakeStrong(this))
- .AsyncVia(GetCurrentInvoker())
- .Run();
- }
-
-private:
- const TPartitionReaderConfigPtr Config_;
- const IClientPtr Client_;
- const TYPath ConsumerPath_;
- const int PartitionIndex_;
-
- NLogging::TLogger Logger;
-
- bool Opened_ = false;
-
- TYPath QueuePath_;
- i64 ApproximateDataWeightPerRow_ = 0;
- ISubConsumerClientPtr ConsumerClient_;
- TQueueRowBatchReadOptions RowBatchReadOptions_;
- TPullQueueOptions PullQueueOptions_;
-
- class TPersistentQueueRowset
- : public IPersistentQueueRowset
- {
- public:
- TPersistentQueueRowset(IQueueRowsetPtr rowset, TWeakPtr<TPartitionReader> partitionReader, i64 currentOffset)
- : Rowset_(std::move(rowset))
- , PartitionReader_(std::move(partitionReader))
- , CurrentOffset_(currentOffset)
- { }
-
- const NTableClient::TTableSchemaPtr& GetSchema() const override
- {
- return Rowset_->GetSchema();
- }
-
- const NTableClient::TNameTablePtr& GetNameTable() const override
- {
- return Rowset_->GetNameTable();
- }
-
- TSharedRange<NTableClient::TUnversionedRow> GetRows() const override
- {
- return Rowset_->GetRows();
- }
-
- i64 GetStartOffset() const override
- {
- return Rowset_->GetStartOffset();
- }
-
- i64 GetFinishOffset() const override
- {
- return Rowset_->GetFinishOffset();
- }
-
- void Commit(const NApi::ITransactionPtr& transaction) override
- {
- YT_VERIFY(transaction);
-
- if (auto partitionReader = PartitionReader_.Lock()) {
- // TODO(achulkov2): Check that this is the first uncommitted batch returned to the user and crash otherwise.
- // Will be much easier to do once we figure out how prefetching & batch storage will look like.
-
- // TODO(achulkov2): Mark this batch as committed in the partition reader.
-
- transaction->AdvanceConsumer(
- partitionReader->ConsumerPath_,
- partitionReader->PartitionIndex_,
- CurrentOffset_,
- GetFinishOffset());
- } else {
- THROW_ERROR_EXCEPTION("Partition reader destroyed");
- }
- }
-
- private:
- const IQueueRowsetPtr Rowset_;
- const TWeakPtr<TPartitionReader> PartitionReader_;
- const i64 CurrentOffset_;
- };
-
- IPersistentQueueRowsetPtr DoRead()
- {
- YT_LOG_DEBUG("Reading rowset");
- TWallTimer timer;
-
- if (!Config_->DataWeightPerRowHint && ApproximateDataWeightPerRow_) {
- RowBatchReadOptions_.DataWeightPerRowHint = ApproximateDataWeightPerRow_;
- }
-
- auto currentOffset = FetchCurrentOffset();
-
- // TODO(achulkov2): Log the options in the RPC method (via SetRequestInfo) instead?
- YT_LOG_DEBUG(
- "Pulling from queue (Offset: %v, MaxRowCount: %v, MaxDataWeight: %v, DataWeightPerRowHint: %v)",
- currentOffset,
- RowBatchReadOptions_.MaxRowCount,
- RowBatchReadOptions_.MaxDataWeight,
- RowBatchReadOptions_.DataWeightPerRowHint);
- auto rowset = WaitFor(Client_->PullQueue(
- QueuePath_,
- currentOffset,
- PartitionIndex_,
- RowBatchReadOptions_,
- PullQueueOptions_))
- .ValueOrThrow();
-
- HandleRowset(rowset);
-
- YT_LOG_DEBUG("Rowset read (WallTime: %v)", timer.GetElapsedTime());
-
- return New<TPersistentQueueRowset>(rowset, MakeWeak(this), currentOffset);
-
- }
-
- void HandleRowset(const IQueueRowsetPtr& rowset)
- {
- // TODO(achulkov2): When prefetching is implemented we'll have some sort of struct for holding the batch + stats.
- // TODO(achulkov2): Don't burn CPU here and get the data weight from PullQueue somehow.
- auto dataWeight = static_cast<i64>(GetDataWeight(rowset->GetRows()));
- i64 rowCount = std::ssize(rowset->GetRows());
-
- RecomputeApproximateDataWeightPerRow(dataWeight, rowCount);
-
- YT_LOG_DEBUG(
- "Rowset obtained (RowCount: %v, DataWeight: %v, StartOffset: %v, FinishOffset: %v)",
- rowCount,
- dataWeight,
- rowset->GetStartOffset(),
- rowset->GetFinishOffset());
- }
-
- void RecomputeApproximateDataWeightPerRow(i64 dataWeight, i64 rowCount)
- {
- if (rowCount == 0) {
- return;
- }
-
- auto newDataWeightPerRowHint = dataWeight / rowCount;
-
- if (ApproximateDataWeightPerRow_) {
- // TODO(achulkov2): Anything better? Variable coefficient?
- ApproximateDataWeightPerRow_ = (ApproximateDataWeightPerRow_ + newDataWeightPerRowHint) / 2;
- } else {
- ApproximateDataWeightPerRow_ = newDataWeightPerRowHint;
- }
-
- YT_LOG_DEBUG("Recomputed approximate data weight per row (ApproximateDataWeightPerRow: %v)", ApproximateDataWeightPerRow_);
- }
-
- // NB: Can throw.
- i64 FetchCurrentOffset() const
- {
- TWallTimer timer;
-
- std::vector<int> partitionIndexesToFetch{PartitionIndex_};
- auto partitions = WaitFor(ConsumerClient_->CollectPartitions(partitionIndexesToFetch))
- .ValueOrThrow();
-
- YT_VERIFY(partitions.size() <= 1);
-
- i64 currentOffset = 0;
-
- if (!partitions.empty()) {
- YT_VERIFY(partitions[0].PartitionIndex == PartitionIndex_);
- currentOffset = partitions[0].NextRowIndex;
- }
-
- YT_LOG_DEBUG("Fetched current offset (Offset: %v, WallTime: %v)", currentOffset, timer.GetElapsedTime());
- return currentOffset;
- }
-
- void DoOpen()
- {
- YT_LOG_DEBUG("Opening partition reader");
-
- ConsumerClient_ = CreateBigRTConsumerClient(Client_, ConsumerPath_);
-
- QueuePath_ = WaitFor(ConsumerClient_->FetchTargetQueue())
- .ValueOrThrow().Path;
-
- Logger.AddTag("Queue: %v", QueuePath_);
-
- auto partitionStatistics = WaitFor(ConsumerClient_->FetchPartitionStatistics(QueuePath_, PartitionIndex_))
- .ValueOrThrow();
-
- RecomputeApproximateDataWeightPerRow(partitionStatistics.FlushedDataWeight, partitionStatistics.FlushedRowCount);
-
- Opened_ = true;
-
- YT_LOG_DEBUG("Partition reader opened");
- }
-};
-
-IPartitionReaderPtr CreatePartitionReader(
- TPartitionReaderConfigPtr config,
- IClientPtr client,
- TYPath path,
- int partitionIndex)
-{
- return New<TPartitionReader>(std::move(config), std::move(client), std::move(path), partitionIndex);
-}
-
class TMultiQueueConsumerPartitionReader
: public IPartitionReader
{
diff --git a/yt/yt/client/queue_client/partition_reader.h b/yt/yt/client/queue_client/partition_reader.h
index 019c7189c66..1629a9e063b 100644
--- a/yt/yt/client/queue_client/partition_reader.h
+++ b/yt/yt/client/queue_client/partition_reader.h
@@ -29,12 +29,6 @@ struct IPartitionReader
DEFINE_REFCOUNTED_TYPE(IPartitionReader)
-IPartitionReaderPtr CreatePartitionReader(
- TPartitionReaderConfigPtr config,
- NApi::IClientPtr client,
- NYPath::TYPath path,
- int partitionIndex);
-
IPartitionReaderPtr CreateMultiQueueConsumerPartitionReader(
TPartitionReaderConfigPtr config,
NApi::IClientPtr client,