diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-02-25 13:10:26 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-25 13:10:26 +0300 |
commit | a4b20bc56d8c4132e109fa1750c46bfb013b2956 (patch) | |
tree | 03afc5385ab18de9c7f51d68d8e599b16b614f7f | |
parent | 1f915f645c751b2c2ea22e11caeb98d676d3fb86 (diff) | |
download | ydb-a4b20bc56d8c4132e109fa1750c46bfb013b2956.tar.gz |
Fix #14903 (#14957)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 52 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 41 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_query_ut.cpp | 46 |
3 files changed, 119 insertions, 20 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index fe16693416..243e81d3fc 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -723,6 +723,20 @@ public: } } + bool CheckTotalRetriesExeeded() { + const auto limit = MaxTotalRetries(); + return limit && TotalRetries + 1 > *limit; + } + + bool CheckShardRetriesExeeded(ui64 id) { + if (!Reads[id] || Reads[id].Finished) { + return false; + } + + const auto& state = Reads[id].Shard; + return state->RetryAttempt + 1 > MaxShardRetries(); + } + void RetryRead(ui64 id, bool allowInstantRetry = true) { if (!Reads[id] || Reads[id].Finished) { return; @@ -730,18 +744,17 @@ public: auto state = Reads[id].Shard; - TotalRetries += 1; - auto limit = MaxTotalRetries(); - if (limit && TotalRetries > *limit) { + if (CheckTotalRetriesExeeded()) { return RuntimeError(TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded", NDqProto::StatusIds::UNAVAILABLE); } + ++TotalRetries; - state->RetryAttempt += 1; - if (state->RetryAttempt > MaxShardRetries()) { + if (CheckShardRetriesExeeded(id)) { ResetRead(id); return ResolveShard(state); } + ++state->RetryAttempt; auto delay = CalcDelay(state->RetryAttempt, allowInstantRetry); if (delay == TDuration::Zero()) { @@ -954,12 +967,16 @@ public: Reads[id].Shard->Issues.push_back(issue); } + auto replyError = [&](auto message, auto status) { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); + return RuntimeError(message, status, issues); + }; + if (UseFollowers && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS && Reads[id].Shard->SuccessBatches > 0) { // read from follower is interrupted with error after several successful responses. // in this case read is not safe because we can return inconsistent data. - NYql::TIssues issues; - NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); - return RuntimeError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE, issues); + return replyError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE); } switch (record.GetStatus().GetCode()) { @@ -968,20 +985,33 @@ public: break; } case Ydb::StatusIds::OVERLOADED: { + if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) { + return replyError( + TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.", + NYql::NDqProto::StatusIds::OVERLOADED); + } return RetryRead(id, false); } case Ydb::StatusIds::INTERNAL_ERROR: { + if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) { + return replyError( + TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.", + NYql::NDqProto::StatusIds::INTERNAL_ERROR); + } return RetryRead(id); } case Ydb::StatusIds::NOT_FOUND: { + if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) { + return replyError( + TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.", + NYql::NDqProto::StatusIds::UNAVAILABLE); + } auto shard = Reads[id].Shard; ResetRead(id); return ResolveShard(shard); } default: { - NYql::TIssues issues; - NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); - return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues); + return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED); } } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index d52e826fd3..ade6644ab8 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -345,6 +345,12 @@ private: Counters->DataShardIteratorFails->Inc(); } + auto replyError = [&](auto message, auto status) { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); + return RuntimeError(message, status, issues); + }; + switch (record.GetStatus().GetCode()) { case Ydb::StatusIds::SUCCESS: break; @@ -354,15 +360,23 @@ private: return ResolveTableShards(); } case Ydb::StatusIds::OVERLOADED: { + if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) { + return replyError( + TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.", + NYql::NDqProto::StatusIds::OVERLOADED); + } return RetryTableRead(read, /*allowInstantRetry = */false); } case Ydb::StatusIds::INTERNAL_ERROR: { + if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) { + return replyError( + TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.", + NYql::NDqProto::StatusIds::INTERNAL_ERROR); + } return RetryTableRead(read); } default: { - NYql::TIssues issues; - NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); - return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues); + return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED); } } @@ -538,24 +552,33 @@ private: } } + bool CheckTotalRetriesExeeded() { + const auto limit = MaxTotalRetries(); + return limit && TotalRetryAttempts + 1 > *limit; + } + + bool CheckShardRetriesExeeded(TReadState& failedRead) { + const auto& shardState = ReadsPerShard[failedRead.ShardId]; + return shardState.RetryAttempts + 1 > MaxShardRetries(); + } + void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) { CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id << ", shardId: " << failedRead.ShardId); - ++TotalRetryAttempts; - auto totalRetriesLimit = MaxTotalRetries(); - if (totalRetriesLimit && TotalRetryAttempts > *totalRetriesLimit) { + if (CheckTotalRetriesExeeded()) { return RuntimeError(TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded", NYql::NDqProto::StatusIds::UNAVAILABLE); } + ++TotalRetryAttempts; - auto& shardState = ReadsPerShard[failedRead.ShardId]; - ++shardState.RetryAttempts; - if (shardState.RetryAttempts > MaxShardRetries()) { + if (CheckShardRetriesExeeded(failedRead)) { StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey); failedRead.SetFinished(); return ResolveTableShards(); } + auto& shardState = ReadsPerShard[failedRead.ShardId]; + ++shardState.RetryAttempts; auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry); if (delay == TDuration::Zero()) { diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 1dbcb57fcb..be70002b51 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -1,6 +1,7 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/tx/datashard/datashard_failpoints.h> +#include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/testlib/common_helper.h> #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h> #include <ydb/core/kqp/counters/kqp_counters.h> @@ -2142,6 +2143,51 @@ Y_UNIT_TEST_SUITE(KqpQuery) { CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0))); } } + + Y_UNIT_TEST_TWIN(ReadOverloaded, StreamLookup) { + NKikimrConfig::TAppConfig appConfig; + auto setting = NKikimrKqp::TKqpSetting(); + TKikimrSettings settings; + settings.SetAppConfig(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + auto writeSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + + kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; }); + + { + const TString query(StreamLookup + ? Q1_(R"( + SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1 + )") + : Q1_(R"( + SELECT COUNT(a.Key) FROM `/Root/SecondaryKeys` as a; + )")); + + auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto { + if (ev->GetTypeRewrite() == TEvDataShard::TEvReadResult::EventType) { + auto* msg = ev->Get<TEvDataShard::TEvReadResult>(); + msg->Record.MutableStatus()->SetCode(::Ydb::StatusIds::OVERLOADED); + } + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + runtime.SetObserverFunc(grab); + auto future = kikimr.RunInThreadPool([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return session.ExecuteDataQuery(query, txc).ExtractValueSync(); + }); + + auto result = runtime.WaitFuture(future); + UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT(result.GetStatus() == NYdb::EStatus::OVERLOADED); + } + } } } // namespace NKqp |