diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-02-07 20:04:36 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-02-07 20:04:36 +0300 |
commit | f6b76976b4f69e9e547b44df4722a700dd98dbec (patch) | |
tree | ee59514f19a3f695c61771052b4107361a64bdfa | |
parent | a3f17a0c71ec03358e0fc9a2f094991491a3f831 (diff) | |
download | ydb-f6b76976b4f69e9e547b44df4722a700dd98dbec.tar.gz |
enable StreamLookup
feature(kqp): enable StreamLookup
32 files changed, 381 insertions, 178 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 6dd5714796..34378f304c 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -340,6 +340,8 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableKqpDataQuerySourceRead = serviceConfig.GetEnableKqpDataQuerySourceRead(); kqpConfig.EnableKqpScanQuerySourceRead = serviceConfig.GetEnableKqpScanQuerySourceRead(); + kqpConfig.EnableKqpDataQueryStreamLookup = serviceConfig.GetEnableKqpDataQueryStreamLookup(); + kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup(); } IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index c84cb4a4f8..957255b671 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -571,7 +571,7 @@ TExprNode::TPtr HandleReadTable(const TKiReadTable& read, TExprContext& ctx, con return nullptr; } - if (kqpCtx->IsScanQuery() && !kqpCtx->Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) { + if (kqpCtx->IsScanQuery() && !kqpCtx->Config->EnableKqpScanQueryStreamLookup) { const TString err = "Secondary index is not supported for ScanQuery"; ctx.AddError(YqlIssue(ctx.GetPosition(read.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err)); return nullptr; diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp index 5a074509b2..7dcf855ce0 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp @@ -15,14 +15,14 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK auto deleteRows = node.Cast<TKqlDeleteRows>(); - TMaybeNode<TKqlLookupTable> lookup; + TMaybeNode<TKqlLookupTableBase> lookup; TMaybeNode<TCoSkipNullMembers> skipNulMembers; - if (deleteRows.Input().Maybe<TKqlLookupTable>()) { - lookup = deleteRows.Input().Cast<TKqlLookupTable>(); - } else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTable>()) { + if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) { + lookup = deleteRows.Input().Cast<TKqlLookupTableBase>(); + } else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTableBase>()) { skipNulMembers = deleteRows.Input().Cast<TCoSkipNullMembers>(); - lookup = skipNulMembers.Input().Cast<TKqlLookupTable>(); + lookup = skipNulMembers.Input().Cast<TKqlLookupTableBase>(); } else { return node; } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 7ea72278c3..96ac7409fa 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -168,7 +168,7 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const T const TKqpOptimizeContext& kqpCtx) { if (kqpCtx.IsScanQuery()) { - YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled"); + YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup, "Stream lookup is not enabled"); return Build<TKqlStreamLookupIndex>(ctx, pos) .Table(read.Table()) .LookupKeys<TCoSkipNullMembers>() @@ -201,7 +201,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const T const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TKqpOptimizeContext& kqpCtx) { if (kqpCtx.IsScanQuery()) { - YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled"); + YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup, "Stream lookup is not enabled"); return Build<TKqlStreamLookupTable>(ctx, pos) .Table(read.Table()) .LookupKeys<TCoSkipNullMembers>() @@ -214,7 +214,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const T .Done(); } - if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) { + if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) { return Build<TKqlStreamLookupTable>(ctx, pos) .Table(read.Table()) .LookupKeys<TCoSkipNullMembers>() @@ -471,7 +471,6 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext } bool needPrecomputeLeft = kqpCtx.IsDataQuery() - && !kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup() && !join.LeftInput().Maybe<TCoParameter>() && !IsParameterToListOfStructsRepack(join.LeftInput()); @@ -589,7 +588,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const NYql::TKikimrConfiguration::TPtr& config) { - if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) || !node.Maybe<TDqJoin>()) { + if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->EnableKqpScanQueryStreamLookup) || !node.Maybe<TDqJoin>()) { return node; } auto join = node.Cast<TDqJoin>(); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index f443c9dacc..f60b698388 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -232,7 +232,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T // is available, currently it can introduce redundant compute stage. useDataQueryLookup = kqpCtx.IsDataQuery() && isFullKey; useScanQueryLookup = kqpCtx.IsScanQuery() && isFullKey - && kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(); + && kqpCtx.Config->EnableKqpScanQueryStreamLookup; } TMaybeNode<TExprBase> readInput; @@ -247,7 +247,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T .Index(indexName.Cast()) .Done(); } else { - if (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQueryStreamLookup()) { + if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) { readInput = Build<TKqlStreamLookupTable>(ctx, read.Pos()) .Table(read.Table()) .LookupKeys(lookupKeys) @@ -262,7 +262,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T } } } else if (useScanQueryLookup) { - YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()); + YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup); auto lookupKeys = BuildEquiRangeLookup(keyRange, tableDesc, read.Pos(), ctx); if (indexName) { diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp index 0155d45134..0644b7ba6a 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp @@ -48,7 +48,7 @@ bool CanRewriteSqlInToEquiJoin(const TTypeAnnotationNode* lookupType, const TTyp TExprBase KqpRewriteSqlInToEquiJoin(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const TKikimrConfiguration::TPtr& config) { - if (kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) { + if (kqpCtx.IsScanQuery() && !kqpCtx.Config->EnableKqpScanQueryStreamLookup) { return node; } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index dd83e5e766..3ec772576b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -131,6 +131,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableKqpScanQuerySourceRead = false; bool EnableKqpDataQuerySourceRead = false; + bool EnableKqpScanQueryStreamLookup = false; + bool EnableKqpDataQueryStreamLookup = false; }; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 82fc2334fc..e1386cff6f 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -126,6 +126,7 @@ private: struct TResult { const ui64 ShardId; THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; + size_t UnprocessedResultRow = 0; }; struct TEvPrivate { @@ -314,25 +315,24 @@ private: } batch.reserve(rowsCount); - for (; !Results.empty() && !sizeLimitExceeded; Results.pop_front()) { - const auto& readResult = Results.front().ReadResult; - const auto shardId = Results.front().ShardId; - - for (size_t rowId = 0; rowId < readResult->Get()->GetRowsCount(); ++rowId) { - const auto& result = readResult->Get()->GetCells(rowId); - YQL_ENSURE(result.size() <= Columns.size(), "Result columns mismatch"); + while (!Results.empty() && !sizeLimitExceeded) { + auto& result = Results.front(); + for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { + const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); + YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch"); NUdf::TUnboxedValue* rowItems = nullptr; auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); i64 rowSize = 0; - for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size() && resultColIndex < result.size(); ++colIndex) { + for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) { const auto& column = Columns[colIndex]; if (IsSystemColumn(column.Name)) { - NMiniKQL::FillSystemColumn(rowItems[colIndex], shardId, column.Id, column.PType); + NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType); rowSize += sizeof(NUdf::TUnboxedValue); } else { - rowItems[colIndex] = NMiniKQL::GetCellValue(result[resultColIndex], column.PType); + YQL_ENSURE(resultColIndex < resultRow.size()); + rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType); rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes; ++resultColIndex; } @@ -347,6 +347,10 @@ private: batch.push_back(std::move(row)); totalSize += rowSize; } + + if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { + Results.pop_front(); + } } return totalSize; diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 558cdc6491..cd15942a43 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -81,7 +81,6 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> { TKikimrSettings() { - this->SetEnableKqpScanQueryStreamLookup(true); } TKikimrSettings& SetAppConfig(const NKikimrConfig::TAppConfig& value) { AppConfig = value; return *this; } diff --git a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp index 3a1bde4868..b97507e0ff 100644 --- a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp @@ -40,7 +40,15 @@ void Test(bool enableInplaceUpdate, const TString& query, TParams&& params, cons setting.SetName("_KqpAllowUnsafeCommit"); setting.SetValue("true"); - TKikimrRunner kikimr({setting}); + // stream lookup use iterator interface, that doesn't use datashard transactions + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -362,7 +370,15 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) { unsafeCommitSetting.SetName("_KqpAllowUnsafeCommit"); unsafeCommitSetting.SetValue("true"); - TKikimrRunner kikimr({keysLimitSetting, unsafeCommitSetting}); + // stream lookup use iterator interface, that doesn't use datashard transactions + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({keysLimitSetting, unsafeCommitSetting}); + + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp index 80a65e920f..0b0164ce03 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp @@ -383,18 +383,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } Y_UNIT_TEST_TWIN(DataColumnUpsertMixedSemantic, WithMvcc) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc); + auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() .SetEnableMvcc(WithMvcc) .SetEnableMvccSnapshotReads(WithMvcc) + .SetAppConfig(appConfig) .SetKqpSettings({setting}); - if (!WithMvcc) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - serverSettings.SetAppConfig(appConfig); - } - TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -435,16 +434,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } Y_UNIT_TEST_TWIN(DataColumnWriteNull, WithMvcc) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc); + auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() .SetEnableMvcc(WithMvcc) .SetEnableMvccSnapshotReads(WithMvcc) + .SetAppConfig(appConfig) .SetKqpSettings({setting}); - if (!WithMvcc) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - serverSettings.SetAppConfig(appConfig); - } + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -519,16 +519,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } Y_UNIT_TEST_TWIN(DataColumnWrite, WithMvcc) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc); + auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() .SetEnableMvcc(WithMvcc) .SetEnableMvccSnapshotReads(WithMvcc) + .SetAppConfig(appConfig) .SetKqpSettings({setting}); - if (!WithMvcc) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - serverSettings.SetAppConfig(appConfig); - } + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -921,16 +922,17 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } Y_UNIT_TEST_TWIN(DataColumnSelect, WithMvcc) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc); + auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() .SetEnableMvcc(WithMvcc) .SetEnableMvccSnapshotReads(WithMvcc) + .SetAppConfig(appConfig) .SetKqpSettings({setting}); - if (!WithMvcc) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - serverSettings.SetAppConfig(appConfig); - } + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1020,16 +1022,16 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } Y_UNIT_TEST_TWIN(DuplicateUpsert, WithMvcc) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc); + auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() .SetEnableMvcc(WithMvcc) .SetEnableMvccSnapshotReads(WithMvcc) + .SetAppConfig(appConfig) .SetKqpSettings({setting}); - if (!WithMvcc) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - serverSettings.SetAppConfig(appConfig); - } TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1059,16 +1061,15 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } Y_UNIT_TEST_TWIN(SortByPk, WithMvcc) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(WithMvcc); + auto serverSettings = TKikimrSettings() .SetEnableMvcc(WithMvcc) - .SetEnableMvccSnapshotReads(WithMvcc); - if (!WithMvcc) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - serverSettings.SetAppConfig(appConfig); - } - TKikimrRunner kikimr(serverSettings); + .SetAppConfig(appConfig); + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); CreateTableWithMultishardIndex(kikimr.GetTestClient(), false); diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 6b06041bd0..36dcc7713c 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -83,7 +83,8 @@ void PrepareTables(TSession session) { Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) { void Test(const TString& query, const TString& answer, size_t rightTableReads) { - TKikimrRunner kikimr; + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -98,15 +99,21 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads) { CompareYson(answer, FormatResultSetYson(result.GetResultSet(0))); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5); ui32 index = 1; - UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups - index = 2; + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups + index = 2; + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Right"); diff --git a/ydb/core/kqp/ut/join/kqp_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_ut.cpp index e30d820629..b832fbb171 100644 --- a/ydb/core/kqp/ut/join/kqp_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_join_ut.cpp @@ -179,7 +179,8 @@ static TParams NoParams = TParamsBuilder().Build(); Y_UNIT_TEST_SUITE(KqpJoin) { Y_UNIT_TEST(IdxLookupLeftPredicate) { - TKikimrRunner kikimr; + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -202,15 +203,21 @@ Y_UNIT_TEST_SUITE(KqpJoin) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8); ui32 index = 1; - UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups - index = 2; + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups + index = 2; + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2"); @@ -218,7 +225,8 @@ Y_UNIT_TEST_SUITE(KqpJoin) { } Y_UNIT_TEST(IdxLookupPartialLeftPredicate) { - TKikimrRunner kikimr; + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -245,15 +253,21 @@ Y_UNIT_TEST_SUITE(KqpJoin) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8); ui32 index = 1; - UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups - index = 2; + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups + index = 2; + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2"); diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 5ccc439b1e..fd01b00699 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -66,7 +66,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(PkSelect1) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -80,7 +81,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto explainResult = session.ExplainDataQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString()); - UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst()); + } else { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + } + UNIT_ASSERT_C(!explainResult.GetAst().Contains("Take"), explainResult.GetAst()); auto params = kikimr.GetTableClient().GetParamsBuilder() @@ -98,9 +105,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases().size()); // no LiteralExecuter phase UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access().size()); - UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards()); UNIT_ASSERT_VALUES_EQUAL("/Root/EightShard", stats.query_phases()[0].table_access()[0].name()); - UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count()); + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards()); + UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count()); + } params = kikimr.GetTableClient().GetParamsBuilder() .AddParam("$key").Uint64(330).Build() @@ -114,13 +123,16 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases().size()); // no LiteralExecuter phase UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access().size()); - UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards()); UNIT_ASSERT_VALUES_EQUAL("/Root/EightShard", stats.query_phases()[0].table_access()[0].name()); - UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count()); + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].affected_shards()); + UNIT_ASSERT_VALUES_EQUAL(1, stats.query_phases()[0].table_access()[0].partitions_count()); + } } Y_UNIT_TEST(PkSelect2) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -135,7 +147,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto explainResult = session.ExplainDataQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString()); - UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst()); + } else { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + } auto params = kikimr.GetTableClient().GetParamsBuilder() .AddParam("$group").OptionalUint32(1).Build() @@ -1105,7 +1121,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(PrunePartitionsByLiteral) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1125,8 +1142,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); // no literal phase UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 2); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 2); + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 2); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2); UNIT_ASSERT(stats.query_phases(0).table_access(0).reads().bytes() > 0); @@ -1134,7 +1153,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(PrunePartitionsByExpr) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1155,19 +1175,24 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { CompareYson(R"([[[3];[301u];["Value1"]]])", FormatResultSetYson(result.GetResultSet(0))); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 0); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 0); - UNIT_ASSERT(stats.query_phases(0).table_access().size() == 0); + ui32 index = 0; + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 0); + UNIT_ASSERT(stats.query_phases(0).table_access().size() == 0); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).partitions_count(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/EightShard"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); - UNIT_ASSERT(stats.query_phases(1).table_access(0).reads().bytes() > 0); - UNIT_ASSERT(stats.query_phases(1).duration_us() > 0); + index = 1; + } + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/EightShard"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 1); + UNIT_ASSERT(stats.query_phases(index).table_access(0).reads().bytes() > 0); + UNIT_ASSERT(stats.query_phases(index).duration_us() > 0); } Y_UNIT_TEST(PruneWritePartitions) { @@ -1343,7 +1368,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(JoinIdxLookup) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1376,16 +1402,24 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { ])", FormatResultSetYson(result.GetResultSet(0))); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 0); + ui32 index = 1; + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 0); + index = 2; + } - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).name(), "/Root/Join2"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join2"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 4); } Y_UNIT_TEST(LeftSemiJoin) { @@ -1724,7 +1758,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(PruneEffectPartitions) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings serverSettings; + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1752,8 +1787,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + if (!serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 0); @@ -3280,10 +3317,9 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(StreamLookupForDataQuery) { - auto settings = TKikimrSettings() - .SetEnableKqpDataQueryStreamLookup(true) - .SetEnablePredicateExtractForDataQueries(false); - TKikimrRunner kikimr{settings}; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); + TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3319,11 +3355,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT(streamLookup.IsDefined()); auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/KeyValue"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/KeyValue"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 2); } { diff --git a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp index 558dab8172..2034ab7476 100644 --- a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp @@ -631,7 +631,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) { } Y_UNIT_TEST(UpdateWhereInNoFullScan) { - TKikimrRunner kikimr; + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -649,11 +650,14 @@ Y_UNIT_TEST_SUITE(KqpRanges) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/MultiShardTable"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); @@ -837,7 +841,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) { } Y_UNIT_TEST(DateKeyPredicate) { - TKikimrRunner kikimr; + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -856,10 +861,13 @@ Y_UNIT_TEST_SUITE(KqpRanges) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + } } Y_UNIT_TEST(DuplicateKeyPredicateLiteral) { @@ -1022,7 +1030,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) { } Y_UNIT_TEST(DeleteNotFullScan) { - TKikimrRunner kikimr; + TKikimrSettings serverSettings; + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1042,12 +1051,15 @@ Y_UNIT_TEST_SUITE(KqpRanges) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 0); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).deletes().rows(), 0); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + + if (!serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); @@ -1058,7 +1070,8 @@ Y_UNIT_TEST_SUITE(KqpRanges) { } Y_UNIT_TEST(LiteralOr) { - TKikimrRunner kikimr; + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1080,10 +1093,13 @@ Y_UNIT_TEST_SUITE(KqpRanges) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 4); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 4); + + if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 4); + } } Y_UNIT_TEST(LiteralOrCompisite) { diff --git a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp index ae1d952db7..f7e87cfe04 100644 --- a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp @@ -1012,7 +1012,8 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) { } Y_UNIT_TEST(PhasesCount) { - TKikimrRunner kikimr; + TKikimrSettings serverSettings; + TKikimrRunner kikimr(serverSettings); auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); // simple key @@ -1040,7 +1041,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) { CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0))); const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()); + if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString()); + } else { + UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()); + } } // complex (tuple) key @@ -1078,7 +1083,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) { CompareYson(R"([[[3500u];["None"];[1u];["Anna"]]])", FormatResultSetYson(result.GetResultSet(0))); const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()); + if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString()); + } else { + UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()); + } } } } diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index 6631bee275..7f8d5c3bf9 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -472,7 +472,8 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } Y_UNIT_TEST(IdxLookupJoin) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -494,11 +495,16 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + } } Y_UNIT_TEST(IdxLookupJoinThreeWay) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -521,7 +527,11 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); + } } Y_UNIT_TEST(ComputeLength) { diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp index d1bb25b355..cd638d2ce8 100644 --- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp @@ -84,9 +84,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); - auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup"); - UNIT_ASSERT(lookup.IsDefined()); - auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue"); + auto right = FindPlanNodeByKv(join, "Table", "KeyValue"); UNIT_ASSERT(right.IsDefined()); } @@ -113,9 +111,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); - auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup"); - UNIT_ASSERT(lookup.IsDefined()); - auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue"); + auto right = FindPlanNodeByKv(join, "Table", "KeyValue"); UNIT_ASSERT(right.IsDefined()); } @@ -205,9 +201,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); - auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup"); - UNIT_ASSERT(lookup.IsDefined()); - auto right = FindPlanNodeByKv(lookup, "Table", "FourShard"); + auto right = FindPlanNodeByKv(join, "Table", "FourShard"); UNIT_ASSERT(right.IsDefined()); } @@ -452,7 +446,8 @@ Y_UNIT_TEST_SUITE(KqpExplain) { } Y_UNIT_TEST(ExplainDataQuery) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -473,12 +468,19 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); node = FindPlanNodeByKv(plan, "Name", "TableFullScan"); UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); - node = FindPlanNodeByKv(plan, "Name", "TablePointLookup"); - UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); + + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + node = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); + } else { + node = FindPlanNodeByKv(plan, "Name", "TablePointLookup"); + UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); + } } Y_UNIT_TEST(FewEffects) { - auto kikimr = DefaultKikimrRunner(); + TKikimrSettings settings; + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -509,7 +511,12 @@ Y_UNIT_TEST_SUITE(KqpExplain) { CountPlanNodesByKv(plan, "Name", "TableRangeScan"); UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1); - auto lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr"); + ui32 lookupsCount = 0; + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup"); + } else { + lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr"); + } UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 3); /* check tables section */ diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index e753b81d4e..b91d2ec163 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -273,7 +273,11 @@ Y_UNIT_TEST_SUITE(KqpQuery) { } Y_UNIT_TEST(QueryTimeoutImmediate) { - TKikimrRunner kikimr; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig); + TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -411,7 +415,11 @@ Y_UNIT_TEST_SUITE(KqpQuery) { } Y_UNIT_TEST(QueryCancelImmediate) { - TKikimrRunner kikimr; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig); + TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index 12498103be..04a58539bb 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -62,8 +62,10 @@ Y_UNIT_TEST(JoinNoStats) { } Y_UNIT_TEST(JoinStatsBasic) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); auto settings = TKikimrSettings() - .SetEnableKqpScanQueryStreamLookup(false); // TODO: enable stream lookup KIKIMR-14294 + .SetAppConfig(appConfig); // TODO: enable stream lookup KIKIMR-14294 TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp b/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp index 376e55e152..bcf8e7a03e 100644 --- a/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp @@ -53,12 +53,12 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity) { appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMinChannelBufferSize(limit); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(200ul << 20); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(20ul << 30); + appCfg.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); // TODO: KIKIMR-14294 auto kikimrSettings = TKikimrSettings() .SetAppConfig(appCfg) - .SetKqpSettings({}) - .SetEnableKqpScanQueryStreamLookup(false); + .SetKqpSettings({}); TKikimrRunner kikimr{kikimrSettings}; // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG); diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 6085d75481..f3dc2fcf3e 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -1693,7 +1693,9 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST(SecondaryIndex) { - auto kikimr = DefaultKikimrRunner(); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true); + TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -2125,7 +2127,9 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST(StreamLookupByFullPk) { - TKikimrRunner kikimr; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true); + TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig)); auto db = kikimr.GetTableClient(); CreateSampleTables(kikimr); @@ -2178,13 +2182,15 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST(StreamLookupTryGetDataBeforeSchemeInitialization) { - TPortManager tp; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); + TPortManager tp; ui16 mbusport = tp.GetPort(2134); auto settings = Tests::TServerSettings(mbusport) .SetDomainName("Root") .SetUseRealThreads(false) - .SetEnableKqpScanQueryStreamLookup(true); + .SetAppConfig(appConfig); Tests::TServer::TPtr server = new Tests::TServer(settings); @@ -2274,7 +2280,9 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST(LimitOverSecondaryIndexRead) { - auto kikimr = DefaultKikimrRunner(); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true); + TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -2312,7 +2320,9 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST(TopSortOverSecondaryIndexRead) { - auto kikimr = DefaultKikimrRunner(); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(true); + TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index f2ed5f7bdc..8bd4ee2f75 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -10,10 +10,12 @@ using namespace NYdb::NTable; Y_UNIT_TEST_SUITE(KqpSnapshotRead) { Y_UNIT_TEST(TestSnapshotExpiration) { - TKikimrRunner kikimr(TKikimrSettings() + auto settings = TKikimrSettings() .SetEnableMvcc(true) .SetEnableMvccSnapshotReads(true) - .SetKeepSnapshotTimeout(TDuration::Seconds(1))); + .SetKeepSnapshotTimeout(TDuration::Seconds(1)); + + TKikimrRunner kikimr(settings); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); @@ -50,12 +52,21 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { if (result.GetStatus() == EStatus::SUCCESS) continue; - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, - [](const NYql::TIssue& issue){ - return issue.GetMessage().Contains("stale snapshot"); - }), result.GetIssues().ToString()); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, + [](const NYql::TIssue& issue){ + return issue.GetMessage().Contains("bellow low watermark"); + }), result.GetIssues().ToString()); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); + } else { + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, + [](const NYql::TIssue& issue){ + return issue.GetMessage().Contains("stale snapshot"); + }), result.GetIssues().ToString()); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + } caught = true; break; @@ -64,11 +75,12 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { } Y_UNIT_TEST(ReadOnlyTxCommitsOnConcurrentWrite) { - TKikimrRunner kikimr( - TKikimrSettings() - .SetEnableMvcc(true) - .SetEnableMvccSnapshotReads(true) - .SetEnableKqpDataQueryStreamLookup(true) + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); + TKikimrRunner kikimr(TKikimrSettings() + .SetAppConfig(appConfig) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) ); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); @@ -268,11 +280,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { } Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite3) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); TKikimrRunner kikimr( TKikimrSettings() .SetEnableMvcc(true) .SetEnableMvccSnapshotReads(true) - .SetEnableKqpDataQueryStreamLookup(true) + .SetAppConfig(appConfig) ); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); diff --git a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp index 75e7b526f6..8cd9b54a68 100644 --- a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp @@ -36,11 +36,11 @@ Y_UNIT_TEST_SUITE(KqpPragma) { UNIT_ASSERT(result.IsSuccess()); CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(0))); - /*result = session.ExecuteDataQuery(R"( + result = session.ExecuteDataQuery(R"( SELECT COUNT(_yql_partition_id) FROM `/Root/KeyValue` WHERE Key = 1; )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN));*/ + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN)); } Y_UNIT_TEST(OrderedColumns) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3b2d9d33b1..5dc17ea367 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -743,7 +743,7 @@ message TFeatureFlags { optional bool AllowVDiskDefrag = 63 [default = true]; optional bool EnableAsyncHttpMon = 64 [default = true]; optional bool EnableChangefeeds = 65 [default = true]; - optional bool EnableKqpScanQueryStreamLookup = 66 [default = false]; + reserved 66; // EnableKqpScanQueryStreamLookup optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false]; optional bool EnablePredicateExtractForDataQueries = 68 [default = true]; reserved 69; // optional bool EnableKqpPatternCacheLiteral = 69 [default = false]; @@ -753,7 +753,7 @@ message TFeatureFlags { optional bool EnableChunkLocking = 72 [default = false]; optional bool EnableNotNullDataColumns = 73 [default = false]; optional bool EnableGrpcAudit = 74 [default = false]; - optional bool EnableKqpDataQueryStreamLookup = 75 [default = false]; + reserved 75; // EnableKqpDataQueryStreamLookup optional bool EnableBorrowedSplitCompaction = 76 [default = true]; optional bool EnableChangefeedInitialScan = 77 [default = false]; reserved 78; // EnableKqpScanQuerySourceRead @@ -1230,6 +1230,8 @@ message TTableServiceConfig { optional bool EnableKqpDataQuerySourceRead = 27 [default = true]; optional uint64 SessionIdleDurationSeconds = 28 [default = 600]; optional TAggregationConfig AggregationConfig = 29; + optional bool EnableKqpScanQueryStreamLookup = 30 [default = false]; + optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; }; // Config describes immediate controls and allows diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index c4c47f2957..3c7ba2b2ed 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -33,8 +33,6 @@ public: FEATURE_FLAG_SETTER(EnableNotNullColumns) FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables) FEATURE_FLAG_SETTER(EnableChangefeeds) - FEATURE_FLAG_SETTER(EnableKqpScanQueryStreamLookup) - FEATURE_FLAG_SETTER(EnableKqpDataQueryStreamLookup) FEATURE_FLAG_SETTER(EnableMoveIndex) FEATURE_FLAG_SETTER(EnablePredicateExtractForDataQueries) FEATURE_FLAG_SETTER(EnableNotNullDataColumns) diff --git a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp index a659a0e87a..70173a4ae7 100644 --- a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp @@ -665,9 +665,12 @@ key = 4, value = (empty maybe) using TEvResponse = TEvDataShard::TEvConditionalEraseRowsResponse; TPortManager pm; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings .SetEnableMvcc(WithMvcc) + .SetAppConfig(appConfig) .SetDomainName("Root") .SetUseRealThreads(false); diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 176294bd0f..06aebbe858 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -1240,6 +1240,7 @@ Y_UNIT_TEST_TWIN(TestDelayedTxWaitsForWriteActiveTxOnly, UseMvcc) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) @@ -1423,6 +1424,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, UseMvcc) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) @@ -1547,9 +1549,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, UseMvcc) { Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(true) + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -1679,6 +1684,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, UseMvcc) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) @@ -1777,9 +1783,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, UseMvcc) { Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, UseMvcc) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -1881,9 +1890,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, UseMvcc) { Y_UNIT_TEST(TestOutOfOrderRestartLocksSingleWithoutBarrier) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(false) // intentionally, because we test non-mvcc locks logic + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -2129,9 +2141,12 @@ Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) { Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, UseMvcc) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -2259,9 +2274,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, UseMvcc) { Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -3655,9 +3673,12 @@ Y_UNIT_TEST_WITH_MVCC(TestLateKqpDataReadAfterColumnDrop) { Y_UNIT_TEST(MvccTestSnapshotRead) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(true) + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -3863,6 +3884,7 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) @@ -3969,6 +3991,7 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, UseMvcc) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp index d770cf0308..97772e08ee 100644 --- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp @@ -208,10 +208,14 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { } Y_UNIT_TEST_WITH_MVCC(TestUploadRowsLocks) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(WithMvcc); + TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(WithMvcc) + .SetAppConfig(appConfig) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); diff --git a/ydb/tests/functional/canonical/test_sql.py b/ydb/tests/functional/canonical/test_sql.py index 38ea534395..c749d78601 100644 --- a/ydb/tests/functional/canonical/test_sql.py +++ b/ydb/tests/functional/canonical/test_sql.py @@ -109,7 +109,15 @@ class BaseCanonicalTest(object): set_canondata_root('ydb/tests/functional/canonical/canondata') cls.database = '/local' - cls.cluster = kikimr_cluster_factory(KikimrConfigGenerator(load_udfs=True, domain_name='local', use_in_memory_pdisks=True, disable_iterator_reads=True)) + cls.cluster = kikimr_cluster_factory( + KikimrConfigGenerator( + load_udfs=True, + domain_name='local', + use_in_memory_pdisks=True, + disable_iterator_reads=True, + disable_iterator_lookups=True + ) + ) cls.cluster.start() cls.driver = ydb.Driver( ydb.DriverConfig("%s:%s" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port), cls.database)) diff --git a/ydb/tests/functional/suite_tests/test_base.py b/ydb/tests/functional/suite_tests/test_base.py index 4389029a65..f1e2a120e9 100644 --- a/ydb/tests/functional/suite_tests/test_base.py +++ b/ydb/tests/functional/suite_tests/test_base.py @@ -245,6 +245,7 @@ class BaseSuiteRunner(object): load_udfs=True, use_in_memory_pdisks=True, disable_iterator_reads=True, + disable_iterator_lookups=True, # additional_log_configs={'KQP_YQL': 7} ) ) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index 58aaa90d3e..99a1cf80de 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -140,7 +140,8 @@ class KikimrConfigGenerator(object): use_legacy_pq=False, dc_mapping={}, enable_alter_database_create_hive_first=False, - disable_iterator_reads=False + disable_iterator_reads=False, + disable_iterator_lookups=False, ): self._version = version self.use_log_files = use_log_files @@ -217,6 +218,12 @@ class KikimrConfigGenerator(object): self.yaml_config["table_service_config"]["enable_kqp_scan_query_source_read"] = False self.yaml_config["table_service_config"]["enable_kqp_data_query_source_read"] = False + if disable_iterator_lookups: + if "table_service_config" not in self.yaml_config: + self.yaml_config["table_service_config"] = {} + self.yaml_config["table_service_config"]["enable_kqp_scan_query_stream_lookup"] = False + self.yaml_config["table_service_config"]["enable_kqp_data_query_stream_lookup"] = False + self.yaml_config["feature_flags"]["enable_public_api_external_blobs"] = enable_public_api_external_blobs self.yaml_config["feature_flags"]["enable_mvcc"] = "VALUE_FALSE" if disable_mvcc else "VALUE_TRUE" if enable_alter_database_create_hive_first: |