diff options
author | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-06-29 17:08:27 +0300 |
---|---|---|
committer | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-06-29 17:08:27 +0300 |
commit | f75f14f22fe3ff4f20a19495c6befcbfa10f5637 (patch) | |
tree | 51fc4ee55105968ad4077bfa903f3c4d3354a827 | |
parent | 5bc762a70a35cb6aa5ca6547538736c38e7bc064 (diff) | |
download | ydb-f75f14f22fe3ff4f20a19495c6befcbfa10f5637.tar.gz |
KIKIMR-14294: implement stream lookup by pk prefix
fix(kqp): use pk prefix for stream lookup
ref:bc9674c5ad428b50426de121e0cb25ea57e1fe51
-rw-r--r-- | ydb/core/kqp/compile/kqp_compile.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_table_resolver.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_tasks_graph.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/prepare/kqp_type_ann.cpp | 24 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 185 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 44 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 4 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 7 |
8 files changed, 246 insertions, 65 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp index 15f17500bad..6504ab68109 100644 --- a/ydb/core/kqp/compile/kqp_compile.cpp +++ b/ydb/core/kqp/compile/kqp_compile.cpp @@ -698,7 +698,6 @@ private: YQL_ENSURE(tableMeta); FillTable(streamLookup.Table(), *streamLookupProto.MutableTable()); - FillColumns(streamLookup.Columns(), *tableMeta, streamLookupProto, true); const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type"); @@ -706,6 +705,18 @@ private: const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType(); streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *lookupKeysItemType), TypeEnv)); + YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct); + const auto& lookupKeyColumns = lookupKeysItemType->Cast<TStructExprType>()->GetItems(); + for (const auto keyColumn : lookupKeyColumns) { + YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()), "Unknown column: " << keyColumn->GetName()); + streamLookupProto.AddKeyColumns(TString(keyColumn->GetName())); + } + + for (const auto& column : streamLookup.Columns()) { + YQL_ENSURE(tableMeta->Columns.FindPtr(column), "Unknown column: " << TString(column)); + streamLookupProto.AddColumns(TString(column)); + } + const auto resultType = streamLookup.Ref().GetTypeAnn(); YQL_ENSURE(resultType, "Empty stream lookup result type"); YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Stream, "Unexpected stream lookup result type"); diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp index 3cb402c15bb..e8aba64987a 100644 --- a/ydb/core/kqp/executer/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp @@ -223,22 +223,22 @@ private: private: // TODO: Get rid of ResolveTables & TableKeys, get table information from phy tx proto. void ResolveTables() { - auto addTable = [](const auto& proto, auto& tables) { - auto& table = tables.GetOrAddTable(MakeTableId(proto.GetTable()), proto.GetTable().GetPath()); - for (auto& column : proto.GetColumns()) { - table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn()); - } - }; - for (auto& tx : Transactions) { for (auto& stage : tx.Body->GetStages()) { for (auto& op : stage.GetTableOps()) { - addTable(op, TableKeys); + auto& table = TableKeys.GetOrAddTable(MakeTableId(op.GetTable()), op.GetTable().GetPath()); + for (auto& column : op.GetColumns()) { + table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn()); + } } for (const auto& input : stage.GetInputs()) { if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { - addTable(input.GetStreamLookup(), TableKeys); + const auto& streamLookup = input.GetStreamLookup(); + auto& table = TableKeys.GetOrAddTable(MakeTableId(streamLookup.GetTable()), streamLookup.GetTable().GetPath()); + for (auto& column : input.GetStreamLookup().GetColumns()) { + table.Columns.emplace(column, TKqpTableKeys::TColumn()); + } } } } diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp index 8ba9b9fbf82..fd0efe74ec7 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp @@ -208,18 +208,16 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf settings.MutableSnapshot()->SetTxId(snapshot.TxId); auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable())); - for (const auto& keyColumnType : table.KeyColumnTypes) { - settings.AddKeyColumnTypes(keyColumnType); + for (const auto& keyColumn : streamLookup.GetKeyColumns()) { + auto columnIt = table.Columns.find(keyColumn); + YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn); + settings.AddKeyColumns(keyColumn); } for (const auto& column : streamLookup.GetColumns()) { - auto columnIt = table.Columns.find(column.GetName()); - YQL_ENSURE(columnIt != table.Columns.end()); - - auto newColumn = settings.AddColumns(); - newColumn->SetName(columnIt->first); - newColumn->SetId(columnIt->second.Id); - newColumn->SetTypeId(columnIt->second.Type); + auto columnIt = table.Columns.find(column); + YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column); + settings.AddColumns(column); } TTransform streamLookupTransform; diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp index a19a9bed8f6..37b3bd99bb6 100644 --- a/ydb/core/kqp/prepare/kqp_type_ann.cpp +++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp @@ -1051,11 +1051,33 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext return TStatus::Error; } + auto lookupKeysTypeNode = node->Child(TKqpCnStreamLookup::idx_LookupKeysType); + if (!EnsureType(*lookupKeysTypeNode, ctx)) { + return TStatus::Error; + } + + auto lookupKeysType = lookupKeysTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureListType(node->Pos(), *lookupKeysType, ctx)) { + return TStatus::Error; + } + + auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType(); + if (!EnsureStructType(node->Pos(), *lookupKeyType, ctx)) { + return TStatus::Error; + } + + const auto& lookupKeyColumns = lookupKeyType->Cast<TStructExprType>()->GetItems(); + for (const auto& keyColumn : lookupKeyColumns) { + if (!table.second->GetKeyColumnIndex(TString(keyColumn->GetName()))) { + return TStatus::Error; + } + } + if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) { return TStatus::Error; } - TCoAtomList columns{node->ChildPtr(TKqlLookupTableBase::idx_Columns)}; + TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)}; auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns); if (!rowType) { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 3a06dd106df..3e208b9fb1f 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -16,7 +16,7 @@ namespace NKqp { namespace { -static constexpr TDuration RESOLVE_SHARDS_TIMEOUT = TDuration::Seconds(5); +static constexpr TDuration SCHEME_CACHE_REQUEST_TIMEOUT = TDuration::Seconds(5); static constexpr TDuration RETRY_READ_TIMEOUT = TDuration::Seconds(10); class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput { @@ -26,18 +26,15 @@ public: NKikimrKqp::TKqpStreamLookupSettings&& settings) : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) , HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable())) - , KeyColumnTypes(settings.GetKeyColumnTypes().begin(), settings.GetKeyColumnTypes().end()) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) - , ResolveTableShardsTimeout(RESOLVE_SHARDS_TIMEOUT) + , KeyPrefixColumns(settings.GetKeyColumns().begin(), settings.GetKeyColumns().end()) + , Columns(settings.GetColumns().begin(), settings.GetColumns().end()) + , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) , RetryReadTimeout(RETRY_READ_TIMEOUT) { - - for (const auto& column : settings.GetColumns()) { - Columns.emplace_back(&column); - } }; void Bootstrap() { - ResolveTableShards(); + ResolveTable(); Become(&TKqpStreamLookupActor::StateFunc); } @@ -87,19 +84,27 @@ private: bool Retried; }; + enum EEvSchemeCacheRequestTag : ui64 { + TableSchemeResolving, + TableShardsResolving + }; + struct TEvPrivate { enum EEv { EvRetryReadTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), - EvResolveTableShardsTimeout, + EvSchemeCacheRequestTimeout, }; - struct TEvResolveTableShardsTimeout : public TEventLocal<TEvResolveTableShardsTimeout, EvResolveTableShardsTimeout> { + struct TEvSchemeCacheRequestTimeout : public TEventLocal<TEvSchemeCacheRequestTimeout, EvSchemeCacheRequestTimeout> { + TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag tag) : Tag(tag) {} + + const EEvSchemeCacheRequestTag Tag; }; struct TEvRetryReadTimeout : public TEventLocal<TEvRetryReadTimeout, EvRetryReadTimeout> { TEvRetryReadTimeout(ui64 readId) : ReadId(readId) {} - ui64 ReadId; + const ui64 ReadId; }; }; @@ -133,8 +138,11 @@ private: auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); for (ui32 colId = 0; colId < Columns.size(); ++colId) { + auto colIt = ColumnsByName.find(Columns[colId]); + YQL_ENSURE(colIt != ColumnsByName.end()); + rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType); + totalDataSize += result[colId].Size(); - rowItems[colId] = NMiniKQL::GetCellValue(result[colId], Columns[colId].TypeId); } batch.push_back(std::move(row)); @@ -143,8 +151,8 @@ private: NUdf::EFetchStatus status; NUdf::TUnboxedValue key; while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) { - std::vector<TCell> keyCells(KeyColumnTypes.size()); - for (ui32 colId = 0; colId < KeyColumnTypes.size(); ++colId) { + std::vector<TCell> keyCells(KeyPrefixColumns.size()); + for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) { keyCells[colId] = MakeCell(KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true); } @@ -165,9 +173,10 @@ private: try { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvDataShard::TEvReadResult, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); - hFunc(TEvPrivate::TEvResolveTableShardsTimeout, Handle); + hFunc(TEvPrivate::TEvSchemeCacheRequestTimeout, Handle); hFunc(TEvPrivate::TEvRetryReadTimeout, Handle); IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); default: @@ -184,12 +193,38 @@ private: } auto& resultSet = ev->Get()->Request->ResultSet; - YQL_ENSURE(resultSet.size() == 1, "Expected one result for range (-inf, +inf)"); + YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)"); Partitioning = resultSet[0].KeyDescription->Partitioning; ProcessLookupKeys(); } + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + auto& resultSet = ev->Get()->Request->ResultSet; + YQL_ENSURE(resultSet.size() == 1, "Expected one result for table: " << TableId); + auto& result = resultSet[0]; + + if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status)); + } + + std::map<ui32, NKikimr::NScheme::TTypeId> keyColumnTypesByKeyOrder; + for (const auto& [_, column] : result.Columns) { + if (column.KeyOrder >= 0) { + keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType; + } + + ColumnsByName.emplace(column.Name, std::move(column)); + } + + KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size()); + for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) { + KeyColumnTypes[keyOrder] = keyColumnType; + } + + ResolveTableShards(); + } + void Handle(TEvDataShard::TEvReadResult::TPtr& ev) { const auto& record = ev->Get()->Record; @@ -253,10 +288,22 @@ private: ResolveTableShards(); } - void Handle(TEvPrivate::TEvResolveTableShardsTimeout::TPtr&) { - if (!Partitioning) { - RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId - << " (request timeout exceeded)"); + void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr& ev) { + switch (ev->Get()->Tag) { + case EEvSchemeCacheRequestTag::TableSchemeResolving: + if (ColumnsByName.empty()) { + RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId + << " (request timeout exceeded)"); + } + break; + case EEvSchemeCacheRequestTag::TableShardsResolving: + if (!Partitioning) { + RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId + << " (request timeout exceeded)"); + } + break; + default: + RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag); } } @@ -278,20 +325,22 @@ private: const auto& key = UnprocessedKeys.front(); YQL_ENSURE(key.Point); - auto partitionInfo = LowerBound( - Partitioning->begin(), Partitioning->end(), /* value */ true, - [&](const auto& partition, bool) { - const int result = CompareBorders<true, false>( - partition.Range->EndKeyPrefix.GetCells(), key.From, - partition.Range->IsInclusive || partition.Range->IsPoint, - key.InclusiveFrom || key.Point, KeyColumnTypes - ); - - return (result < 0); - } - ); + std::vector<ui64> shardIds; + if (KeyPrefixColumns.size() < KeyColumnTypes.size()) { + /* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */ + std::vector<TCell> fromCells(KeyColumnTypes.size()); + fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end()); + std::vector<TCell> toCells(key.From.begin(), key.From.end()); + + shardIds = GetRangePartitioning(TOwnedTableRange{std::move(fromCells), /* inclusiveFrom */ true, + std::move(toCells), /* inclusiveTo */ false}); + } else { + shardIds = GetRangePartitioning(key); + } - shardKeys[partitionInfo->ShardId].emplace_back(std::move(key)); + for (auto shardId : shardIds) { + shardKeys[shardId].emplace_back(key); + } } for (auto& [shardId, keys] : shardKeys) { @@ -299,6 +348,45 @@ private: } } + std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) { + YQL_ENSURE(Partitioning); + + auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true, + [&](const auto& partition, bool) { + const int result = CompareBorders<true, false>( + partition.Range->EndKeyPrefix.GetCells(), range.From, + partition.Range->IsInclusive || partition.Range->IsPoint, + range.InclusiveFrom || range.Point, KeyColumnTypes + ); + + return (result < 0); + } + ); + + YQL_ENSURE(it != Partitioning->end()); + + std::vector<ui64> rangePartitions; + for (; it != Partitioning->end(); ++it) { + rangePartitions.push_back(it->ShardId); + + if (range.Point) { + break; + } + + auto cmp = CompareBorders<true, true>( + it->Range->EndKeyPrefix.GetCells(), range.To, + it->Range->IsInclusive || it->Range->IsPoint, + range.InclusiveTo || range.Point, KeyColumnTypes + ); + + if (cmp >= 0) { + break; + } + } + + return rangePartitions; + } + TReadState& StartTableRead(ui64 shardId, std::vector<TOwnedTableRange>&& keys) { const auto readId = GetNextReadId(); TReadState read(readId, shardId, std::move(keys)); @@ -317,7 +405,9 @@ private: record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); for (const auto& column : Columns) { - record.AddColumns(column.Id); + auto colIt = ColumnsByName.find(column); + YQL_ENSURE(colIt != ColumnsByName.end()); + record.AddColumns(colIt->second.Id); } for (auto& key : read.Keys) { @@ -357,6 +447,19 @@ private: failedRead.SetFinished(TlsActivationContext->AsActorContext()); } + void ResolveTable() { + TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate()); + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.TableId = TableId; + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; + request->ResultSet.emplace_back(entry); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request)); + + SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout, + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableSchemeResolving))); + } + void ResolveTableShards() { Partitioning.reset(); @@ -372,8 +475,8 @@ private: Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); - ResolveTableShardsTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), ResolveTableShardsTimeout, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveTableShardsTimeout())); + SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout, + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableShardsResolving))); } bool AllReadsFinished() const { @@ -413,16 +516,18 @@ private: const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; const TTableId TableId; - const TVector<NKikimr::NScheme::TTypeId> KeyColumnTypes; - std::vector<NYql::TKikimrColumnMetadata> Columns; const IKqpGateway::TKqpSnapshot Snapshot; + const std::vector<TString> KeyPrefixColumns; + const std::vector<TString> Columns; + std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName; + std::vector<NKikimr::NScheme::TTypeId> KeyColumnTypes; std::deque<TOwnedCellVec> Results; std::unordered_map<ui64, TReadState> Reads; std::unordered_map<ui64, std::set<ui64>> ReadsPerShard; std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; std::deque<TOwnedTableRange> UnprocessedKeys; - const TDuration ResolveTableShardsTimeout; - NActors::TActorId ResolveTableShardsTimeoutTimer; + const TDuration SchemeCacheRequestTimeout; + NActors::TActorId SchemeCacheRequestTimeoutTimer; const TDuration RetryReadTimeout; }; diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index d81eaa91996..86a948ade19 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1968,6 +1968,50 @@ Y_UNIT_TEST_SUITE(KqpScan) { CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", StreamResultToYson(result)); } } + + Y_UNIT_TEST_TWIN(StreamLookupByPkPrefix, UseSessionActor) { + auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); + auto db = kikimr.GetTableClient(); + CreateSampleTables(kikimr); + + { + kikimr.GetTestClient().CreateTable("/Root", R"( + Name: "TestTable" + Columns { Name: "Key1", Type: "Uint64" } + Columns { Name: "Key2", Type: "Uint64" } + Columns { Name: "Value", Type: "String" } + KeyColumnNames: ["Key1", "Key2"] + SplitBoundary { + KeyPrefix { + Tuple { Optional { Uint64: 2 } } + Tuple { Optional { Uint64: 20 } } + } + } + )"); + + auto result = db.CreateSession().GetValueSync().GetSession().ExecuteDataQuery(R"( + REPLACE INTO `/Root/TestTable` (Key1, Key2, Value) VALUES + (1u, 10, "Value1"), + (2u, 19, "Value2"), + (2u, 21, "Value2"), + (3u, 30, "Value3"), + (4u, 40, "Value4"), + (5u, 50, "Value5"); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = db.StreamExecuteScanQuery(R"( + PRAGMA kikimr.UseNewEngine = "true"; + PRAGMA kikimr.OptEnablePredicateExtract = "false"; + $keys = SELECT Key FROM `/Root/KeyValue`; + SELECT * FROM `/Root/TestTable` WHERE Key1 IN $keys ORDER BY Key1, Key2; + )").GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[1u];[10u];["Value1"]];[[2u];[19u];["Value2"]];[[2u];[21u];["Value2"]]])", StreamResultToYson(result)); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 2bd82e48e58..bc9db521ec6 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -577,7 +577,7 @@ message TEvKillScanTablet { message TKqpStreamLookupSettings { optional NKqpProto.TKqpPhyTable Table = 1; - repeated uint32 KeyColumnTypes = 2; - repeated TKqpColumnMetadataProto Columns = 3; + repeated string KeyColumns = 2; + repeated string Columns = 3; optional TKqpSnapshot Snapshot = 4; } diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index d0c652a324c..2bf8943a35d 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -210,9 +210,10 @@ message TKqpPhyCnMerge { message TKqpPhyCnStreamLookup { TKqpPhyTable Table = 1; - repeated TKqpPhyColumn Columns = 2; - bytes LookupKeysType = 3; - bytes ResultType = 4; + repeated string KeyColumns = 2; + repeated string Columns = 3; + bytes LookupKeysType = 4; + bytes ResultType = 5; } message TKqpPhyConnection { |