aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2022-09-26 16:42:37 +0300
committerulya-sidorina <yulia@ydb.tech>2022-09-26 16:42:37 +0300
commit91cdfb6d765fc121f80bf6b84b6fad2a9e11e4e7 (patch)
tree39b26d7a81b3eed01ace43449a3e292fc010fa02
parent6094735641fc7e3e5e900eab9e3e25017b518355 (diff)
downloadydb-91cdfb6d765fc121f80bf6b84b6fad2a9e11e4e7.tar.gz
support stream lookup for data query
feature(data_query): support stream lookup for data query
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp7
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h3
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp36
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp13
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp19
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp18
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp91
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp87
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp.proto2
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
11 files changed, 235 insertions, 43 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index c75b507035..9fe5852f38 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/kqp/runtime/kqp_compute.h>
#include <ydb/core/kqp/runtime/kqp_read_table.h>
+#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -44,6 +45,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory() {
+ auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
+ RegisterStreamLookupActorFactory(*factory);
+ return factory;
+}
+
void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta,
ui32& maxInFlight, bool& isAggregationRequest) const {
const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true);
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 466953906d..8030b56d4e 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -2,6 +2,7 @@
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/kqp_compute.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
namespace NKikimr {
@@ -51,5 +52,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory();
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index d7659d13b6..584524bdba 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -1226,7 +1226,8 @@ private:
break;
}
- case NKqpProto::TKqpPhyConnection::kMap: {
+ case NKqpProto::TKqpPhyConnection::kMap:
+ case NKqpProto::TKqpPhyConnection::kStreamLookup: {
partitionsCount = originStageInfo.Tasks.size();
break;
}
@@ -1344,7 +1345,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits, ExecuterSpan.GetTraceId());
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(), nullptr, settings, limits);
auto computeActorId = Register(computeActor);
task.ComputeActorId = computeActorId;
@@ -1793,6 +1794,7 @@ private:
TVector<ui64> computeTaskIds{Reserve(computeTasks.size())};
for (auto&& taskDesc : computeTasks) {
computeTaskIds.emplace_back(taskDesc.GetId());
+ FillInputSettings(taskDesc, lockTxId);
ExecuteDataComputeTask(std::move(taskDesc));
}
@@ -2047,6 +2049,36 @@ private:
}
}
+ void FillInputSettings(NYql::NDqProto::TDqTask& task, const TMaybe<ui64> lockTxId) {
+ for (auto& input : *task.MutableInputs()) {
+ if (input.HasTransform()) {
+ auto transform = input.MutableTransform();
+ YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer",
+ "Unexpected input transform type: " << transform->GetType());
+
+ const google::protobuf::Any& settingsAny = transform->GetSettings();
+ YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: "
+ << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name()
+ << " , but got: " << settingsAny.type_url());
+
+ NKikimrKqp::TKqpStreamLookupSettings settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
+ if (Snapshot.IsValid()) {
+ settings.MutableSnapshot()->SetStep(Snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ }
+
+ if (lockTxId.Defined()) {
+ settings.SetLockTxId(*lockTxId);
+ }
+
+ settings.SetImmediateTx(ImmediateTx);
+ transform->MutableSettings()->PackFrom(settings);
+ }
+ }
+ }
+
static void AddDataShardErrors(const NKikimrTxDataShard::TEvProposeTransactionResult& result, TIssue& issue) {
for (const auto &err : result.GetError()) {
issue.AddSubIssue(new TIssue(TStringBuilder()
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index 0ff2d5c3f4..0da904d3bb 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -12,12 +12,9 @@
#include <ydb/core/kqp/rm/kqp_resource_estimation.h>
#include <ydb/core/kqp/rm/kqp_rm.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
-#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
#include <ydb/core/base/wilson.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/actors/wilson/wilson_span.h>
@@ -307,11 +304,11 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId));
+ CreateKqpAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
- computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateAsyncIoFactory(),
+ computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(),
nullptr, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
@@ -541,12 +538,6 @@ private:
return ResourceManager_;
}
- NYql::NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() {
- auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
- RegisterStreamLookupActorFactory(*factory);
- return factory;
- }
-
private:
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
TIntrusivePtr<TKqpCounters> Counters;
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index f4f55af0cf..edb292ac07 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -121,6 +121,19 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const T
.Done();
}
+ if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) {
+ return Build<TKqlStreamLookupTable>(ctx, pos)
+ .Table(read.Table())
+ .LookupKeys<TCoSkipNullMembers>()
+ .Input(keysToLookup)
+ .Members()
+ .Add(lookupNames)
+ .Build()
+ .Build()
+ .Columns(read.Columns())
+ .Done();
+ }
+
return Build<TKqlLookupTable>(ctx, pos)
.Table(read.Table())
.LookupKeys<TCoSkipNullMembers>()
@@ -362,8 +375,10 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
return {};
}
- bool needPrecomputeLeft = kqpCtx.IsDataQuery() && !join.LeftInput().Maybe<TCoParameter>() &&
- !IsParameterToListOfStructsRepack(join.LeftInput());
+ bool needPrecomputeLeft = kqpCtx.IsDataQuery()
+ && !kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()
+ && !join.LeftInput().Maybe<TCoParameter>()
+ && !IsParameterToListOfStructsRepack(join.LeftInput());
TExprBase leftData = needPrecomputeLeft
? Build<TDqPrecompute>(ctx, join.Pos())
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
index 88e9a831fa..51b3ae9817 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
@@ -245,11 +245,19 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
.Index(indexName.Cast())
.Done();
} else {
- readInput = Build<TKqlLookupTable>(ctx, read.Pos())
- .Table(read.Table())
- .LookupKeys(lookupKeys)
- .Columns(read.Columns())
- .Done();
+ if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) {
+ readInput = Build<TKqlStreamLookupTable>(ctx, read.Pos())
+ .Table(read.Table())
+ .LookupKeys(lookupKeys)
+ .Columns(read.Columns())
+ .Done();
+ } else {
+ readInput = Build<TKqlLookupTable>(ctx, read.Pos())
+ .Table(read.Table())
+ .LookupKeys(lookupKeys)
+ .Columns(read.Columns())
+ .Done();
+ }
}
} else {
auto keyRangeExpr = BuildKeyRangeExpr(keyRange, tableDesc, node.Pos(), ctx);
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 291a547b87..103c72098b 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -27,6 +27,8 @@ public:
: InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
, HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable()))
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
+ , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
+ , ImmediateTx(settings.GetImmediateTx())
, KeyPrefixColumns(settings.GetKeyColumns().begin(), settings.GetKeyColumns().end())
, Columns(settings.GetColumns().begin(), settings.GetColumns().end())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
@@ -152,18 +154,21 @@ private:
TActorBootstrapped<TKqpStreamLookupActor>::PassAway();
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
i64 totalDataSize = 0;
if (TableScheme) {
- totalDataSize = PackResults(batch);
+ totalDataSize = PackResults(batch, freeSpace);
auto status = FetchLookupKeys();
if (Partitioning) {
ProcessLookupKeys();
}
- finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished();
+ finished = (status == NUdf::EFetchStatus::Finish)
+ && UnprocessedKeys.empty()
+ && AllReadsFinished()
+ && Results.empty();
} else {
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
@@ -184,16 +189,18 @@ private:
hFunc(TEvPrivate::TEvRetryReadTimeout, Handle);
IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
default:
- RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite());
+ RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(),
+ NYql::NDqProto::StatusIds::INTERNAL_ERROR);
}
} catch (const yexception& e) {
- RuntimeError(e.what());
+ RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR);
}
}
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
if (ev->Get()->Request->ErrorCount > 0) {
- return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId);
+ return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId,
+ NYql::NDqProto::StatusIds::SCHEME_ERROR);
}
auto& resultSet = ev->Get()->Request->ResultSet;
@@ -209,7 +216,8 @@ private:
auto& result = resultSet[0];
if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
- return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status));
+ return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status),
+ NYql::NDqProto::StatusIds::SCHEME_ERROR);
}
TableScheme = std::make_unique<TTableScheme>(result.Columns);
@@ -227,6 +235,18 @@ private:
return;
}
+ if (record.BrokenTxLocksSize()) {
+ return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED);
+ }
+
+ if (!Snapshot.IsValid() && !record.GetFinished()) {
+ // HEAD read was converted to repeatable read
+ Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
+ } else if (Snapshot.IsValid()) {
+ YQL_ENSURE(record.GetSnapshot().GetStep() == Snapshot.Step && record.GetSnapshot().GetTxId() == Snapshot.TxId,
+ "Snapshot version mismatch");
+ }
+
// TODO: refactor after KIKIMR-15102
if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
NKikimrTxDataShard::TReadContinuationToken continuationToken;
@@ -284,17 +304,18 @@ private:
case EEvSchemeCacheRequestTag::TableSchemeResolving:
if (!TableScheme) {
RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId
- << " (request timeout exceeded)");
+ << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
}
break;
case EEvSchemeCacheRequestTag::TableShardsResolving:
if (!Partitioning) {
RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
- << " (request timeout exceeded)");
+ << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
}
break;
default:
- RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag);
+ RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag,
+ NYql::NDqProto::StatusIds::INTERNAL_ERROR);
}
}
@@ -304,14 +325,26 @@ private:
auto& read = readIt->second;
if (read.Retried) {
- RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId);
+ RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId,
+ NYql::NDqProto::StatusIds::TIMEOUT);
}
}
- ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch) {
+ ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch, i64 freeSpace) {
YQL_ENSURE(TableScheme);
- ui64 totalSize = 0;
+ i64 totalSize = 0;
+ batch.clear();
+ batch.reserve(Results.size());
+
+ std::vector<NKikimr::NScheme::TTypeId> columnTypes;
+ columnTypes.reserve(Columns.size());
+ for (const auto& column : Columns) {
+ auto colIt = TableScheme->ColumnsByName.find(column);
+ YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
+ columnTypes.push_back(colIt->second.PType);
+ }
+
for (; !Results.empty(); Results.pop_front()) {
const auto& result = Results.front();
YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch");
@@ -319,15 +352,19 @@ private:
NUdf::TUnboxedValue* rowItems = nullptr;
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+ i64 rowSize = 0;
for (ui32 colId = 0; colId < Columns.size(); ++colId) {
- auto colIt = TableScheme->ColumnsByName.find(Columns[colId]);
- YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
- rowItems[colId] = NMiniKQL::GetCellValue(result[colId], colIt->second.PType);
+ rowItems[colId] = NMiniKQL::GetCellValue(result[colId], columnTypes[colId]);
+ rowSize += result[colId].Size();
+ }
- totalSize += result[colId].Size();
+ if (totalSize + rowSize > freeSpace) {
+ row.DeleteUnreferenced();
+ break;
}
batch.push_back(std::move(row));
+ totalSize += rowSize;
}
return totalSize;
@@ -429,10 +466,16 @@ private:
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
auto& record = request->Record;
- YQL_ENSURE(Snapshot.IsValid());
+ if (Snapshot.IsValid()) {
+ record.MutableSnapshot()->SetStep(Snapshot.Step);
+ record.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ } else {
+ YQL_ENSURE(ImmediateTx, "HEAD reading is only available for immediate txs");
+ }
- record.MutableSnapshot()->SetStep(Snapshot.Step);
- record.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ if (LockTxId) {
+ record.SetLockTxId(*LockTxId);
+ }
record.SetReadId(read.Id);
record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
@@ -537,7 +580,7 @@ private:
return TypeEnv.BindAllocator();
}
- void RuntimeError(const TString& message, const NYql::TIssues& subIssues = {}) {
+ void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) {
NYql::TIssue issue(message);
for (const auto& i : subIssues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
@@ -545,7 +588,7 @@ private:
NYql::TIssues issues;
issues.AddIssue(std::move(issue));
- Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}
private:
@@ -555,7 +598,9 @@ private:
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
const TTableId TableId;
- const IKqpGateway::TKqpSnapshot Snapshot;
+ IKqpGateway::TKqpSnapshot Snapshot;
+ const TMaybe<ui64> LockTxId;
+ const bool ImmediateTx;
const std::vector<TString> KeyPrefixColumns;
const std::vector<TString> Columns;
std::unique_ptr<const TTableScheme> TableScheme;
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index e3a748ba97..89f1e313af 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3484,6 +3484,93 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
)", TTxControl::BeginTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
+
+ Y_UNIT_TEST(StreamLookupForDataQuery) {
+ auto settings = TKikimrSettings()
+ .SetEnableKqpDataQueryStreamLookup(true)
+ .SetEnablePredicateExtractForDataQueries(false);
+ TKikimrRunner kikimr{settings};
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ auto result = db.CreateSession().GetValueSync().GetSession().ExecuteDataQuery(R"(
+ REPLACE INTO `/Root/EightShard` (Key, Text, Data) VALUES
+ (1u, "Value1", 1),
+ (2u, "Value2", 1),
+ (3u, "Value3", 1),
+ (4u, "Value4", 1),
+ (5u, "Value5", 1);
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ TExecDataQuerySettings querySettings;
+ querySettings.CollectQueryStats(ECollectQueryStatsMode::Full);
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+ $subquery = SELECT Key FROM `/Root/EightShard`;
+
+ SELECT * FROM `/Root/KeyValue`
+ WHERE Key IN $subquery ORDER BY Key;
+ )", TTxControl::BeginTx(), querySettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", FormatResultSetYson(result.GetResultSet(0)));
+
+ NJson::TJsonValue plan;
+ NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
+ auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
+ UNIT_ASSERT(streamLookup.IsDefined());
+ }
+
+ {
+ auto params = kikimr.GetTableClient().GetParamsBuilder()
+ .AddParam("$keys").BeginList()
+ .AddListItem()
+ .Uint64(1)
+ .AddListItem()
+ .Uint64(2)
+ .EndList()
+ .Build()
+ .Build();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+ DECLARE $keys AS List<Uint64>;
+
+ SELECT * FROM `/Root/KeyValue`
+ WHERE Key IN $keys ORDER BY Key;
+ )", TTxControl::BeginTx(), params, querySettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", FormatResultSetYson(result.GetResultSet(0)));
+
+ NJson::TJsonValue plan;
+ NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
+ auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
+ UNIT_ASSERT(streamLookup.IsDefined());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+
+ SELECT * FROM `/Root/KeyValue`
+ WHERE Key = 1;
+ )", TTxControl::BeginTx(), querySettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
+
+ NJson::TJsonValue plan;
+ NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
+ auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
+ UNIT_ASSERT(streamLookup.IsDefined());
+ }
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 57b064c0c9..6e7444f8fb 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -716,6 +716,7 @@ message TFeatureFlags {
optional bool EnableChunkLocking = 72 [default = false];
optional bool EnableNotNullDataColumns = 73 [default = false];
optional bool EnableGrpcAudit = 74 [default = false];
+ optional bool EnableKqpDataQueryStreamLookup = 75 [default = false];
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 836057d074..56e65aa696 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -607,4 +607,6 @@ message TKqpStreamLookupSettings {
repeated string KeyColumns = 2;
repeated string Columns = 3;
optional TKqpSnapshot Snapshot = 4;
+ optional uint64 LockTxId = 5;
+ optional bool ImmediateTx = 6;
}
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index c1b2dc06b9..b17b56b6d4 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -35,6 +35,7 @@ public:
FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables)
FEATURE_FLAG_SETTER(EnableChangefeeds)
FEATURE_FLAG_SETTER(EnableKqpScanQueryStreamLookup)
+ FEATURE_FLAG_SETTER(EnableKqpDataQueryStreamLookup)
FEATURE_FLAG_SETTER(EnableMoveIndex)
FEATURE_FLAG_SETTER(EnablePredicateExtractForDataQueries)
FEATURE_FLAG_SETTER(EnableNotNullDataColumns)