diff options
author | ulya-sidorina <yulia@ydb.tech> | 2022-09-26 16:42:37 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2022-09-26 16:42:37 +0300 |
commit | 91cdfb6d765fc121f80bf6b84b6fad2a9e11e4e7 (patch) | |
tree | 39b26d7a81b3eed01ace43449a3e292fc010fa02 | |
parent | 6094735641fc7e3e5e900eab9e3e25017b518355 (diff) | |
download | ydb-91cdfb6d765fc121f80bf6b84b6fad2a9e11e4e7.tar.gz |
support stream lookup for data query
feature(data_query): support stream lookup for data query
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 36 | ||||
-rw-r--r-- | ydb/core/kqp/node/kqp_node.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 91 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 87 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 2 | ||||
-rw-r--r-- | ydb/core/testlib/basics/feature_flags.h | 1 |
11 files changed, 235 insertions, 43 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index c75b507035..9fe5852f38 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -3,6 +3,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/kqp/runtime/kqp_compute.h> #include <ydb/core/kqp/runtime/kqp_read_table.h> +#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h> namespace NKikimr { namespace NMiniKQL { @@ -44,6 +45,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory() { + auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); + RegisterStreamLookupActorFactory(*factory); + return factory; +} + void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, ui32& maxInFlight, bool& isAggregationRequest) const { const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 466953906d..8030b56d4e 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -2,6 +2,7 @@ #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/kqp_compute.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> #include <ydb/core/scheme/scheme_tabledefs.h> namespace NKikimr { @@ -51,5 +52,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId); +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(); + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index d7659d13b6..584524bdba 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1226,7 +1226,8 @@ private: break; } - case NKqpProto::TKqpPhyConnection::kMap: { + case NKqpProto::TKqpPhyConnection::kMap: + case NKqpProto::TKqpPhyConnection::kStreamLookup: { partitionsCount = originStageInfo.Tasks.size(); break; } @@ -1344,7 +1345,7 @@ private: return false; }; - auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits, ExecuterSpan.GetTraceId()); + auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(), nullptr, settings, limits); auto computeActorId = Register(computeActor); task.ComputeActorId = computeActorId; @@ -1793,6 +1794,7 @@ private: TVector<ui64> computeTaskIds{Reserve(computeTasks.size())}; for (auto&& taskDesc : computeTasks) { computeTaskIds.emplace_back(taskDesc.GetId()); + FillInputSettings(taskDesc, lockTxId); ExecuteDataComputeTask(std::move(taskDesc)); } @@ -2047,6 +2049,36 @@ private: } } + void FillInputSettings(NYql::NDqProto::TDqTask& task, const TMaybe<ui64> lockTxId) { + for (auto& input : *task.MutableInputs()) { + if (input.HasTransform()) { + auto transform = input.MutableTransform(); + YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer", + "Unexpected input transform type: " << transform->GetType()); + + const google::protobuf::Any& settingsAny = transform->GetSettings(); + YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: " + << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name() + << " , but got: " << settingsAny.type_url()); + + NKikimrKqp::TKqpStreamLookupSettings settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + + if (Snapshot.IsValid()) { + settings.MutableSnapshot()->SetStep(Snapshot.Step); + settings.MutableSnapshot()->SetTxId(Snapshot.TxId); + } + + if (lockTxId.Defined()) { + settings.SetLockTxId(*lockTxId); + } + + settings.SetImmediateTx(ImmediateTx); + transform->MutableSettings()->PackFrom(settings); + } + } + } + static void AddDataShardErrors(const NKikimrTxDataShard::TEvProposeTransactionResult& result, TIssue& issue) { for (const auto &err : result.GetError()) { issue.AddSubIssue(new TIssue(TStringBuilder() diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index 0ff2d5c3f4..0da904d3bb 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -12,12 +12,9 @@ #include <ydb/core/kqp/rm/kqp_resource_estimation.h> #include <ydb/core/kqp/rm/kqp_rm.h> #include <ydb/core/kqp/common/kqp_resolve.h> -#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h> #include <ydb/core/base/wilson.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> - #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/monlib/service/pages/templates.h> #include <library/cpp/actors/wilson/wilson_span.h> @@ -307,11 +304,11 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)); + CreateKqpAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { - computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateAsyncIoFactory(), + computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { @@ -541,12 +538,6 @@ private: return ResourceManager_; } - NYql::NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() { - auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); - RegisterStreamLookupActorFactory(*factory); - return factory; - } - private: NKikimrConfig::TTableServiceConfig::TResourceManager Config; TIntrusivePtr<TKqpCounters> Counters; diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index f4f55af0cf..edb292ac07 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -121,6 +121,19 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const T .Done(); } + if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) { + return Build<TKqlStreamLookupTable>(ctx, pos) + .Table(read.Table()) + .LookupKeys<TCoSkipNullMembers>() + .Input(keysToLookup) + .Members() + .Add(lookupNames) + .Build() + .Build() + .Columns(read.Columns()) + .Done(); + } + return Build<TKqlLookupTable>(ctx, pos) .Table(read.Table()) .LookupKeys<TCoSkipNullMembers>() @@ -362,8 +375,10 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext return {}; } - bool needPrecomputeLeft = kqpCtx.IsDataQuery() && !join.LeftInput().Maybe<TCoParameter>() && - !IsParameterToListOfStructsRepack(join.LeftInput()); + bool needPrecomputeLeft = kqpCtx.IsDataQuery() + && !kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup() + && !join.LeftInput().Maybe<TCoParameter>() + && !IsParameterToListOfStructsRepack(join.LeftInput()); TExprBase leftData = needPrecomputeLeft ? Build<TDqPrecompute>(ctx, join.Pos()) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index 88e9a831fa..51b3ae9817 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -245,11 +245,19 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T .Index(indexName.Cast()) .Done(); } else { - readInput = Build<TKqlLookupTable>(ctx, read.Pos()) - .Table(read.Table()) - .LookupKeys(lookupKeys) - .Columns(read.Columns()) - .Done(); + if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) { + readInput = Build<TKqlStreamLookupTable>(ctx, read.Pos()) + .Table(read.Table()) + .LookupKeys(lookupKeys) + .Columns(read.Columns()) + .Done(); + } else { + readInput = Build<TKqlLookupTable>(ctx, read.Pos()) + .Table(read.Table()) + .LookupKeys(lookupKeys) + .Columns(read.Columns()) + .Done(); + } } } else { auto keyRangeExpr = BuildKeyRangeExpr(keyRange, tableDesc, node.Pos(), ctx); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 291a547b87..103c72098b 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -27,6 +27,8 @@ public: : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) , HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable())) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) + , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) + , ImmediateTx(settings.GetImmediateTx()) , KeyPrefixColumns(settings.GetKeyColumns().begin(), settings.GetKeyColumns().end()) , Columns(settings.GetColumns().begin(), settings.GetColumns().end()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) @@ -152,18 +154,21 @@ private: TActorBootstrapped<TKqpStreamLookupActor>::PassAway(); } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { i64 totalDataSize = 0; if (TableScheme) { - totalDataSize = PackResults(batch); + totalDataSize = PackResults(batch, freeSpace); auto status = FetchLookupKeys(); if (Partitioning) { ProcessLookupKeys(); } - finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished(); + finished = (status == NUdf::EFetchStatus::Finish) + && UnprocessedKeys.empty() + && AllReadsFinished() + && Results.empty(); } else { Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } @@ -184,16 +189,18 @@ private: hFunc(TEvPrivate::TEvRetryReadTimeout, Handle); IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); default: - RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite()); + RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(), + NYql::NDqProto::StatusIds::INTERNAL_ERROR); } } catch (const yexception& e) { - RuntimeError(e.what()); + RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR); } } void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { if (ev->Get()->Request->ErrorCount > 0) { - return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId); + return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId, + NYql::NDqProto::StatusIds::SCHEME_ERROR); } auto& resultSet = ev->Get()->Request->ResultSet; @@ -209,7 +216,8 @@ private: auto& result = resultSet[0]; if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { - return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status)); + return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status), + NYql::NDqProto::StatusIds::SCHEME_ERROR); } TableScheme = std::make_unique<TTableScheme>(result.Columns); @@ -227,6 +235,18 @@ private: return; } + if (record.BrokenTxLocksSize()) { + return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED); + } + + if (!Snapshot.IsValid() && !record.GetFinished()) { + // HEAD read was converted to repeatable read + Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId()); + } else if (Snapshot.IsValid()) { + YQL_ENSURE(record.GetSnapshot().GetStep() == Snapshot.Step && record.GetSnapshot().GetTxId() == Snapshot.TxId, + "Snapshot version mismatch"); + } + // TODO: refactor after KIKIMR-15102 if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { NKikimrTxDataShard::TReadContinuationToken continuationToken; @@ -284,17 +304,18 @@ private: case EEvSchemeCacheRequestTag::TableSchemeResolving: if (!TableScheme) { RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId - << " (request timeout exceeded)"); + << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); } break; case EEvSchemeCacheRequestTag::TableShardsResolving: if (!Partitioning) { RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId - << " (request timeout exceeded)"); + << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); } break; default: - RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag); + RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag, + NYql::NDqProto::StatusIds::INTERNAL_ERROR); } } @@ -304,14 +325,26 @@ private: auto& read = readIt->second; if (read.Retried) { - RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId); + RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId, + NYql::NDqProto::StatusIds::TIMEOUT); } } - ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch) { + ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch, i64 freeSpace) { YQL_ENSURE(TableScheme); - ui64 totalSize = 0; + i64 totalSize = 0; + batch.clear(); + batch.reserve(Results.size()); + + std::vector<NKikimr::NScheme::TTypeId> columnTypes; + columnTypes.reserve(Columns.size()); + for (const auto& column : Columns) { + auto colIt = TableScheme->ColumnsByName.find(column); + YQL_ENSURE(colIt != TableScheme->ColumnsByName.end()); + columnTypes.push_back(colIt->second.PType); + } + for (; !Results.empty(); Results.pop_front()) { const auto& result = Results.front(); YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch"); @@ -319,15 +352,19 @@ private: NUdf::TUnboxedValue* rowItems = nullptr; auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); + i64 rowSize = 0; for (ui32 colId = 0; colId < Columns.size(); ++colId) { - auto colIt = TableScheme->ColumnsByName.find(Columns[colId]); - YQL_ENSURE(colIt != TableScheme->ColumnsByName.end()); - rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType); + rowItems[colId] = NMiniKQL::GetCellValue(result[colId], columnTypes[colId]); + rowSize += result[colId].Size(); + } - totalSize += result[colId].Size(); + if (totalSize + rowSize > freeSpace) { + row.DeleteUnreferenced(); + break; } batch.push_back(std::move(row)); + totalSize += rowSize; } return totalSize; @@ -429,10 +466,16 @@ private: THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); auto& record = request->Record; - YQL_ENSURE(Snapshot.IsValid()); + if (Snapshot.IsValid()) { + record.MutableSnapshot()->SetStep(Snapshot.Step); + record.MutableSnapshot()->SetTxId(Snapshot.TxId); + } else { + YQL_ENSURE(ImmediateTx, "HEAD reading is only available for immediate txs"); + } - record.MutableSnapshot()->SetStep(Snapshot.Step); - record.MutableSnapshot()->SetTxId(Snapshot.TxId); + if (LockTxId) { + record.SetLockTxId(*LockTxId); + } record.SetReadId(read.Id); record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); @@ -537,7 +580,7 @@ private: return TypeEnv.BindAllocator(); } - void RuntimeError(const TString& message, const NYql::TIssues& subIssues = {}) { + void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) { NYql::TIssue issue(message); for (const auto& i : subIssues) { issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); @@ -545,7 +588,7 @@ private: NYql::TIssues issues; issues.AddIssue(std::move(issue)); - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode)); } private: @@ -555,7 +598,9 @@ private: const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; const TTableId TableId; - const IKqpGateway::TKqpSnapshot Snapshot; + IKqpGateway::TKqpSnapshot Snapshot; + const TMaybe<ui64> LockTxId; + const bool ImmediateTx; const std::vector<TString> KeyPrefixColumns; const std::vector<TString> Columns; std::unique_ptr<const TTableScheme> TableScheme; diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index e3a748ba97..89f1e313af 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3484,6 +3484,93 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { )", TTxControl::BeginTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } + + Y_UNIT_TEST(StreamLookupForDataQuery) { + auto settings = TKikimrSettings() + .SetEnableKqpDataQueryStreamLookup(true) + .SetEnablePredicateExtractForDataQueries(false); + TKikimrRunner kikimr{settings}; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto result = db.CreateSession().GetValueSync().GetSession().ExecuteDataQuery(R"( + REPLACE INTO `/Root/EightShard` (Key, Text, Data) VALUES + (1u, "Value1", 1), + (2u, "Value2", 1), + (3u, "Value3", 1), + (4u, "Value4", 1), + (5u, "Value5", 1); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + TExecDataQuerySettings querySettings; + querySettings.CollectQueryStats(ECollectQueryStatsMode::Full); + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + $subquery = SELECT Key FROM `/Root/EightShard`; + + SELECT * FROM `/Root/KeyValue` + WHERE Key IN $subquery ORDER BY Key; + )", TTxControl::BeginTx(), querySettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", FormatResultSetYson(result.GetResultSet(0))); + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); + auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + UNIT_ASSERT(streamLookup.IsDefined()); + } + + { + auto params = kikimr.GetTableClient().GetParamsBuilder() + .AddParam("$keys").BeginList() + .AddListItem() + .Uint64(1) + .AddListItem() + .Uint64(2) + .EndList() + .Build() + .Build(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + DECLARE $keys AS List<Uint64>; + + SELECT * FROM `/Root/KeyValue` + WHERE Key IN $keys ORDER BY Key; + )", TTxControl::BeginTx(), params, querySettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", FormatResultSetYson(result.GetResultSet(0))); + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); + auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + UNIT_ASSERT(streamLookup.IsDefined()); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + + SELECT * FROM `/Root/KeyValue` + WHERE Key = 1; + )", TTxControl::BeginTx(), querySettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0))); + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); + auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + UNIT_ASSERT(streamLookup.IsDefined()); + } + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 57b064c0c9..6e7444f8fb 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -716,6 +716,7 @@ message TFeatureFlags { optional bool EnableChunkLocking = 72 [default = false]; optional bool EnableNotNullDataColumns = 73 [default = false]; optional bool EnableGrpcAudit = 74 [default = false]; + optional bool EnableKqpDataQueryStreamLookup = 75 [default = false]; } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 836057d074..56e65aa696 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -607,4 +607,6 @@ message TKqpStreamLookupSettings { repeated string KeyColumns = 2; repeated string Columns = 3; optional TKqpSnapshot Snapshot = 4; + optional uint64 LockTxId = 5; + optional bool ImmediateTx = 6; } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index c1b2dc06b9..b17b56b6d4 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -35,6 +35,7 @@ public: FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables) FEATURE_FLAG_SETTER(EnableChangefeeds) FEATURE_FLAG_SETTER(EnableKqpScanQueryStreamLookup) + FEATURE_FLAG_SETTER(EnableKqpDataQueryStreamLookup) FEATURE_FLAG_SETTER(EnableMoveIndex) FEATURE_FLAG_SETTER(EnablePredicateExtractForDataQueries) FEATURE_FLAG_SETTER(EnableNotNullDataColumns) |