aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-02-25 13:10:26 +0300
committerGitHub <noreply@github.com>2025-02-25 13:10:26 +0300
commita4b20bc56d8c4132e109fa1750c46bfb013b2956 (patch)
tree03afc5385ab18de9c7f51d68d8e599b16b614f7f
parent1f915f645c751b2c2ea22e11caeb98d676d3fb86 (diff)
downloadydb-a4b20bc56d8c4132e109fa1750c46bfb013b2956.tar.gz
Fix #14903 (#14957)
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp52
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp41
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp46
3 files changed, 119 insertions, 20 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index fe16693416..243e81d3fc 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -723,6 +723,20 @@ public:
}
}
+ bool CheckTotalRetriesExeeded() {
+ const auto limit = MaxTotalRetries();
+ return limit && TotalRetries + 1 > *limit;
+ }
+
+ bool CheckShardRetriesExeeded(ui64 id) {
+ if (!Reads[id] || Reads[id].Finished) {
+ return false;
+ }
+
+ const auto& state = Reads[id].Shard;
+ return state->RetryAttempt + 1 > MaxShardRetries();
+ }
+
void RetryRead(ui64 id, bool allowInstantRetry = true) {
if (!Reads[id] || Reads[id].Finished) {
return;
@@ -730,18 +744,17 @@ public:
auto state = Reads[id].Shard;
- TotalRetries += 1;
- auto limit = MaxTotalRetries();
- if (limit && TotalRetries > *limit) {
+ if (CheckTotalRetriesExeeded()) {
return RuntimeError(TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded",
NDqProto::StatusIds::UNAVAILABLE);
}
+ ++TotalRetries;
- state->RetryAttempt += 1;
- if (state->RetryAttempt > MaxShardRetries()) {
+ if (CheckShardRetriesExeeded(id)) {
ResetRead(id);
return ResolveShard(state);
}
+ ++state->RetryAttempt;
auto delay = CalcDelay(state->RetryAttempt, allowInstantRetry);
if (delay == TDuration::Zero()) {
@@ -954,12 +967,16 @@ public:
Reads[id].Shard->Issues.push_back(issue);
}
+ auto replyError = [&](auto message, auto status) {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
+ return RuntimeError(message, status, issues);
+ };
+
if (UseFollowers && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS && Reads[id].Shard->SuccessBatches > 0) {
// read from follower is interrupted with error after several successful responses.
// in this case read is not safe because we can return inconsistent data.
- NYql::TIssues issues;
- NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
- return RuntimeError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE, issues);
+ return replyError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE);
}
switch (record.GetStatus().GetCode()) {
@@ -968,20 +985,33 @@ public:
break;
}
case Ydb::StatusIds::OVERLOADED: {
+ if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
+ return replyError(
+ TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
+ NYql::NDqProto::StatusIds::OVERLOADED);
+ }
return RetryRead(id, false);
}
case Ydb::StatusIds::INTERNAL_ERROR: {
+ if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
+ return replyError(
+ TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
+ NYql::NDqProto::StatusIds::INTERNAL_ERROR);
+ }
return RetryRead(id);
}
case Ydb::StatusIds::NOT_FOUND: {
+ if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
+ return replyError(
+ TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
+ NYql::NDqProto::StatusIds::UNAVAILABLE);
+ }
auto shard = Reads[id].Shard;
ResetRead(id);
return ResolveShard(shard);
}
default: {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
- return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
+ return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED);
}
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index d52e826fd3..ade6644ab8 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -345,6 +345,12 @@ private:
Counters->DataShardIteratorFails->Inc();
}
+ auto replyError = [&](auto message, auto status) {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
+ return RuntimeError(message, status, issues);
+ };
+
switch (record.GetStatus().GetCode()) {
case Ydb::StatusIds::SUCCESS:
break;
@@ -354,15 +360,23 @@ private:
return ResolveTableShards();
}
case Ydb::StatusIds::OVERLOADED: {
+ if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
+ return replyError(
+ TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
+ NYql::NDqProto::StatusIds::OVERLOADED);
+ }
return RetryTableRead(read, /*allowInstantRetry = */false);
}
case Ydb::StatusIds::INTERNAL_ERROR: {
+ if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
+ return replyError(
+ TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
+ NYql::NDqProto::StatusIds::INTERNAL_ERROR);
+ }
return RetryTableRead(read);
}
default: {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
- return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
+ return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED);
}
}
@@ -538,24 +552,33 @@ private:
}
}
+ bool CheckTotalRetriesExeeded() {
+ const auto limit = MaxTotalRetries();
+ return limit && TotalRetryAttempts + 1 > *limit;
+ }
+
+ bool CheckShardRetriesExeeded(TReadState& failedRead) {
+ const auto& shardState = ReadsPerShard[failedRead.ShardId];
+ return shardState.RetryAttempts + 1 > MaxShardRetries();
+ }
+
void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) {
CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id
<< ", shardId: " << failedRead.ShardId);
- ++TotalRetryAttempts;
- auto totalRetriesLimit = MaxTotalRetries();
- if (totalRetriesLimit && TotalRetryAttempts > *totalRetriesLimit) {
+ if (CheckTotalRetriesExeeded()) {
return RuntimeError(TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded",
NYql::NDqProto::StatusIds::UNAVAILABLE);
}
+ ++TotalRetryAttempts;
- auto& shardState = ReadsPerShard[failedRead.ShardId];
- ++shardState.RetryAttempts;
- if (shardState.RetryAttempts > MaxShardRetries()) {
+ if (CheckShardRetriesExeeded(failedRead)) {
StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
failedRead.SetFinished();
return ResolveTableShards();
}
+ auto& shardState = ReadsPerShard[failedRead.ShardId];
+ ++shardState.RetryAttempts;
auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry);
if (delay == TDuration::Zero()) {
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index 1dbcb57fcb..be70002b51 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/tx/datashard/datashard_failpoints.h>
+#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -2142,6 +2143,51 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
}
}
+
+ Y_UNIT_TEST_TWIN(ReadOverloaded, StreamLookup) {
+ NKikimrConfig::TAppConfig appConfig;
+ auto setting = NKikimrKqp::TKqpSetting();
+ TKikimrSettings settings;
+ settings.SetAppConfig(appConfig);
+ settings.SetUseRealThreads(false);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
+ auto writeSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
+
+ auto& runtime = *kikimr.GetTestServer().GetRuntime();
+
+ kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });
+
+ {
+ const TString query(StreamLookup
+ ? Q1_(R"(
+ SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1
+ )")
+ : Q1_(R"(
+ SELECT COUNT(a.Key) FROM `/Root/SecondaryKeys` as a;
+ )"));
+
+ auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
+ if (ev->GetTypeRewrite() == TEvDataShard::TEvReadResult::EventType) {
+ auto* msg = ev->Get<TEvDataShard::TEvReadResult>();
+ msg->Record.MutableStatus()->SetCode(::Ydb::StatusIds::OVERLOADED);
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ runtime.SetObserverFunc(grab);
+ auto future = kikimr.RunInThreadPool([&]{
+ auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
+ return session.ExecuteDataQuery(query, txc).ExtractValueSync();
+ });
+
+ auto result = runtime.WaitFuture(future);
+ UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetStatus() == NYdb::EStatus::OVERLOADED);
+ }
+ }
}
} // namespace NKqp