aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-02-07 20:04:36 +0300
committerulya-sidorina <yulia@ydb.tech>2023-02-07 20:04:36 +0300
commitf6b76976b4f69e9e547b44df4722a700dd98dbec (patch)
treeee59514f19a3f695c61771052b4107361a64bdfa
parenta3f17a0c71ec03358e0fc9a2f094991491a3f831 (diff)
downloadydb-f6b76976b4f69e9e547b44df4722a700dd98dbec.tar.gz
enable StreamLookup
feature(kqp): enable StreamLookup
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp2
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp2
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp10
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp9
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp6
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h2
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp24
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h1
-rw-r--r--ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp20
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp67
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp15
-rw-r--r--ydb/core/kqp/ut/join/kqp_join_ut.cpp30
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp117
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp40
-rw-r--r--ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp15
-rw-r--r--ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp18
-rw-r--r--ydb/core/kqp/ut/query/kqp_explain_ut.cpp35
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp12
-rw-r--r--ydb/core/kqp/ut/query/kqp_stats_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp22
-rw-r--r--ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp40
-rw-r--r--ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp4
-rw-r--r--ydb/core/protos/config.proto6
-rw-r--r--ydb/core/testlib/basics/feature_flags.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_erase_rows.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp23
-rw-r--r--ydb/core/tx/datashard/datashard_ut_upload_rows.cpp4
-rw-r--r--ydb/tests/functional/canonical/test_sql.py10
-rw-r--r--ydb/tests/functional/suite_tests/test_base.py1
-rw-r--r--ydb/tests/library/harness/kikimr_config.py9
32 files changed, 381 insertions, 178 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 6dd5714796..34378f304c 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -340,6 +340,8 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableKqpDataQuerySourceRead = serviceConfig.GetEnableKqpDataQuerySourceRead();
kqpConfig.EnableKqpScanQuerySourceRead = serviceConfig.GetEnableKqpScanQuerySourceRead();
+ kqpConfig.EnableKqpDataQueryStreamLookup = serviceConfig.GetEnableKqpDataQueryStreamLookup();
+ kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup();
}
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index c84cb4a4f8..957255b671 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -571,7 +571,7 @@ TExprNode::TPtr HandleReadTable(const TKiReadTable& read, TExprContext& ctx, con
return nullptr;
}
- if (kqpCtx->IsScanQuery() && !kqpCtx->Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) {
+ if (kqpCtx->IsScanQuery() && !kqpCtx->Config->EnableKqpScanQueryStreamLookup) {
const TString err = "Secondary index is not supported for ScanQuery";
ctx.AddError(YqlIssue(ctx.GetPosition(read.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err));
return nullptr;
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
index 5a074509b2..7dcf855ce0 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
@@ -15,14 +15,14 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
auto deleteRows = node.Cast<TKqlDeleteRows>();
- TMaybeNode<TKqlLookupTable> lookup;
+ TMaybeNode<TKqlLookupTableBase> lookup;
TMaybeNode<TCoSkipNullMembers> skipNulMembers;
- if (deleteRows.Input().Maybe<TKqlLookupTable>()) {
- lookup = deleteRows.Input().Cast<TKqlLookupTable>();
- } else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTable>()) {
+ if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) {
+ lookup = deleteRows.Input().Cast<TKqlLookupTableBase>();
+ } else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTableBase>()) {
skipNulMembers = deleteRows.Input().Cast<TCoSkipNullMembers>();
- lookup = skipNulMembers.Input().Cast<TKqlLookupTable>();
+ lookup = skipNulMembers.Input().Cast<TKqlLookupTableBase>();
} else {
return node;
}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index 7ea72278c3..96ac7409fa 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -168,7 +168,7 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const T
const TKqpOptimizeContext& kqpCtx)
{
if (kqpCtx.IsScanQuery()) {
- YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled");
+ YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup, "Stream lookup is not enabled");
return Build<TKqlStreamLookupIndex>(ctx, pos)
.Table(read.Table())
.LookupKeys<TCoSkipNullMembers>()
@@ -201,7 +201,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const T
const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TKqpOptimizeContext& kqpCtx)
{
if (kqpCtx.IsScanQuery()) {
- YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled");
+ YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup, "Stream lookup is not enabled");
return Build<TKqlStreamLookupTable>(ctx, pos)
.Table(read.Table())
.LookupKeys<TCoSkipNullMembers>()
@@ -214,7 +214,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const T
.Done();
}
- if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) {
+ if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
return Build<TKqlStreamLookupTable>(ctx, pos)
.Table(read.Table())
.LookupKeys<TCoSkipNullMembers>()
@@ -471,7 +471,6 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
}
bool needPrecomputeLeft = kqpCtx.IsDataQuery()
- && !kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()
&& !join.LeftInput().Maybe<TCoParameter>()
&& !IsParameterToListOfStructsRepack(join.LeftInput());
@@ -589,7 +588,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
const NYql::TKikimrConfiguration::TPtr& config)
{
- if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) || !node.Maybe<TDqJoin>()) {
+ if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->EnableKqpScanQueryStreamLookup) || !node.Maybe<TDqJoin>()) {
return node;
}
auto join = node.Cast<TDqJoin>();
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
index f443c9dacc..f60b698388 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
@@ -232,7 +232,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
// is available, currently it can introduce redundant compute stage.
useDataQueryLookup = kqpCtx.IsDataQuery() && isFullKey;
useScanQueryLookup = kqpCtx.IsScanQuery() && isFullKey
- && kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup();
+ && kqpCtx.Config->EnableKqpScanQueryStreamLookup;
}
TMaybeNode<TExprBase> readInput;
@@ -247,7 +247,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
.Index(indexName.Cast())
.Done();
} else {
- if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) {
+ if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
readInput = Build<TKqlStreamLookupTable>(ctx, read.Pos())
.Table(read.Table())
.LookupKeys(lookupKeys)
@@ -262,7 +262,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
}
}
} else if (useScanQueryLookup) {
- YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup());
+ YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup);
auto lookupKeys = BuildEquiRangeLookup(keyRange, tableDesc, read.Pos(), ctx);
if (indexName) {
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp
index 0155d45134..0644b7ba6a 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp
@@ -48,7 +48,7 @@ bool CanRewriteSqlInToEquiJoin(const TTypeAnnotationNode* lookupType, const TTyp
TExprBase KqpRewriteSqlInToEquiJoin(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
const TKikimrConfiguration::TPtr& config)
{
- if (kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) {
+ if (kqpCtx.IsScanQuery() && !kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
return node;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index dd83e5e766..3ec772576b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -131,6 +131,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableKqpScanQuerySourceRead = false;
bool EnableKqpDataQuerySourceRead = false;
+ bool EnableKqpScanQueryStreamLookup = false;
+ bool EnableKqpDataQueryStreamLookup = false;
};
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 82fc2334fc..e1386cff6f 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -126,6 +126,7 @@ private:
struct TResult {
const ui64 ShardId;
THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
+ size_t UnprocessedResultRow = 0;
};
struct TEvPrivate {
@@ -314,25 +315,24 @@ private:
}
batch.reserve(rowsCount);
- for (; !Results.empty() && !sizeLimitExceeded; Results.pop_front()) {
- const auto& readResult = Results.front().ReadResult;
- const auto shardId = Results.front().ShardId;
-
- for (size_t rowId = 0; rowId < readResult->Get()->GetRowsCount(); ++rowId) {
- const auto& result = readResult->Get()->GetCells(rowId);
- YQL_ENSURE(result.size() <= Columns.size(), "Result columns mismatch");
+ while (!Results.empty() && !sizeLimitExceeded) {
+ auto& result = Results.front();
+ for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
+ const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
+ YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch");
NUdf::TUnboxedValue* rowItems = nullptr;
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
i64 rowSize = 0;
- for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size() && resultColIndex < result.size(); ++colIndex) {
+ for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
const auto& column = Columns[colIndex];
if (IsSystemColumn(column.Name)) {
- NMiniKQL::FillSystemColumn(rowItems[colIndex], shardId, column.Id, column.PType);
+ NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
rowSize += sizeof(NUdf::TUnboxedValue);
} else {
- rowItems[colIndex] = NMiniKQL::GetCellValue(result[resultColIndex], column.PType);
+ YQL_ENSURE(resultColIndex < resultRow.size());
+ rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
++resultColIndex;
}
@@ -347,6 +347,10 @@ private:
batch.push_back(std::move(row));
totalSize += rowSize;
}
+
+ if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
+ Results.pop_front();
+ }
}
return totalSize;
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 558cdc6491..cd15942a43 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -81,7 +81,6 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
TKikimrSettings()
{
- this->SetEnableKqpScanQueryStreamLookup(true);
}
TKikimrSettings& SetAppConfig(const NKikimrConfig::TAppConfig& value) { AppConfig = value; return *this; }
diff --git a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
index 3a1bde4868..b97507e0ff 100644
--- a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
+++ b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
@@ -40,7 +40,15 @@ void Test(bool enableInplaceUpdate, const TString& query, TParams&& params, cons
setting.SetName("_KqpAllowUnsafeCommit");
setting.SetValue("true");
- TKikimrRunner kikimr({setting});
+ // stream lookup use iterator interface, that doesn't use datashard transactions
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -362,7 +370,15 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) {
unsafeCommitSetting.SetName("_KqpAllowUnsafeCommit");
unsafeCommitSetting.SetValue("true");
- TKikimrRunner kikimr({keysLimitSetting, unsafeCommitSetting});
+ // stream lookup use iterator interface, that doesn't use datashard transactions
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({keysLimitSetting, unsafeCommitSetting});
+
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
index 80a65e920f..0b0164ce03 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
@@ -383,18 +383,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
Y_UNIT_TEST_TWIN(DataColumnUpsertMixedSemantic, WithMvcc) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc);
+
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetEnableMvcc(WithMvcc)
.SetEnableMvccSnapshotReads(WithMvcc)
+ .SetAppConfig(appConfig)
.SetKqpSettings({setting});
- if (!WithMvcc) {
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
- serverSettings.SetAppConfig(appConfig);
- }
-
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -435,16 +434,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
Y_UNIT_TEST_TWIN(DataColumnWriteNull, WithMvcc) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc);
+
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetEnableMvcc(WithMvcc)
.SetEnableMvccSnapshotReads(WithMvcc)
+ .SetAppConfig(appConfig)
.SetKqpSettings({setting});
- if (!WithMvcc) {
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
- serverSettings.SetAppConfig(appConfig);
- }
+
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -519,16 +519,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
Y_UNIT_TEST_TWIN(DataColumnWrite, WithMvcc) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc);
+
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetEnableMvcc(WithMvcc)
.SetEnableMvccSnapshotReads(WithMvcc)
+ .SetAppConfig(appConfig)
.SetKqpSettings({setting});
- if (!WithMvcc) {
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
- serverSettings.SetAppConfig(appConfig);
- }
+
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -921,16 +922,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
Y_UNIT_TEST_TWIN(DataColumnSelect, WithMvcc) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc);
+
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetEnableMvcc(WithMvcc)
.SetEnableMvccSnapshotReads(WithMvcc)
+ .SetAppConfig(appConfig)
.SetKqpSettings({setting});
- if (!WithMvcc) {
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
- serverSettings.SetAppConfig(appConfig);
- }
+
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1020,16 +1022,16 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
Y_UNIT_TEST_TWIN(DuplicateUpsert, WithMvcc) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc);
+
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetEnableMvcc(WithMvcc)
.SetEnableMvccSnapshotReads(WithMvcc)
+ .SetAppConfig(appConfig)
.SetKqpSettings({setting});
- if (!WithMvcc) {
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
- serverSettings.SetAppConfig(appConfig);
- }
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
@@ -1059,16 +1061,15 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
Y_UNIT_TEST_TWIN(SortByPk, WithMvcc) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc);
+
auto serverSettings = TKikimrSettings()
.SetEnableMvcc(WithMvcc)
- .SetEnableMvccSnapshotReads(WithMvcc);
- if (!WithMvcc) {
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
- serverSettings.SetAppConfig(appConfig);
- }
- TKikimrRunner kikimr(serverSettings);
+ .SetAppConfig(appConfig);
+ TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
CreateTableWithMultishardIndex(kikimr.GetTestClient(), false);
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 6b06041bd0..36dcc7713c 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -83,7 +83,8 @@ void PrepareTables(TSession session) {
Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) {
void Test(const TString& query, const TString& answer, size_t rightTableReads) {
- TKikimrRunner kikimr;
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -98,15 +99,21 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads) {
CompareYson(answer, FormatResultSetYson(result.GetResultSet(0)));
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5);
ui32 index = 1;
- UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
- index = 2;
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
+ index = 2;
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Right");
diff --git a/ydb/core/kqp/ut/join/kqp_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_ut.cpp
index e30d820629..b832fbb171 100644
--- a/ydb/core/kqp/ut/join/kqp_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_join_ut.cpp
@@ -179,7 +179,8 @@ static TParams NoParams = TParamsBuilder().Build();
Y_UNIT_TEST_SUITE(KqpJoin) {
Y_UNIT_TEST(IdxLookupLeftPredicate) {
- TKikimrRunner kikimr;
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -202,15 +203,21 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8);
ui32 index = 1;
- UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
- index = 2;
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
+ index = 2;
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2");
@@ -218,7 +225,8 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
}
Y_UNIT_TEST(IdxLookupPartialLeftPredicate) {
- TKikimrRunner kikimr;
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -245,15 +253,21 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
Cerr << stats.DebugString() << Endl;
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8);
ui32 index = 1;
- UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
- index = 2;
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
+ index = 2;
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2");
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 5ccc439b1e..fd01b00699 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -66,7 +66,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(PkSelect1) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -80,7 +81,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
- UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
+
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
+ } else {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
+ }
+
UNIT_ASSERT_C(!explainResult.GetAst().Contains("Take"), explainResult.GetAst());
auto params = kikimr.GetTableClient().GetParamsBuilder()
@@ -98,9 +105,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases().size()); // no LiteralExecuter phase
UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access().size());
- UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards());
UNIT_ASSERT_VALUES_EQUAL("/Root/EightShard", stats.query_phases()[0].table_access()[0].name());
- UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count());
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards());
+ UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count());
+ }
params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$key").Uint64(330).Build()
@@ -114,13 +123,16 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases().size()); // no LiteralExecuter phase
UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access().size());
- UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards());
UNIT_ASSERT_VALUES_EQUAL("/Root/EightShard", stats.query_phases()[0].table_access()[0].name());
- UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count());
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards());
+ UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count());
+ }
}
Y_UNIT_TEST(PkSelect2) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -135,7 +147,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
- UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
+ } else {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
+ }
auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$group").OptionalUint32(1).Build()
@@ -1105,7 +1121,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(PrunePartitionsByLiteral) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1125,8 +1142,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); // no literal phase
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 2);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 2);
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 2);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2);
UNIT_ASSERT(stats.query_phases(0).table_access(0).reads().bytes() > 0);
@@ -1134,7 +1153,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(PrunePartitionsByExpr) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1155,19 +1175,24 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
CompareYson(R"([[[3];[301u];["Value1"]]])", FormatResultSetYson(result.GetResultSet(0)));
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 0);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 0);
- UNIT_ASSERT(stats.query_phases(0).table_access().size() == 0);
+ ui32 index = 0;
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 0);
+ UNIT_ASSERT(stats.query_phases(0).table_access().size() == 0);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).partitions_count(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/EightShard");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1);
- UNIT_ASSERT(stats.query_phases(1).table_access(0).reads().bytes() > 0);
- UNIT_ASSERT(stats.query_phases(1).duration_us() > 0);
+ index = 1;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/EightShard");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 1);
+ UNIT_ASSERT(stats.query_phases(index).table_access(0).reads().bytes() > 0);
+ UNIT_ASSERT(stats.query_phases(index).duration_us() > 0);
}
Y_UNIT_TEST(PruneWritePartitions) {
@@ -1343,7 +1368,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(JoinIdxLookup) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1376,16 +1402,24 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
])", FormatResultSetYson(result.GetResultSet(0)));
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 0);
+ ui32 index = 1;
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 0);
+ index = 2;
+ }
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).name(), "/Root/Join2");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).reads().rows(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join2");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 4);
}
Y_UNIT_TEST(LeftSemiJoin) {
@@ -1724,7 +1758,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(PruneEffectPartitions) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings serverSettings;
+ TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1752,8 +1787,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+ if (!serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 0);
@@ -3280,10 +3317,9 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(StreamLookupForDataQuery) {
- auto settings = TKikimrSettings()
- .SetEnableKqpDataQueryStreamLookup(true)
- .SetEnablePredicateExtractForDataQueries(false);
- TKikimrRunner kikimr{settings};
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
+ TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3319,11 +3355,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT(streamLookup.IsDefined());
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/KeyValue");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/KeyValue");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 2);
}
{
diff --git a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
index 558dab8172..2034ab7476 100644
--- a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
@@ -631,7 +631,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
}
Y_UNIT_TEST(UpdateWhereInNoFullScan) {
- TKikimrRunner kikimr;
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -649,11 +650,14 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/MultiShardTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
@@ -837,7 +841,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
}
Y_UNIT_TEST(DateKeyPredicate) {
- TKikimrRunner kikimr;
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -856,10 +861,13 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+ }
}
Y_UNIT_TEST(DuplicateKeyPredicateLiteral) {
@@ -1022,7 +1030,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
}
Y_UNIT_TEST(DeleteNotFullScan) {
- TKikimrRunner kikimr;
+ TKikimrSettings serverSettings;
+ TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1042,12 +1051,15 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 0);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).deletes().rows(), 0);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+
+ if (!serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
+ }
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
@@ -1058,7 +1070,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
}
Y_UNIT_TEST(LiteralOr) {
- TKikimrRunner kikimr;
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1080,10 +1093,13 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 4);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 4);
+
+ if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 4);
+ }
}
Y_UNIT_TEST(LiteralOrCompisite) {
diff --git a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
index ae1d952db7..f7e87cfe04 100644
--- a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
@@ -1012,7 +1012,8 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) {
}
Y_UNIT_TEST(PhasesCount) {
- TKikimrRunner kikimr;
+ TKikimrSettings serverSettings;
+ TKikimrRunner kikimr(serverSettings);
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
// simple key
@@ -1040,7 +1041,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) {
CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString());
+ if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString());
+ } else {
+ UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString());
+ }
}
// complex (tuple) key
@@ -1078,7 +1083,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) {
CompareYson(R"([[[3500u];["None"];[1u];["Anna"]]])", FormatResultSetYson(result.GetResultSet(0)));
const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString());
+ if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString());
+ } else {
+ UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString());
+ }
}
}
}
diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
index 6631bee275..7f8d5c3bf9 100644
--- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
+++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
@@ -472,7 +472,8 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
}
Y_UNIT_TEST(IdxLookupJoin) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -494,11 +495,16 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ }
}
Y_UNIT_TEST(IdxLookupJoinThreeWay) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -521,7 +527,11 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5);
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5);
+ }
}
Y_UNIT_TEST(ComputeLength) {
diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
index d1bb25b355..cd638d2ce8 100644
--- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
@@ -84,9 +84,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
- auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup");
- UNIT_ASSERT(lookup.IsDefined());
- auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue");
+ auto right = FindPlanNodeByKv(join, "Table", "KeyValue");
UNIT_ASSERT(right.IsDefined());
}
@@ -113,9 +111,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
- auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup");
- UNIT_ASSERT(lookup.IsDefined());
- auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue");
+ auto right = FindPlanNodeByKv(join, "Table", "KeyValue");
UNIT_ASSERT(right.IsDefined());
}
@@ -205,9 +201,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
- auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup");
- UNIT_ASSERT(lookup.IsDefined());
- auto right = FindPlanNodeByKv(lookup, "Table", "FourShard");
+ auto right = FindPlanNodeByKv(join, "Table", "FourShard");
UNIT_ASSERT(right.IsDefined());
}
@@ -452,7 +446,8 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
}
Y_UNIT_TEST(ExplainDataQuery) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -473,12 +468,19 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
node = FindPlanNodeByKv(plan, "Name", "TableFullScan");
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
- node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
- UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
+
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ node = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
+ UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
+ } else {
+ node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
+ UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
+ }
}
Y_UNIT_TEST(FewEffects) {
- auto kikimr = DefaultKikimrRunner();
+ TKikimrSettings settings;
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -509,7 +511,12 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
CountPlanNodesByKv(plan, "Name", "TableRangeScan");
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);
- auto lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
+ ui32 lookupsCount = 0;
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup");
+ } else {
+ lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
+ }
UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 3);
/* check tables section */
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index e753b81d4e..b91d2ec163 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -273,7 +273,11 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
}
Y_UNIT_TEST(QueryTimeoutImmediate) {
- TKikimrRunner kikimr;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+ TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -411,7 +415,11 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
}
Y_UNIT_TEST(QueryCancelImmediate) {
- TKikimrRunner kikimr;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+ TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
index 12498103be..04a58539bb 100644
--- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
@@ -62,8 +62,10 @@ Y_UNIT_TEST(JoinNoStats) {
}
Y_UNIT_TEST(JoinStatsBasic) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
auto settings = TKikimrSettings()
- .SetEnableKqpScanQueryStreamLookup(false); // TODO: enable stream lookup KIKIMR-14294
+ .SetAppConfig(appConfig); // TODO: enable stream lookup KIKIMR-14294
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
diff --git a/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp b/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp
index 376e55e152..bcf8e7a03e 100644
--- a/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp
@@ -53,12 +53,12 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity) {
appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMinChannelBufferSize(limit);
appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(200ul << 20);
appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(20ul << 30);
+ appCfg.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
// TODO: KIKIMR-14294
auto kikimrSettings = TKikimrSettings()
.SetAppConfig(appCfg)
- .SetKqpSettings({})
- .SetEnableKqpScanQueryStreamLookup(false);
+ .SetKqpSettings({});
TKikimrRunner kikimr{kikimrSettings};
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index 6085d75481..f3dc2fcf3e 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -1693,7 +1693,9 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST(SecondaryIndex) {
- auto kikimr = DefaultKikimrRunner();
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true);
+ TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -2125,7 +2127,9 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST(StreamLookupByFullPk) {
- TKikimrRunner kikimr;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true);
+ TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig));
auto db = kikimr.GetTableClient();
CreateSampleTables(kikimr);
@@ -2178,13 +2182,15 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST(StreamLookupTryGetDataBeforeSchemeInitialization) {
- TPortManager tp;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
+ TPortManager tp;
ui16 mbusport = tp.GetPort(2134);
auto settings = Tests::TServerSettings(mbusport)
.SetDomainName("Root")
.SetUseRealThreads(false)
- .SetEnableKqpScanQueryStreamLookup(true);
+ .SetAppConfig(appConfig);
Tests::TServer::TPtr server = new Tests::TServer(settings);
@@ -2274,7 +2280,9 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST(LimitOverSecondaryIndexRead) {
- auto kikimr = DefaultKikimrRunner();
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true);
+ TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -2312,7 +2320,9 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST(TopSortOverSecondaryIndexRead) {
- auto kikimr = DefaultKikimrRunner();
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true);
+ TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
index f2ed5f7bdc..8bd4ee2f75 100644
--- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
+++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
@@ -10,10 +10,12 @@ using namespace NYdb::NTable;
Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
Y_UNIT_TEST(TestSnapshotExpiration) {
- TKikimrRunner kikimr(TKikimrSettings()
+ auto settings = TKikimrSettings()
.SetEnableMvcc(true)
.SetEnableMvccSnapshotReads(true)
- .SetKeepSnapshotTimeout(TDuration::Seconds(1)));
+ .SetKeepSnapshotTimeout(TDuration::Seconds(1));
+
+ TKikimrRunner kikimr(settings);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
@@ -50,12 +52,21 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
if (result.GetStatus() == EStatus::SUCCESS)
continue;
- UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
- [](const NYql::TIssue& issue){
- return issue.GetMessage().Contains("stale snapshot");
- }), result.GetIssues().ToString());
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
+ [](const NYql::TIssue& issue){
+ return issue.GetMessage().Contains("bellow low watermark");
+ }), result.GetIssues().ToString());
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
+ } else {
+ UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
+ [](const NYql::TIssue& issue){
+ return issue.GetMessage().Contains("stale snapshot");
+ }), result.GetIssues().ToString());
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
+ }
caught = true;
break;
@@ -64,11 +75,12 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
}
Y_UNIT_TEST(ReadOnlyTxCommitsOnConcurrentWrite) {
- TKikimrRunner kikimr(
- TKikimrSettings()
- .SetEnableMvcc(true)
- .SetEnableMvccSnapshotReads(true)
- .SetEnableKqpDataQueryStreamLookup(true)
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
+ TKikimrRunner kikimr(TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
@@ -268,11 +280,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
}
Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite3) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
TKikimrRunner kikimr(
TKikimrSettings()
.SetEnableMvcc(true)
.SetEnableMvccSnapshotReads(true)
- .SetEnableKqpDataQueryStreamLookup(true)
+ .SetAppConfig(appConfig)
);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
diff --git a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
index 75e7b526f6..8cd9b54a68 100644
--- a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
@@ -36,11 +36,11 @@ Y_UNIT_TEST_SUITE(KqpPragma) {
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(0)));
- /*result = session.ExecuteDataQuery(R"(
+ result = session.ExecuteDataQuery(R"(
SELECT COUNT(_yql_partition_id) FROM `/Root/KeyValue` WHERE Key = 1;
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR);
- UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN));*/
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN));
}
Y_UNIT_TEST(OrderedColumns) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 3b2d9d33b1..5dc17ea367 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -743,7 +743,7 @@ message TFeatureFlags {
optional bool AllowVDiskDefrag = 63 [default = true];
optional bool EnableAsyncHttpMon = 64 [default = true];
optional bool EnableChangefeeds = 65 [default = true];
- optional bool EnableKqpScanQueryStreamLookup = 66 [default = false];
+ reserved 66; // EnableKqpScanQueryStreamLookup
optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false];
optional bool EnablePredicateExtractForDataQueries = 68 [default = true];
reserved 69; // optional bool EnableKqpPatternCacheLiteral = 69 [default = false];
@@ -753,7 +753,7 @@ message TFeatureFlags {
optional bool EnableChunkLocking = 72 [default = false];
optional bool EnableNotNullDataColumns = 73 [default = false];
optional bool EnableGrpcAudit = 74 [default = false];
- optional bool EnableKqpDataQueryStreamLookup = 75 [default = false];
+ reserved 75; // EnableKqpDataQueryStreamLookup
optional bool EnableBorrowedSplitCompaction = 76 [default = true];
optional bool EnableChangefeedInitialScan = 77 [default = false];
reserved 78; // EnableKqpScanQuerySourceRead
@@ -1230,6 +1230,8 @@ message TTableServiceConfig {
optional bool EnableKqpDataQuerySourceRead = 27 [default = true];
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
optional TAggregationConfig AggregationConfig = 29;
+ optional bool EnableKqpScanQueryStreamLookup = 30 [default = false];
+ optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
};
// Config describes immediate controls and allows
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index c4c47f2957..3c7ba2b2ed 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -33,8 +33,6 @@ public:
FEATURE_FLAG_SETTER(EnableNotNullColumns)
FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables)
FEATURE_FLAG_SETTER(EnableChangefeeds)
- FEATURE_FLAG_SETTER(EnableKqpScanQueryStreamLookup)
- FEATURE_FLAG_SETTER(EnableKqpDataQueryStreamLookup)
FEATURE_FLAG_SETTER(EnableMoveIndex)
FEATURE_FLAG_SETTER(EnablePredicateExtractForDataQueries)
FEATURE_FLAG_SETTER(EnableNotNullDataColumns)
diff --git a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
index a659a0e87a..70173a4ae7 100644
--- a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
@@ -665,9 +665,12 @@ key = 4, value = (empty maybe)
using TEvResponse = TEvDataShard::TEvConditionalEraseRowsResponse;
TPortManager pm;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings
.SetEnableMvcc(WithMvcc)
+ .SetAppConfig(appConfig)
.SetDomainName("Root")
.SetUseRealThreads(false);
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 176294bd0f..06aebbe858 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -1240,6 +1240,7 @@ Y_UNIT_TEST_TWIN(TestDelayedTxWaitsForWriteActiveTxOnly, UseMvcc) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
@@ -1423,6 +1424,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, UseMvcc) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
@@ -1547,9 +1549,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, UseMvcc) {
Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) {
TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(true)
+ .SetAppConfig(app)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -1679,6 +1684,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, UseMvcc) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
@@ -1777,9 +1783,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, UseMvcc) {
Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, UseMvcc) {
TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
+ .SetAppConfig(app)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -1881,9 +1890,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, UseMvcc) {
Y_UNIT_TEST(TestOutOfOrderRestartLocksSingleWithoutBarrier) {
TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(false) // intentionally, because we test non-mvcc locks logic
+ .SetAppConfig(app)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -2129,9 +2141,12 @@ Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, UseMvcc) {
TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
+ .SetAppConfig(app)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -2259,9 +2274,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, UseMvcc) {
Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) {
TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
+ .SetAppConfig(app)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -3655,9 +3673,12 @@ Y_UNIT_TEST_WITH_MVCC(TestLateKqpDataReadAfterColumnDrop) {
Y_UNIT_TEST(MvccTestSnapshotRead) {
TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(true)
+ .SetAppConfig(app)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -3863,6 +3884,7 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
@@ -3969,6 +3991,7 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, UseMvcc) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
index d770cf0308..97772e08ee 100644
--- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
@@ -208,10 +208,14 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
}
Y_UNIT_TEST_WITH_MVCC(TestUploadRowsLocks) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc);
+
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(WithMvcc)
+ .SetAppConfig(appConfig)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
diff --git a/ydb/tests/functional/canonical/test_sql.py b/ydb/tests/functional/canonical/test_sql.py
index 38ea534395..c749d78601 100644
--- a/ydb/tests/functional/canonical/test_sql.py
+++ b/ydb/tests/functional/canonical/test_sql.py
@@ -109,7 +109,15 @@ class BaseCanonicalTest(object):
set_canondata_root('ydb/tests/functional/canonical/canondata')
cls.database = '/local'
- cls.cluster = kikimr_cluster_factory(KikimrConfigGenerator(load_udfs=True, domain_name='local', use_in_memory_pdisks=True, disable_iterator_reads=True))
+ cls.cluster = kikimr_cluster_factory(
+ KikimrConfigGenerator(
+ load_udfs=True,
+ domain_name='local',
+ use_in_memory_pdisks=True,
+ disable_iterator_reads=True,
+ disable_iterator_lookups=True
+ )
+ )
cls.cluster.start()
cls.driver = ydb.Driver(
ydb.DriverConfig("%s:%s" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port), cls.database))
diff --git a/ydb/tests/functional/suite_tests/test_base.py b/ydb/tests/functional/suite_tests/test_base.py
index 4389029a65..f1e2a120e9 100644
--- a/ydb/tests/functional/suite_tests/test_base.py
+++ b/ydb/tests/functional/suite_tests/test_base.py
@@ -245,6 +245,7 @@ class BaseSuiteRunner(object):
load_udfs=True,
use_in_memory_pdisks=True,
disable_iterator_reads=True,
+ disable_iterator_lookups=True,
# additional_log_configs={'KQP_YQL': 7}
)
)
diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py
index 58aaa90d3e..99a1cf80de 100644
--- a/ydb/tests/library/harness/kikimr_config.py
+++ b/ydb/tests/library/harness/kikimr_config.py
@@ -140,7 +140,8 @@ class KikimrConfigGenerator(object):
use_legacy_pq=False,
dc_mapping={},
enable_alter_database_create_hive_first=False,
- disable_iterator_reads=False
+ disable_iterator_reads=False,
+ disable_iterator_lookups=False,
):
self._version = version
self.use_log_files = use_log_files
@@ -217,6 +218,12 @@ class KikimrConfigGenerator(object):
self.yaml_config["table_service_config"]["enable_kqp_scan_query_source_read"] = False
self.yaml_config["table_service_config"]["enable_kqp_data_query_source_read"] = False
+ if disable_iterator_lookups:
+ if "table_service_config" not in self.yaml_config:
+ self.yaml_config["table_service_config"] = {}
+ self.yaml_config["table_service_config"]["enable_kqp_scan_query_stream_lookup"] = False
+ self.yaml_config["table_service_config"]["enable_kqp_data_query_stream_lookup"] = False
+
self.yaml_config["feature_flags"]["enable_public_api_external_blobs"] = enable_public_api_external_blobs
self.yaml_config["feature_flags"]["enable_mvcc"] = "VALUE_FALSE" if disable_mvcc else "VALUE_TRUE"
if enable_alter_database_create_hive_first: