aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <ulya.sidorina@gmail.com>2022-07-06 20:57:17 +0300
committerIuliia Sidorina <ulya.sidorina@gmail.com>2022-07-06 20:57:17 +0300
commit8bfc750efbcb32aa186f6e9d71727c2eb9943aa0 (patch)
tree101db68ae557979aa422055d28972d8e24ed75b8
parentb2ec03de03af07aefcce3360eb3a2995d4fac12f (diff)
downloadydb-8bfc750efbcb32aa186f6e9d71727c2eb9943aa0.tar.gz
KIKIMR-14294: fix stream lookup actor
ref:a635c4db8605e809efd973dbd7a21e4f81b945d2
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_ut.cpp46
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp143
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp44
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h2
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp45
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp102
-rw-r--r--ydb/core/protos/services.proto1
7 files changed, 237 insertions, 146 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy/kqp_proxy_ut.cpp
index 49224bf2f4..715a6f59cd 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_ut.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_ut.cpp
@@ -45,52 +45,6 @@ TVector<NKikimrKqp::TKqpProxyNodeResources> Transform(TVector<TSimpleResource> d
return result;
}
-void InitRoot(Tests::TServer::TPtr server,
- TActorId sender)
-{
- if (server->GetSettings().StoragePoolTypes.empty()) {
- return;
- }
-
- auto &runtime = *server->GetRuntime();
- auto &settings = server->GetSettings();
-
- auto tid = ChangeStateStorage(SchemeRoot, settings.Domain);
- const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain);
-
- auto evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(1, tid);
- auto transaction = evTx->Record.AddTransaction();
- transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain);
- transaction->SetWorkingDir("/");
- auto op = transaction->MutableSubDomain();
- op->SetName(domain.Name);
-
- for (const auto& [kind, pool] : settings.StoragePoolTypes) {
- auto* p = op->AddStoragePools();
- p->SetKind(kind);
- p->SetName(pool.GetName());
- }
-
- runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries());
-
- {
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted);
- }
-
- auto evSubscribe = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(1);
- runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries());
-
- {
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1);
- }
-}
-
-
Y_UNIT_TEST_SUITE(KqpProxy) {
Y_UNIT_TEST(CalcPeerStats) {
auto getActiveWorkers = [](const NKikimrKqp::TKqpProxyNodeResources& entry) {
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 3e208b9fb1..4777fd4d71 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -39,6 +39,10 @@ public:
Become(&TKqpStreamLookupActor::StateFunc);
}
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
+ }
+
private:
enum class EReadState {
Initial,
@@ -108,6 +112,27 @@ private:
};
};
+ struct TTableScheme {
+ TTableScheme(const THashMap<ui32, TSysTables::TTableColumnInfo>& columns) {
+ std::map<ui32, NKikimr::NScheme::TTypeId> keyColumnTypesByKeyOrder;
+ for (const auto& [_, column] : columns) {
+ if (column.KeyOrder >= 0) {
+ keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType;
+ }
+
+ ColumnsByName.emplace(column.Name, std::move(column));
+ }
+
+ KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size());
+ for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) {
+ KeyColumnTypes[keyOrder] = keyColumnType;
+ }
+ }
+
+ std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName;
+ std::vector<NKikimr::NScheme::TTypeId> KeyColumnTypes;
+ };
+
private:
void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) final {}
void LoadState(const NYql::NDqProto::TSourceState&) final {}
@@ -130,40 +155,19 @@ private:
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64) final {
i64 totalDataSize = 0;
- for (; !Results.empty(); Results.pop_front()) {
- const auto& result = Results.front();
- YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch");
-
- NUdf::TUnboxedValue* rowItems = nullptr;
- auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
-
- for (ui32 colId = 0; colId < Columns.size(); ++colId) {
- auto colIt = ColumnsByName.find(Columns[colId]);
- YQL_ENSURE(colIt != ColumnsByName.end());
- rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType);
+ if (TableScheme) {
+ totalDataSize = PackResults(batch);
+ auto status = FetchLookupKeys();
- totalDataSize += result[colId].Size();
- }
-
- batch.push_back(std::move(row));
- }
-
- NUdf::EFetchStatus status;
- NUdf::TUnboxedValue key;
- while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
- std::vector<TCell> keyCells(KeyPrefixColumns.size());
- for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) {
- keyCells[colId] = MakeCell(KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true);
+ if (Partitioning) {
+ ProcessLookupKeys();
}
- UnprocessedKeys.emplace_back(std::move(keyCells));
- }
-
- if (Partitioning) {
- ProcessLookupKeys();
+ finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished();
+ } else {
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
- finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished();
return totalDataSize;
}
@@ -208,20 +212,7 @@ private:
return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status));
}
- std::map<ui32, NKikimr::NScheme::TTypeId> keyColumnTypesByKeyOrder;
- for (const auto& [_, column] : result.Columns) {
- if (column.KeyOrder >= 0) {
- keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType;
- }
-
- ColumnsByName.emplace(column.Name, std::move(column));
- }
-
- KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size());
- for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) {
- KeyColumnTypes[keyOrder] = keyColumnType;
- }
-
+ TableScheme = std::make_unique<TTableScheme>(result.Columns);
ResolveTableShards();
}
@@ -291,7 +282,7 @@ private:
void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr& ev) {
switch (ev->Get()->Tag) {
case EEvSchemeCacheRequestTag::TableSchemeResolving:
- if (ColumnsByName.empty()) {
+ if (!TableScheme) {
RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId
<< " (request timeout exceeded)");
}
@@ -317,6 +308,49 @@ private:
}
}
+ ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch) {
+ YQL_ENSURE(TableScheme);
+
+ ui64 totalSize = 0;
+ for (; !Results.empty(); Results.pop_front()) {
+ const auto& result = Results.front();
+ YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch");
+
+ NUdf::TUnboxedValue* rowItems = nullptr;
+ auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+
+ for (ui32 colId = 0; colId < Columns.size(); ++colId) {
+ auto colIt = TableScheme->ColumnsByName.find(Columns[colId]);
+ YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
+ rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType);
+
+ totalSize += result[colId].Size();
+ }
+
+ batch.push_back(std::move(row));
+ }
+
+ return totalSize;
+ }
+
+ NUdf::EFetchStatus FetchLookupKeys() {
+ YQL_ENSURE(TableScheme);
+ YQL_ENSURE(KeyPrefixColumns.size() <= TableScheme->KeyColumnTypes.size());
+
+ NUdf::EFetchStatus status;
+ NUdf::TUnboxedValue key;
+ while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
+ std::vector<TCell> keyCells(KeyPrefixColumns.size());
+ for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) {
+ keyCells[colId] = MakeCell(TableScheme->KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true);
+ }
+
+ UnprocessedKeys.emplace_back(std::move(keyCells));
+ }
+
+ return status;
+ }
+
void ProcessLookupKeys() {
YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing");
@@ -326,9 +360,9 @@ private:
YQL_ENSURE(key.Point);
std::vector<ui64> shardIds;
- if (KeyPrefixColumns.size() < KeyColumnTypes.size()) {
+ if (KeyPrefixColumns.size() < TableScheme->KeyColumnTypes.size()) {
/* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */
- std::vector<TCell> fromCells(KeyColumnTypes.size());
+ std::vector<TCell> fromCells(TableScheme->KeyColumnTypes.size());
fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end());
std::vector<TCell> toCells(key.From.begin(), key.From.end());
@@ -349,6 +383,7 @@ private:
}
std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) {
+ YQL_ENSURE(TableScheme);
YQL_ENSURE(Partitioning);
auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true,
@@ -356,7 +391,7 @@ private:
const int result = CompareBorders<true, false>(
partition.Range->EndKeyPrefix.GetCells(), range.From,
partition.Range->IsInclusive || partition.Range->IsPoint,
- range.InclusiveFrom || range.Point, KeyColumnTypes
+ range.InclusiveFrom || range.Point, TableScheme->KeyColumnTypes
);
return (result < 0);
@@ -376,7 +411,7 @@ private:
auto cmp = CompareBorders<true, true>(
it->Range->EndKeyPrefix.GetCells(), range.To,
it->Range->IsInclusive || it->Range->IsPoint,
- range.InclusiveTo || range.Point, KeyColumnTypes
+ range.InclusiveTo || range.Point, TableScheme->KeyColumnTypes
);
if (cmp >= 0) {
@@ -405,8 +440,8 @@ private:
record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
for (const auto& column : Columns) {
- auto colIt = ColumnsByName.find(column);
- YQL_ENSURE(colIt != ColumnsByName.end());
+ auto colIt = TableScheme->ColumnsByName.find(column);
+ YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
record.AddColumns(colIt->second.Id);
}
@@ -461,16 +496,17 @@ private:
}
void ResolveTableShards() {
+ YQL_ENSURE(TableScheme);
Partitioning.reset();
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
- TVector<TCell> minusInf(KeyColumnTypes.size());
+ TVector<TCell> minusInf(TableScheme->KeyColumnTypes.size());
TVector<TCell> plusInf;
TTableRange range(minusInf, true, plusInf, true, false);
request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read,
- KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
+ TableScheme->KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
@@ -519,8 +555,7 @@ private:
const IKqpGateway::TKqpSnapshot Snapshot;
const std::vector<TString> KeyPrefixColumns;
const std::vector<TString> Columns;
- std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName;
- std::vector<NKikimr::NScheme::TTypeId> KeyColumnTypes;
+ std::unique_ptr<const TTableScheme> TableScheme;
std::deque<TOwnedCellVec> Results;
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, std::set<ui64>> ReadsPerShard;
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 39dfd130e3..1453ba58d3 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -1,5 +1,6 @@
#include "kqp_ut_common.h"
+#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -942,5 +943,48 @@ void WaitForKqpProxyInit(const NYdb::TDriver& driver) {
}
}
+void InitRoot(Tests::TServer::TPtr server, TActorId sender) {
+ if (server->GetSettings().StoragePoolTypes.empty()) {
+ return;
+ }
+
+ auto &runtime = *server->GetRuntime();
+ auto &settings = server->GetSettings();
+
+ auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain);
+ const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain);
+
+ auto evTx = MakeHolder<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransaction>(1, tid);
+ auto transaction = evTx->Record.AddTransaction();
+ transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain);
+ transaction->SetWorkingDir("/");
+ auto op = transaction->MutableSubDomain();
+ op->SetName(domain.Name);
+
+ for (const auto& [kind, pool] : settings.StoragePoolTypes) {
+ auto* p = op->AddStoragePools();
+ p->SetKind(kind);
+ p->SetName(pool.GetName());
+ }
+
+ runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries());
+
+ {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted);
+ }
+
+ auto evSubscribe = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(1);
+ runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries());
+
+ {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1);
+ }
+}
+
} // namspace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 07d94ee25a..2f13a31353 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -260,5 +260,7 @@ void CreateSampleTablesWithIndex(NYdb::NTable::TSession& session);
// This method retries a simple query until it succeeds.
void WaitForKqpProxyInit(const NYdb::TDriver& driver);
+void InitRoot(Tests::TServer::TPtr server, TActorId sender);
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 4b87a9273a..b251552a45 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -31,51 +31,6 @@ using namespace NYdb::NScheme;
using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest,
Ydb::Table::BulkUpsertResponse>;
-void InitRoot(Tests::TServer::TPtr server,
- TActorId sender)
-{
- if (server->GetSettings().StoragePoolTypes.empty()) {
- return;
- }
-
- auto &runtime = *server->GetRuntime();
- auto &settings = server->GetSettings();
-
- auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain);
- const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain);
-
- auto evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(1, tid);
- auto transaction = evTx->Record.AddTransaction();
- transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain);
- transaction->SetWorkingDir("/");
- auto op = transaction->MutableSubDomain();
- op->SetName(domain.Name);
-
- for (const auto& [kind, pool] : settings.StoragePoolTypes) {
- auto* p = op->AddStoragePools();
- p->SetKind(kind);
- p->SetName(pool.GetName());
- }
-
- runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries());
-
- {
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted);
- }
-
- auto evSubscribe = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(1);
- runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries());
-
- {
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1);
- }
-}
-
Y_UNIT_TEST_SUITE(KqpOlap) {
void EnableDebugLogging(NActors::TTestActorRuntime* runtime) {
//runtime->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index bc430ba986..cc9af0f917 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1,11 +1,14 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/lib/experimental/ydb_experimental.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <util/generic/size_literals.h>
+#include <ydb/core/kqp/kqp.h>
+#include <ydb/core/kqp/executer/kqp_executer.h>
namespace NKikimr {
namespace NKqp {
@@ -13,7 +16,6 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;
-
namespace {
NKikimrConfig::TAppConfig AppCfg() {
@@ -2020,6 +2022,104 @@ Y_UNIT_TEST_SUITE(KqpScan) {
CompareYson(R"([[[1u];[10u];["Value1"]];[[2u];[19u];["Value2"]];[[2u];[21u];["Value2"]]])", StreamResultToYson(result));
}
}
+
+ Y_UNIT_TEST_TWIN(StreamLookupTryGetDataBeforeSchemeInitialization, UseSessionActor) {
+ TPortManager tp;
+
+ ui16 mbusport = tp.GetPort(2134);
+ auto settings = Tests::TServerSettings(mbusport)
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableKqpScanQueryStreamLookup(true);
+
+ Tests::TServer::TPtr server = new Tests::TServer(settings);
+
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+ auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+
+ InitRoot(server, sender);
+
+ std::vector<TAutoPtr<IEventHandle>> captured;
+ bool firstAttemptToGetData = false;
+
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) {
+ if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) {
+ IActor* actor = runtime->FindActor(ev->GetRecipientRewrite());
+ if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
+ if (!firstAttemptToGetData) {
+ // capture response from scheme cache until CA calls GetAsyncInputData()
+ captured.push_back(ev.Release());
+ return true;
+ }
+
+ for (auto ev : captured) {
+ runtime->Send(ev.Release());
+ }
+ }
+ } else if (ev->GetTypeRewrite() == NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType) {
+ firstAttemptToGetData = true;
+ } else if (ev->GetTypeRewrite() == NKqp::TEvKqpExecuter::TEvStreamData::EventType) {
+ auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
+ Y_ASSERT(record.GetResultSet().rows().size() == 0);
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(record.GetSeqNo());
+ runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
+ return true;
+ }
+
+ return false;
+ };
+
+ auto createSession = [&]() {
+ runtime->Send(new IEventHandle(kqpProxy, sender, new TEvKqp::TEvCreateSessionRequest()));
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvCreateSessionResponse>(sender);
+ auto record = reply->Get()->Record;
+ UNIT_ASSERT_VALUES_EQUAL(record.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ return record.GetResponse().GetSessionId();
+ };
+
+ auto createTable = [&](const TString& sessionId, const TString& queryText) {
+ auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
+ ev->Record.MutableRequest()->SetSessionId(sessionId);
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
+ ev->Record.MutableRequest()->SetQuery(queryText);
+
+ runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ };
+
+ auto sendQuery = [&](const TString& queryText) {
+ auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN);
+ ev->Record.MutableRequest()->SetQuery(queryText);
+ ev->Record.MutableRequest()->SetKeepSession(false);
+ ActorIdToProto(sender, ev->Record.MutableRequestActorId());
+
+ runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ };
+
+ createTable(createSession(), R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/Table` (Key int32, Value int32, PRIMARY KEY(Key));
+ )");
+
+ server->GetRuntime()->SetEventFilter(captureEvents);
+
+ sendQuery(R"(
+ PRAGMA kikimr.UseNewEngine = "true";
+ PRAGMA kikimr.OptEnablePredicateExtract = "false";
+ SELECT Value FROM `/Root/Table` WHERE Key IN AsList(1, 2, 3);
+ )");
+
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index f59945ce01..5fc0856fc5 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -894,5 +894,6 @@ message TActivity {
YQ_HEALTH_ACTOR = 569;
BLOB_DEPOT_ACTOR = 570;
BLOB_DEPOT_AGENT = 571;
+ KQP_STREAM_LOOKUP_ACTOR = 572;
};
};