aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2023-10-05 23:01:25 +0300
committerbabenko <babenko@yandex-team.com>2023-10-05 23:24:21 +0300
commitb4e21550041c2ed632474931a6923570badc1902 (patch)
treefba4c688417a08a6a02d7133a1948ec22b8dba7d
parent3129b20e900736c3db0cf7498e1a36dc754a5b07 (diff)
downloadydb-b4e21550041c2ed632474931a6923570badc1902.tar.gz
YT-18872: Introduce T(Versioned|Unversioned)LookupRowsResult
-rw-r--r--yt/yt/client/api/delegating_client.cpp6
-rw-r--r--yt/yt/client/api/delegating_client.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp29
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp6
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h6
-rw-r--r--yt/yt/client/api/table_client.h15
-rw-r--r--yt/yt/client/driver/table_commands.cpp19
-rw-r--r--yt/yt/client/federated/client.cpp24
-rw-r--r--yt/yt/client/federated/unittests/client_ut.cpp49
-rw-r--r--yt/yt/client/hedging/hedging.cpp6
-rw-r--r--yt/yt/client/queue_client/consumer_client.cpp6
-rw-r--r--yt/yt/client/unittests/mock/client.h6
-rw-r--r--yt/yt/client/unittests/mock/transaction.h6
14 files changed, 111 insertions, 79 deletions
diff --git a/yt/yt/client/api/delegating_client.cpp b/yt/yt/client/api/delegating_client.cpp
index b534b66737c..bc94114454f 100644
--- a/yt/yt/client/api/delegating_client.cpp
+++ b/yt/yt/client/api/delegating_client.cpp
@@ -25,7 +25,7 @@ TFuture<ITransactionPtr> TDelegatingClient::StartTransaction(
return Underlying_->StartTransaction(type, options);
}
-TFuture<IUnversionedRowsetPtr> TDelegatingClient::LookupRows(
+TFuture<TUnversionedLookupRowsResult> TDelegatingClient::LookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
@@ -34,7 +34,7 @@ TFuture<IUnversionedRowsetPtr> TDelegatingClient::LookupRows(
return Underlying_->LookupRows(path, std::move(nameTable), keys, options);
}
-TFuture<IVersionedRowsetPtr> TDelegatingClient::VersionedLookupRows(
+TFuture<TVersionedLookupRowsResult> TDelegatingClient::VersionedLookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
@@ -43,7 +43,7 @@ TFuture<IVersionedRowsetPtr> TDelegatingClient::VersionedLookupRows(
return Underlying_->VersionedLookupRows(path, std::move(nameTable), keys, options);
}
-TFuture<std::vector<IUnversionedRowsetPtr>> TDelegatingClient::MultiLookup(
+TFuture<std::vector<TUnversionedLookupRowsResult>> TDelegatingClient::MultiLookup(
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options)
{
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index 65317f5aa62..3eeaaa702e0 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -25,19 +25,19 @@ public:
const TTransactionStartOptions& options = {}) override;
// Tables
- TFuture<IUnversionedRowsetPtr> LookupRows(
+ TFuture<TUnversionedLookupRowsResult> LookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TLookupRowsOptions& options = {}) override;
- TFuture<IVersionedRowsetPtr> VersionedLookupRows(
+ TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TVersionedLookupRowsOptions& options = {}) override;
- TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup(
+ TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup(
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options = {}) override;
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index 7d4b94d7cac..b6884472a31 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -749,7 +749,7 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter(
////////////////////////////////////////////////////////////////////////////////
-TFuture<IUnversionedRowsetPtr> TClientBase::LookupRows(
+TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows(
const TYPath& path,
TNameTablePtr nameTable,
const TSharedRange<TLegacyKey>& keys,
@@ -784,13 +784,16 @@ TFuture<IUnversionedRowsetPtr> TClientBase::LookupRows(
ToProto(req->mutable_tablet_read_options(), options);
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspLookupRowsPtr& rsp) {
- return DeserializeRowset<TUnversionedRow>(
+ auto rowset = DeserializeRowset<TUnversionedRow>(
rsp->rowset_descriptor(),
MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments()));
+ return TUnversionedLookupRowsResult{
+ .Rowset = std::move(rowset),
+ };
}));
}
-TFuture<IVersionedRowsetPtr> TClientBase::VersionedLookupRows(
+TFuture<TVersionedLookupRowsResult> TClientBase::VersionedLookupRows(
const TYPath& path,
TNameTablePtr nameTable,
const TSharedRange<TLegacyKey>& keys,
@@ -825,13 +828,16 @@ TFuture<IVersionedRowsetPtr> TClientBase::VersionedLookupRows(
}
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspVersionedLookupRowsPtr& rsp) {
- return DeserializeRowset<TVersionedRow>(
+ auto rowset = DeserializeRowset<TVersionedRow>(
rsp->rowset_descriptor(),
MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments()));
+ return TVersionedLookupRowsResult{
+ .Rowset = std::move(rowset),
+ };
}));
}
-TFuture<std::vector<IUnversionedRowsetPtr>> TClientBase::MultiLookup(
+TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookup(
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options)
{
@@ -876,7 +882,7 @@ TFuture<std::vector<IUnversionedRowsetPtr>> TClientBase::MultiLookup(
return req->Invoke().Apply(BIND([subrequestCount = std::ssize(subrequests)] (const TApiServiceProxy::TRspMultiLookupPtr& rsp) {
YT_VERIFY(subrequestCount == rsp->subresponses_size());
- std::vector<IUnversionedRowsetPtr> result;
+ std::vector<TUnversionedLookupRowsResult> result;
result.reserve(subrequestCount);
int beginAttachmentIndex = 0;
@@ -884,12 +890,15 @@ TFuture<std::vector<IUnversionedRowsetPtr>> TClientBase::MultiLookup(
int endAttachmentIndex = beginAttachmentIndex + subresponse.attachment_count();
YT_VERIFY(endAttachmentIndex <= std::ssize(rsp->Attachments()));
- std::vector<TSharedRef> subresponseAttachments{
+ std::vector<TSharedRef> subresponseAttachments(
rsp->Attachments().begin() + beginAttachmentIndex,
- rsp->Attachments().begin() + endAttachmentIndex};
- result.push_back(DeserializeRowset<TUnversionedRow>(
+ rsp->Attachments().begin() + endAttachmentIndex);
+ auto rowset = DeserializeRowset<TUnversionedRow>(
subresponse.rowset_descriptor(),
- MergeRefsToRef<TRpcProxyClientBufferTag>(std::move(subresponseAttachments))));
+ MergeRefsToRef<TRpcProxyClientBufferTag>(std::move(subresponseAttachments)));
+ result.push_back({
+ .Rowset = std::move(rowset),
+ });
beginAttachmentIndex = endAttachmentIndex;
}
diff --git a/yt/yt/client/api/rpc_proxy/client_base.h b/yt/yt/client/api/rpc_proxy/client_base.h
index f8df2a67a8f..2a182eaa860 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.h
+++ b/yt/yt/client/api/rpc_proxy/client_base.h
@@ -51,19 +51,19 @@ public:
const NApi::TTransactionStartOptions& options) override;
// Tables.
- TFuture<NApi::IUnversionedRowsetPtr> LookupRows(
+ TFuture<TUnversionedLookupRowsResult> LookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const NApi::TLookupRowsOptions& options) override;
- TFuture<NApi::IVersionedRowsetPtr> VersionedLookupRows(
+ TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const NApi::TVersionedLookupRowsOptions& options) override;
- TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup(
+ TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup(
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options = {}) override;
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index 20dadc04498..4ab161b34b5 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -492,7 +492,7 @@ TFuture<ITransactionPtr> TTransaction::StartTransaction(
PatchTransactionId(options));
}
-TFuture<IUnversionedRowsetPtr> TTransaction::LookupRows(
+TFuture<TUnversionedLookupRowsResult> TTransaction::LookupRows(
const TYPath& path,
TNameTablePtr nameTable,
const TSharedRange<TLegacyKey>& keys,
@@ -506,7 +506,7 @@ TFuture<IUnversionedRowsetPtr> TTransaction::LookupRows(
PatchTransactionTimestamp(options));
}
-TFuture<IVersionedRowsetPtr> TTransaction::VersionedLookupRows(
+TFuture<TVersionedLookupRowsResult> TTransaction::VersionedLookupRows(
const TYPath& path,
TNameTablePtr nameTable,
const TSharedRange<TLegacyKey>& keys,
@@ -520,7 +520,7 @@ TFuture<IVersionedRowsetPtr> TTransaction::VersionedLookupRows(
PatchTransactionTimestamp(options));
}
-TFuture<std::vector<IUnversionedRowsetPtr>> TTransaction::MultiLookup(
+TFuture<std::vector<TUnversionedLookupRowsResult>> TTransaction::MultiLookup(
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options)
{
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 415b0b4686c..200e9e7539f 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -78,19 +78,19 @@ public:
NTransactionClient::ETransactionType type,
const NApi::TTransactionStartOptions& options) override;
- TFuture<NApi::IUnversionedRowsetPtr> LookupRows(
+ TFuture<TUnversionedLookupRowsResult> LookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const NApi::TLookupRowsOptions& options) override;
- TFuture<NApi::IVersionedRowsetPtr> VersionedLookupRows(
+ TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const NApi::TVersionedLookupRowsOptions& options) override;
- TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup(
+ TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup(
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options) override;
diff --git a/yt/yt/client/api/table_client.h b/yt/yt/client/api/table_client.h
index 7232cdc23f7..2b9ecc320ed 100644
--- a/yt/yt/client/api/table_client.h
+++ b/yt/yt/client/api/table_client.h
@@ -94,6 +94,15 @@ struct TSelectRowsResult
NQueryClient::TQueryStatistics Statistics;
};
+template <class IRowset>
+struct TLookupRowsResult
+{
+ TIntrusivePtr<IRowset> Rowset;
+};
+
+using TUnversionedLookupRowsResult = TLookupRowsResult<IUnversionedRowset>;
+using TVersionedLookupRowsResult = TLookupRowsResult<IVersionedRowset>;
+
struct TTableReaderOptions
: public TTransactionalOptions
, public TSuppressableAccessTrackingOptions
@@ -404,19 +413,19 @@ struct TLocateSkynetShareOptions
struct ITableClientBase
{
- virtual TFuture<IUnversionedRowsetPtr> LookupRows(
+ virtual TFuture<TUnversionedLookupRowsResult> LookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TLookupRowsOptions& options = {}) = 0;
- virtual TFuture<IVersionedRowsetPtr> VersionedLookupRows(
+ virtual TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TVersionedLookupRowsOptions& options = {}) = 0;
- virtual TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup(
+ virtual TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup(
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options = {}) = 0;
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index f49ed631ef2..4aac4324103 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -896,18 +896,27 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context)
versionedOptions.CachedSyncReplicasTimeout = Options.CachedSyncReplicasTimeout;
versionedOptions.RetentionConfig = RetentionConfig;
versionedOptions.ReplicaConsistency = Options.ReplicaConsistency;
- auto asyncRowset = clientBase->VersionedLookupRows(Path.GetPath(), std::move(nameTable), std::move(keyRange), versionedOptions);
+ auto asyncRowset = clientBase->VersionedLookupRows(
+ Path.GetPath(),
+ std::move(nameTable),
+ std::move(keyRange),
+ versionedOptions);
auto rowset = WaitFor(asyncRowset)
- .ValueOrThrow();
+ .ValueOrThrow()
+ .Rowset;
auto writer = CreateVersionedWriterForFormat(format, rowset->GetSchema(), output);
writer->Write(rowset->GetRows());
WaitFor(writer->Close())
.ThrowOnError();
} else {
- auto asyncRowset = clientBase->LookupRows(Path.GetPath(), std::move(nameTable), std::move(keyRange), Options);
+ auto asyncRowset = clientBase->LookupRows(
+ Path.GetPath(),
+ std::move(nameTable),
+ std::move(keyRange),
+ Options);
auto rowset = WaitFor(asyncRowset)
- .ValueOrThrow();
-
+ .ValueOrThrow()
+ .Rowset;
auto writer = CreateSchemafulWriterForFormat(format, rowset->GetSchema(), output);
writer->Write(rowset->GetRows());
WaitFor(writer->Close())
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 0bb95300a71..d0bfe7a58a1 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -61,7 +61,7 @@ public:
NTransactionClient::ETransactionType type,
const TTransactionStartOptions& options = {}) override;
- TFuture<IUnversionedRowsetPtr> LookupRows(
+ TFuture<TUnversionedLookupRowsResult> LookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
@@ -85,12 +85,12 @@ public:
TFuture<void> Abort(const TTransactionAbortOptions& options = TTransactionAbortOptions()) override;
- TFuture<IVersionedRowsetPtr> VersionedLookupRows(
+ TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
const NYPath::TYPath&, NTableClient::TNameTablePtr,
const TSharedRange<NTableClient::TUnversionedRow>&,
const TVersionedLookupRowsOptions&) override;
- TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup(
+ TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup(
const std::vector<TMultiLookupSubrequest>&,
const TMultiLookupOptions&) override;
@@ -231,7 +231,7 @@ class TClient
public:
TClient(const std::vector<IClientPtr>& underlyingClients, TFederationConfigPtr config);
- TFuture<IUnversionedRowsetPtr> LookupRows(
+ TFuture<TUnversionedLookupRowsResult> LookupRows(
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
@@ -239,10 +239,10 @@ public:
TFuture<TSelectRowsResult> SelectRows(
const TString& query,
const TSelectRowsOptions& options = {}) override;
- TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup(
+ TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup(
const std::vector<TMultiLookupSubrequest>&,
const TMultiLookupOptions&) override;
- TFuture<IVersionedRowsetPtr> VersionedLookupRows(
+ TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
const NYPath::TYPath&, NTableClient::TNameTablePtr,
const TSharedRange<NTableClient::TUnversionedRow>&,
const TVersionedLookupRowsOptions&) override;
@@ -469,13 +469,13 @@ TFuture<ResultType> TTransaction::MethodName(Y_METHOD_USED_ARGS_DECLARATION(Args
return future; \
} Y_SEMICOLON_GUARD
-TRANSACTION_METHOD_IMPL(IUnversionedRowsetPtr, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&));
+TRANSACTION_METHOD_IMPL(TUnversionedLookupRowsResult, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&));
TRANSACTION_METHOD_IMPL(TSelectRowsResult, SelectRows, (const TString&, const TSelectRowsOptions&));
TRANSACTION_METHOD_IMPL(void, Ping, (const NApi::TTransactionPingOptions&));
TRANSACTION_METHOD_IMPL(TTransactionCommitResult, Commit, (const TTransactionCommitOptions&));
TRANSACTION_METHOD_IMPL(void, Abort, (const TTransactionAbortOptions&));
-TRANSACTION_METHOD_IMPL(IVersionedRowsetPtr, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
-TRANSACTION_METHOD_IMPL(std::vector<IUnversionedRowsetPtr>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
+TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
+TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
@@ -631,10 +631,10 @@ TFuture<ResultType> TClient::MethodName(Y_METHOD_USED_ARGS_DECLARATION(Args))
return DoCall<ResultType>(Config_->ClusterRetryAttempts, callee); \
} Y_SEMICOLON_GUARD
-CLIENT_METHOD_IMPL(IUnversionedRowsetPtr, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TLegacyKey>&, const TLookupRowsOptions&));
+CLIENT_METHOD_IMPL(TUnversionedLookupRowsResult, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TLegacyKey>&, const TLookupRowsOptions&));
CLIENT_METHOD_IMPL(TSelectRowsResult, SelectRows, (const TString&, const TSelectRowsOptions&));
-CLIENT_METHOD_IMPL(std::vector<IUnversionedRowsetPtr>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
-CLIENT_METHOD_IMPL(IVersionedRowsetPtr, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
+CLIENT_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
+CLIENT_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
CLIENT_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&));
CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
diff --git a/yt/yt/client/federated/unittests/client_ut.cpp b/yt/yt/client/federated/unittests/client_ut.cpp
index 10e14a5b712..4268404dd09 100644
--- a/yt/yt/client/federated/unittests/client_ut.cpp
+++ b/yt/yt/client/federated/unittests/client_ut.cpp
@@ -42,7 +42,9 @@ struct TTestDataStorage
row[1] = rowBuffer->CaptureValue(NTableClient::MakeUnversionedUint64Value(key + 10, TableSchema->GetColumnIndex(ValueColumn)));
rows.push_back(NTableClient::TUnversionedRow{row});
}
- return NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer)));
+ return TUnversionedLookupRowsResult{
+ .Rowset = NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer))),
+ };
} ();
LookupResult2 = [&]() {
@@ -54,7 +56,9 @@ struct TTestDataStorage
row[1] = rowBuffer->CaptureValue(NTableClient::MakeUnversionedUint64Value(key + 10, TableSchema->GetColumnIndex(ValueColumn)));
rows.push_back(NTableClient::TUnversionedRow{row});
}
- return NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer)));
+ return TUnversionedLookupRowsResult{
+ .Rowset = NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer))),
+ };
} ();
NameTable = NYT::New<NTableClient::TNameTable>();
@@ -80,8 +84,8 @@ struct TTestDataStorage
const NTableClient::TColumnSchema ValueColumnSchema = NTableClient::TColumnSchema(ValueColumn, NTableClient::EValueType::Uint64);
NTableClient::TTableSchemaPtr TableSchema = New<NTableClient::TTableSchema>(std::vector{KeyColumnSchema, ValueColumnSchema});
- IUnversionedRowsetPtr LookupResult1;
- IUnversionedRowsetPtr LookupResult2;
+ NApi::TUnversionedLookupRowsResult LookupResult1;
+ NApi::TUnversionedLookupRowsResult LookupResult2;
NTableClient::TNameTablePtr NameTable;
TSharedRange<NTableClient::TUnversionedRow> Keys;
};
@@ -115,7 +119,7 @@ TEST(TFederatedClientTest, Basic)
.WillRepeatedly(Return(VoidFuture));
// Creation of federated client.
- std::vector<IClientPtr> clients = {mockClientSas, mockClientVla};
+ std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
auto config = New<TFederationConfig>();
config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
config->ClusterRetryAttempts = 1;
@@ -127,7 +131,7 @@ TEST(TFederatedClientTest, Basic)
EXPECT_CALL(*mockClientVla, LookupRows(data.Path, _, _, _))
.WillOnce(Return(MakeFuture(data.LookupResult1)))
- .WillOnce(Return(MakeFuture<IUnversionedRowsetPtr>(TError(NRpc::EErrorCode::Unavailable, "Failure"))));
+ .WillOnce(Return(MakeFuture<TUnversionedLookupRowsResult>(TError(NRpc::EErrorCode::Unavailable, "Failure"))));
EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _))
.WillOnce(Return(MakeFuture(data.LookupResult2)));
@@ -138,7 +142,7 @@ TEST(TFederatedClientTest, Basic)
// From `vla`.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow);
@@ -153,7 +157,7 @@ TEST(TFederatedClientTest, Basic)
// From `sas`.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
@@ -183,7 +187,7 @@ TEST(TFederatedClientTest, CheckHealth)
});
NNet::WriteLocalHostName("a-rpc-proxy.vla.yp-c.yandex.net");
- std::vector<IClientPtr> clients = {mockClientSas, mockClientVla};
+ std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
auto config = New<TFederationConfig>();
config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
config->ClusterRetryAttempts = 1;
@@ -212,7 +216,7 @@ TEST(TFederatedClientTest, CheckHealth)
// From `vla`.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow);
@@ -224,7 +228,7 @@ TEST(TFederatedClientTest, CheckHealth)
// From `sas` because `vla` was marked as unhealthy after CheckClustersHealth.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
@@ -238,7 +242,7 @@ TEST(TFederatedClientTest, CheckHealth)
// From `vla` because it became ok again.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow);
@@ -274,7 +278,7 @@ TEST(TFederatedClientTest, Transactions)
.WillRepeatedly(Return(VoidFuture));
// Creation of federated client.
- std::vector<IClientPtr> clients = {mockClientSas, mockClientVla};
+ std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
auto config = New<TFederationConfig>();
config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
config->ClusterRetryAttempts = 1;
@@ -287,7 +291,7 @@ TEST(TFederatedClientTest, Transactions)
EXPECT_CALL(*mockTransactionVla, LookupRows(data.Path, _, _, _))
.WillOnce(Return(MakeFuture(data.LookupResult1)))
- .WillOnce(Return(MakeFuture<IUnversionedRowsetPtr>(TError(NRpc::EErrorCode::Unavailable, "Failure"))));
+ .WillOnce(Return(MakeFuture<TUnversionedLookupRowsResult>(TError(NRpc::EErrorCode::Unavailable, "Failure"))));
// Wait for the first check of clusters healths.
Sleep(TDuration::Seconds(2));
@@ -297,7 +301,7 @@ TEST(TFederatedClientTest, Transactions)
// From `vla`.
{
auto result = transaction->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow);
@@ -321,7 +325,7 @@ TEST(TFederatedClientTest, Transactions)
transaction = federatedClient->StartTransaction(NTransactionClient::ETransactionType::Tablet).Get().Value();
auto result = transaction->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
@@ -360,7 +364,7 @@ TEST(TFederatedClientTest, RetryWithoutTransaction)
.WillRepeatedly(Return(VoidFuture));
// Creation of federated client.
- std::vector<IClientPtr> clients = {mockClientSas, mockClientVla};
+ std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
auto config = New<TFederationConfig>();
config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
auto federatedClient = CreateClient(clients, config);
@@ -370,7 +374,7 @@ TEST(TFederatedClientTest, RetryWithoutTransaction)
// 3. `sas` client should be used as other cluster.
EXPECT_CALL(*mockClientVla, LookupRows(data.Path, _, _, _))
- .WillOnce(Return(MakeFuture<IUnversionedRowsetPtr>(TError(NRpc::EErrorCode::Unavailable, "Failure"))));
+ .WillOnce(Return(MakeFuture<TUnversionedLookupRowsResult>(TError(NRpc::EErrorCode::Unavailable, "Failure"))));
EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _))
.WillOnce(Return(MakeFuture(data.LookupResult2)));
@@ -381,7 +385,7 @@ TEST(TFederatedClientTest, RetryWithoutTransaction)
// Go to `vla`, getting error, retry via `sas` and getting response from `sas`.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
@@ -398,17 +402,16 @@ TEST(TFederatedClientTest, RetryWithoutTransaction)
auto mockTransactionSas = New<TStrictMockTransaction>();
EXPECT_CALL(*mockClientSas, StartTransaction(_, _))
- .WillOnce(Return(MakeFuture(NApi::ITransactionPtr{mockTransactionSas})));
+ .WillOnce(Return(MakeFuture(NApi::ITransactionPtr(mockTransactionSas))));
EXPECT_CALL(*mockTransactionSas, LookupRows(data.Path, _, _, _))
.WillOnce(Return(MakeFuture(data.LookupResult2)));
-
// Try to start transaction in `vla`, getting error, retry via `sas`, creating transaction and getting response from `sas`.
{
auto transaction = federatedClient->StartTransaction(NTransactionClient::ETransactionType::Tablet).Get().Value();
auto result = transaction->LookupRows(data.Path, data.NameTable, data.Keys);
- auto rows = result.Get().Value()->GetRows();
+ auto rows = result.Get().Value().Rowset->GetRows();
ASSERT_EQ(2u, rows.Size());
auto actualFirstRow = ToString(rows[0]);
@@ -457,7 +460,7 @@ TEST(TFederatedClientTest, AttachTransaction)
.WillOnce(Return(mockConnectionVla));
// Creation of federated client.
- std::vector<IClientPtr> clients = {mockClientSas, mockClientVla};
+ std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
auto config = New<TFederationConfig>();
config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
auto federatedClient = CreateClient(clients, config);
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index c30bdee3901..f415a8c614c 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -74,8 +74,8 @@ public:
return {};
}
- RETRYABLE_METHOD(TFuture<IUnversionedRowsetPtr>, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&));
- RETRYABLE_METHOD(TFuture<IVersionedRowsetPtr>, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
+ RETRYABLE_METHOD(TFuture<TUnversionedLookupRowsResult>, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&));
+ RETRYABLE_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
RETRYABLE_METHOD(TFuture<TSelectRowsResult>, SelectRows, (const TString&, const TSelectRowsOptions&));
RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&));
RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
@@ -88,7 +88,7 @@ public:
RETRYABLE_METHOD(TFuture<NYson::TYsonString>, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
RETRYABLE_METHOD(TFuture<bool>, NodeExists, (const NYPath::TYPath&, const TNodeExistsOptions&));
RETRYABLE_METHOD(TFuture<IFileReaderPtr>, CreateFileReader, (const NYPath::TYPath&, const TFileReaderOptions&));
- RETRYABLE_METHOD(TFuture<std::vector<IUnversionedRowsetPtr>>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
+ RETRYABLE_METHOD(TFuture<std::vector<TUnversionedLookupRowsResult>>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
// Unsupported methods.
UNSUPPORTED_METHOD(TFuture<ITransactionPtr>, StartTransaction, (NTransactionClient::ETransactionType, const TTransactionStartOptions&));
diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp
index a0557dbdc6b..cace53eb53e 100644
--- a/yt/yt/client/queue_client/consumer_client.cpp
+++ b/yt/yt/client/queue_client/consumer_client.cpp
@@ -122,7 +122,8 @@ public:
options.RetentionConfig->MaxDataVersions = 1;
auto partitionRowset = WaitFor(transaction->VersionedLookupRows(Path_, NameTable_, keyRowsBuilder.Build(), options))
- .ValueOrThrow();
+ .ValueOrThrow()
+ .Rowset;
const auto& rows = partitionRowset->GetRows();
// XXX(max42): should we use name table from the rowset, or it coincides with our own name table?
@@ -430,7 +431,8 @@ private:
NameTable_,
builder.Build(),
options))
- .ValueOrThrow();
+ .ValueOrThrow()
+ .Rowset;
YT_VERIFY(versionedRowset->GetRows().size() == partitionIndices.size());
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 1963c5d910b..8c91a172519 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -35,19 +35,19 @@ public:
NTransactionClient::ETransactionType type,
const TTransactionStartOptions& options), (override));
- MOCK_METHOD(TFuture<IUnversionedRowsetPtr>, LookupRows, (
+ MOCK_METHOD(TFuture<TUnversionedLookupRowsResult>, LookupRows, (
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TLookupRowsOptions& options), (override));
- MOCK_METHOD(TFuture<IVersionedRowsetPtr>, VersionedLookupRows, (
+ MOCK_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TVersionedLookupRowsOptions& options), (override));
- MOCK_METHOD(TFuture<std::vector<IUnversionedRowsetPtr>>, MultiLookup, (
+ MOCK_METHOD(TFuture<std::vector<TUnversionedLookupRowsResult>>, MultiLookup, (
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options), (override));
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index d22136846df..06c9c651486 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -21,19 +21,19 @@ public:
NTransactionClient::ETransactionType type,
const TTransactionStartOptions& options), (override));
- MOCK_METHOD(TFuture<IUnversionedRowsetPtr>, LookupRows, (
+ MOCK_METHOD(TFuture<TUnversionedLookupRowsResult>, LookupRows, (
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TLookupRowsOptions& options), (override));
- MOCK_METHOD(TFuture<IVersionedRowsetPtr>, VersionedLookupRows, (
+ MOCK_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
const TSharedRange<NTableClient::TLegacyKey>& keys,
const TVersionedLookupRowsOptions& options), (override));
- MOCK_METHOD(TFuture<std::vector<IUnversionedRowsetPtr>>, MultiLookup, (
+ MOCK_METHOD(TFuture<std::vector<TUnversionedLookupRowsResult>>, MultiLookup, (
const std::vector<TMultiLookupSubrequest>& subrequests,
const TMultiLookupOptions& options), (override));