diff options
author | Nikita Vasilev <[email protected]> | 2025-03-12 11:32:37 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-03-12 08:32:37 +0000 |
commit | 3acfcb782ac01a2fa77f50b68f7995e3a5b44e17 (patch) | |
tree | 97a9e7adbe1f075747710c664500551b13733f0f | |
parent | 9059c4534bd1b0fb20ba2426eb59640415e24113 (diff) |
Fix sink errors (#15579)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 85 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp | 140 |
2 files changed, 176 insertions, 49 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 253f266dc7c..5225e2fddea 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -473,7 +473,7 @@ public: SchemeEntry = resultSet[0]; CA_LOG_D("Resolved TableId=" << TableId << " (" - << TableId.PathId.ToString() << " " + << TablePath << " " << TableId.SchemaVersion << ")"); if (TableId.SchemaVersion != SchemeEntry->TableId.SchemaVersion) { @@ -551,7 +551,7 @@ public: switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { CA_LOG_E("Got UNSPECIFIED for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -559,7 +559,7 @@ public: RuntimeError( NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssuesIds::DEFAULT_ERROR, - TStringBuilder() << "Unspecified error for table `" + TStringBuilder() << "Unspecified error. Table `" << TablePath << "`.", getIssues()); return; @@ -574,7 +574,7 @@ public: } case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED: { CA_LOG_E("Got ABORTED for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -582,14 +582,13 @@ public: RuntimeError( NYql::NDqProto::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, - TStringBuilder() << "Aborted for table `" - << TablePath << "`.", + TStringBuilder() << "Operation aborted.", getIssues()); return; } case NKikimrDataEvents::TEvWriteResult::STATUS_WRONG_SHARD_STATE: CA_LOG_E("Got WRONG SHARD STATE for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -602,14 +601,14 @@ public: RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, - TStringBuilder() << "Wrong shard state for table `" + TStringBuilder() << "Wrong shard state. Table `" << TablePath << "`.", getIssues()); } return; case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR: { CA_LOG_E("Got INTERNAL ERROR for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -617,14 +616,13 @@ public: RuntimeError( NYql::NDqProto::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR, - TStringBuilder() << "Internal error for table `" - << TablePath << "`.", + TStringBuilder() << "Internal error while executing transaction.", getIssues()); return; } case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED: { CA_LOG_E("Got DISK_SPACE_EXHAUSTED for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -632,14 +630,14 @@ public: RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED, - TStringBuilder() << "Disk space exhausted for table `" + TStringBuilder() << "Disk space exhausted. Table `" << TablePath << "`.", getIssues()); return; } case NKikimrDataEvents::TEvWriteResult::STATUS_OUT_OF_SPACE: { CA_LOG_W("Got OUT_OF_SPACE for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << " Ignored this error." @@ -658,7 +656,7 @@ public: } case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: { CA_LOG_W("Got OVERLOADED for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << " Ignored this error." @@ -669,7 +667,8 @@ public: RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, NYql::TIssuesIds::KIKIMR_OVERLOADED, - TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded. Table `" + TStringBuilder() << "Kikimr cluster or one of its subsystems is overloaded." + << " Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded. Table `" << TablePath << "`.", getIssues()); } @@ -677,7 +676,7 @@ public: } case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED: { CA_LOG_E("Got CANCELLED for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -685,14 +684,13 @@ public: RuntimeError( NYql::NDqProto::StatusIds::CANCELLED, NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED, - TStringBuilder() << "Cancelled request to table `" - << TablePath << "`.", + TStringBuilder() << "Operation cancelled.", getIssues()); return; } case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST: { CA_LOG_E("Got BAD REQUEST for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -707,7 +705,7 @@ public: } case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED: { CA_LOG_E("Got SCHEME CHANGED for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -727,7 +725,7 @@ public: } case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: { CA_LOG_E("Got LOCKS BROKEN for table `" - << TableId.PathId.ToString() << "`." + << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -744,7 +742,7 @@ public: return; } case NKikimrDataEvents::TEvWriteResult::STATUS_CONSTRAINT_VIOLATION: { - CA_LOG_E("Got CONSTRAINT VIOLATION for table." + CA_LOG_E("Got CONSTRAINT VIOLATION for table `" << TablePath << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); @@ -1022,7 +1020,8 @@ public: NYql::NDqProto::StatusIds::UNDETERMINED, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, TStringBuilder() - << "Error writing to table `" << TableId.PathId.ToString() << "`" + << "State of operation is unknown. " + << "Error writing to table `" << TablePath << "`" << ". Transaction state unknown for tablet " << ev->Get()->TabletId << "."); } else { TxManager->SetError(ev->Get()->TabletId); @@ -1030,7 +1029,8 @@ public: NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() - << "Error writing to table `" << TableId.PathId.ToString() << "`" + << "Kikimr cluster or one of its subsystems was unavailable. " + << "Error writing to table `" << TablePath << "`" << ": can't deliver message to tablet " << ev->Get()->TabletId << "."); } } @@ -2237,7 +2237,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, - TStringBuilder() << "Failed to deviler message to coordinator.", + TStringBuilder() << "Kikimr cluster or one of its subsystems was unavailable. Failed to deviler message to coordinator.", {}); return; } @@ -2250,7 +2250,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNDETERMINED, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, - TStringBuilder() << "Failed to deviler message to coordinator.", + TStringBuilder() << "State of operation is unknown. Failed to deviler message to coordinator.", {}); return; } @@ -2259,13 +2259,13 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNDETERMINED, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, - TStringBuilder() << "Failed to deviler message.", + TStringBuilder() << "State of operation is unknown. Failed to deviler message.", {}); } else { ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, - TStringBuilder() << "Failed to deviler message.", + TStringBuilder() << "Kikimr cluster or one of its subsystems was unavailable. Failed to deviler message.", {}); } } @@ -2342,7 +2342,7 @@ public: } builder << "`" << path << "`"; } - return builder; + return (tableInfo.Pathes.size() == 1 ? "Table: " : "Tables: ") + builder; }; // TODO: get rid of copy-paste @@ -2356,7 +2356,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssuesIds::DEFAULT_ERROR, - TStringBuilder() << "Unspecified error for tables `" << getPathes() << "`. " + TStringBuilder() << "Unspecified error. " << getPathes() << ". " << getIssues().ToOneLineString(), getIssues()); return; @@ -2378,7 +2378,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, - TStringBuilder() << "Aborted for tables " << getPathes() << ". ", + TStringBuilder() << "Operation aborted.", getIssues()); return; } @@ -2391,7 +2391,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, - TStringBuilder() << "Wrong shard state for tables " << getPathes() << ".", + TStringBuilder() << "Wrong shard state. " << getPathes() << ".", getIssues()); return; } @@ -2404,7 +2404,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR, - TStringBuilder() << "Internal error for tables " << getPathes() << ".", + TStringBuilder() << "Internal error while executing transaction.", getIssues()); return; } @@ -2417,7 +2417,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED, - TStringBuilder() << "Disk space exhausted for tables " << getPathes() << ".", + TStringBuilder() << "Disk space exhausted. " << getPathes() << ".", getIssues()); return; } @@ -2431,7 +2431,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::OVERLOADED, NYql::TIssuesIds::KIKIMR_OVERLOADED, - TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << "(" << getPathes() << ")" << " is out of space.", + TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is out of space. " << getPathes() << ".", getIssues()); return; } @@ -2445,7 +2445,9 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::OVERLOADED, NYql::TIssuesIds::KIKIMR_OVERLOADED, - TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << "(" << getPathes() << ")" << " is overloaded.", + TStringBuilder() << "Kikimr cluster or one of its subsystems is overloaded." + << " Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded." + << " " << getPathes() << ".", getIssues()); return; } @@ -2458,7 +2460,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::CANCELLED, NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED, - TStringBuilder() << "Cancelled request to tables " << getPathes() << ".", + TStringBuilder() << "Operation cancelled.", getIssues()); return; } @@ -2471,7 +2473,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::BAD_REQUEST, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, - TStringBuilder() << "Bad request. Tables: " << getPathes() << ".", + TStringBuilder() << "Bad request. " << getPathes() << ".", getIssues()); return; } @@ -2484,7 +2486,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::SCHEME_ERROR, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, - TStringBuilder() << "Scheme changed. Tables: " << getPathes() << ".", + TStringBuilder() << "Scheme changed. " << getPathes() << ".", getIssues()); return; } @@ -2501,7 +2503,6 @@ public: NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, TStringBuilder() << "Transaction locks invalidated. " - << (TxManager->GetShardTableInfo(ev->Get()->Record.GetOrigin()).Pathes.size() == 1 ? "Table: " : "Tables: ") << getPathes() << ".", getIssues()); return; @@ -2515,7 +2516,7 @@ public: ReplyErrorAndDie( NYql::NDqProto::StatusIds::PRECONDITION_FAILED, NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, - TStringBuilder() << "Constraint violated. Tables: " << getPathes() << ".", + TStringBuilder() << "Constraint violated. " << getPathes() << ".", getIssues()); return; } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp index 58bda03fd2a..2bd35adb68e 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp @@ -16,12 +16,12 @@ using namespace NKikimrTxDataShard; namespace { -bool HasIssue(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate = {}) { +bool HasIssueImpl(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate, bool contains) { bool hasIssue = false; for (auto& issue : issues) { WalkThroughIssues(issue, false, [&] (const TIssue& issue, int) { - if (!hasIssue && issue.GetCode() == code && (!message || message == issue.GetMessage())) { + if (!hasIssue && issue.GetCode() == code && (!message || message == issue.GetMessage() || (contains && issue.GetMessage().Contains(message)))) { hasIssue = !predicate || predicate(issue); } }); @@ -30,14 +30,25 @@ bool HasIssue(const TIssues& issues, ui32 code, TStringBuf message, std::functio return hasIssue; } +bool HasIssue(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate = {}) { + return HasIssueImpl(issues, code, message, predicate, false); +} + +bool HasIssueContains(const TIssues& issues, ui32 code, TStringBuf message, std::function<bool(const TIssue& issue)> predicate = {}) { + return HasIssueImpl(issues, code, message, predicate, true); +} + } // anonymous namespace class TLocalFixture { public: - TLocalFixture(bool enableResourcePools = true) { + TLocalFixture(bool enableResourcePools = true, std::optional<bool> enableOltpSink = std::nullopt) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableFeatureFlags()->SetEnableResourcePools(enableResourcePools); + if (enableOltpSink) { + app.MutableTableServiceConfig()->SetEnableOltpSink(*enableOltpSink); + } TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetNodeCount(2) @@ -227,6 +238,107 @@ Y_UNIT_TEST(ProposeError) { "Error executing transaction: transaction failed."); } +Y_UNIT_TEST(ProposeErrorEvWrite) { + TLocalFixture fixture(true, true); + THashSet<TActorId> knownExecuters; + + using TMod = std::function<void(NKikimrDataEvents::TEvWriteResult&)>; + + auto test = [&](auto proposeStatus, auto ydbStatus, auto issue, auto issueMessage, TMod mod = {}) { + auto client = fixture.Runtime->AllocateEdgeActor(); + + bool done = false; + auto mitm = [&](TAutoPtr<IEventHandle> &ev) { + if (!done && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWriteResult::EventType && + !knownExecuters.contains(ev->Recipient)) + { + auto event = ev.Get()->Get<NKikimr::NEvents::TDataEvents::TEvWriteResult>(); + event->Record.SetStatus(proposeStatus); + if (mod) { + mod(event->Record); + } + knownExecuters.insert(ev->Recipient); + done = true; + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + fixture.Runtime->SetObserverFunc(mitm); + + SendRequest(*fixture.Runtime, client, MakeSQLRequest(Q_("upsert into `/Root/table-1` (key, value) values (5, 5);"))); + + auto ev = fixture.Runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(client); + auto& record = ev->Get()->Record; + UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), ydbStatus, record.DebugString()); + + // Cerr << record.DebugString() << Endl; + + TIssues issues; + IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); + UNIT_ASSERT_C(HasIssueContains(issues, issue, issueMessage), "issue not found, issue: " << (int) issue + << ", message: " << issueMessage << ", response: " << record.GetResponse().DebugString()); + }; + + test(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, // propose error + Ydb::StatusIds::OVERLOADED, // ydb status + NYql::TIssuesIds::KIKIMR_OVERLOADED, // issue status + "Kikimr cluster or one of its subsystems is overloaded."); // main issue message (more detailed info can be in subissues) + + test(NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, + Ydb::StatusIds::STATUS_CODE_UNSPECIFIED, + NYql::TIssuesIds::DEFAULT_ERROR, + "Unspecified error."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED, + Ydb::StatusIds::ABORTED, + NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, + "Operation aborted."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, + Ydb::StatusIds::INTERNAL_ERROR, + NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR, + "Internal error while executing transaction."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, + Ydb::StatusIds::CANCELLED, + NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED, + "Operation cancelled."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, + Ydb::StatusIds::BAD_REQUEST, + NYql::TIssuesIds::KIKIMR_BAD_REQUEST, + "Bad request."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED, + Ydb::StatusIds::ABORTED, + NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, + "Scheme changed."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, + Ydb::StatusIds::ABORTED, + NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + "Transaction locks invalidated."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED, + Ydb::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED, + "Disk space exhausted."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_WRONG_SHARD_STATE, + Ydb::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + "Wrong shard state."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_CONSTRAINT_VIOLATION, + Ydb::StatusIds::PRECONDITION_FAILED, + NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, + "Constraint violated."); + + test(NKikimrDataEvents::TEvWriteResult::STATUS_OUT_OF_SPACE, + Ydb::StatusIds::OVERLOADED, + NYql::TIssuesIds::KIKIMR_OVERLOADED, + "out of space."); +} + void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TString& query, std::function<void(const NKikimrKqp::TEvQueryResponse& resp)> fn) { @@ -236,7 +348,8 @@ void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TS auto prev = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvPipeCache::TEvForward::EventType) { auto* fe = ev.Get()->Get<TEvPipeCache::TEvForward>(); - if (fe->Ev->Type() == TEvDataShard::TEvProposeTransaction::EventType) { + if (fe->Ev->Type() == TEvDataShard::TEvProposeTransaction::EventType + || fe->Ev->Type() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) { executer = ev->Sender; // Cerr << "-- executer: " << executer << Endl; return TTestActorRuntime::EEventAction::PROCESS; @@ -256,6 +369,19 @@ void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TS } } + if (ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWriteResult::EventType) { + auto* msg = ev.Get()->Get<NKikimr::NEvents::TDataEvents::TEvWriteResult>(); + if (msg->Record.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) { + if (ev->Sender.NodeId() == executer.NodeId()) { + ++droppedEvents; + // Cerr << "-- send undelivery to " << ev->Recipient << ", executer: " << executer << Endl; + runtime.Send(new IEventHandle(executer, ev->Sender, + new TEvPipeCache::TEvDeliveryProblem(msg->Record.GetOrigin(), /* NotDelivered */ false))); + return TTestActorRuntime::EEventAction::DROP; + } + } + } + return TTestActorRuntime::EEventAction::PROCESS; }); SendRequest(runtime, client, MakeSQLRequest(query)); @@ -270,8 +396,8 @@ void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TS runtime.SetObserverFunc(prev); } -Y_UNIT_TEST(ProposeResultLost_RwTx) { - TLocalFixture fixture; +Y_UNIT_TEST_TWIN(ProposeResultLost_RwTx, UseSink) { + TLocalFixture fixture(true, UseSink); TestProposeResultLost(*fixture.Runtime, fixture.Client, Q_(R"( upsert into `/Root/table-1` (key, value) VALUES @@ -283,7 +409,7 @@ Y_UNIT_TEST(ProposeResultLost_RwTx) { TIssues issues; IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); UNIT_ASSERT_C( - HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + HasIssueContains(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, "Kikimr cluster or one of its subsystems was unavailable."), record.GetResponse().DebugString()); }); |