diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-04-24 21:32:12 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-04-24 21:45:26 +0300 |
commit | 183d260ecc8feaf3159307bc412d3de15cc7108d (patch) | |
tree | be772e6141b6acb809492112ef0b41d1b8de643e | |
parent | 51d2493e940c90d551f876c96cac349eb59496d3 (diff) | |
download | ydb-183d260ecc8feaf3159307bc412d3de15cc7108d.tar.gz |
YT-21454: Better resolving queue path in AdvanceConsumer
bcf4bbcfad0f6d4e3f35d1e59b2395c155a84e9d
-rw-r--r-- | yt/yt/client/queue_client/consumer_client.cpp | 12 | ||||
-rw-r--r-- | yt/yt/client/queue_client/helpers.cpp | 48 | ||||
-rw-r--r-- | yt/yt/client/queue_client/helpers.h | 2 |
3 files changed, 35 insertions, 27 deletions
diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp index 16d9ac366e..1333e1c7b7 100644 --- a/yt/yt/client/queue_client/consumer_client.cpp +++ b/yt/yt/client/queue_client/consumer_client.cpp @@ -537,12 +537,20 @@ private: tabletAndRowIndices.push_back({partitionIndex, offset - 1}); } - auto partitionRowInfos = CollectPartitionRowInfos( + auto partitionRowInfosOrError = WaitFor(CollectPartitionRowInfos( QueueRef_->Path, QueueClusterClient_, std::move(tabletAndRowIndices), params, - Logger); + Logger)); + + if (!partitionRowInfosOrError.IsOK()) { + YT_LOG_DEBUG(partitionRowInfosOrError, "Failed to get partition row infos (Path: %v)", + QueueRef_->Path); + return {}; + } + + auto partitionRowInfos = std::move(partitionRowInfosOrError).Value(); auto partitionIt = partitionRowInfos.find(partitionIndex); if (partitionIt == partitionRowInfos.end()) { diff --git a/yt/yt/client/queue_client/helpers.cpp b/yt/yt/client/queue_client/helpers.cpp index 79847234f8..a924bbef23 100644 --- a/yt/yt/client/queue_client/helpers.cpp +++ b/yt/yt/client/queue_client/helpers.cpp @@ -17,7 +17,7 @@ using namespace NQueueClient; //////////////////////////////////////////////////////////////////////////////// -THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos( +TFuture<THashMap<int, THashMap<i64, TPartitionRowInfo>>> CollectPartitionRowInfos( const TYPath& path, const IClientPtr& client, const std::vector<std::pair<int, i64>>& tabletAndRowIndices, @@ -27,7 +27,7 @@ THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos( const auto& Logger = logger; if (tabletAndRowIndices.empty()) { - return {}; + return MakeFuture<THashMap<int, THashMap<i64, TPartitionRowInfo>>>({}); } TStringBuilder queryBuilder; @@ -70,28 +70,28 @@ THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos( TSelectRowsOptions options; options.ReplicaConsistency = EReplicaConsistency::Sync; - auto selectResult = WaitFor(client->SelectRows(query, options)) - .ValueOrThrow(); - - THashMap<int, THashMap<i64, TPartitionRowInfo>> result; - - for (auto row : selectResult.Rowset->GetRows()) { - YT_VERIFY(static_cast<int>(row.GetCount()) == expectedRowSize); - - auto tabletIndex = FromUnversionedValue<int>(row[0]); - auto rowIndex = FromUnversionedValue<i64>(row[1]); - - result[tabletIndex].emplace(rowIndex, TPartitionRowInfo{ - .CumulativeDataWeight = cumulativeDataWeightColumnId - ? FromUnversionedValue<std::optional<i64>>(row[*cumulativeDataWeightColumnId]) - : std::nullopt, - .Timestamp = timestampColumnId - ? FromUnversionedValue<std::optional<TTimestamp>>(row[*timestampColumnId]) - : std::nullopt, - }); - } - - return result; + return client->SelectRows(query, options) + .Apply(BIND([expectedRowSize, cumulativeDataWeightColumnId, timestampColumnId] (const TSelectRowsResult& selectResult) { + THashMap<int, THashMap<i64, TPartitionRowInfo>> result; + + for (auto row : selectResult.Rowset->GetRows()) { + YT_VERIFY(static_cast<int>(row.GetCount()) == expectedRowSize); + + auto tabletIndex = FromUnversionedValue<int>(row[0]); + auto rowIndex = FromUnversionedValue<i64>(row[1]); + + result[tabletIndex].emplace(rowIndex, TPartitionRowInfo{ + .CumulativeDataWeight = cumulativeDataWeightColumnId + ? FromUnversionedValue<std::optional<i64>>(row[*cumulativeDataWeightColumnId]) + : std::nullopt, + .Timestamp = timestampColumnId + ? FromUnversionedValue<std::optional<TTimestamp>>(row[*timestampColumnId]) + : std::nullopt, + }); + } + + return result; + })); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/queue_client/helpers.h b/yt/yt/client/queue_client/helpers.h index 86b38c69d7..1ab38e32f8 100644 --- a/yt/yt/client/queue_client/helpers.h +++ b/yt/yt/client/queue_client/helpers.h @@ -19,7 +19,7 @@ struct TCollectPartitionRowInfoParams //! Collect info (cumulative data weight and timestamp) from rows with given (tablet_index, row_index) pairs and //! return them as a tablet_index: (row_index: info) map. -THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos( +TFuture<THashMap<int, THashMap<i64, TPartitionRowInfo>>> CollectPartitionRowInfos( const NYPath::TYPath& path, const NApi::IClientPtr& client, const std::vector<std::pair<int, i64>>& tabletAndRowIndices, |