diff options
author | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-07-06 20:57:17 +0300 |
---|---|---|
committer | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-07-06 20:57:17 +0300 |
commit | 8bfc750efbcb32aa186f6e9d71727c2eb9943aa0 (patch) | |
tree | 101db68ae557979aa422055d28972d8e24ed75b8 | |
parent | b2ec03de03af07aefcce3360eb3a2995d4fac12f (diff) | |
download | ydb-8bfc750efbcb32aa186f6e9d71727c2eb9943aa0.tar.gz |
KIKIMR-14294: fix stream lookup actor
ref:a635c4db8605e809efd973dbd7a21e4f81b945d2
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_ut.cpp | 46 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 143 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 44 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 45 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 102 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 1 |
7 files changed, 237 insertions, 146 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy/kqp_proxy_ut.cpp index 49224bf2f4..715a6f59cd 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_ut.cpp @@ -45,52 +45,6 @@ TVector<NKikimrKqp::TKqpProxyNodeResources> Transform(TVector<TSimpleResource> d return result; } -void InitRoot(Tests::TServer::TPtr server, - TActorId sender) -{ - if (server->GetSettings().StoragePoolTypes.empty()) { - return; - } - - auto &runtime = *server->GetRuntime(); - auto &settings = server->GetSettings(); - - auto tid = ChangeStateStorage(SchemeRoot, settings.Domain); - const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain); - - auto evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(1, tid); - auto transaction = evTx->Record.AddTransaction(); - transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain); - transaction->SetWorkingDir("/"); - auto op = transaction->MutableSubDomain(); - op->SetName(domain.Name); - - for (const auto& [kind, pool] : settings.StoragePoolTypes) { - auto* p = op->AddStoragePools(); - p->SetKind(kind); - p->SetName(pool.GetName()); - } - - runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries()); - - { - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted); - } - - auto evSubscribe = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(1); - runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries()); - - { - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1); - } -} - - Y_UNIT_TEST_SUITE(KqpProxy) { Y_UNIT_TEST(CalcPeerStats) { auto getActiveWorkers = [](const NKikimrKqp::TKqpProxyNodeResources& entry) { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 3e208b9fb1..4777fd4d71 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -39,6 +39,10 @@ public: Become(&TKqpStreamLookupActor::StateFunc); } + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR; + } + private: enum class EReadState { Initial, @@ -108,6 +112,27 @@ private: }; }; + struct TTableScheme { + TTableScheme(const THashMap<ui32, TSysTables::TTableColumnInfo>& columns) { + std::map<ui32, NKikimr::NScheme::TTypeId> keyColumnTypesByKeyOrder; + for (const auto& [_, column] : columns) { + if (column.KeyOrder >= 0) { + keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType; + } + + ColumnsByName.emplace(column.Name, std::move(column)); + } + + KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size()); + for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) { + KeyColumnTypes[keyOrder] = keyColumnType; + } + } + + std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName; + std::vector<NKikimr::NScheme::TTypeId> KeyColumnTypes; + }; + private: void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) final {} void LoadState(const NYql::NDqProto::TSourceState&) final {} @@ -130,40 +155,19 @@ private: i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64) final { i64 totalDataSize = 0; - for (; !Results.empty(); Results.pop_front()) { - const auto& result = Results.front(); - YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch"); - - NUdf::TUnboxedValue* rowItems = nullptr; - auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); - - for (ui32 colId = 0; colId < Columns.size(); ++colId) { - auto colIt = ColumnsByName.find(Columns[colId]); - YQL_ENSURE(colIt != ColumnsByName.end()); - rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType); + if (TableScheme) { + totalDataSize = PackResults(batch); + auto status = FetchLookupKeys(); - totalDataSize += result[colId].Size(); - } - - batch.push_back(std::move(row)); - } - - NUdf::EFetchStatus status; - NUdf::TUnboxedValue key; - while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) { - std::vector<TCell> keyCells(KeyPrefixColumns.size()); - for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) { - keyCells[colId] = MakeCell(KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true); + if (Partitioning) { + ProcessLookupKeys(); } - UnprocessedKeys.emplace_back(std::move(keyCells)); - } - - if (Partitioning) { - ProcessLookupKeys(); + finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished(); + } else { + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } - finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished(); return totalDataSize; } @@ -208,20 +212,7 @@ private: return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status)); } - std::map<ui32, NKikimr::NScheme::TTypeId> keyColumnTypesByKeyOrder; - for (const auto& [_, column] : result.Columns) { - if (column.KeyOrder >= 0) { - keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType; - } - - ColumnsByName.emplace(column.Name, std::move(column)); - } - - KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size()); - for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) { - KeyColumnTypes[keyOrder] = keyColumnType; - } - + TableScheme = std::make_unique<TTableScheme>(result.Columns); ResolveTableShards(); } @@ -291,7 +282,7 @@ private: void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr& ev) { switch (ev->Get()->Tag) { case EEvSchemeCacheRequestTag::TableSchemeResolving: - if (ColumnsByName.empty()) { + if (!TableScheme) { RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId << " (request timeout exceeded)"); } @@ -317,6 +308,49 @@ private: } } + ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch) { + YQL_ENSURE(TableScheme); + + ui64 totalSize = 0; + for (; !Results.empty(); Results.pop_front()) { + const auto& result = Results.front(); + YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch"); + + NUdf::TUnboxedValue* rowItems = nullptr; + auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); + + for (ui32 colId = 0; colId < Columns.size(); ++colId) { + auto colIt = TableScheme->ColumnsByName.find(Columns[colId]); + YQL_ENSURE(colIt != TableScheme->ColumnsByName.end()); + rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType); + + totalSize += result[colId].Size(); + } + + batch.push_back(std::move(row)); + } + + return totalSize; + } + + NUdf::EFetchStatus FetchLookupKeys() { + YQL_ENSURE(TableScheme); + YQL_ENSURE(KeyPrefixColumns.size() <= TableScheme->KeyColumnTypes.size()); + + NUdf::EFetchStatus status; + NUdf::TUnboxedValue key; + while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) { + std::vector<TCell> keyCells(KeyPrefixColumns.size()); + for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) { + keyCells[colId] = MakeCell(TableScheme->KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true); + } + + UnprocessedKeys.emplace_back(std::move(keyCells)); + } + + return status; + } + void ProcessLookupKeys() { YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing"); @@ -326,9 +360,9 @@ private: YQL_ENSURE(key.Point); std::vector<ui64> shardIds; - if (KeyPrefixColumns.size() < KeyColumnTypes.size()) { + if (KeyPrefixColumns.size() < TableScheme->KeyColumnTypes.size()) { /* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */ - std::vector<TCell> fromCells(KeyColumnTypes.size()); + std::vector<TCell> fromCells(TableScheme->KeyColumnTypes.size()); fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end()); std::vector<TCell> toCells(key.From.begin(), key.From.end()); @@ -349,6 +383,7 @@ private: } std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) { + YQL_ENSURE(TableScheme); YQL_ENSURE(Partitioning); auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true, @@ -356,7 +391,7 @@ private: const int result = CompareBorders<true, false>( partition.Range->EndKeyPrefix.GetCells(), range.From, partition.Range->IsInclusive || partition.Range->IsPoint, - range.InclusiveFrom || range.Point, KeyColumnTypes + range.InclusiveFrom || range.Point, TableScheme->KeyColumnTypes ); return (result < 0); @@ -376,7 +411,7 @@ private: auto cmp = CompareBorders<true, true>( it->Range->EndKeyPrefix.GetCells(), range.To, it->Range->IsInclusive || it->Range->IsPoint, - range.InclusiveTo || range.Point, KeyColumnTypes + range.InclusiveTo || range.Point, TableScheme->KeyColumnTypes ); if (cmp >= 0) { @@ -405,8 +440,8 @@ private: record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); for (const auto& column : Columns) { - auto colIt = ColumnsByName.find(column); - YQL_ENSURE(colIt != ColumnsByName.end()); + auto colIt = TableScheme->ColumnsByName.find(column); + YQL_ENSURE(colIt != TableScheme->ColumnsByName.end()); record.AddColumns(colIt->second.Id); } @@ -461,16 +496,17 @@ private: } void ResolveTableShards() { + YQL_ENSURE(TableScheme); Partitioning.reset(); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); - TVector<TCell> minusInf(KeyColumnTypes.size()); + TVector<TCell> minusInf(TableScheme->KeyColumnTypes.size()); TVector<TCell> plusInf; TTableRange range(minusInf, true, plusInf, true, false); request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read, - KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); + TableScheme->KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); @@ -519,8 +555,7 @@ private: const IKqpGateway::TKqpSnapshot Snapshot; const std::vector<TString> KeyPrefixColumns; const std::vector<TString> Columns; - std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName; - std::vector<NKikimr::NScheme::TTypeId> KeyColumnTypes; + std::unique_ptr<const TTableScheme> TableScheme; std::deque<TOwnedCellVec> Results; std::unordered_map<ui64, TReadState> Reads; std::unordered_map<ui64, std::set<ui64>> ReadsPerShard; diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 39dfd130e3..1453ba58d3 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -1,5 +1,6 @@ #include "kqp_ut_common.h" +#include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/kqp/provider/yql_kikimr_results.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> @@ -942,5 +943,48 @@ void WaitForKqpProxyInit(const NYdb::TDriver& driver) { } } +void InitRoot(Tests::TServer::TPtr server, TActorId sender) { + if (server->GetSettings().StoragePoolTypes.empty()) { + return; + } + + auto &runtime = *server->GetRuntime(); + auto &settings = server->GetSettings(); + + auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain); + const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain); + + auto evTx = MakeHolder<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransaction>(1, tid); + auto transaction = evTx->Record.AddTransaction(); + transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain); + transaction->SetWorkingDir("/"); + auto op = transaction->MutableSubDomain(); + op->SetName(domain.Name); + + for (const auto& [kind, pool] : settings.StoragePoolTypes) { + auto* p = op->AddStoragePools(); + p->SetKind(kind); + p->SetName(pool.GetName()); + } + + runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries()); + + { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult>(handle); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted); + } + + auto evSubscribe = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(1); + runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries()); + + { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(handle); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1); + } +} + } // namspace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 07d94ee25a..2f13a31353 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -260,5 +260,7 @@ void CreateSampleTablesWithIndex(NYdb::NTable::TSession& session); // This method retries a simple query until it succeeds. void WaitForKqpProxyInit(const NYdb::TDriver& driver); +void InitRoot(Tests::TServer::TPtr server, TActorId sender); + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index 4b87a9273a..b251552a45 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -31,51 +31,6 @@ using namespace NYdb::NScheme; using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>; -void InitRoot(Tests::TServer::TPtr server, - TActorId sender) -{ - if (server->GetSettings().StoragePoolTypes.empty()) { - return; - } - - auto &runtime = *server->GetRuntime(); - auto &settings = server->GetSettings(); - - auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain); - const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain); - - auto evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(1, tid); - auto transaction = evTx->Record.AddTransaction(); - transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain); - transaction->SetWorkingDir("/"); - auto op = transaction->MutableSubDomain(); - op->SetName(domain.Name); - - for (const auto& [kind, pool] : settings.StoragePoolTypes) { - auto* p = op->AddStoragePools(); - p->SetKind(kind); - p->SetName(pool.GetName()); - } - - runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries()); - - { - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted); - } - - auto evSubscribe = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(1); - runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries()); - - { - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1); - } -} - Y_UNIT_TEST_SUITE(KqpOlap) { void EnableDebugLogging(NActors::TTestActorRuntime* runtime) { //runtime->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index bc430ba986..cc9af0f917 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1,11 +1,14 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/public/lib/experimental/ydb_experimental.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <util/generic/size_literals.h> +#include <ydb/core/kqp/kqp.h> +#include <ydb/core/kqp/executer/kqp_executer.h> namespace NKikimr { namespace NKqp { @@ -13,7 +16,6 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; - namespace { NKikimrConfig::TAppConfig AppCfg() { @@ -2020,6 +2022,104 @@ Y_UNIT_TEST_SUITE(KqpScan) { CompareYson(R"([[[1u];[10u];["Value1"]];[[2u];[19u];["Value2"]];[[2u];[21u];["Value2"]]])", StreamResultToYson(result)); } } + + Y_UNIT_TEST_TWIN(StreamLookupTryGetDataBeforeSchemeInitialization, UseSessionActor) { + TPortManager tp; + + ui16 mbusport = tp.GetPort(2134); + auto settings = Tests::TServerSettings(mbusport) + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableKqpScanQueryStreamLookup(true); + + Tests::TServer::TPtr server = new Tests::TServer(settings); + + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + + InitRoot(server, sender); + + std::vector<TAutoPtr<IEventHandle>> captured; + bool firstAttemptToGetData = false; + + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) { + if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) { + IActor* actor = runtime->FindActor(ev->GetRecipientRewrite()); + if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) { + if (!firstAttemptToGetData) { + // capture response from scheme cache until CA calls GetAsyncInputData() + captured.push_back(ev.Release()); + return true; + } + + for (auto ev : captured) { + runtime->Send(ev.Release()); + } + } + } else if (ev->GetTypeRewrite() == NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType) { + firstAttemptToGetData = true; + } else if (ev->GetTypeRewrite() == NKqp::TEvKqpExecuter::TEvStreamData::EventType) { + auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; + Y_ASSERT(record.GetResultSet().rows().size() == 0); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(record.GetSeqNo()); + runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); + return true; + } + + return false; + }; + + auto createSession = [&]() { + runtime->Send(new IEventHandle(kqpProxy, sender, new TEvKqp::TEvCreateSessionRequest())); + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvCreateSessionResponse>(sender); + auto record = reply->Get()->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + return record.GetResponse().GetSessionId(); + }; + + auto createTable = [&](const TString& sessionId, const TString& queryText) { + auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); + ev->Record.MutableRequest()->SetSessionId(sessionId); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL); + ev->Record.MutableRequest()->SetQuery(queryText); + + runtime->Send(new IEventHandle(kqpProxy, sender, ev.release())); + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + }; + + auto sendQuery = [&](const TString& queryText) { + auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN); + ev->Record.MutableRequest()->SetQuery(queryText); + ev->Record.MutableRequest()->SetKeepSession(false); + ActorIdToProto(sender, ev->Record.MutableRequestActorId()); + + runtime->Send(new IEventHandle(kqpProxy, sender, ev.release())); + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + }; + + createTable(createSession(), R"( + --!syntax_v1 + CREATE TABLE `/Root/Table` (Key int32, Value int32, PRIMARY KEY(Key)); + )"); + + server->GetRuntime()->SetEventFilter(captureEvents); + + sendQuery(R"( + PRAGMA kikimr.UseNewEngine = "true"; + PRAGMA kikimr.OptEnablePredicateExtract = "false"; + SELECT Value FROM `/Root/Table` WHERE Key IN AsList(1, 2, 3); + )"); + + } } } // namespace NKqp diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index f59945ce01..5fc0856fc5 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -894,5 +894,6 @@ message TActivity { YQ_HEALTH_ACTOR = 569; BLOB_DEPOT_ACTOR = 570; BLOB_DEPOT_AGENT = 571; + KQP_STREAM_LOOKUP_ACTOR = 572; }; }; |