aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-08-13 18:49:24 +0300
committerbabenko <babenko@yandex-team.com>2024-08-13 19:47:49 +0300
commit70ed6114541643f807012c222d8b8cc4c03a0ae3 (patch)
tree46f1d20656e55b34dd8cec84767078112d1af6f0
parente691ffb61aa4fb15314af64307d9b7ffa6ddb89e (diff)
downloadydb-70ed6114541643f807012c222d8b8cc4c03a0ae3.tar.gz
YT-18872: Add TLookupRowsResult::UnavailableKeyIndexes
a6b83fed73df68e59b190c5f4789d0d4671a8270
-rw-r--r--yt/yt/client/api/dynamic_table_client.h10
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp2
-rw-r--r--yt/yt/client/driver/table_commands.cpp38
-rw-r--r--yt/yt/client/driver/table_commands.h1
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto4
5 files changed, 42 insertions, 13 deletions
diff --git a/yt/yt/client/api/dynamic_table_client.h b/yt/yt/client/api/dynamic_table_client.h
index 1a12180a27..fec7ff0321 100644
--- a/yt/yt/client/api/dynamic_table_client.h
+++ b/yt/yt/client/api/dynamic_table_client.h
@@ -69,6 +69,16 @@ template <class IRowset>
struct TLookupRowsResult
{
TIntrusivePtr<IRowset> Rowset;
+
+ //! If TLookupRequestOptions::EnablePartialResult is set, this vector contains
+ //! indexes of keys that were not available (due to timeout or other failure).
+ //! If TLookupRequestOptions::KeepMissingRows is false then the corresponding rows are just
+ //! omitted from #Rowset. Otherwise these rows are present (to ensure 1-1 mapping
+ //! between the keys and the returned rows) but are null.
+ //! In the latter case, this vector helps distinguishing between missing and
+ //! unavailable keys.
+ //! Indexes are guaranteed to be unique and increasing.
+ std::vector<int> UnavailableKeyIndexes;
};
using TUnversionedLookupRowsResult = TLookupRowsResult<IUnversionedRowset>;
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index 4b36ee2e0e..31c9950f71 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -878,6 +878,7 @@ TFuture<TVersionedLookupRowsResult> TClientBase::VersionedLookupRows(
MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments()));
return TVersionedLookupRowsResult{
.Rowset = std::move(rowset),
+ .UnavailableKeyIndexes = FromProto<std::vector<int>>(rsp->unavailable_key_indexes()),
};
}));
}
@@ -952,6 +953,7 @@ TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookupRows(
MergeRefsToRef<TRpcProxyClientBufferTag>(std::move(subresponseAttachments)));
result.push_back({
.Rowset = std::move(rowset),
+ .UnavailableKeyIndexes = FromProto<std::vector<int>>(subresponse.unavailable_key_indexes()),
});
beginAttachmentIndex = endAttachmentIndex;
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index c8d2e427d1..d454ef00e7 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -1103,6 +1103,15 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context)
auto clientBase = GetClientBase(context);
+ auto produceResponseParameters = [&] (const auto& result) {
+ ProduceResponseParameters(context, [&] (NYson::IYsonConsumer* consumer) {
+ if (!result.UnavailableKeyIndexes.empty()) {
+ BuildYsonMapFragmentFluently(consumer)
+ .Item("unavailable_key_indexes").Value(result.UnavailableKeyIndexes);
+ }
+ });
+ };
+
if (Versioned) {
TVersionedLookupRowsOptions versionedOptions;
versionedOptions.ColumnFilter = Options.ColumnFilter;
@@ -1113,34 +1122,39 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context)
versionedOptions.CachedSyncReplicasTimeout = Options.CachedSyncReplicasTimeout;
versionedOptions.RetentionConfig = RetentionConfig;
versionedOptions.ReplicaConsistency = Options.ReplicaConsistency;
- auto asyncRowset = clientBase->VersionedLookupRows(
+ auto resultFuture = clientBase->VersionedLookupRows(
Path.GetPath(),
std::move(nameTable),
std::move(keyRange),
versionedOptions);
- auto rowset = WaitFor(asyncRowset)
- .ValueOrThrow()
- .Rowset;
- auto writer = CreateVersionedWriterForFormat(format, rowset->GetSchema(), output);
- Y_UNUSED(writer->Write(rowset->GetRows()));
+ auto result = WaitFor(resultFuture)
+ .ValueOrThrow();
+ produceResponseParameters(result);
+ auto writer = CreateVersionedWriterForFormat(format, result.Rowset->GetSchema(), output);
+ Y_UNUSED(writer->Write(result.Rowset->GetRows()));
WaitFor(writer->Close())
.ThrowOnError();
} else {
- auto asyncRowset = clientBase->LookupRows(
+ auto resultFuture = clientBase->LookupRows(
Path.GetPath(),
std::move(nameTable),
std::move(keyRange),
Options);
- auto rowset = WaitFor(asyncRowset)
- .ValueOrThrow()
- .Rowset;
- auto writer = CreateSchemafulWriterForFormat(format, rowset->GetSchema(), output);
- Y_UNUSED(writer->Write(rowset->GetRows()));
+ auto result = WaitFor(resultFuture)
+ .ValueOrThrow();
+ produceResponseParameters(result);
+ auto writer = CreateSchemafulWriterForFormat(format, result.Rowset->GetSchema(), output);
+ Y_UNUSED(writer->Write(result.Rowset->GetRows()));
WaitFor(writer->Close())
.ThrowOnError();
}
}
+bool TLookupRowsCommand::HasResponseParameters() const
+{
+ return true;
+}
+
////////////////////////////////////////////////////////////////////////////////
void TPullRowsCommand::Register(TRegistrar registrar)
diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h
index 4587063dff..744d0220b2 100644
--- a/yt/yt/client/driver/table_commands.h
+++ b/yt/yt/client/driver/table_commands.h
@@ -374,6 +374,7 @@ private:
NTableClient::TRetentionConfigPtr RetentionConfig;
void DoExecute(ICommandContextPtr context) override;
+ bool HasResponseParameters() const override;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 0333b8d4f0..67c8ba88bd 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -504,6 +504,7 @@ message TReqLookupRows
message TRspLookupRows
{
required TRowsetDescriptor rowset_descriptor = 200;
+ repeated int32 unavailable_key_indexes = 201;
}
////////////////////////////////////////////////////////////////////////////////
@@ -544,6 +545,7 @@ message TReqVersionedLookupRows
message TRspVersionedLookupRows
{
required TRowsetDescriptor rowset_descriptor = 200;
+ repeated int32 unavailable_key_indexes = 201;
}
////////////////////////////////////////////////////////////////////////////////
@@ -579,8 +581,8 @@ message TRspMultiLookup
message TSubresponse
{
required TRowsetDescriptor rowset_descriptor = 1;
-
required int32 attachment_count = 2;
+ repeated int32 unavailable_key_indexes = 3;
}
repeated TSubresponse subresponses = 1;