diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2024-12-04 16:53:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-04 16:53:36 +0300 |
commit | 06b7c083814dd8352eabfe11a1501a6d97c31cf3 (patch) | |
tree | 08c5e9c5ac840b739a1b12319fd86d05855f7dbb | |
parent | 76b2fb965e3deddfc311bc2519faf6aa0eac2b33 (diff) | |
download | ydb-06b7c083814dd8352eabfe11a1501a6d97c31cf3.tar.gz |
fix various problems in stream lookup (#12218)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 94 |
3 files changed, 100 insertions, 10 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 30f7d86d50..d43222283b 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -268,6 +268,8 @@ private: RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(), NYql::NDqProto::StatusIds::INTERNAL_ERROR); } + } catch (const NKikimr::TMemoryLimitExceededException& e) { + RuntimeError("Memory limit exceeded at stream lookup", NYql::NDqProto::StatusIds::PRECONDITION_FAILED); } catch (const yexception& e) { RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR); } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 54b88c2f97..c523314384 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -82,7 +82,7 @@ struct THashableKey { struct TKeyHash { using is_transparent = void; - bool operator()(TConstArrayRef<TCell> key) const { + size_t operator()(TConstArrayRef<TCell> key) const { return absl::Hash<THashableKey>()(THashableKey{ key }); } }; @@ -364,14 +364,16 @@ public: } } - if (rowSize > freeSpace - (i64)resultStats.ResultBytesCount) { - row.DeleteUnreferenced(); + if (rowSize + (i64)resultStats.ResultBytesCount > freeSpace) { sizeLimitExceeded = true; + } + + if (resultStats.ResultRowsCount && sizeLimitExceeded) { + row.DeleteUnreferenced(); break; } batch.push_back(std::move(row)); - storageRowSize = std::max(storageRowSize, (i64)8); resultStats.ReadRowsCount += 1; @@ -680,7 +682,7 @@ public: for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); - // result can contain fewer columns because of system columns + // result can contain fewer columns because of system columns YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch"); std::vector<TCell> joinKeyCells(LookupKeyColumns.size()); @@ -805,7 +807,7 @@ public: for (; result.FirstUnprocessedRow < result.Rows.size(); ++result.FirstUnprocessedRow) { auto& row = result.Rows[result.FirstUnprocessedRow]; - if (resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) { + if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) { sizeLimitExceeded = true; break; } diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 3c6b4259de..8693c77452 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -3830,7 +3830,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/TestTable1"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3); - + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 2); for (const auto& ta : stats.query_phases(1).table_access()) { if (ta.name() == "/Root/TestTable2") { @@ -4338,6 +4338,92 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda SelectFromAsyncIndexedTable(); } + Y_UNIT_TEST(SelectFromIndexesAndFreeSpaceLogicDoesntTimeout) { + auto setting = NKikimrKqp::TKqpSetting(); + setting.SetName("_KqpYqlSyntaxVersion"); + setting.SetValue("1"); + auto serverSettings = TKikimrSettings() + .SetKqpSettings({setting}); + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(true); + // setting channel buffer size so small to make sure that we will be able to transfer at least + // one row in stream lookup. + appConfig.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(1_KB); + // setting string a bit larger than size of the channel buffer. + const int payloadSize = 5000; + + serverSettings.SetAppConfig(appConfig); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + CreateSampleTablesWithIndex(session); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + { + const TString query(Q_(R"( + DECLARE $Payload AS String; + REPLACE INTO `/Root/SecondaryComplexKeys` (Key, Fk1, Fk2, Value) VALUES + (1, 1, "Fk1", $Payload); + )")); + + TString largeString(payloadSize, 'a'); + + auto params = TParamsBuilder() + .AddParam("$Payload") + .String(largeString) + .Build() + .Build(); + + auto result = session.ExecuteDataQuery( + query, + TTxControl::BeginTx().CommitTx(), + params, + execSettings).ExtractValueSync(); + + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const TString query1(Q_(R"( + SELECT * + FROM `/Root/SecondaryComplexKeys` VIEW Index + WHERE Fk1 = 1 + LIMIT 10; + )")); + + auto result2 = session.ExecuteDataQuery( + query1, + TTxControl::BeginTx().CommitTx(), + execSettings).ExtractValueSync(); + + UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString()); + // UNIT_ASSERT(result2.GetIssues().Empty()); + } + + { + const TString query1(Q_(R"( + SELECT q.Value as V1, t.Value as V2 + FROM `/Root/SecondaryComplexKeys` VIEW Index as t + LEFT JOIN `/Root/SecondaryComplexKeys` as q + ON q.Key = t.Key + WHERE t.Key = 1 + LIMIT 10; + )")); + + auto result2 = session.ExecuteDataQuery( + query1, + TTxControl::BeginTx().CommitTx(), + execSettings).ExtractValueSync(); + + UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString()); + // UNIT_ASSERT(result2.GetIssues().Empty()); + } + } + Y_UNIT_TEST(InnerJoinWithNonIndexWherePredicate) { auto setting = NKikimrKqp::TKqpSetting(); setting.SetName("_KqpYqlSyntaxVersion"); @@ -5344,7 +5430,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync(); UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + } { const auto& yson = ReadTablePartToYson(session, "/Root/table"); @@ -5360,9 +5446,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda auto result = session.ExecuteDataQuery(selectSql, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString()); UNIT_ASSERT(result.IsSuccess()); - UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)), R"([[[100u];[101u]];[[200u];[201u]]])"); + UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)), R"([[[100u];[101u]];[[200u];[201u]]])"); } - } + } } } |