aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-04-24 21:32:12 +0300
committernadya73 <nadya73@yandex-team.com>2024-04-24 21:45:26 +0300
commit183d260ecc8feaf3159307bc412d3de15cc7108d (patch)
treebe772e6141b6acb809492112ef0b41d1b8de643e
parent51d2493e940c90d551f876c96cac349eb59496d3 (diff)
downloadydb-183d260ecc8feaf3159307bc412d3de15cc7108d.tar.gz
YT-21454: Better resolving queue path in AdvanceConsumer
bcf4bbcfad0f6d4e3f35d1e59b2395c155a84e9d
-rw-r--r--yt/yt/client/queue_client/consumer_client.cpp12
-rw-r--r--yt/yt/client/queue_client/helpers.cpp48
-rw-r--r--yt/yt/client/queue_client/helpers.h2
3 files changed, 35 insertions, 27 deletions
diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp
index 16d9ac366e..1333e1c7b7 100644
--- a/yt/yt/client/queue_client/consumer_client.cpp
+++ b/yt/yt/client/queue_client/consumer_client.cpp
@@ -537,12 +537,20 @@ private:
tabletAndRowIndices.push_back({partitionIndex, offset - 1});
}
- auto partitionRowInfos = CollectPartitionRowInfos(
+ auto partitionRowInfosOrError = WaitFor(CollectPartitionRowInfos(
QueueRef_->Path,
QueueClusterClient_,
std::move(tabletAndRowIndices),
params,
- Logger);
+ Logger));
+
+ if (!partitionRowInfosOrError.IsOK()) {
+ YT_LOG_DEBUG(partitionRowInfosOrError, "Failed to get partition row infos (Path: %v)",
+ QueueRef_->Path);
+ return {};
+ }
+
+ auto partitionRowInfos = std::move(partitionRowInfosOrError).Value();
auto partitionIt = partitionRowInfos.find(partitionIndex);
if (partitionIt == partitionRowInfos.end()) {
diff --git a/yt/yt/client/queue_client/helpers.cpp b/yt/yt/client/queue_client/helpers.cpp
index 79847234f8..a924bbef23 100644
--- a/yt/yt/client/queue_client/helpers.cpp
+++ b/yt/yt/client/queue_client/helpers.cpp
@@ -17,7 +17,7 @@ using namespace NQueueClient;
////////////////////////////////////////////////////////////////////////////////
-THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos(
+TFuture<THashMap<int, THashMap<i64, TPartitionRowInfo>>> CollectPartitionRowInfos(
const TYPath& path,
const IClientPtr& client,
const std::vector<std::pair<int, i64>>& tabletAndRowIndices,
@@ -27,7 +27,7 @@ THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos(
const auto& Logger = logger;
if (tabletAndRowIndices.empty()) {
- return {};
+ return MakeFuture<THashMap<int, THashMap<i64, TPartitionRowInfo>>>({});
}
TStringBuilder queryBuilder;
@@ -70,28 +70,28 @@ THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos(
TSelectRowsOptions options;
options.ReplicaConsistency = EReplicaConsistency::Sync;
- auto selectResult = WaitFor(client->SelectRows(query, options))
- .ValueOrThrow();
-
- THashMap<int, THashMap<i64, TPartitionRowInfo>> result;
-
- for (auto row : selectResult.Rowset->GetRows()) {
- YT_VERIFY(static_cast<int>(row.GetCount()) == expectedRowSize);
-
- auto tabletIndex = FromUnversionedValue<int>(row[0]);
- auto rowIndex = FromUnversionedValue<i64>(row[1]);
-
- result[tabletIndex].emplace(rowIndex, TPartitionRowInfo{
- .CumulativeDataWeight = cumulativeDataWeightColumnId
- ? FromUnversionedValue<std::optional<i64>>(row[*cumulativeDataWeightColumnId])
- : std::nullopt,
- .Timestamp = timestampColumnId
- ? FromUnversionedValue<std::optional<TTimestamp>>(row[*timestampColumnId])
- : std::nullopt,
- });
- }
-
- return result;
+ return client->SelectRows(query, options)
+ .Apply(BIND([expectedRowSize, cumulativeDataWeightColumnId, timestampColumnId] (const TSelectRowsResult& selectResult) {
+ THashMap<int, THashMap<i64, TPartitionRowInfo>> result;
+
+ for (auto row : selectResult.Rowset->GetRows()) {
+ YT_VERIFY(static_cast<int>(row.GetCount()) == expectedRowSize);
+
+ auto tabletIndex = FromUnversionedValue<int>(row[0]);
+ auto rowIndex = FromUnversionedValue<i64>(row[1]);
+
+ result[tabletIndex].emplace(rowIndex, TPartitionRowInfo{
+ .CumulativeDataWeight = cumulativeDataWeightColumnId
+ ? FromUnversionedValue<std::optional<i64>>(row[*cumulativeDataWeightColumnId])
+ : std::nullopt,
+ .Timestamp = timestampColumnId
+ ? FromUnversionedValue<std::optional<TTimestamp>>(row[*timestampColumnId])
+ : std::nullopt,
+ });
+ }
+
+ return result;
+ }));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/queue_client/helpers.h b/yt/yt/client/queue_client/helpers.h
index 86b38c69d7..1ab38e32f8 100644
--- a/yt/yt/client/queue_client/helpers.h
+++ b/yt/yt/client/queue_client/helpers.h
@@ -19,7 +19,7 @@ struct TCollectPartitionRowInfoParams
//! Collect info (cumulative data weight and timestamp) from rows with given (tablet_index, row_index) pairs and
//! return them as a tablet_index: (row_index: info) map.
-THashMap<int, THashMap<i64, TPartitionRowInfo>> CollectPartitionRowInfos(
+TFuture<THashMap<int, THashMap<i64, TPartitionRowInfo>>> CollectPartitionRowInfos(
const NYPath::TYPath& path,
const NApi::IClientPtr& client,
const std::vector<std::pair<int, i64>>& tabletAndRowIndices,