aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <ulya.sidorina@gmail.com>2022-06-29 17:08:27 +0300
committerIuliia Sidorina <ulya.sidorina@gmail.com>2022-06-29 17:08:27 +0300
commitf75f14f22fe3ff4f20a19495c6befcbfa10f5637 (patch)
tree51fc4ee55105968ad4077bfa903f3c4d3354a827
parent5bc762a70a35cb6aa5ca6547538736c38e7bc064 (diff)
downloadydb-f75f14f22fe3ff4f20a19495c6befcbfa10f5637.tar.gz
KIKIMR-14294: implement stream lookup by pk prefix
fix(kqp): use pk prefix for stream lookup ref:bc9674c5ad428b50426de121e0cb25ea57e1fe51
-rw-r--r--ydb/core/kqp/compile/kqp_compile.cpp13
-rw-r--r--ydb/core/kqp/executer/kqp_table_resolver.cpp18
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.cpp16
-rw-r--r--ydb/core/kqp/prepare/kqp_type_ann.cpp24
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp185
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp44
-rw-r--r--ydb/core/protos/kqp.proto4
-rw-r--r--ydb/core/protos/kqp_physical.proto7
8 files changed, 246 insertions, 65 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp
index 15f17500bad..6504ab68109 100644
--- a/ydb/core/kqp/compile/kqp_compile.cpp
+++ b/ydb/core/kqp/compile/kqp_compile.cpp
@@ -698,7 +698,6 @@ private:
YQL_ENSURE(tableMeta);
FillTable(streamLookup.Table(), *streamLookupProto.MutableTable());
- FillColumns(streamLookup.Columns(), *tableMeta, streamLookupProto, true);
const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type");
@@ -706,6 +705,18 @@ private:
const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *lookupKeysItemType), TypeEnv));
+ YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct);
+ const auto& lookupKeyColumns = lookupKeysItemType->Cast<TStructExprType>()->GetItems();
+ for (const auto keyColumn : lookupKeyColumns) {
+ YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()), "Unknown column: " << keyColumn->GetName());
+ streamLookupProto.AddKeyColumns(TString(keyColumn->GetName()));
+ }
+
+ for (const auto& column : streamLookup.Columns()) {
+ YQL_ENSURE(tableMeta->Columns.FindPtr(column), "Unknown column: " << TString(column));
+ streamLookupProto.AddColumns(TString(column));
+ }
+
const auto resultType = streamLookup.Ref().GetTypeAnn();
YQL_ENSURE(resultType, "Empty stream lookup result type");
YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Stream, "Unexpected stream lookup result type");
diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp
index 3cb402c15bb..e8aba64987a 100644
--- a/ydb/core/kqp/executer/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp
@@ -223,22 +223,22 @@ private:
private:
// TODO: Get rid of ResolveTables & TableKeys, get table information from phy tx proto.
void ResolveTables() {
- auto addTable = [](const auto& proto, auto& tables) {
- auto& table = tables.GetOrAddTable(MakeTableId(proto.GetTable()), proto.GetTable().GetPath());
- for (auto& column : proto.GetColumns()) {
- table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn());
- }
- };
-
for (auto& tx : Transactions) {
for (auto& stage : tx.Body->GetStages()) {
for (auto& op : stage.GetTableOps()) {
- addTable(op, TableKeys);
+ auto& table = TableKeys.GetOrAddTable(MakeTableId(op.GetTable()), op.GetTable().GetPath());
+ for (auto& column : op.GetColumns()) {
+ table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn());
+ }
}
for (const auto& input : stage.GetInputs()) {
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
- addTable(input.GetStreamLookup(), TableKeys);
+ const auto& streamLookup = input.GetStreamLookup();
+ auto& table = TableKeys.GetOrAddTable(MakeTableId(streamLookup.GetTable()), streamLookup.GetTable().GetPath());
+ for (auto& column : input.GetStreamLookup().GetColumns()) {
+ table.Columns.emplace(column, TKqpTableKeys::TColumn());
+ }
}
}
}
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
index 8ba9b9fbf82..fd0efe74ec7 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
@@ -208,18 +208,16 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
settings.MutableSnapshot()->SetTxId(snapshot.TxId);
auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable()));
- for (const auto& keyColumnType : table.KeyColumnTypes) {
- settings.AddKeyColumnTypes(keyColumnType);
+ for (const auto& keyColumn : streamLookup.GetKeyColumns()) {
+ auto columnIt = table.Columns.find(keyColumn);
+ YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn);
+ settings.AddKeyColumns(keyColumn);
}
for (const auto& column : streamLookup.GetColumns()) {
- auto columnIt = table.Columns.find(column.GetName());
- YQL_ENSURE(columnIt != table.Columns.end());
-
- auto newColumn = settings.AddColumns();
- newColumn->SetName(columnIt->first);
- newColumn->SetId(columnIt->second.Id);
- newColumn->SetTypeId(columnIt->second.Type);
+ auto columnIt = table.Columns.find(column);
+ YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column);
+ settings.AddColumns(column);
}
TTransform streamLookupTransform;
diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp
index a19a9bed8f6..37b3bd99bb6 100644
--- a/ydb/core/kqp/prepare/kqp_type_ann.cpp
+++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp
@@ -1051,11 +1051,33 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
return TStatus::Error;
}
+ auto lookupKeysTypeNode = node->Child(TKqpCnStreamLookup::idx_LookupKeysType);
+ if (!EnsureType(*lookupKeysTypeNode, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto lookupKeysType = lookupKeysTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ if (!EnsureListType(node->Pos(), *lookupKeysType, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType();
+ if (!EnsureStructType(node->Pos(), *lookupKeyType, ctx)) {
+ return TStatus::Error;
+ }
+
+ const auto& lookupKeyColumns = lookupKeyType->Cast<TStructExprType>()->GetItems();
+ for (const auto& keyColumn : lookupKeyColumns) {
+ if (!table.second->GetKeyColumnIndex(TString(keyColumn->GetName()))) {
+ return TStatus::Error;
+ }
+ }
+
if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) {
return TStatus::Error;
}
- TCoAtomList columns{node->ChildPtr(TKqlLookupTableBase::idx_Columns)};
+ TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)};
auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns);
if (!rowType) {
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 3a06dd106df..3e208b9fb1f 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -16,7 +16,7 @@ namespace NKqp {
namespace {
-static constexpr TDuration RESOLVE_SHARDS_TIMEOUT = TDuration::Seconds(5);
+static constexpr TDuration SCHEME_CACHE_REQUEST_TIMEOUT = TDuration::Seconds(5);
static constexpr TDuration RETRY_READ_TIMEOUT = TDuration::Seconds(10);
class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
@@ -26,18 +26,15 @@ public:
NKikimrKqp::TKqpStreamLookupSettings&& settings)
: InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
, HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable()))
- , KeyColumnTypes(settings.GetKeyColumnTypes().begin(), settings.GetKeyColumnTypes().end())
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
- , ResolveTableShardsTimeout(RESOLVE_SHARDS_TIMEOUT)
+ , KeyPrefixColumns(settings.GetKeyColumns().begin(), settings.GetKeyColumns().end())
+ , Columns(settings.GetColumns().begin(), settings.GetColumns().end())
+ , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, RetryReadTimeout(RETRY_READ_TIMEOUT) {
-
- for (const auto& column : settings.GetColumns()) {
- Columns.emplace_back(&column);
- }
};
void Bootstrap() {
- ResolveTableShards();
+ ResolveTable();
Become(&TKqpStreamLookupActor::StateFunc);
}
@@ -87,19 +84,27 @@ private:
bool Retried;
};
+ enum EEvSchemeCacheRequestTag : ui64 {
+ TableSchemeResolving,
+ TableShardsResolving
+ };
+
struct TEvPrivate {
enum EEv {
EvRetryReadTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
- EvResolveTableShardsTimeout,
+ EvSchemeCacheRequestTimeout,
};
- struct TEvResolveTableShardsTimeout : public TEventLocal<TEvResolveTableShardsTimeout, EvResolveTableShardsTimeout> {
+ struct TEvSchemeCacheRequestTimeout : public TEventLocal<TEvSchemeCacheRequestTimeout, EvSchemeCacheRequestTimeout> {
+ TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag tag) : Tag(tag) {}
+
+ const EEvSchemeCacheRequestTag Tag;
};
struct TEvRetryReadTimeout : public TEventLocal<TEvRetryReadTimeout, EvRetryReadTimeout> {
TEvRetryReadTimeout(ui64 readId) : ReadId(readId) {}
- ui64 ReadId;
+ const ui64 ReadId;
};
};
@@ -133,8 +138,11 @@ private:
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
for (ui32 colId = 0; colId < Columns.size(); ++colId) {
+ auto colIt = ColumnsByName.find(Columns[colId]);
+ YQL_ENSURE(colIt != ColumnsByName.end());
+ rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType);
+
totalDataSize += result[colId].Size();
- rowItems[colId] = NMiniKQL::GetCellValue(result[colId], Columns[colId].TypeId);
}
batch.push_back(std::move(row));
@@ -143,8 +151,8 @@ private:
NUdf::EFetchStatus status;
NUdf::TUnboxedValue key;
while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
- std::vector<TCell> keyCells(KeyColumnTypes.size());
- for (ui32 colId = 0; colId < KeyColumnTypes.size(); ++colId) {
+ std::vector<TCell> keyCells(KeyPrefixColumns.size());
+ for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) {
keyCells[colId] = MakeCell(KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true);
}
@@ -165,9 +173,10 @@ private:
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvDataShard::TEvReadResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
- hFunc(TEvPrivate::TEvResolveTableShardsTimeout, Handle);
+ hFunc(TEvPrivate::TEvSchemeCacheRequestTimeout, Handle);
hFunc(TEvPrivate::TEvRetryReadTimeout, Handle);
IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
default:
@@ -184,12 +193,38 @@ private:
}
auto& resultSet = ev->Get()->Request->ResultSet;
- YQL_ENSURE(resultSet.size() == 1, "Expected one result for range (-inf, +inf)");
+ YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)");
Partitioning = resultSet[0].KeyDescription->Partitioning;
ProcessLookupKeys();
}
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ auto& resultSet = ev->Get()->Request->ResultSet;
+ YQL_ENSURE(resultSet.size() == 1, "Expected one result for table: " << TableId);
+ auto& result = resultSet[0];
+
+ if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status));
+ }
+
+ std::map<ui32, NKikimr::NScheme::TTypeId> keyColumnTypesByKeyOrder;
+ for (const auto& [_, column] : result.Columns) {
+ if (column.KeyOrder >= 0) {
+ keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType;
+ }
+
+ ColumnsByName.emplace(column.Name, std::move(column));
+ }
+
+ KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size());
+ for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) {
+ KeyColumnTypes[keyOrder] = keyColumnType;
+ }
+
+ ResolveTableShards();
+ }
+
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
const auto& record = ev->Get()->Record;
@@ -253,10 +288,22 @@ private:
ResolveTableShards();
}
- void Handle(TEvPrivate::TEvResolveTableShardsTimeout::TPtr&) {
- if (!Partitioning) {
- RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
- << " (request timeout exceeded)");
+ void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr& ev) {
+ switch (ev->Get()->Tag) {
+ case EEvSchemeCacheRequestTag::TableSchemeResolving:
+ if (ColumnsByName.empty()) {
+ RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId
+ << " (request timeout exceeded)");
+ }
+ break;
+ case EEvSchemeCacheRequestTag::TableShardsResolving:
+ if (!Partitioning) {
+ RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
+ << " (request timeout exceeded)");
+ }
+ break;
+ default:
+ RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag);
}
}
@@ -278,20 +325,22 @@ private:
const auto& key = UnprocessedKeys.front();
YQL_ENSURE(key.Point);
- auto partitionInfo = LowerBound(
- Partitioning->begin(), Partitioning->end(), /* value */ true,
- [&](const auto& partition, bool) {
- const int result = CompareBorders<true, false>(
- partition.Range->EndKeyPrefix.GetCells(), key.From,
- partition.Range->IsInclusive || partition.Range->IsPoint,
- key.InclusiveFrom || key.Point, KeyColumnTypes
- );
-
- return (result < 0);
- }
- );
+ std::vector<ui64> shardIds;
+ if (KeyPrefixColumns.size() < KeyColumnTypes.size()) {
+ /* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */
+ std::vector<TCell> fromCells(KeyColumnTypes.size());
+ fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end());
+ std::vector<TCell> toCells(key.From.begin(), key.From.end());
+
+ shardIds = GetRangePartitioning(TOwnedTableRange{std::move(fromCells), /* inclusiveFrom */ true,
+ std::move(toCells), /* inclusiveTo */ false});
+ } else {
+ shardIds = GetRangePartitioning(key);
+ }
- shardKeys[partitionInfo->ShardId].emplace_back(std::move(key));
+ for (auto shardId : shardIds) {
+ shardKeys[shardId].emplace_back(key);
+ }
}
for (auto& [shardId, keys] : shardKeys) {
@@ -299,6 +348,45 @@ private:
}
}
+ std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) {
+ YQL_ENSURE(Partitioning);
+
+ auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true,
+ [&](const auto& partition, bool) {
+ const int result = CompareBorders<true, false>(
+ partition.Range->EndKeyPrefix.GetCells(), range.From,
+ partition.Range->IsInclusive || partition.Range->IsPoint,
+ range.InclusiveFrom || range.Point, KeyColumnTypes
+ );
+
+ return (result < 0);
+ }
+ );
+
+ YQL_ENSURE(it != Partitioning->end());
+
+ std::vector<ui64> rangePartitions;
+ for (; it != Partitioning->end(); ++it) {
+ rangePartitions.push_back(it->ShardId);
+
+ if (range.Point) {
+ break;
+ }
+
+ auto cmp = CompareBorders<true, true>(
+ it->Range->EndKeyPrefix.GetCells(), range.To,
+ it->Range->IsInclusive || it->Range->IsPoint,
+ range.InclusiveTo || range.Point, KeyColumnTypes
+ );
+
+ if (cmp >= 0) {
+ break;
+ }
+ }
+
+ return rangePartitions;
+ }
+
TReadState& StartTableRead(ui64 shardId, std::vector<TOwnedTableRange>&& keys) {
const auto readId = GetNextReadId();
TReadState read(readId, shardId, std::move(keys));
@@ -317,7 +405,9 @@ private:
record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
for (const auto& column : Columns) {
- record.AddColumns(column.Id);
+ auto colIt = ColumnsByName.find(column);
+ YQL_ENSURE(colIt != ColumnsByName.end());
+ record.AddColumns(colIt->second.Id);
}
for (auto& key : read.Keys) {
@@ -357,6 +447,19 @@ private:
failedRead.SetFinished(TlsActivationContext->AsActorContext());
}
+ void ResolveTable() {
+ TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
+ NSchemeCache::TSchemeCacheNavigate::TEntry entry;
+ entry.TableId = TableId;
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
+ request->ResultSet.emplace_back(entry);
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
+
+ SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableSchemeResolving)));
+ }
+
void ResolveTableShards() {
Partitioning.reset();
@@ -372,8 +475,8 @@ private:
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
- ResolveTableShardsTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), ResolveTableShardsTimeout,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveTableShardsTimeout()));
+ SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableShardsResolving)));
}
bool AllReadsFinished() const {
@@ -413,16 +516,18 @@ private:
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
const TTableId TableId;
- const TVector<NKikimr::NScheme::TTypeId> KeyColumnTypes;
- std::vector<NYql::TKikimrColumnMetadata> Columns;
const IKqpGateway::TKqpSnapshot Snapshot;
+ const std::vector<TString> KeyPrefixColumns;
+ const std::vector<TString> Columns;
+ std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName;
+ std::vector<NKikimr::NScheme::TTypeId> KeyColumnTypes;
std::deque<TOwnedCellVec> Results;
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, std::set<ui64>> ReadsPerShard;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
std::deque<TOwnedTableRange> UnprocessedKeys;
- const TDuration ResolveTableShardsTimeout;
- NActors::TActorId ResolveTableShardsTimeoutTimer;
+ const TDuration SchemeCacheRequestTimeout;
+ NActors::TActorId SchemeCacheRequestTimeoutTimer;
const TDuration RetryReadTimeout;
};
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index d81eaa91996..86a948ade19 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1968,6 +1968,50 @@ Y_UNIT_TEST_SUITE(KqpScan) {
CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", StreamResultToYson(result));
}
}
+
+ Y_UNIT_TEST_TWIN(StreamLookupByPkPrefix, UseSessionActor) {
+ auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
+ auto db = kikimr.GetTableClient();
+ CreateSampleTables(kikimr);
+
+ {
+ kikimr.GetTestClient().CreateTable("/Root", R"(
+ Name: "TestTable"
+ Columns { Name: "Key1", Type: "Uint64" }
+ Columns { Name: "Key2", Type: "Uint64" }
+ Columns { Name: "Value", Type: "String" }
+ KeyColumnNames: ["Key1", "Key2"]
+ SplitBoundary {
+ KeyPrefix {
+ Tuple { Optional { Uint64: 2 } }
+ Tuple { Optional { Uint64: 20 } }
+ }
+ }
+ )");
+
+ auto result = db.CreateSession().GetValueSync().GetSession().ExecuteDataQuery(R"(
+ REPLACE INTO `/Root/TestTable` (Key1, Key2, Value) VALUES
+ (1u, 10, "Value1"),
+ (2u, 19, "Value2"),
+ (2u, 21, "Value2"),
+ (3u, 30, "Value3"),
+ (4u, 40, "Value4"),
+ (5u, 50, "Value5");
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ auto result = db.StreamExecuteScanQuery(R"(
+ PRAGMA kikimr.UseNewEngine = "true";
+ PRAGMA kikimr.OptEnablePredicateExtract = "false";
+ $keys = SELECT Key FROM `/Root/KeyValue`;
+ SELECT * FROM `/Root/TestTable` WHERE Key1 IN $keys ORDER BY Key1, Key2;
+ )").GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[1u];[10u];["Value1"]];[[2u];[19u];["Value2"]];[[2u];[21u];["Value2"]]])", StreamResultToYson(result));
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 2bd82e48e58..bc9db521ec6 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -577,7 +577,7 @@ message TEvKillScanTablet {
message TKqpStreamLookupSettings {
optional NKqpProto.TKqpPhyTable Table = 1;
- repeated uint32 KeyColumnTypes = 2;
- repeated TKqpColumnMetadataProto Columns = 3;
+ repeated string KeyColumns = 2;
+ repeated string Columns = 3;
optional TKqpSnapshot Snapshot = 4;
}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index d0c652a324c..2bf8943a35d 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -210,9 +210,10 @@ message TKqpPhyCnMerge {
message TKqpPhyCnStreamLookup {
TKqpPhyTable Table = 1;
- repeated TKqpPhyColumn Columns = 2;
- bytes LookupKeysType = 3;
- bytes ResultType = 4;
+ repeated string KeyColumns = 2;
+ repeated string Columns = 3;
+ bytes LookupKeysType = 4;
+ bytes ResultType = 5;
}
message TKqpPhyConnection {