aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2024-12-04 16:53:36 +0300
committerGitHub <noreply@github.com>2024-12-04 16:53:36 +0300
commit06b7c083814dd8352eabfe11a1501a6d97c31cf3 (patch)
tree08c5e9c5ac840b739a1b12319fd86d05855f7dbb
parent76b2fb965e3deddfc311bc2519faf6aa0eac2b33 (diff)
downloadydb-06b7c083814dd8352eabfe11a1501a6d97c31cf3.tar.gz
fix various problems in stream lookup (#12218)
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp14
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp94
3 files changed, 100 insertions, 10 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 30f7d86d50..d43222283b 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -268,6 +268,8 @@ private:
RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(),
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
}
+ } catch (const NKikimr::TMemoryLimitExceededException& e) {
+ RuntimeError("Memory limit exceeded at stream lookup", NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
} catch (const yexception& e) {
RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR);
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index 54b88c2f97..c523314384 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -82,7 +82,7 @@ struct THashableKey {
struct TKeyHash {
using is_transparent = void;
- bool operator()(TConstArrayRef<TCell> key) const {
+ size_t operator()(TConstArrayRef<TCell> key) const {
return absl::Hash<THashableKey>()(THashableKey{ key });
}
};
@@ -364,14 +364,16 @@ public:
}
}
- if (rowSize > freeSpace - (i64)resultStats.ResultBytesCount) {
- row.DeleteUnreferenced();
+ if (rowSize + (i64)resultStats.ResultBytesCount > freeSpace) {
sizeLimitExceeded = true;
+ }
+
+ if (resultStats.ResultRowsCount && sizeLimitExceeded) {
+ row.DeleteUnreferenced();
break;
}
batch.push_back(std::move(row));
-
storageRowSize = std::max(storageRowSize, (i64)8);
resultStats.ReadRowsCount += 1;
@@ -680,7 +682,7 @@ public:
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
- // result can contain fewer columns because of system columns
+ // result can contain fewer columns because of system columns
YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch");
std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
@@ -805,7 +807,7 @@ public:
for (; result.FirstUnprocessedRow < result.Rows.size(); ++result.FirstUnprocessedRow) {
auto& row = result.Rows[result.FirstUnprocessedRow];
- if (resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
+ if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
sizeLimitExceeded = true;
break;
}
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 3c6b4259de..8693c77452 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -3830,7 +3830,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/TestTable1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3);
-
+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 2);
for (const auto& ta : stats.query_phases(1).table_access()) {
if (ta.name() == "/Root/TestTable2") {
@@ -4338,6 +4338,92 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
SelectFromAsyncIndexedTable();
}
+ Y_UNIT_TEST(SelectFromIndexesAndFreeSpaceLogicDoesntTimeout) {
+ auto setting = NKikimrKqp::TKqpSetting();
+ setting.SetName("_KqpYqlSyntaxVersion");
+ setting.SetValue("1");
+ auto serverSettings = TKikimrSettings()
+ .SetKqpSettings({setting});
+
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(true);
+ // setting channel buffer size so small to make sure that we will be able to transfer at least
+ // one row in stream lookup.
+ appConfig.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(1_KB);
+ // setting string a bit larger than size of the channel buffer.
+ const int payloadSize = 5000;
+
+ serverSettings.SetAppConfig(appConfig);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ CreateSampleTablesWithIndex(session);
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ {
+ const TString query(Q_(R"(
+ DECLARE $Payload AS String;
+ REPLACE INTO `/Root/SecondaryComplexKeys` (Key, Fk1, Fk2, Value) VALUES
+ (1, 1, "Fk1", $Payload);
+ )"));
+
+ TString largeString(payloadSize, 'a');
+
+ auto params = TParamsBuilder()
+ .AddParam("$Payload")
+ .String(largeString)
+ .Build()
+ .Build();
+
+ auto result = session.ExecuteDataQuery(
+ query,
+ TTxControl::BeginTx().CommitTx(),
+ params,
+ execSettings).ExtractValueSync();
+
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ const TString query1(Q_(R"(
+ SELECT *
+ FROM `/Root/SecondaryComplexKeys` VIEW Index
+ WHERE Fk1 = 1
+ LIMIT 10;
+ )"));
+
+ auto result2 = session.ExecuteDataQuery(
+ query1,
+ TTxControl::BeginTx().CommitTx(),
+ execSettings).ExtractValueSync();
+
+ UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString());
+ // UNIT_ASSERT(result2.GetIssues().Empty());
+ }
+
+ {
+ const TString query1(Q_(R"(
+ SELECT q.Value as V1, t.Value as V2
+ FROM `/Root/SecondaryComplexKeys` VIEW Index as t
+ LEFT JOIN `/Root/SecondaryComplexKeys` as q
+ ON q.Key = t.Key
+ WHERE t.Key = 1
+ LIMIT 10;
+ )"));
+
+ auto result2 = session.ExecuteDataQuery(
+ query1,
+ TTxControl::BeginTx().CommitTx(),
+ execSettings).ExtractValueSync();
+
+ UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString());
+ // UNIT_ASSERT(result2.GetIssues().Empty());
+ }
+ }
+
Y_UNIT_TEST(InnerJoinWithNonIndexWherePredicate) {
auto setting = NKikimrKqp::TKqpSetting();
setting.SetName("_KqpYqlSyntaxVersion");
@@ -5344,7 +5430,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync();
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
+ }
{
const auto& yson = ReadTablePartToYson(session, "/Root/table");
@@ -5360,9 +5446,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
auto result = session.ExecuteDataQuery(selectSql, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
UNIT_ASSERT(result.IsSuccess());
- UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)), R"([[[100u];[101u]];[[200u];[201u]]])");
+ UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)), R"([[[100u];[101u]];[[200u];[201u]]])");
}
- }
+ }
}
}