summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <[email protected]>2025-03-12 11:32:37 +0300
committerGitHub <[email protected]>2025-03-12 08:32:37 +0000
commit3acfcb782ac01a2fa77f50b68f7995e3a5b44e17 (patch)
tree97a9e7adbe1f075747710c664500551b13733f0f
parent9059c4534bd1b0fb20ba2426eb59640415e24113 (diff)
Fix sink errors (#15579)
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp85
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp140
2 files changed, 176 insertions, 49 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 253f266dc7c..5225e2fddea 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -473,7 +473,7 @@ public:
SchemeEntry = resultSet[0];
CA_LOG_D("Resolved TableId=" << TableId << " ("
- << TableId.PathId.ToString() << " "
+ << TablePath << " "
<< TableId.SchemaVersion << ")");
if (TableId.SchemaVersion != SchemeEntry->TableId.SchemaVersion) {
@@ -551,7 +551,7 @@ public:
switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
CA_LOG_E("Got UNSPECIFIED for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -559,7 +559,7 @@ public:
RuntimeError(
NYql::NDqProto::StatusIds::UNSPECIFIED,
NYql::TIssuesIds::DEFAULT_ERROR,
- TStringBuilder() << "Unspecified error for table `"
+ TStringBuilder() << "Unspecified error. Table `"
<< TablePath << "`.",
getIssues());
return;
@@ -574,7 +574,7 @@ public:
}
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED: {
CA_LOG_E("Got ABORTED for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -582,14 +582,13 @@ public:
RuntimeError(
NYql::NDqProto::StatusIds::ABORTED,
NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED,
- TStringBuilder() << "Aborted for table `"
- << TablePath << "`.",
+ TStringBuilder() << "Operation aborted.",
getIssues());
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_WRONG_SHARD_STATE:
CA_LOG_E("Got WRONG SHARD STATE for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -602,14 +601,14 @@ public:
RuntimeError(
NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
- TStringBuilder() << "Wrong shard state for table `"
+ TStringBuilder() << "Wrong shard state. Table `"
<< TablePath << "`.",
getIssues());
}
return;
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR: {
CA_LOG_E("Got INTERNAL ERROR for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -617,14 +616,13 @@ public:
RuntimeError(
NYql::NDqProto::StatusIds::INTERNAL_ERROR,
NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR,
- TStringBuilder() << "Internal error for table `"
- << TablePath << "`.",
+ TStringBuilder() << "Internal error while executing transaction.",
getIssues());
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED: {
CA_LOG_E("Got DISK_SPACE_EXHAUSTED for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -632,14 +630,14 @@ public:
RuntimeError(
NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED,
- TStringBuilder() << "Disk space exhausted for table `"
+ TStringBuilder() << "Disk space exhausted. Table `"
<< TablePath << "`.",
getIssues());
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OUT_OF_SPACE: {
CA_LOG_W("Got OUT_OF_SPACE for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< " Ignored this error."
@@ -658,7 +656,7 @@ public:
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
CA_LOG_W("Got OVERLOADED for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< " Ignored this error."
@@ -669,7 +667,8 @@ public:
RuntimeError(
NYql::NDqProto::StatusIds::OVERLOADED,
NYql::TIssuesIds::KIKIMR_OVERLOADED,
- TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded. Table `"
+ TStringBuilder() << "Kikimr cluster or one of its subsystems is overloaded."
+ << " Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded. Table `"
<< TablePath << "`.",
getIssues());
}
@@ -677,7 +676,7 @@ public:
}
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED: {
CA_LOG_E("Got CANCELLED for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -685,14 +684,13 @@ public:
RuntimeError(
NYql::NDqProto::StatusIds::CANCELLED,
NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED,
- TStringBuilder() << "Cancelled request to table `"
- << TablePath << "`.",
+ TStringBuilder() << "Operation cancelled.",
getIssues());
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST: {
CA_LOG_E("Got BAD REQUEST for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -707,7 +705,7 @@ public:
}
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED: {
CA_LOG_E("Got SCHEME CHANGED for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -727,7 +725,7 @@ public:
}
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
CA_LOG_E("Got LOCKS BROKEN for table `"
- << TableId.PathId.ToString() << "`."
+ << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -744,7 +742,7 @@ public:
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_CONSTRAINT_VIOLATION: {
- CA_LOG_E("Got CONSTRAINT VIOLATION for table."
+ CA_LOG_E("Got CONSTRAINT VIOLATION for table `" << TablePath << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -1022,7 +1020,8 @@ public:
NYql::NDqProto::StatusIds::UNDETERMINED,
NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
TStringBuilder()
- << "Error writing to table `" << TableId.PathId.ToString() << "`"
+ << "State of operation is unknown. "
+ << "Error writing to table `" << TablePath << "`"
<< ". Transaction state unknown for tablet " << ev->Get()->TabletId << ".");
} else {
TxManager->SetError(ev->Get()->TabletId);
@@ -1030,7 +1029,8 @@ public:
NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
TStringBuilder()
- << "Error writing to table `" << TableId.PathId.ToString() << "`"
+ << "Kikimr cluster or one of its subsystems was unavailable. "
+ << "Error writing to table `" << TablePath << "`"
<< ": can't deliver message to tablet " << ev->Get()->TabletId << ".");
}
}
@@ -2237,7 +2237,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
- TStringBuilder() << "Failed to deviler message to coordinator.",
+ TStringBuilder() << "Kikimr cluster or one of its subsystems was unavailable. Failed to deviler message to coordinator.",
{});
return;
}
@@ -2250,7 +2250,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::UNDETERMINED,
NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
- TStringBuilder() << "Failed to deviler message to coordinator.",
+ TStringBuilder() << "State of operation is unknown. Failed to deviler message to coordinator.",
{});
return;
}
@@ -2259,13 +2259,13 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::UNDETERMINED,
NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
- TStringBuilder() << "Failed to deviler message.",
+ TStringBuilder() << "State of operation is unknown. Failed to deviler message.",
{});
} else {
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
- TStringBuilder() << "Failed to deviler message.",
+ TStringBuilder() << "Kikimr cluster or one of its subsystems was unavailable. Failed to deviler message.",
{});
}
}
@@ -2342,7 +2342,7 @@ public:
}
builder << "`" << path << "`";
}
- return builder;
+ return (tableInfo.Pathes.size() == 1 ? "Table: " : "Tables: ") + builder;
};
// TODO: get rid of copy-paste
@@ -2356,7 +2356,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::UNSPECIFIED,
NYql::TIssuesIds::DEFAULT_ERROR,
- TStringBuilder() << "Unspecified error for tables `" << getPathes() << "`. "
+ TStringBuilder() << "Unspecified error. " << getPathes() << ". "
<< getIssues().ToOneLineString(),
getIssues());
return;
@@ -2378,7 +2378,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::ABORTED,
NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED,
- TStringBuilder() << "Aborted for tables " << getPathes() << ". ",
+ TStringBuilder() << "Operation aborted.",
getIssues());
return;
}
@@ -2391,7 +2391,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
- TStringBuilder() << "Wrong shard state for tables " << getPathes() << ".",
+ TStringBuilder() << "Wrong shard state. " << getPathes() << ".",
getIssues());
return;
}
@@ -2404,7 +2404,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::INTERNAL_ERROR,
NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR,
- TStringBuilder() << "Internal error for tables " << getPathes() << ".",
+ TStringBuilder() << "Internal error while executing transaction.",
getIssues());
return;
}
@@ -2417,7 +2417,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED,
- TStringBuilder() << "Disk space exhausted for tables " << getPathes() << ".",
+ TStringBuilder() << "Disk space exhausted. " << getPathes() << ".",
getIssues());
return;
}
@@ -2431,7 +2431,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::OVERLOADED,
NYql::TIssuesIds::KIKIMR_OVERLOADED,
- TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << "(" << getPathes() << ")" << " is out of space.",
+ TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is out of space. " << getPathes() << ".",
getIssues());
return;
}
@@ -2445,7 +2445,9 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::OVERLOADED,
NYql::TIssuesIds::KIKIMR_OVERLOADED,
- TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << "(" << getPathes() << ")" << " is overloaded.",
+ TStringBuilder() << "Kikimr cluster or one of its subsystems is overloaded."
+ << " Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded."
+ << " " << getPathes() << ".",
getIssues());
return;
}
@@ -2458,7 +2460,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::CANCELLED,
NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED,
- TStringBuilder() << "Cancelled request to tables " << getPathes() << ".",
+ TStringBuilder() << "Operation cancelled.",
getIssues());
return;
}
@@ -2471,7 +2473,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::BAD_REQUEST,
NYql::TIssuesIds::KIKIMR_BAD_REQUEST,
- TStringBuilder() << "Bad request. Tables: " << getPathes() << ".",
+ TStringBuilder() << "Bad request. " << getPathes() << ".",
getIssues());
return;
}
@@ -2484,7 +2486,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::SCHEME_ERROR,
NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH,
- TStringBuilder() << "Scheme changed. Tables: " << getPathes() << ".",
+ TStringBuilder() << "Scheme changed. " << getPathes() << ".",
getIssues());
return;
}
@@ -2501,7 +2503,6 @@ public:
NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
TStringBuilder()
<< "Transaction locks invalidated. "
- << (TxManager->GetShardTableInfo(ev->Get()->Record.GetOrigin()).Pathes.size() == 1 ? "Table: " : "Tables: ")
<< getPathes() << ".",
getIssues());
return;
@@ -2515,7 +2516,7 @@ public:
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION,
- TStringBuilder() << "Constraint violated. Tables: " << getPathes() << ".",
+ TStringBuilder() << "Constraint violated. " << getPathes() << ".",
getIssues());
return;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
index 58bda03fd2a..2bd35adb68e 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
@@ -16,12 +16,12 @@ using namespace NKikimrTxDataShard;
namespace {
-bool HasIssue(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate = {}) {
+bool HasIssueImpl(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate, bool contains) {
bool hasIssue = false;
for (auto& issue : issues) {
WalkThroughIssues(issue, false, [&] (const TIssue& issue, int) {
- if (!hasIssue && issue.GetCode() == code && (!message || message == issue.GetMessage())) {
+ if (!hasIssue && issue.GetCode() == code && (!message || message == issue.GetMessage() || (contains && issue.GetMessage().Contains(message)))) {
hasIssue = !predicate || predicate(issue);
}
});
@@ -30,14 +30,25 @@ bool HasIssue(const TIssues& issues, ui32 code, TStringBuf message, std::functio
return hasIssue;
}
+bool HasIssue(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate = {}) {
+ return HasIssueImpl(issues, code, message, predicate, false);
+}
+
+bool HasIssueContains(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate = {}) {
+ return HasIssueImpl(issues, code, message, predicate, true);
+}
+
} // anonymous namespace
class TLocalFixture {
public:
- TLocalFixture(bool enableResourcePools = true) {
+ TLocalFixture(bool enableResourcePools = true, std::optional<bool> enableOltpSink = std::nullopt) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableFeatureFlags()->SetEnableResourcePools(enableResourcePools);
+ if (enableOltpSink) {
+ app.MutableTableServiceConfig()->SetEnableOltpSink(*enableOltpSink);
+ }
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetNodeCount(2)
@@ -227,6 +238,107 @@ Y_UNIT_TEST(ProposeError) {
"Error executing transaction: transaction failed.");
}
+Y_UNIT_TEST(ProposeErrorEvWrite) {
+ TLocalFixture fixture(true, true);
+ THashSet<TActorId> knownExecuters;
+
+ using TMod = std::function<void(NKikimrDataEvents::TEvWriteResult&)>;
+
+ auto test = [&](auto proposeStatus, auto ydbStatus, auto issue, auto issueMessage, TMod mod = {}) {
+ auto client = fixture.Runtime->AllocateEdgeActor();
+
+ bool done = false;
+ auto mitm = [&](TAutoPtr<IEventHandle> &ev) {
+ if (!done && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWriteResult::EventType &&
+ !knownExecuters.contains(ev->Recipient))
+ {
+ auto event = ev.Get()->Get<NKikimr::NEvents::TDataEvents::TEvWriteResult>();
+ event->Record.SetStatus(proposeStatus);
+ if (mod) {
+ mod(event->Record);
+ }
+ knownExecuters.insert(ev->Recipient);
+ done = true;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ fixture.Runtime->SetObserverFunc(mitm);
+
+ SendRequest(*fixture.Runtime, client, MakeSQLRequest(Q_("upsert into `/Root/table-1` (key, value) values (5, 5);")));
+
+ auto ev = fixture.Runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(client);
+ auto& record = ev->Get()->Record;
+ UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), ydbStatus, record.DebugString());
+
+ // Cerr << record.DebugString() << Endl;
+
+ TIssues issues;
+ IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
+ UNIT_ASSERT_C(HasIssueContains(issues, issue, issueMessage), "issue not found, issue: " << (int) issue
+ << ", message: " << issueMessage << ", response: " << record.GetResponse().DebugString());
+ };
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, // propose error
+ Ydb::StatusIds::OVERLOADED, // ydb status
+ NYql::TIssuesIds::KIKIMR_OVERLOADED, // issue status
+ "Kikimr cluster or one of its subsystems is overloaded."); // main issue message (more detailed info can be in subissues)
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED,
+ Ydb::StatusIds::STATUS_CODE_UNSPECIFIED,
+ NYql::TIssuesIds::DEFAULT_ERROR,
+ "Unspecified error.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED,
+ Ydb::StatusIds::ABORTED,
+ NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED,
+ "Operation aborted.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR,
+ Ydb::StatusIds::INTERNAL_ERROR,
+ NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR,
+ "Internal error while executing transaction.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED,
+ Ydb::StatusIds::CANCELLED,
+ NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED,
+ "Operation cancelled.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,
+ Ydb::StatusIds::BAD_REQUEST,
+ NYql::TIssuesIds::KIKIMR_BAD_REQUEST,
+ "Bad request.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED,
+ Ydb::StatusIds::ABORTED,
+ NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH,
+ "Scheme changed.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN,
+ Ydb::StatusIds::ABORTED,
+ NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
+ "Transaction locks invalidated.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED,
+ Ydb::StatusIds::UNAVAILABLE,
+ NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED,
+ "Disk space exhausted.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_WRONG_SHARD_STATE,
+ Ydb::StatusIds::UNAVAILABLE,
+ NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
+ "Wrong shard state.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_CONSTRAINT_VIOLATION,
+ Ydb::StatusIds::PRECONDITION_FAILED,
+ NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION,
+ "Constraint violated.");
+
+ test(NKikimrDataEvents::TEvWriteResult::STATUS_OUT_OF_SPACE,
+ Ydb::StatusIds::OVERLOADED,
+ NYql::TIssuesIds::KIKIMR_OVERLOADED,
+ "out of space.");
+}
+
void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TString& query,
std::function<void(const NKikimrKqp::TEvQueryResponse& resp)> fn)
{
@@ -236,7 +348,8 @@ void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TS
auto prev = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvPipeCache::TEvForward::EventType) {
auto* fe = ev.Get()->Get<TEvPipeCache::TEvForward>();
- if (fe->Ev->Type() == TEvDataShard::TEvProposeTransaction::EventType) {
+ if (fe->Ev->Type() == TEvDataShard::TEvProposeTransaction::EventType
+ || fe->Ev->Type() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
executer = ev->Sender;
// Cerr << "-- executer: " << executer << Endl;
return TTestActorRuntime::EEventAction::PROCESS;
@@ -256,6 +369,19 @@ void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TS
}
}
+ if (ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWriteResult::EventType) {
+ auto* msg = ev.Get()->Get<NKikimr::NEvents::TDataEvents::TEvWriteResult>();
+ if (msg->Record.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) {
+ if (ev->Sender.NodeId() == executer.NodeId()) {
+ ++droppedEvents;
+ // Cerr << "-- send undelivery to " << ev->Recipient << ", executer: " << executer << Endl;
+ runtime.Send(new IEventHandle(executer, ev->Sender,
+ new TEvPipeCache::TEvDeliveryProblem(msg->Record.GetOrigin(), /* NotDelivered */ false)));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ }
+
return TTestActorRuntime::EEventAction::PROCESS;
});
SendRequest(runtime, client, MakeSQLRequest(query));
@@ -270,8 +396,8 @@ void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TS
runtime.SetObserverFunc(prev);
}
-Y_UNIT_TEST(ProposeResultLost_RwTx) {
- TLocalFixture fixture;
+Y_UNIT_TEST_TWIN(ProposeResultLost_RwTx, UseSink) {
+ TLocalFixture fixture(true, UseSink);
TestProposeResultLost(*fixture.Runtime, fixture.Client,
Q_(R"(
upsert into `/Root/table-1` (key, value) VALUES
@@ -283,7 +409,7 @@ Y_UNIT_TEST(ProposeResultLost_RwTx) {
TIssues issues;
IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
UNIT_ASSERT_C(
- HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
+ HasIssueContains(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
"Kikimr cluster or one of its subsystems was unavailable."),
record.GetResponse().DebugString());
});