diff options
author | babenko <babenko@yandex-team.com> | 2024-08-13 18:49:24 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-08-13 19:47:49 +0300 |
commit | 70ed6114541643f807012c222d8b8cc4c03a0ae3 (patch) | |
tree | 46f1d20656e55b34dd8cec84767078112d1af6f0 | |
parent | e691ffb61aa4fb15314af64307d9b7ffa6ddb89e (diff) | |
download | ydb-70ed6114541643f807012c222d8b8cc4c03a0ae3.tar.gz |
YT-18872: Add TLookupRowsResult::UnavailableKeyIndexes
a6b83fed73df68e59b190c5f4789d0d4671a8270
-rw-r--r-- | yt/yt/client/api/dynamic_table_client.h | 10 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_base.cpp | 2 | ||||
-rw-r--r-- | yt/yt/client/driver/table_commands.cpp | 38 | ||||
-rw-r--r-- | yt/yt/client/driver/table_commands.h | 1 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 4 |
5 files changed, 42 insertions, 13 deletions
diff --git a/yt/yt/client/api/dynamic_table_client.h b/yt/yt/client/api/dynamic_table_client.h index 1a12180a27..fec7ff0321 100644 --- a/yt/yt/client/api/dynamic_table_client.h +++ b/yt/yt/client/api/dynamic_table_client.h @@ -69,6 +69,16 @@ template <class IRowset> struct TLookupRowsResult { TIntrusivePtr<IRowset> Rowset; + + //! If TLookupRequestOptions::EnablePartialResult is set, this vector contains + //! indexes of keys that were not available (due to timeout or other failure). + //! If TLookupRequestOptions::KeepMissingRows is false then the corresponding rows are just + //! omitted from #Rowset. Otherwise these rows are present (to ensure 1-1 mapping + //! between the keys and the returned rows) but are null. + //! In the latter case, this vector helps distinguishing between missing and + //! unavailable keys. + //! Indexes are guaranteed to be unique and increasing. + std::vector<int> UnavailableKeyIndexes; }; using TUnversionedLookupRowsResult = TLookupRowsResult<IUnversionedRowset>; diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index 4b36ee2e0e..31c9950f71 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -878,6 +878,7 @@ TFuture<TVersionedLookupRowsResult> TClientBase::VersionedLookupRows( MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments())); return TVersionedLookupRowsResult{ .Rowset = std::move(rowset), + .UnavailableKeyIndexes = FromProto<std::vector<int>>(rsp->unavailable_key_indexes()), }; })); } @@ -952,6 +953,7 @@ TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookupRows( MergeRefsToRef<TRpcProxyClientBufferTag>(std::move(subresponseAttachments))); result.push_back({ .Rowset = std::move(rowset), + .UnavailableKeyIndexes = FromProto<std::vector<int>>(subresponse.unavailable_key_indexes()), }); beginAttachmentIndex = endAttachmentIndex; diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index c8d2e427d1..d454ef00e7 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -1103,6 +1103,15 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context) auto clientBase = GetClientBase(context); + auto produceResponseParameters = [&] (const auto& result) { + ProduceResponseParameters(context, [&] (NYson::IYsonConsumer* consumer) { + if (!result.UnavailableKeyIndexes.empty()) { + BuildYsonMapFragmentFluently(consumer) + .Item("unavailable_key_indexes").Value(result.UnavailableKeyIndexes); + } + }); + }; + if (Versioned) { TVersionedLookupRowsOptions versionedOptions; versionedOptions.ColumnFilter = Options.ColumnFilter; @@ -1113,34 +1122,39 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context) versionedOptions.CachedSyncReplicasTimeout = Options.CachedSyncReplicasTimeout; versionedOptions.RetentionConfig = RetentionConfig; versionedOptions.ReplicaConsistency = Options.ReplicaConsistency; - auto asyncRowset = clientBase->VersionedLookupRows( + auto resultFuture = clientBase->VersionedLookupRows( Path.GetPath(), std::move(nameTable), std::move(keyRange), versionedOptions); - auto rowset = WaitFor(asyncRowset) - .ValueOrThrow() - .Rowset; - auto writer = CreateVersionedWriterForFormat(format, rowset->GetSchema(), output); - Y_UNUSED(writer->Write(rowset->GetRows())); + auto result = WaitFor(resultFuture) + .ValueOrThrow(); + produceResponseParameters(result); + auto writer = CreateVersionedWriterForFormat(format, result.Rowset->GetSchema(), output); + Y_UNUSED(writer->Write(result.Rowset->GetRows())); WaitFor(writer->Close()) .ThrowOnError(); } else { - auto asyncRowset = clientBase->LookupRows( + auto resultFuture = clientBase->LookupRows( Path.GetPath(), std::move(nameTable), std::move(keyRange), Options); - auto rowset = WaitFor(asyncRowset) - .ValueOrThrow() - .Rowset; - auto writer = CreateSchemafulWriterForFormat(format, rowset->GetSchema(), output); - Y_UNUSED(writer->Write(rowset->GetRows())); + auto result = WaitFor(resultFuture) + .ValueOrThrow(); + produceResponseParameters(result); + auto writer = CreateSchemafulWriterForFormat(format, result.Rowset->GetSchema(), output); + Y_UNUSED(writer->Write(result.Rowset->GetRows())); WaitFor(writer->Close()) .ThrowOnError(); } } +bool TLookupRowsCommand::HasResponseParameters() const +{ + return true; +} + //////////////////////////////////////////////////////////////////////////////// void TPullRowsCommand::Register(TRegistrar registrar) diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h index 4587063dff..744d0220b2 100644 --- a/yt/yt/client/driver/table_commands.h +++ b/yt/yt/client/driver/table_commands.h @@ -374,6 +374,7 @@ private: NTableClient::TRetentionConfigPtr RetentionConfig; void DoExecute(ICommandContextPtr context) override; + bool HasResponseParameters() const override; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 0333b8d4f0..67c8ba88bd 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -504,6 +504,7 @@ message TReqLookupRows message TRspLookupRows { required TRowsetDescriptor rowset_descriptor = 200; + repeated int32 unavailable_key_indexes = 201; } //////////////////////////////////////////////////////////////////////////////// @@ -544,6 +545,7 @@ message TReqVersionedLookupRows message TRspVersionedLookupRows { required TRowsetDescriptor rowset_descriptor = 200; + repeated int32 unavailable_key_indexes = 201; } //////////////////////////////////////////////////////////////////////////////// @@ -579,8 +581,8 @@ message TRspMultiLookup message TSubresponse { required TRowsetDescriptor rowset_descriptor = 1; - required int32 attachment_count = 2; + repeated int32 unavailable_key_indexes = 3; } repeated TSubresponse subresponses = 1; |