diff options
author | babenko <babenko@yandex-team.com> | 2023-10-05 23:01:25 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2023-10-05 23:24:21 +0300 |
commit | b4e21550041c2ed632474931a6923570badc1902 (patch) | |
tree | fba4c688417a08a6a02d7133a1948ec22b8dba7d | |
parent | 3129b20e900736c3db0cf7498e1a36dc754a5b07 (diff) | |
download | ydb-b4e21550041c2ed632474931a6923570badc1902.tar.gz |
YT-18872: Introduce T(Versioned|Unversioned)LookupRowsResult
-rw-r--r-- | yt/yt/client/api/delegating_client.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/api/delegating_client.h | 6 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_base.cpp | 29 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_base.h | 6 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.h | 6 | ||||
-rw-r--r-- | yt/yt/client/api/table_client.h | 15 | ||||
-rw-r--r-- | yt/yt/client/driver/table_commands.cpp | 19 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 24 | ||||
-rw-r--r-- | yt/yt/client/federated/unittests/client_ut.cpp | 49 | ||||
-rw-r--r-- | yt/yt/client/hedging/hedging.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/queue_client/consumer_client.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/client.h | 6 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/transaction.h | 6 |
14 files changed, 111 insertions, 79 deletions
diff --git a/yt/yt/client/api/delegating_client.cpp b/yt/yt/client/api/delegating_client.cpp index b534b66737c..bc94114454f 100644 --- a/yt/yt/client/api/delegating_client.cpp +++ b/yt/yt/client/api/delegating_client.cpp @@ -25,7 +25,7 @@ TFuture<ITransactionPtr> TDelegatingClient::StartTransaction( return Underlying_->StartTransaction(type, options); } -TFuture<IUnversionedRowsetPtr> TDelegatingClient::LookupRows( +TFuture<TUnversionedLookupRowsResult> TDelegatingClient::LookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, @@ -34,7 +34,7 @@ TFuture<IUnversionedRowsetPtr> TDelegatingClient::LookupRows( return Underlying_->LookupRows(path, std::move(nameTable), keys, options); } -TFuture<IVersionedRowsetPtr> TDelegatingClient::VersionedLookupRows( +TFuture<TVersionedLookupRowsResult> TDelegatingClient::VersionedLookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, @@ -43,7 +43,7 @@ TFuture<IVersionedRowsetPtr> TDelegatingClient::VersionedLookupRows( return Underlying_->VersionedLookupRows(path, std::move(nameTable), keys, options); } -TFuture<std::vector<IUnversionedRowsetPtr>> TDelegatingClient::MultiLookup( +TFuture<std::vector<TUnversionedLookupRowsResult>> TDelegatingClient::MultiLookup( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options) { diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index 65317f5aa62..3eeaaa702e0 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -25,19 +25,19 @@ public: const TTransactionStartOptions& options = {}) override; // Tables - TFuture<IUnversionedRowsetPtr> LookupRows( + TFuture<TUnversionedLookupRowsResult> LookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TLookupRowsOptions& options = {}) override; - TFuture<IVersionedRowsetPtr> VersionedLookupRows( + TFuture<TVersionedLookupRowsResult> VersionedLookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TVersionedLookupRowsOptions& options = {}) override; - TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup( + TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options = {}) override; diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index 7d4b94d7cac..b6884472a31 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -749,7 +749,7 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter( //////////////////////////////////////////////////////////////////////////////// -TFuture<IUnversionedRowsetPtr> TClientBase::LookupRows( +TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows( const TYPath& path, TNameTablePtr nameTable, const TSharedRange<TLegacyKey>& keys, @@ -784,13 +784,16 @@ TFuture<IUnversionedRowsetPtr> TClientBase::LookupRows( ToProto(req->mutable_tablet_read_options(), options); return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspLookupRowsPtr& rsp) { - return DeserializeRowset<TUnversionedRow>( + auto rowset = DeserializeRowset<TUnversionedRow>( rsp->rowset_descriptor(), MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments())); + return TUnversionedLookupRowsResult{ + .Rowset = std::move(rowset), + }; })); } -TFuture<IVersionedRowsetPtr> TClientBase::VersionedLookupRows( +TFuture<TVersionedLookupRowsResult> TClientBase::VersionedLookupRows( const TYPath& path, TNameTablePtr nameTable, const TSharedRange<TLegacyKey>& keys, @@ -825,13 +828,16 @@ TFuture<IVersionedRowsetPtr> TClientBase::VersionedLookupRows( } return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspVersionedLookupRowsPtr& rsp) { - return DeserializeRowset<TVersionedRow>( + auto rowset = DeserializeRowset<TVersionedRow>( rsp->rowset_descriptor(), MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments())); + return TVersionedLookupRowsResult{ + .Rowset = std::move(rowset), + }; })); } -TFuture<std::vector<IUnversionedRowsetPtr>> TClientBase::MultiLookup( +TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookup( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options) { @@ -876,7 +882,7 @@ TFuture<std::vector<IUnversionedRowsetPtr>> TClientBase::MultiLookup( return req->Invoke().Apply(BIND([subrequestCount = std::ssize(subrequests)] (const TApiServiceProxy::TRspMultiLookupPtr& rsp) { YT_VERIFY(subrequestCount == rsp->subresponses_size()); - std::vector<IUnversionedRowsetPtr> result; + std::vector<TUnversionedLookupRowsResult> result; result.reserve(subrequestCount); int beginAttachmentIndex = 0; @@ -884,12 +890,15 @@ TFuture<std::vector<IUnversionedRowsetPtr>> TClientBase::MultiLookup( int endAttachmentIndex = beginAttachmentIndex + subresponse.attachment_count(); YT_VERIFY(endAttachmentIndex <= std::ssize(rsp->Attachments())); - std::vector<TSharedRef> subresponseAttachments{ + std::vector<TSharedRef> subresponseAttachments( rsp->Attachments().begin() + beginAttachmentIndex, - rsp->Attachments().begin() + endAttachmentIndex}; - result.push_back(DeserializeRowset<TUnversionedRow>( + rsp->Attachments().begin() + endAttachmentIndex); + auto rowset = DeserializeRowset<TUnversionedRow>( subresponse.rowset_descriptor(), - MergeRefsToRef<TRpcProxyClientBufferTag>(std::move(subresponseAttachments)))); + MergeRefsToRef<TRpcProxyClientBufferTag>(std::move(subresponseAttachments))); + result.push_back({ + .Rowset = std::move(rowset), + }); beginAttachmentIndex = endAttachmentIndex; } diff --git a/yt/yt/client/api/rpc_proxy/client_base.h b/yt/yt/client/api/rpc_proxy/client_base.h index f8df2a67a8f..2a182eaa860 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.h +++ b/yt/yt/client/api/rpc_proxy/client_base.h @@ -51,19 +51,19 @@ public: const NApi::TTransactionStartOptions& options) override; // Tables. - TFuture<NApi::IUnversionedRowsetPtr> LookupRows( + TFuture<TUnversionedLookupRowsResult> LookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const NApi::TLookupRowsOptions& options) override; - TFuture<NApi::IVersionedRowsetPtr> VersionedLookupRows( + TFuture<TVersionedLookupRowsResult> VersionedLookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const NApi::TVersionedLookupRowsOptions& options) override; - TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup( + TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options = {}) override; diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index 20dadc04498..4ab161b34b5 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -492,7 +492,7 @@ TFuture<ITransactionPtr> TTransaction::StartTransaction( PatchTransactionId(options)); } -TFuture<IUnversionedRowsetPtr> TTransaction::LookupRows( +TFuture<TUnversionedLookupRowsResult> TTransaction::LookupRows( const TYPath& path, TNameTablePtr nameTable, const TSharedRange<TLegacyKey>& keys, @@ -506,7 +506,7 @@ TFuture<IUnversionedRowsetPtr> TTransaction::LookupRows( PatchTransactionTimestamp(options)); } -TFuture<IVersionedRowsetPtr> TTransaction::VersionedLookupRows( +TFuture<TVersionedLookupRowsResult> TTransaction::VersionedLookupRows( const TYPath& path, TNameTablePtr nameTable, const TSharedRange<TLegacyKey>& keys, @@ -520,7 +520,7 @@ TFuture<IVersionedRowsetPtr> TTransaction::VersionedLookupRows( PatchTransactionTimestamp(options)); } -TFuture<std::vector<IUnversionedRowsetPtr>> TTransaction::MultiLookup( +TFuture<std::vector<TUnversionedLookupRowsResult>> TTransaction::MultiLookup( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options) { diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 415b0b4686c..200e9e7539f 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -78,19 +78,19 @@ public: NTransactionClient::ETransactionType type, const NApi::TTransactionStartOptions& options) override; - TFuture<NApi::IUnversionedRowsetPtr> LookupRows( + TFuture<TUnversionedLookupRowsResult> LookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const NApi::TLookupRowsOptions& options) override; - TFuture<NApi::IVersionedRowsetPtr> VersionedLookupRows( + TFuture<TVersionedLookupRowsResult> VersionedLookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const NApi::TVersionedLookupRowsOptions& options) override; - TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup( + TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options) override; diff --git a/yt/yt/client/api/table_client.h b/yt/yt/client/api/table_client.h index 7232cdc23f7..2b9ecc320ed 100644 --- a/yt/yt/client/api/table_client.h +++ b/yt/yt/client/api/table_client.h @@ -94,6 +94,15 @@ struct TSelectRowsResult NQueryClient::TQueryStatistics Statistics; }; +template <class IRowset> +struct TLookupRowsResult +{ + TIntrusivePtr<IRowset> Rowset; +}; + +using TUnversionedLookupRowsResult = TLookupRowsResult<IUnversionedRowset>; +using TVersionedLookupRowsResult = TLookupRowsResult<IVersionedRowset>; + struct TTableReaderOptions : public TTransactionalOptions , public TSuppressableAccessTrackingOptions @@ -404,19 +413,19 @@ struct TLocateSkynetShareOptions struct ITableClientBase { - virtual TFuture<IUnversionedRowsetPtr> LookupRows( + virtual TFuture<TUnversionedLookupRowsResult> LookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TLookupRowsOptions& options = {}) = 0; - virtual TFuture<IVersionedRowsetPtr> VersionedLookupRows( + virtual TFuture<TVersionedLookupRowsResult> VersionedLookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TVersionedLookupRowsOptions& options = {}) = 0; - virtual TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup( + virtual TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options = {}) = 0; diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index f49ed631ef2..4aac4324103 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -896,18 +896,27 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context) versionedOptions.CachedSyncReplicasTimeout = Options.CachedSyncReplicasTimeout; versionedOptions.RetentionConfig = RetentionConfig; versionedOptions.ReplicaConsistency = Options.ReplicaConsistency; - auto asyncRowset = clientBase->VersionedLookupRows(Path.GetPath(), std::move(nameTable), std::move(keyRange), versionedOptions); + auto asyncRowset = clientBase->VersionedLookupRows( + Path.GetPath(), + std::move(nameTable), + std::move(keyRange), + versionedOptions); auto rowset = WaitFor(asyncRowset) - .ValueOrThrow(); + .ValueOrThrow() + .Rowset; auto writer = CreateVersionedWriterForFormat(format, rowset->GetSchema(), output); writer->Write(rowset->GetRows()); WaitFor(writer->Close()) .ThrowOnError(); } else { - auto asyncRowset = clientBase->LookupRows(Path.GetPath(), std::move(nameTable), std::move(keyRange), Options); + auto asyncRowset = clientBase->LookupRows( + Path.GetPath(), + std::move(nameTable), + std::move(keyRange), + Options); auto rowset = WaitFor(asyncRowset) - .ValueOrThrow(); - + .ValueOrThrow() + .Rowset; auto writer = CreateSchemafulWriterForFormat(format, rowset->GetSchema(), output); writer->Write(rowset->GetRows()); WaitFor(writer->Close()) diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 0bb95300a71..d0bfe7a58a1 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -61,7 +61,7 @@ public: NTransactionClient::ETransactionType type, const TTransactionStartOptions& options = {}) override; - TFuture<IUnversionedRowsetPtr> LookupRows( + TFuture<TUnversionedLookupRowsResult> LookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, @@ -85,12 +85,12 @@ public: TFuture<void> Abort(const TTransactionAbortOptions& options = TTransactionAbortOptions()) override; - TFuture<IVersionedRowsetPtr> VersionedLookupRows( + TFuture<TVersionedLookupRowsResult> VersionedLookupRows( const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&) override; - TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup( + TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup( const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&) override; @@ -231,7 +231,7 @@ class TClient public: TClient(const std::vector<IClientPtr>& underlyingClients, TFederationConfigPtr config); - TFuture<IUnversionedRowsetPtr> LookupRows( + TFuture<TUnversionedLookupRowsResult> LookupRows( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, @@ -239,10 +239,10 @@ public: TFuture<TSelectRowsResult> SelectRows( const TString& query, const TSelectRowsOptions& options = {}) override; - TFuture<std::vector<IUnversionedRowsetPtr>> MultiLookup( + TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookup( const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&) override; - TFuture<IVersionedRowsetPtr> VersionedLookupRows( + TFuture<TVersionedLookupRowsResult> VersionedLookupRows( const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&) override; @@ -469,13 +469,13 @@ TFuture<ResultType> TTransaction::MethodName(Y_METHOD_USED_ARGS_DECLARATION(Args return future; \ } Y_SEMICOLON_GUARD -TRANSACTION_METHOD_IMPL(IUnversionedRowsetPtr, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&)); +TRANSACTION_METHOD_IMPL(TUnversionedLookupRowsResult, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&)); TRANSACTION_METHOD_IMPL(TSelectRowsResult, SelectRows, (const TString&, const TSelectRowsOptions&)); TRANSACTION_METHOD_IMPL(void, Ping, (const NApi::TTransactionPingOptions&)); TRANSACTION_METHOD_IMPL(TTransactionCommitResult, Commit, (const TTransactionCommitOptions&)); TRANSACTION_METHOD_IMPL(void, Abort, (const TTransactionAbortOptions&)); -TRANSACTION_METHOD_IMPL(IVersionedRowsetPtr, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); -TRANSACTION_METHOD_IMPL(std::vector<IUnversionedRowsetPtr>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); +TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); +TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); @@ -631,10 +631,10 @@ TFuture<ResultType> TClient::MethodName(Y_METHOD_USED_ARGS_DECLARATION(Args)) return DoCall<ResultType>(Config_->ClusterRetryAttempts, callee); \ } Y_SEMICOLON_GUARD -CLIENT_METHOD_IMPL(IUnversionedRowsetPtr, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TLegacyKey>&, const TLookupRowsOptions&)); +CLIENT_METHOD_IMPL(TUnversionedLookupRowsResult, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TLegacyKey>&, const TLookupRowsOptions&)); CLIENT_METHOD_IMPL(TSelectRowsResult, SelectRows, (const TString&, const TSelectRowsOptions&)); -CLIENT_METHOD_IMPL(std::vector<IUnversionedRowsetPtr>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); -CLIENT_METHOD_IMPL(IVersionedRowsetPtr, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); +CLIENT_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); +CLIENT_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); CLIENT_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&)); CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); diff --git a/yt/yt/client/federated/unittests/client_ut.cpp b/yt/yt/client/federated/unittests/client_ut.cpp index 10e14a5b712..4268404dd09 100644 --- a/yt/yt/client/federated/unittests/client_ut.cpp +++ b/yt/yt/client/federated/unittests/client_ut.cpp @@ -42,7 +42,9 @@ struct TTestDataStorage row[1] = rowBuffer->CaptureValue(NTableClient::MakeUnversionedUint64Value(key + 10, TableSchema->GetColumnIndex(ValueColumn))); rows.push_back(NTableClient::TUnversionedRow{row}); } - return NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer))); + return TUnversionedLookupRowsResult{ + .Rowset = NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer))), + }; } (); LookupResult2 = [&]() { @@ -54,7 +56,9 @@ struct TTestDataStorage row[1] = rowBuffer->CaptureValue(NTableClient::MakeUnversionedUint64Value(key + 10, TableSchema->GetColumnIndex(ValueColumn))); rows.push_back(NTableClient::TUnversionedRow{row}); } - return NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer))); + return TUnversionedLookupRowsResult{ + .Rowset = NApi::CreateRowset(TableSchema, MakeSharedRange(rows, std::move(rowBuffer))), + }; } (); NameTable = NYT::New<NTableClient::TNameTable>(); @@ -80,8 +84,8 @@ struct TTestDataStorage const NTableClient::TColumnSchema ValueColumnSchema = NTableClient::TColumnSchema(ValueColumn, NTableClient::EValueType::Uint64); NTableClient::TTableSchemaPtr TableSchema = New<NTableClient::TTableSchema>(std::vector{KeyColumnSchema, ValueColumnSchema}); - IUnversionedRowsetPtr LookupResult1; - IUnversionedRowsetPtr LookupResult2; + NApi::TUnversionedLookupRowsResult LookupResult1; + NApi::TUnversionedLookupRowsResult LookupResult2; NTableClient::TNameTablePtr NameTable; TSharedRange<NTableClient::TUnversionedRow> Keys; }; @@ -115,7 +119,7 @@ TEST(TFederatedClientTest, Basic) .WillRepeatedly(Return(VoidFuture)); // Creation of federated client. - std::vector<IClientPtr> clients = {mockClientSas, mockClientVla}; + std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; auto config = New<TFederationConfig>(); config->ClusterHealthCheckPeriod = TDuration::Seconds(5); config->ClusterRetryAttempts = 1; @@ -127,7 +131,7 @@ TEST(TFederatedClientTest, Basic) EXPECT_CALL(*mockClientVla, LookupRows(data.Path, _, _, _)) .WillOnce(Return(MakeFuture(data.LookupResult1))) - .WillOnce(Return(MakeFuture<IUnversionedRowsetPtr>(TError(NRpc::EErrorCode::Unavailable, "Failure")))); + .WillOnce(Return(MakeFuture<TUnversionedLookupRowsResult>(TError(NRpc::EErrorCode::Unavailable, "Failure")))); EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _)) .WillOnce(Return(MakeFuture(data.LookupResult2))); @@ -138,7 +142,7 @@ TEST(TFederatedClientTest, Basic) // From `vla`. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow); @@ -153,7 +157,7 @@ TEST(TFederatedClientTest, Basic) // From `sas`. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); @@ -183,7 +187,7 @@ TEST(TFederatedClientTest, CheckHealth) }); NNet::WriteLocalHostName("a-rpc-proxy.vla.yp-c.yandex.net"); - std::vector<IClientPtr> clients = {mockClientSas, mockClientVla}; + std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; auto config = New<TFederationConfig>(); config->ClusterHealthCheckPeriod = TDuration::Seconds(5); config->ClusterRetryAttempts = 1; @@ -212,7 +216,7 @@ TEST(TFederatedClientTest, CheckHealth) // From `vla`. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow); @@ -224,7 +228,7 @@ TEST(TFederatedClientTest, CheckHealth) // From `sas` because `vla` was marked as unhealthy after CheckClustersHealth. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); @@ -238,7 +242,7 @@ TEST(TFederatedClientTest, CheckHealth) // From `vla` because it became ok again. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow); @@ -274,7 +278,7 @@ TEST(TFederatedClientTest, Transactions) .WillRepeatedly(Return(VoidFuture)); // Creation of federated client. - std::vector<IClientPtr> clients = {mockClientSas, mockClientVla}; + std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; auto config = New<TFederationConfig>(); config->ClusterHealthCheckPeriod = TDuration::Seconds(5); config->ClusterRetryAttempts = 1; @@ -287,7 +291,7 @@ TEST(TFederatedClientTest, Transactions) EXPECT_CALL(*mockTransactionVla, LookupRows(data.Path, _, _, _)) .WillOnce(Return(MakeFuture(data.LookupResult1))) - .WillOnce(Return(MakeFuture<IUnversionedRowsetPtr>(TError(NRpc::EErrorCode::Unavailable, "Failure")))); + .WillOnce(Return(MakeFuture<TUnversionedLookupRowsResult>(TError(NRpc::EErrorCode::Unavailable, "Failure")))); // Wait for the first check of clusters healths. Sleep(TDuration::Seconds(2)); @@ -297,7 +301,7 @@ TEST(TFederatedClientTest, Transactions) // From `vla`. { auto result = transaction->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); ASSERT_EQ("[0#10u, 1#20u]", actualFirstRow); @@ -321,7 +325,7 @@ TEST(TFederatedClientTest, Transactions) transaction = federatedClient->StartTransaction(NTransactionClient::ETransactionType::Tablet).Get().Value(); auto result = transaction->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); @@ -360,7 +364,7 @@ TEST(TFederatedClientTest, RetryWithoutTransaction) .WillRepeatedly(Return(VoidFuture)); // Creation of federated client. - std::vector<IClientPtr> clients = {mockClientSas, mockClientVla}; + std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; auto config = New<TFederationConfig>(); config->ClusterHealthCheckPeriod = TDuration::Seconds(5); auto federatedClient = CreateClient(clients, config); @@ -370,7 +374,7 @@ TEST(TFederatedClientTest, RetryWithoutTransaction) // 3. `sas` client should be used as other cluster. EXPECT_CALL(*mockClientVla, LookupRows(data.Path, _, _, _)) - .WillOnce(Return(MakeFuture<IUnversionedRowsetPtr>(TError(NRpc::EErrorCode::Unavailable, "Failure")))); + .WillOnce(Return(MakeFuture<TUnversionedLookupRowsResult>(TError(NRpc::EErrorCode::Unavailable, "Failure")))); EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _)) .WillOnce(Return(MakeFuture(data.LookupResult2))); @@ -381,7 +385,7 @@ TEST(TFederatedClientTest, RetryWithoutTransaction) // Go to `vla`, getting error, retry via `sas` and getting response from `sas`. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); @@ -398,17 +402,16 @@ TEST(TFederatedClientTest, RetryWithoutTransaction) auto mockTransactionSas = New<TStrictMockTransaction>(); EXPECT_CALL(*mockClientSas, StartTransaction(_, _)) - .WillOnce(Return(MakeFuture(NApi::ITransactionPtr{mockTransactionSas}))); + .WillOnce(Return(MakeFuture(NApi::ITransactionPtr(mockTransactionSas)))); EXPECT_CALL(*mockTransactionSas, LookupRows(data.Path, _, _, _)) .WillOnce(Return(MakeFuture(data.LookupResult2))); - // Try to start transaction in `vla`, getting error, retry via `sas`, creating transaction and getting response from `sas`. { auto transaction = federatedClient->StartTransaction(NTransactionClient::ETransactionType::Tablet).Get().Value(); auto result = transaction->LookupRows(data.Path, data.NameTable, data.Keys); - auto rows = result.Get().Value()->GetRows(); + auto rows = result.Get().Value().Rowset->GetRows(); ASSERT_EQ(2u, rows.Size()); auto actualFirstRow = ToString(rows[0]); @@ -457,7 +460,7 @@ TEST(TFederatedClientTest, AttachTransaction) .WillOnce(Return(mockConnectionVla)); // Creation of federated client. - std::vector<IClientPtr> clients = {mockClientSas, mockClientVla}; + std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; auto config = New<TFederationConfig>(); config->ClusterHealthCheckPeriod = TDuration::Seconds(5); auto federatedClient = CreateClient(clients, config); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index c30bdee3901..f415a8c614c 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -74,8 +74,8 @@ public: return {}; } - RETRYABLE_METHOD(TFuture<IUnversionedRowsetPtr>, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&)); - RETRYABLE_METHOD(TFuture<IVersionedRowsetPtr>, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); + RETRYABLE_METHOD(TFuture<TUnversionedLookupRowsResult>, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&)); + RETRYABLE_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); RETRYABLE_METHOD(TFuture<TSelectRowsResult>, SelectRows, (const TString&, const TSelectRowsOptions&)); RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&)); RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); @@ -88,7 +88,7 @@ public: RETRYABLE_METHOD(TFuture<NYson::TYsonString>, ListNode, (const NYPath::TYPath&, const TListNodeOptions&)); RETRYABLE_METHOD(TFuture<bool>, NodeExists, (const NYPath::TYPath&, const TNodeExistsOptions&)); RETRYABLE_METHOD(TFuture<IFileReaderPtr>, CreateFileReader, (const NYPath::TYPath&, const TFileReaderOptions&)); - RETRYABLE_METHOD(TFuture<std::vector<IUnversionedRowsetPtr>>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); + RETRYABLE_METHOD(TFuture<std::vector<TUnversionedLookupRowsResult>>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); // Unsupported methods. UNSUPPORTED_METHOD(TFuture<ITransactionPtr>, StartTransaction, (NTransactionClient::ETransactionType, const TTransactionStartOptions&)); diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp index a0557dbdc6b..cace53eb53e 100644 --- a/yt/yt/client/queue_client/consumer_client.cpp +++ b/yt/yt/client/queue_client/consumer_client.cpp @@ -122,7 +122,8 @@ public: options.RetentionConfig->MaxDataVersions = 1; auto partitionRowset = WaitFor(transaction->VersionedLookupRows(Path_, NameTable_, keyRowsBuilder.Build(), options)) - .ValueOrThrow(); + .ValueOrThrow() + .Rowset; const auto& rows = partitionRowset->GetRows(); // XXX(max42): should we use name table from the rowset, or it coincides with our own name table? @@ -430,7 +431,8 @@ private: NameTable_, builder.Build(), options)) - .ValueOrThrow(); + .ValueOrThrow() + .Rowset; YT_VERIFY(versionedRowset->GetRows().size() == partitionIndices.size()); diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 1963c5d910b..8c91a172519 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -35,19 +35,19 @@ public: NTransactionClient::ETransactionType type, const TTransactionStartOptions& options), (override)); - MOCK_METHOD(TFuture<IUnversionedRowsetPtr>, LookupRows, ( + MOCK_METHOD(TFuture<TUnversionedLookupRowsResult>, LookupRows, ( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TLookupRowsOptions& options), (override)); - MOCK_METHOD(TFuture<IVersionedRowsetPtr>, VersionedLookupRows, ( + MOCK_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, ( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TVersionedLookupRowsOptions& options), (override)); - MOCK_METHOD(TFuture<std::vector<IUnversionedRowsetPtr>>, MultiLookup, ( + MOCK_METHOD(TFuture<std::vector<TUnversionedLookupRowsResult>>, MultiLookup, ( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options), (override)); diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index d22136846df..06c9c651486 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -21,19 +21,19 @@ public: NTransactionClient::ETransactionType type, const TTransactionStartOptions& options), (override)); - MOCK_METHOD(TFuture<IUnversionedRowsetPtr>, LookupRows, ( + MOCK_METHOD(TFuture<TUnversionedLookupRowsResult>, LookupRows, ( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TLookupRowsOptions& options), (override)); - MOCK_METHOD(TFuture<IVersionedRowsetPtr>, VersionedLookupRows, ( + MOCK_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, ( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, const TSharedRange<NTableClient::TLegacyKey>& keys, const TVersionedLookupRowsOptions& options), (override)); - MOCK_METHOD(TFuture<std::vector<IUnversionedRowsetPtr>>, MultiLookup, ( + MOCK_METHOD(TFuture<std::vector<TUnversionedLookupRowsResult>>, MultiLookup, ( const std::vector<TMultiLookupSubrequest>& subrequests, const TMultiLookupOptions& options), (override)); |