aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordave11ar <dave11ar@yandex-team.com>2025-01-16 12:18:19 +0300
committerdave11ar <dave11ar@yandex-team.com>2025-01-16 12:36:55 +0300
commit1b44ecad7ca01cc2506bd4afbe2b8d911e655122 (patch)
tree2217bbc6fa60d242f743e744c546a1aa2173ae8a
parent9ef54447fb808783ad84dd50b946c3e493a99128 (diff)
downloadydb-1b44ecad7ca01cc2506bd4afbe2b8d911e655122.tar.gz
YT-22886: Add lookup format with timestamp columns
## Changelog entry * Type: feature * Component: tablet node Add lookup format with timestamp columns commit_hash:b95f7520a6d7f83ff4d9a7b85424cbbf9e948ff2
-rw-r--r--yt/yt/client/api/client_common.h2
-rw-r--r--yt/yt/client/api/dynamic_table_client.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp6
-rw-r--r--yt/yt/client/driver/table_commands.cpp18
-rw-r--r--yt/yt/client/table_client/timestamped_schema_helpers.cpp67
-rw-r--r--yt/yt/client/table_client/timestamped_schema_helpers.h34
-rw-r--r--yt/yt/client/ya.make1
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto4
8 files changed, 132 insertions, 2 deletions
diff --git a/yt/yt/client/api/client_common.h b/yt/yt/client/api/client_common.h
index 87f102cf82..c961b4398b 100644
--- a/yt/yt/client/api/client_common.h
+++ b/yt/yt/client/api/client_common.h
@@ -140,7 +140,7 @@ struct TSelectRowsOptionsBase
//! Expected schemas for tables in a query (used for replica fallback in replicated tables).
using TExpectedTableSchemas = THashMap<NYPath::TYPath, NTableClient::TTableSchemaPtr>;
TExpectedTableSchemas ExpectedTableSchemas;
- //! Add |$timestamp:columnName| to result if read_mode is latest_timestamp.
+ //! Add |$timestamp:columnName| to result if readMode is latest_timestamp.
NTableClient::TVersionedReadOptions VersionedReadOptions;
//! Limits range expanding.
ui64 RangeExpansionLimit = 200'000;
diff --git a/yt/yt/client/api/dynamic_table_client.h b/yt/yt/client/api/dynamic_table_client.h
index fec7ff0321..674417e69d 100644
--- a/yt/yt/client/api/dynamic_table_client.h
+++ b/yt/yt/client/api/dynamic_table_client.h
@@ -18,6 +18,8 @@ struct TLookupRequestOptions
bool EnablePartialResult = false;
std::optional<bool> UseLookupCache;
TDetailedProfilingInfoPtr DetailedProfilingInfo;
+ //! Add |$timestamp:columnName| to result if readMode is latest_timestamp.
+ NTableClient::TVersionedReadOptions VersionedReadOptions;
};
struct TLookupRowsOptionsBase
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index 26062e93ee..25152bf3fd 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -874,6 +874,7 @@ TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows(
req->set_multiplexing_band(static_cast<NProto::EMultiplexingBand>(options.MultiplexingBand));
ToProto(req->mutable_tablet_read_options(), options);
+ ToProto(req->mutable_versioned_read_options(), options.VersionedReadOptions);
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspLookupRowsPtr& rsp) {
auto rowset = DeserializeRowset<TUnversionedRow>(
@@ -926,6 +927,10 @@ TFuture<TVersionedLookupRowsResult> TClientBase::VersionedLookupRows(
if (options.RetentionConfig) {
ToProto(req->mutable_retention_config(), *options.RetentionConfig);
}
+ if (options.VersionedReadOptions.ReadMode != NTableClient::EVersionedIOMode::Default) {
+ THROW_ERROR_EXCEPTION("Versioned lookup does not support versioned read mode %Qlv",
+ options.VersionedReadOptions.ReadMode);
+ }
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspVersionedLookupRowsPtr& rsp) {
auto rowset = DeserializeRowset<TVersionedRow>(
@@ -971,6 +976,7 @@ TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookupRows(
subrequest.Keys,
protoSubrequest->mutable_rowset_descriptor());
protoSubrequest->set_attachment_count(rowset.size());
+ ToProto(protoSubrequest->mutable_versioned_read_options(), subrequest.Options.VersionedReadOptions);
req->Attachments().insert(req->Attachments().end(), rowset.begin(), rowset.end());
}
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index e358d2f8ee..c05964a50f 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -16,6 +16,7 @@
#include <yt/yt/client/table_client/row_buffer.h>
#include <yt/yt/client/table_client/table_consumer.h>
#include <yt/yt/client/table_client/table_output.h>
+#include <yt/yt/client/table_client/timestamped_schema_helpers.h>
#include <yt/yt/client/table_client/unversioned_writer.h>
#include <yt/yt/client/table_client/versioned_writer.h>
#include <yt/yt/client/table_client/wire_protocol.h>
@@ -1068,6 +1069,13 @@ void TLookupRowsCommand::Register(TRegistrar registrar)
return command->Options.ReplicaConsistency;
})
.Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<TVersionedReadOptions>(
+ "versioned_read_options",
+ [] (TThis* command) -> auto& {
+ return command->Options.VersionedReadOptions;
+ })
+ .Optional(/*init*/ false);
}
void TLookupRowsCommand::DoExecute(ICommandContextPtr context)
@@ -1092,6 +1100,11 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context)
<< TErrorAttribute("rich_ypath", Path);
}
+ if (Versioned && Options.VersionedReadOptions.ReadMode != NTableClient::EVersionedIOMode::Default) {
+ THROW_ERROR_EXCEPTION("Versioned lookup does not support versioned read mode %Qlv",
+ Options.VersionedReadOptions.ReadMode);
+ }
+
struct TLookupRowsBufferTag
{ };
@@ -1112,12 +1125,15 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context)
auto nameTable = valueConsumer.GetNameTable();
if (ColumnNames) {
+ auto primarySchema = Options.VersionedReadOptions.ReadMode == NTableClient::EVersionedIOMode::LatestTimestamp
+ ? ToLatestTimestampSchema(tableInfo->Schemas[ETableSchemaKind::Primary])
+ : tableInfo->Schemas[ETableSchemaKind::Primary];
TColumnFilter::TIndexes columnFilterIndexes;
columnFilterIndexes.reserve(ColumnNames->size());
for (const auto& name : *ColumnNames) {
auto optionalIndex = nameTable->FindId(name);
if (!optionalIndex) {
- if (!tableInfo->Schemas[ETableSchemaKind::Primary]->FindColumn(name)) {
+ if (!primarySchema->FindColumn(name)) {
THROW_ERROR_EXCEPTION("No such column %Qv",
name);
}
diff --git a/yt/yt/client/table_client/timestamped_schema_helpers.cpp b/yt/yt/client/table_client/timestamped_schema_helpers.cpp
new file mode 100644
index 0000000000..4b0feff80f
--- /dev/null
+++ b/yt/yt/client/table_client/timestamped_schema_helpers.cpp
@@ -0,0 +1,67 @@
+#include "timestamped_schema_helpers.h"
+
+#include "row_base.h"
+#include "schema.h"
+
+namespace NYT::NTableClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TTableSchemaPtr ToLatestTimestampSchema(const TTableSchemaPtr& schema)
+{
+ std::vector<TColumnSchema> columns;
+ columns.reserve(schema->GetColumnCount() + schema->GetValueColumnCount());
+
+ for (int columnIndex = 0; columnIndex < schema->GetColumnCount(); ++columnIndex) {
+ columns.push_back(schema->Columns()[columnIndex]);
+ }
+ for (int columnIndex = schema->GetKeyColumnCount(); columnIndex < schema->GetColumnCount(); ++columnIndex) {
+ columns.emplace_back(TimestampColumnPrefix + schema->Columns()[columnIndex].Name(), EValueType::Uint64);
+ }
+
+ return New<TTableSchema>(
+ std::move(columns),
+ schema->GetStrict(),
+ schema->GetUniqueKeys(),
+ schema->GetSchemaModification(),
+ schema->DeletedColumns());
+}
+
+TColumnFilter ToLatestTimestampColumnFilter(
+ const TColumnFilter& columnFilter,
+ const TTimestampReadOptions& timestampReadOptions,
+ int columnCount)
+{
+ YT_ASSERT(!timestampReadOptions.TimestampColumnMapping.empty());
+
+ TColumnFilter::TIndexes indexes;
+
+ std::vector<int> timestampOnlyColumns = timestampReadOptions.TimestampOnlyColumns;
+ std::sort(timestampOnlyColumns.begin(), timestampOnlyColumns.end());
+
+ auto addIndex = [&] (int columnIndex) {
+ if (!std::binary_search(timestampOnlyColumns.begin(), timestampOnlyColumns.end(), columnIndex)) {
+ indexes.push_back(columnIndex);
+ }
+ };
+
+ if (columnFilter.IsUniversal()) {
+ for (int columnIndex = 0; columnIndex < columnCount; ++columnIndex) {
+ addIndex(columnIndex);
+ }
+ } else {
+ for (int columnIndex : columnFilter.GetIndexes()) {
+ addIndex(columnIndex);
+ }
+ }
+
+ for (auto [columnIndex, timestampColumnIndex] : timestampReadOptions.TimestampColumnMapping) {
+ indexes.push_back(timestampColumnIndex);
+ }
+
+ return TColumnFilter(std::move(indexes));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/timestamped_schema_helpers.h b/yt/yt/client/table_client/timestamped_schema_helpers.h
new file mode 100644
index 0000000000..980e124d29
--- /dev/null
+++ b/yt/yt/client/table_client/timestamped_schema_helpers.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include "public.h"
+
+namespace NYT::NTableClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TColumnToTimestampColumn
+{
+ int ColumnIndex;
+ int TimestampColumnIndex;
+};
+
+using TTimestampColumnMapping = std::vector<TColumnToTimestampColumn>;
+
+struct TTimestampReadOptions
+{
+ TTimestampColumnMapping TimestampColumnMapping;
+ // Original indexes of columns for which timestamp is requested and value is not.
+ // TODO(dave11ar): Read only timestamps without value for such columns.
+ std::vector<int> TimestampOnlyColumns;
+};
+
+TTableSchemaPtr ToLatestTimestampSchema(const TTableSchemaPtr& schema);
+
+TColumnFilter ToLatestTimestampColumnFilter(
+ const TColumnFilter& columnFilter,
+ const TTimestampReadOptions& timestampReadOptions,
+ int columnCount);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index f183fca520..0cedbb4cd4 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -108,6 +108,7 @@ SRCS(
table_client/public.cpp
table_client/adapters.cpp
table_client/table_output.cpp
+ table_client/timestamped_schema_helpers.cpp
table_client/blob_reader.cpp
table_client/check_schema_compatibility.cpp
table_client/chunk_stripe_statistics.cpp
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 92a71cafda..f3309a4f6a 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -498,6 +498,8 @@ message TReqLookupRows
required TRowsetDescriptor rowset_descriptor = 200;
+ optional NYT.NTableClient.NProto.TVersionedReadOptions versioned_read_options = 12;
+
reserved 5;
reserved 6;
}
@@ -566,6 +568,8 @@ message TReqMultiLookup
required TRowsetDescriptor rowset_descriptor = 6;
required int32 attachment_count = 7;
+
+ optional NYT.NTableClient.NProto.TVersionedReadOptions versioned_read_options = 8;
}
repeated TSubrequest subrequests = 1;