aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-09-02 18:14:47 +0300
committerspuchin <spuchin@ydb.tech>2022-09-02 18:14:47 +0300
commitcddf9ad736065f4d89cf6cd3765d0f1200cebc7e (patch)
treefbd85f0aa45f40433a52ba4dccc5405e31d2d0fd
parentcec129ee50c9e30e6b0525bc555fbf9b3c413181 (diff)
downloadydb-cddf9ad736065f4d89cf6cd3765d0f1200cebc7e.tar.gz
Fix query statuses/issues for exec-time scheme errors. ()
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp16
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp17
-rw-r--r--ydb/core/kqp/executer/kqp_table_resolver.cpp28
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp35
-rw-r--r--ydb/core/kqp/ut/kqp_acl_ut.cpp6
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp6
-rw-r--r--ydb/core/kqp/ut/kqp_sys_view_ut.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp2
-rw-r--r--ydb/tests/functional/rename/test.py3
10 files changed, 88 insertions, 41 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 1351c6a35b7..3caf55df4c2 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -103,7 +103,7 @@ public:
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
NWilson::TTraceId traceId)
: TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
@@ -442,6 +442,7 @@ private:
YQL_ENSURE(state->Generation == msg.GetGeneration());
if (state->State == EShardState::Starting) {
+ // TODO: Do not parse issues here, use status code.
if (FindSchemeErrorInIssues(status, issues)) {
return EnqueueResolveShard(state);
}
@@ -515,17 +516,22 @@ private:
switch (x.Status) {
case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
statusCode = NDqProto::StatusIds::SCHEME_ERROR;
- issueCode = TIssuesIds::KIKIMR_SCHEME_ERROR;
+ issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH;
error = TStringBuilder() << "Table '" << ScanData->TablePath << "' not exists.";
break;
case NSchemeCache::TSchemeCacheRequest::EStatus::TypeCheckError:
- statusCode = NDqProto::StatusIds::ABORTED;
+ statusCode = NDqProto::StatusIds::SCHEME_ERROR;
issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH;
error = TStringBuilder() << "Table '" << ScanData->TablePath << "' scheme changed.";
break;
+ case NSchemeCache::TSchemeCacheRequest::EStatus::LookupError:
+ statusCode = NDqProto::StatusIds::UNAVAILABLE;
+ issueCode = TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE;
+ error = TStringBuilder() << "Failed to resolve table '" << ScanData->TablePath << "'.";
+ break;
default:
statusCode = NDqProto::StatusIds::SCHEME_ERROR;
- issueCode = TIssuesIds::KIKIMR_SCHEME_ERROR;
+ issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH;
error = TStringBuilder() << "Unresolved table '" << ScanData->TablePath << "'. Status: " << x.Status;
break;
}
@@ -540,7 +546,7 @@ private:
if (keyDesc->GetPartitions().empty()) {
TString error = TStringBuilder() << "No partitions to read from '" << ScanData->TablePath << "'";
CA_LOG_E(error);
- InternalError(NDqProto::StatusIds::SCHEME_ERROR, TIssuesIds::KIKIMR_SCHEME_ERROR, error);
+ InternalError(NDqProto::StatusIds::SCHEME_ERROR, TIssuesIds::KIKIMR_SCHEME_MISMATCH, error);
return;
}
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 3e09e81ada4..07135d24a18 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -476,11 +476,14 @@ private:
case NKikimrTxDataShard::TEvProposeTransactionResult::ERROR: {
Counters->TxProxyMon->TxResultError->Inc();
for (auto& er : result.GetError()) {
- if (er.GetKind() == NKikimrTxDataShard::TError::SCHEME_CHANGED) {
- return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_MISMATCH, er.GetReason()));
- }
- if (er.GetKind() == NKikimrTxDataShard::TError::SCHEME_ERROR) {
- return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_ERROR, er.GetReason()));
+ switch (er.GetKind()) {
+ case NKikimrTxDataShard::TError::SCHEME_CHANGED:
+ case NKikimrTxDataShard::TError::SCHEME_ERROR:
+ return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, YqlIssue({},
+ TIssuesIds::KIKIMR_SCHEME_MISMATCH, er.GetReason()));
+
+ default:
+ break;
}
}
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
@@ -1825,12 +1828,12 @@ private:
}
LWTRACK(KqpDataExecuterFinalize, ResponseEv->Orbit, TxId, LastShard, response.GetResult().ResultsSize(), response.ByteSize());
-
+
if (ExecuterStateSpan) {
ExecuterStateSpan.End();
ExecuterStateSpan = {};
}
-
+
if (ExecuterSpan) {
ExecuterSpan.EndOk();
}
diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp
index 65345d3dc6d..c0f3f327b2c 100644
--- a/ydb/core/kqp/executer/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp
@@ -58,18 +58,24 @@ private:
YQL_ENSURE(entries.size() == TableKeys.Size());
for (auto& entry : entries) {
- if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
- timer.reset();
- if (entry.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown) {
+ switch (entry.Status) {
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
+ break;
+
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
+ ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
+ YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
+ << "Table scheme error `" << JoinPath(entry.Path) << "`: " << entry.Status << '.'));
+ return;
+
+ default:
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE,
YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder()
<< "Failed to resolve table `" << JoinPath(entry.Path) << "`: " << entry.Status << '.'));
- } else {
- ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder()
- << "Failed to resolve table `" << JoinPath(entry.Path) << "`: " << entry.Status << '.'));
- }
- return;
+ return;
+
}
auto* table = TableKeys.FindTablePtr(entry.TableId);
@@ -125,7 +131,7 @@ private:
if (!columnId) {
timer.reset();
ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder()
+ YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
<< "Unknown column `" << columnName << "` at table `" << JoinPath(entry.Path) << "`."));
return;
}
@@ -191,7 +197,7 @@ private:
timer.reset();
ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder()
+ YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
<< "Failed to resolve table " << path << " keys: " << entry.Status << '.'));
return;
}
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 9aac53c984c..afb8c28f328 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -1121,6 +1121,13 @@ public:
return true;
}
+ void InvalidateQuery() {
+ auto invalidateEv = MakeHolder<TEvKqp::TEvCompileInvalidateRequest>(
+ QueryState->CompileResult->Uid, Settings.DbCounters);
+
+ Send(MakeKqpCompileServiceID(SelfId().NodeId()), invalidateEv.Release());
+ }
+
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
QueryState->Orbit = std::move(ev->Get()->Orbit);
@@ -1140,15 +1147,31 @@ public:
TransactionsToBeAborted.emplace_back(txCtx);
RemoveTransaction(QueryState->TxId);
+ auto status = response->GetStatus();
TIssues issues;
- issues.AddIssue(YqlIssue({}, TIssuesIds::CORE_EXEC, "Execution"));
- TIssues subIssues;
- IssuesFromMessage(response->GetIssues(), subIssues);
- for (auto& i : subIssues) {
- issues.back().AddSubIssue(MakeIntrusive<TIssue>(i));
+ IssuesFromMessage(response->GetIssues(), issues);
+
+ // Invalidate query cache on scheme/internal errors
+ switch (status) {
+ case Ydb::StatusIds::SCHEME_ERROR:
+ case Ydb::StatusIds::INTERNAL_ERROR:
+ InvalidateQuery();
+ issues.AddIssue(YqlIssue(TPosition(), TIssuesIds::KIKIMR_QUERY_INVALIDATED,
+ TStringBuilder() << "Query invalidated on scheme/internal error."));
+
+ // SCHEME_ERROR during execution is a soft (retriable) error, we abort query execution,
+ // invalidate query cache, and return ABORTED as retriable status.
+ if (status == Ydb::StatusIds::SCHEME_ERROR) {
+ status = Ydb::StatusIds::ABORTED;
+ }
+
+ break;
+
+ default:
+ break;
}
- ReplyQueryError(requestInfo, response->GetStatus(), "", MessageFromIssues(issues));
+ ReplyQueryError(requestInfo, status, "", MessageFromIssues(issues));
return;
}
diff --git a/ydb/core/kqp/ut/kqp_acl_ut.cpp b/ydb/core/kqp/ut/kqp_acl_ut.cpp
index 203c8ba6188..6d774d54a6e 100644
--- a/ydb/core/kqp/ut/kqp_acl_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_acl_ut.cpp
@@ -49,7 +49,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngineAcl) {
PRAGMA kikimr.UseNewEngine = "true";
SELECT * FROM `/Root/TwoShard`;
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
+ // TODO: Should be UNAUTHORIZED
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
}
{
auto result = session.ExecuteDataQuery(R"(
@@ -57,7 +58,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngineAcl) {
UPSERT INTO `/Root/TwoShard` (Key, Value1, Value2) VALUES
(10u, "One", -10);
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
+ // TODO: Should be UNAUTHORIZED
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
}
driver.Stop(true);
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index 1e6e8b844ac..3e8e86876a8 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1234,7 +1234,11 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto part = it.ReadNext().GetValueSync();
UNIT_ASSERT_EQUAL_C(part.GetStatus(), EStatus::PRECONDITION_FAILED, part.GetStatus());
- UNIT_ASSERT_STRINGS_EQUAL(part.GetIssues().back().GetSubIssues().back()->Message, "Requested too many execution units: 32");
+ part.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT(HasIssue(part.GetIssues(), NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
+ [](const NYql::TIssue& issue) {
+ return issue.Message.Contains("Requested too many execution units");
+ }));
part = it.ReadNext().GetValueSync();
UNIT_ASSERT(part.EOS());
diff --git a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
index 3ba730c872b..8b1e5545671 100644
--- a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
@@ -97,7 +97,7 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
auto it = client.StreamExecuteScanQuery(R"(
PRAGMA Kikimr.OptEnablePredicateExtract = "true";
SELECT OwnerId, PartIdx, Path, PathId
- FROM `/Root/.sys/partition_stats`
+ FROM `/Root/.sys/partition_stats`
WHERE
OwnerId = 72057594046644480ul
AND PathId = 5u
@@ -138,7 +138,7 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
PRAGMA Kikimr.OptEnablePredicateExtract = "true";
SELECT OwnerId, PartIdx, Path, PathId
- FROM `/Root/.sys/partition_stats`
+ FROM `/Root/.sys/partition_stats`
WHERE
OwnerId = 72057594046644480ul
AND PathId = 5u
@@ -464,7 +464,8 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
auto streamPart = it.ReadNext().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SCHEME_ERROR);
+ // TODO: Should be UNAUTHORIZED
+ UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::ABORTED);
driver.Stop(true);
}
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
index 8e14a1ce09a..8318b5e9dcb 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
@@ -90,7 +90,7 @@ Y_UNIT_TEST(ResolveTableError) {
auto event = ev.Get()->Get<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
event->Request->ErrorCount = 1;
auto& entries = event->Request->ResultSet;
- entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown;
+ entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
@@ -108,8 +108,7 @@ Y_UNIT_TEST(ResolveTableError) {
TIssues issues;
IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
- UNIT_ASSERT_C(HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
- "Failed to resolve table `Root/table-1`: PathErrorUnknown."), record.GetResponse().DebugString());
+ UNIT_ASSERT(HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE));
}
Y_UNIT_TEST_NEW_ENGINE(ProposeError) {
@@ -199,8 +198,8 @@ Y_UNIT_TEST_NEW_ENGINE(ProposeError) {
if (UseNewEngine) {
test(TEvProposeTransactionResult::ERROR,
- Ydb::StatusIds::SCHEME_ERROR,
- NYql::TIssuesIds::KIKIMR_SCHEME_ERROR,
+ Ydb::StatusIds::ABORTED,
+ NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH,
"blah-blah-blah",
[](NKikimrTxDataShard::TEvProposeTransactionResult& x) {
auto* error = x.MutableError()->Add();
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index f1b90a0cd1a..4df58ff07a4 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -3750,7 +3750,7 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query, bool
auto& response = ev->Get()->Record.GetRef();
Cerr << response.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED);
- auto& issue = response.GetResponse().GetQueryIssues(0).Getissues(0);
+ auto& issue = response.GetResponse().GetQueryIssues(0);
UNIT_ASSERT_VALUES_EQUAL(issue.issue_code(), (int) NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH);
UNIT_ASSERT_STRINGS_EQUAL(issue.message(), "Table \'/Root/table-1\' scheme changed.");
}
diff --git a/ydb/tests/functional/rename/test.py b/ydb/tests/functional/rename/test.py
index 01f5e421b8a..751d1382ad3 100644
--- a/ydb/tests/functional/rename/test.py
+++ b/ydb/tests/functional/rename/test.py
@@ -141,6 +141,7 @@ class Simple:
upsert_table_template = (
"""
+ PRAGMA Kikimr.UseNewEngine="True";
DECLARE $key AS Uint64;
DECLARE $value AS Utf8;
@@ -152,6 +153,7 @@ class Simple:
select_table_template = (
"""
+ PRAGMA Kikimr.UseNewEngine="True";
DECLARE $key AS Uint64;
SELECT value FROM `{table}` WHERE id = $key;
"""
@@ -159,6 +161,7 @@ class Simple:
select_index_table_template = (
"""
+ PRAGMA Kikimr.UseNewEngine="True";
DECLARE $value AS Utf8;
SELECT id FROM `{table}` VIEW `value_index` WHERE value = $value;
"""