diff options
author | spuchin <spuchin@ydb.tech> | 2022-09-02 18:14:47 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-09-02 18:14:47 +0300 |
commit | cddf9ad736065f4d89cf6cd3765d0f1200cebc7e (patch) | |
tree | fbd85f0aa45f40433a52ba4dccc5405e31d2d0fd | |
parent | cec129ee50c9e30e6b0525bc555fbf9b3c413181 (diff) | |
download | ydb-cddf9ad736065f4d89cf6cd3765d0f1200cebc7e.tar.gz |
Fix query statuses/issues for exec-time scheme errors. ()
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_table_resolver.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 35 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_acl_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_sys_view_ut.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 2 | ||||
-rw-r--r-- | ydb/tests/functional/rename/test.py | 3 |
10 files changed, 88 insertions, 41 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 1351c6a35b7..3caf55df4c2 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -103,7 +103,7 @@ public: TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters, + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) , ComputeCtx(settings.StatsMode) @@ -442,6 +442,7 @@ private: YQL_ENSURE(state->Generation == msg.GetGeneration()); if (state->State == EShardState::Starting) { + // TODO: Do not parse issues here, use status code. if (FindSchemeErrorInIssues(status, issues)) { return EnqueueResolveShard(state); } @@ -515,17 +516,22 @@ private: switch (x.Status) { case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist: statusCode = NDqProto::StatusIds::SCHEME_ERROR; - issueCode = TIssuesIds::KIKIMR_SCHEME_ERROR; + issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH; error = TStringBuilder() << "Table '" << ScanData->TablePath << "' not exists."; break; case NSchemeCache::TSchemeCacheRequest::EStatus::TypeCheckError: - statusCode = NDqProto::StatusIds::ABORTED; + statusCode = NDqProto::StatusIds::SCHEME_ERROR; issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH; error = TStringBuilder() << "Table '" << ScanData->TablePath << "' scheme changed."; break; + case NSchemeCache::TSchemeCacheRequest::EStatus::LookupError: + statusCode = NDqProto::StatusIds::UNAVAILABLE; + issueCode = TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE; + error = TStringBuilder() << "Failed to resolve table '" << ScanData->TablePath << "'."; + break; default: statusCode = NDqProto::StatusIds::SCHEME_ERROR; - issueCode = TIssuesIds::KIKIMR_SCHEME_ERROR; + issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH; error = TStringBuilder() << "Unresolved table '" << ScanData->TablePath << "'. Status: " << x.Status; break; } @@ -540,7 +546,7 @@ private: if (keyDesc->GetPartitions().empty()) { TString error = TStringBuilder() << "No partitions to read from '" << ScanData->TablePath << "'"; CA_LOG_E(error); - InternalError(NDqProto::StatusIds::SCHEME_ERROR, TIssuesIds::KIKIMR_SCHEME_ERROR, error); + InternalError(NDqProto::StatusIds::SCHEME_ERROR, TIssuesIds::KIKIMR_SCHEME_MISMATCH, error); return; } diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 3e09e81ada4..07135d24a18 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -476,11 +476,14 @@ private: case NKikimrTxDataShard::TEvProposeTransactionResult::ERROR: { Counters->TxProxyMon->TxResultError->Inc(); for (auto& er : result.GetError()) { - if (er.GetKind() == NKikimrTxDataShard::TError::SCHEME_CHANGED) { - return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_MISMATCH, er.GetReason())); - } - if (er.GetKind() == NKikimrTxDataShard::TError::SCHEME_ERROR) { - return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_ERROR, er.GetReason())); + switch (er.GetKind()) { + case NKikimrTxDataShard::TError::SCHEME_CHANGED: + case NKikimrTxDataShard::TError::SCHEME_ERROR: + return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, YqlIssue({}, + TIssuesIds::KIKIMR_SCHEME_MISMATCH, er.GetReason())); + + default: + break; } } auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE); @@ -1825,12 +1828,12 @@ private: } LWTRACK(KqpDataExecuterFinalize, ResponseEv->Orbit, TxId, LastShard, response.GetResult().ResultsSize(), response.ByteSize()); - + if (ExecuterStateSpan) { ExecuterStateSpan.End(); ExecuterStateSpan = {}; } - + if (ExecuterSpan) { ExecuterSpan.EndOk(); } diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp index 65345d3dc6d..c0f3f327b2c 100644 --- a/ydb/core/kqp/executer/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp @@ -58,18 +58,24 @@ private: YQL_ENSURE(entries.size() == TableKeys.Size()); for (auto& entry : entries) { - if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { - timer.reset(); - if (entry.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown) { + switch (entry.Status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: + break; + + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable: + case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: + ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, + YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() + << "Table scheme error `" << JoinPath(entry.Path) << "`: " << entry.Status << '.')); + return; + + default: ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() << "Failed to resolve table `" << JoinPath(entry.Path) << "`: " << entry.Status << '.')); - } else { - ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder() - << "Failed to resolve table `" << JoinPath(entry.Path) << "`: " << entry.Status << '.')); - } - return; + return; + } auto* table = TableKeys.FindTablePtr(entry.TableId); @@ -125,7 +131,7 @@ private: if (!columnId) { timer.reset(); ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder() + YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() << "Unknown column `" << columnName << "` at table `" << JoinPath(entry.Path) << "`.")); return; } @@ -191,7 +197,7 @@ private: timer.reset(); ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder() + YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() << "Failed to resolve table " << path << " keys: " << entry.Status << '.')); return; } diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 9aac53c984c..afb8c28f328 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -1121,6 +1121,13 @@ public: return true; } + void InvalidateQuery() { + auto invalidateEv = MakeHolder<TEvKqp::TEvCompileInvalidateRequest>( + QueryState->CompileResult->Uid, Settings.DbCounters); + + Send(MakeKqpCompileServiceID(SelfId().NodeId()), invalidateEv.Release()); + } + void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { QueryState->Orbit = std::move(ev->Get()->Orbit); @@ -1140,15 +1147,31 @@ public: TransactionsToBeAborted.emplace_back(txCtx); RemoveTransaction(QueryState->TxId); + auto status = response->GetStatus(); TIssues issues; - issues.AddIssue(YqlIssue({}, TIssuesIds::CORE_EXEC, "Execution")); - TIssues subIssues; - IssuesFromMessage(response->GetIssues(), subIssues); - for (auto& i : subIssues) { - issues.back().AddSubIssue(MakeIntrusive<TIssue>(i)); + IssuesFromMessage(response->GetIssues(), issues); + + // Invalidate query cache on scheme/internal errors + switch (status) { + case Ydb::StatusIds::SCHEME_ERROR: + case Ydb::StatusIds::INTERNAL_ERROR: + InvalidateQuery(); + issues.AddIssue(YqlIssue(TPosition(), TIssuesIds::KIKIMR_QUERY_INVALIDATED, + TStringBuilder() << "Query invalidated on scheme/internal error.")); + + // SCHEME_ERROR during execution is a soft (retriable) error, we abort query execution, + // invalidate query cache, and return ABORTED as retriable status. + if (status == Ydb::StatusIds::SCHEME_ERROR) { + status = Ydb::StatusIds::ABORTED; + } + + break; + + default: + break; } - ReplyQueryError(requestInfo, response->GetStatus(), "", MessageFromIssues(issues)); + ReplyQueryError(requestInfo, status, "", MessageFromIssues(issues)); return; } diff --git a/ydb/core/kqp/ut/kqp_acl_ut.cpp b/ydb/core/kqp/ut/kqp_acl_ut.cpp index 203c8ba6188..6d774d54a6e 100644 --- a/ydb/core/kqp/ut/kqp_acl_ut.cpp +++ b/ydb/core/kqp/ut/kqp_acl_ut.cpp @@ -49,7 +49,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngineAcl) { PRAGMA kikimr.UseNewEngine = "true"; SELECT * FROM `/Root/TwoShard`; )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); + // TODO: Should be UNAUTHORIZED + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); } { auto result = session.ExecuteDataQuery(R"( @@ -57,7 +58,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngineAcl) { UPSERT INTO `/Root/TwoShard` (Key, Value1, Value2) VALUES (10u, "One", -10); )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); + // TODO: Should be UNAUTHORIZED + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); } driver.Stop(true); diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index 1e6e8b844ac..3e8e86876a8 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1234,7 +1234,11 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto part = it.ReadNext().GetValueSync(); UNIT_ASSERT_EQUAL_C(part.GetStatus(), EStatus::PRECONDITION_FAILED, part.GetStatus()); - UNIT_ASSERT_STRINGS_EQUAL(part.GetIssues().back().GetSubIssues().back()->Message, "Requested too many execution units: 32"); + part.GetIssues().PrintTo(Cerr); + UNIT_ASSERT(HasIssue(part.GetIssues(), NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, + [](const NYql::TIssue& issue) { + return issue.Message.Contains("Requested too many execution units"); + })); part = it.ReadNext().GetValueSync(); UNIT_ASSERT(part.EOS()); diff --git a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp index 3ba730c872b..8b1e5545671 100644 --- a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp +++ b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp @@ -97,7 +97,7 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { auto it = client.StreamExecuteScanQuery(R"( PRAGMA Kikimr.OptEnablePredicateExtract = "true"; SELECT OwnerId, PartIdx, Path, PathId - FROM `/Root/.sys/partition_stats` + FROM `/Root/.sys/partition_stats` WHERE OwnerId = 72057594046644480ul AND PathId = 5u @@ -138,7 +138,7 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { PRAGMA Kikimr.OptEnablePredicateExtract = "true"; SELECT OwnerId, PartIdx, Path, PathId - FROM `/Root/.sys/partition_stats` + FROM `/Root/.sys/partition_stats` WHERE OwnerId = 72057594046644480ul AND PathId = 5u @@ -464,7 +464,8 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); auto streamPart = it.ReadNext().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SCHEME_ERROR); + // TODO: Should be UNAUTHORIZED + UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::ABORTED); driver.Stop(true); } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp index 8e14a1ce09a..8318b5e9dcb 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp @@ -90,7 +90,7 @@ Y_UNIT_TEST(ResolveTableError) { auto event = ev.Get()->Get<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(); event->Request->ErrorCount = 1; auto& entries = event->Request->ResultSet; - entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown; + entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError; } } return TTestActorRuntime::EEventAction::PROCESS; @@ -108,8 +108,7 @@ Y_UNIT_TEST(ResolveTableError) { TIssues issues; IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); - UNIT_ASSERT_C(HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, - "Failed to resolve table `Root/table-1`: PathErrorUnknown."), record.GetResponse().DebugString()); + UNIT_ASSERT(HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE)); } Y_UNIT_TEST_NEW_ENGINE(ProposeError) { @@ -199,8 +198,8 @@ Y_UNIT_TEST_NEW_ENGINE(ProposeError) { if (UseNewEngine) { test(TEvProposeTransactionResult::ERROR, - Ydb::StatusIds::SCHEME_ERROR, - NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, + Ydb::StatusIds::ABORTED, + NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, "blah-blah-blah", [](NKikimrTxDataShard::TEvProposeTransactionResult& x) { auto* error = x.MutableError()->Add(); diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index f1b90a0cd1a..4df58ff07a4 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -3750,7 +3750,7 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query, bool auto& response = ev->Get()->Record.GetRef(); Cerr << response.DebugString() << Endl; UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED); - auto& issue = response.GetResponse().GetQueryIssues(0).Getissues(0); + auto& issue = response.GetResponse().GetQueryIssues(0); UNIT_ASSERT_VALUES_EQUAL(issue.issue_code(), (int) NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH); UNIT_ASSERT_STRINGS_EQUAL(issue.message(), "Table \'/Root/table-1\' scheme changed."); } diff --git a/ydb/tests/functional/rename/test.py b/ydb/tests/functional/rename/test.py index 01f5e421b8a..751d1382ad3 100644 --- a/ydb/tests/functional/rename/test.py +++ b/ydb/tests/functional/rename/test.py @@ -141,6 +141,7 @@ class Simple: upsert_table_template = ( """ + PRAGMA Kikimr.UseNewEngine="True"; DECLARE $key AS Uint64; DECLARE $value AS Utf8; @@ -152,6 +153,7 @@ class Simple: select_table_template = ( """ + PRAGMA Kikimr.UseNewEngine="True"; DECLARE $key AS Uint64; SELECT value FROM `{table}` WHERE id = $key; """ @@ -159,6 +161,7 @@ class Simple: select_index_table_template = ( """ + PRAGMA Kikimr.UseNewEngine="True"; DECLARE $value AS Utf8; SELECT id FROM `{table}` VIEW `value_index` WHERE value = $value; """ |