diff options
author | dave11ar <dave11ar@yandex-team.com> | 2025-01-16 12:18:19 +0300 |
---|---|---|
committer | dave11ar <dave11ar@yandex-team.com> | 2025-01-16 12:36:55 +0300 |
commit | 1b44ecad7ca01cc2506bd4afbe2b8d911e655122 (patch) | |
tree | 2217bbc6fa60d242f743e744c546a1aa2173ae8a | |
parent | 9ef54447fb808783ad84dd50b946c3e493a99128 (diff) | |
download | ydb-1b44ecad7ca01cc2506bd4afbe2b8d911e655122.tar.gz |
YT-22886: Add lookup format with timestamp columns
## Changelog entry
* Type: feature
* Component: tablet node
Add lookup format with timestamp columns
commit_hash:b95f7520a6d7f83ff4d9a7b85424cbbf9e948ff2
-rw-r--r-- | yt/yt/client/api/client_common.h | 2 | ||||
-rw-r--r-- | yt/yt/client/api/dynamic_table_client.h | 2 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_base.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/driver/table_commands.cpp | 18 | ||||
-rw-r--r-- | yt/yt/client/table_client/timestamped_schema_helpers.cpp | 67 | ||||
-rw-r--r-- | yt/yt/client/table_client/timestamped_schema_helpers.h | 34 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 4 |
8 files changed, 132 insertions, 2 deletions
diff --git a/yt/yt/client/api/client_common.h b/yt/yt/client/api/client_common.h index 87f102cf82..c961b4398b 100644 --- a/yt/yt/client/api/client_common.h +++ b/yt/yt/client/api/client_common.h @@ -140,7 +140,7 @@ struct TSelectRowsOptionsBase //! Expected schemas for tables in a query (used for replica fallback in replicated tables). using TExpectedTableSchemas = THashMap<NYPath::TYPath, NTableClient::TTableSchemaPtr>; TExpectedTableSchemas ExpectedTableSchemas; - //! Add |$timestamp:columnName| to result if read_mode is latest_timestamp. + //! Add |$timestamp:columnName| to result if readMode is latest_timestamp. NTableClient::TVersionedReadOptions VersionedReadOptions; //! Limits range expanding. ui64 RangeExpansionLimit = 200'000; diff --git a/yt/yt/client/api/dynamic_table_client.h b/yt/yt/client/api/dynamic_table_client.h index fec7ff0321..674417e69d 100644 --- a/yt/yt/client/api/dynamic_table_client.h +++ b/yt/yt/client/api/dynamic_table_client.h @@ -18,6 +18,8 @@ struct TLookupRequestOptions bool EnablePartialResult = false; std::optional<bool> UseLookupCache; TDetailedProfilingInfoPtr DetailedProfilingInfo; + //! Add |$timestamp:columnName| to result if readMode is latest_timestamp. + NTableClient::TVersionedReadOptions VersionedReadOptions; }; struct TLookupRowsOptionsBase diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index 26062e93ee..25152bf3fd 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -874,6 +874,7 @@ TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows( req->set_multiplexing_band(static_cast<NProto::EMultiplexingBand>(options.MultiplexingBand)); ToProto(req->mutable_tablet_read_options(), options); + ToProto(req->mutable_versioned_read_options(), options.VersionedReadOptions); return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspLookupRowsPtr& rsp) { auto rowset = DeserializeRowset<TUnversionedRow>( @@ -926,6 +927,10 @@ TFuture<TVersionedLookupRowsResult> TClientBase::VersionedLookupRows( if (options.RetentionConfig) { ToProto(req->mutable_retention_config(), *options.RetentionConfig); } + if (options.VersionedReadOptions.ReadMode != NTableClient::EVersionedIOMode::Default) { + THROW_ERROR_EXCEPTION("Versioned lookup does not support versioned read mode %Qlv", + options.VersionedReadOptions.ReadMode); + } return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspVersionedLookupRowsPtr& rsp) { auto rowset = DeserializeRowset<TVersionedRow>( @@ -971,6 +976,7 @@ TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookupRows( subrequest.Keys, protoSubrequest->mutable_rowset_descriptor()); protoSubrequest->set_attachment_count(rowset.size()); + ToProto(protoSubrequest->mutable_versioned_read_options(), subrequest.Options.VersionedReadOptions); req->Attachments().insert(req->Attachments().end(), rowset.begin(), rowset.end()); } diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index e358d2f8ee..c05964a50f 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -16,6 +16,7 @@ #include <yt/yt/client/table_client/row_buffer.h> #include <yt/yt/client/table_client/table_consumer.h> #include <yt/yt/client/table_client/table_output.h> +#include <yt/yt/client/table_client/timestamped_schema_helpers.h> #include <yt/yt/client/table_client/unversioned_writer.h> #include <yt/yt/client/table_client/versioned_writer.h> #include <yt/yt/client/table_client/wire_protocol.h> @@ -1068,6 +1069,13 @@ void TLookupRowsCommand::Register(TRegistrar registrar) return command->Options.ReplicaConsistency; }) .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<TVersionedReadOptions>( + "versioned_read_options", + [] (TThis* command) -> auto& { + return command->Options.VersionedReadOptions; + }) + .Optional(/*init*/ false); } void TLookupRowsCommand::DoExecute(ICommandContextPtr context) @@ -1092,6 +1100,11 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context) << TErrorAttribute("rich_ypath", Path); } + if (Versioned && Options.VersionedReadOptions.ReadMode != NTableClient::EVersionedIOMode::Default) { + THROW_ERROR_EXCEPTION("Versioned lookup does not support versioned read mode %Qlv", + Options.VersionedReadOptions.ReadMode); + } + struct TLookupRowsBufferTag { }; @@ -1112,12 +1125,15 @@ void TLookupRowsCommand::DoExecute(ICommandContextPtr context) auto nameTable = valueConsumer.GetNameTable(); if (ColumnNames) { + auto primarySchema = Options.VersionedReadOptions.ReadMode == NTableClient::EVersionedIOMode::LatestTimestamp + ? ToLatestTimestampSchema(tableInfo->Schemas[ETableSchemaKind::Primary]) + : tableInfo->Schemas[ETableSchemaKind::Primary]; TColumnFilter::TIndexes columnFilterIndexes; columnFilterIndexes.reserve(ColumnNames->size()); for (const auto& name : *ColumnNames) { auto optionalIndex = nameTable->FindId(name); if (!optionalIndex) { - if (!tableInfo->Schemas[ETableSchemaKind::Primary]->FindColumn(name)) { + if (!primarySchema->FindColumn(name)) { THROW_ERROR_EXCEPTION("No such column %Qv", name); } diff --git a/yt/yt/client/table_client/timestamped_schema_helpers.cpp b/yt/yt/client/table_client/timestamped_schema_helpers.cpp new file mode 100644 index 0000000000..4b0feff80f --- /dev/null +++ b/yt/yt/client/table_client/timestamped_schema_helpers.cpp @@ -0,0 +1,67 @@ +#include "timestamped_schema_helpers.h" + +#include "row_base.h" +#include "schema.h" + +namespace NYT::NTableClient { + +//////////////////////////////////////////////////////////////////////////////// + +TTableSchemaPtr ToLatestTimestampSchema(const TTableSchemaPtr& schema) +{ + std::vector<TColumnSchema> columns; + columns.reserve(schema->GetColumnCount() + schema->GetValueColumnCount()); + + for (int columnIndex = 0; columnIndex < schema->GetColumnCount(); ++columnIndex) { + columns.push_back(schema->Columns()[columnIndex]); + } + for (int columnIndex = schema->GetKeyColumnCount(); columnIndex < schema->GetColumnCount(); ++columnIndex) { + columns.emplace_back(TimestampColumnPrefix + schema->Columns()[columnIndex].Name(), EValueType::Uint64); + } + + return New<TTableSchema>( + std::move(columns), + schema->GetStrict(), + schema->GetUniqueKeys(), + schema->GetSchemaModification(), + schema->DeletedColumns()); +} + +TColumnFilter ToLatestTimestampColumnFilter( + const TColumnFilter& columnFilter, + const TTimestampReadOptions& timestampReadOptions, + int columnCount) +{ + YT_ASSERT(!timestampReadOptions.TimestampColumnMapping.empty()); + + TColumnFilter::TIndexes indexes; + + std::vector<int> timestampOnlyColumns = timestampReadOptions.TimestampOnlyColumns; + std::sort(timestampOnlyColumns.begin(), timestampOnlyColumns.end()); + + auto addIndex = [&] (int columnIndex) { + if (!std::binary_search(timestampOnlyColumns.begin(), timestampOnlyColumns.end(), columnIndex)) { + indexes.push_back(columnIndex); + } + }; + + if (columnFilter.IsUniversal()) { + for (int columnIndex = 0; columnIndex < columnCount; ++columnIndex) { + addIndex(columnIndex); + } + } else { + for (int columnIndex : columnFilter.GetIndexes()) { + addIndex(columnIndex); + } + } + + for (auto [columnIndex, timestampColumnIndex] : timestampReadOptions.TimestampColumnMapping) { + indexes.push_back(timestampColumnIndex); + } + + return TColumnFilter(std::move(indexes)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/timestamped_schema_helpers.h b/yt/yt/client/table_client/timestamped_schema_helpers.h new file mode 100644 index 0000000000..980e124d29 --- /dev/null +++ b/yt/yt/client/table_client/timestamped_schema_helpers.h @@ -0,0 +1,34 @@ +#pragma once + +#include "public.h" + +namespace NYT::NTableClient { + +//////////////////////////////////////////////////////////////////////////////// + +struct TColumnToTimestampColumn +{ + int ColumnIndex; + int TimestampColumnIndex; +}; + +using TTimestampColumnMapping = std::vector<TColumnToTimestampColumn>; + +struct TTimestampReadOptions +{ + TTimestampColumnMapping TimestampColumnMapping; + // Original indexes of columns for which timestamp is requested and value is not. + // TODO(dave11ar): Read only timestamps without value for such columns. + std::vector<int> TimestampOnlyColumns; +}; + +TTableSchemaPtr ToLatestTimestampSchema(const TTableSchemaPtr& schema); + +TColumnFilter ToLatestTimestampColumnFilter( + const TColumnFilter& columnFilter, + const TTimestampReadOptions& timestampReadOptions, + int columnCount); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index f183fca520..0cedbb4cd4 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -108,6 +108,7 @@ SRCS( table_client/public.cpp table_client/adapters.cpp table_client/table_output.cpp + table_client/timestamped_schema_helpers.cpp table_client/blob_reader.cpp table_client/check_schema_compatibility.cpp table_client/chunk_stripe_statistics.cpp diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 92a71cafda..f3309a4f6a 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -498,6 +498,8 @@ message TReqLookupRows required TRowsetDescriptor rowset_descriptor = 200; + optional NYT.NTableClient.NProto.TVersionedReadOptions versioned_read_options = 12; + reserved 5; reserved 6; } @@ -566,6 +568,8 @@ message TReqMultiLookup required TRowsetDescriptor rowset_descriptor = 6; required int32 attachment_count = 7; + + optional NYT.NTableClient.NProto.TVersionedReadOptions versioned_read_options = 8; } repeated TSubrequest subrequests = 1; |