summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <[email protected]>2025-04-22 09:55:22 +0300
committerGitHub <[email protected]>2025-04-22 09:55:22 +0300
commit37c19de3f39ce3f5b1be0192317ad6129a2c7b91 (patch)
tree1734a41e8a67bf94787d826db785e25000d6fe63
parent52cd0e15ab5b65d821df60f3a1e4c2450c99fd9a (diff)
The PQ tablet does not receive a TEvTxCalcPredicateResult (#17497)
-rw-r--r--ydb/core/persqueue/partition.cpp2
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp64
2 files changed, 65 insertions, 1 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index cd384de2734..08e28652ffe 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2256,7 +2256,7 @@ void TPartition::AnswerCurrentReplies(const TActorContext& ctx)
TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t)
{
auto result = EProcessResult::Continue;
- if (t->SupportivePartitionActor && !t->WriteInfo) { // Pending for write info
+ if (t->SupportivePartitionActor && !t->WriteInfo && !t->WriteInfoApplied) { // Pending for write info
return EProcessResult::NotReady;
}
if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index a7d5385d657..95bea8c420f 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -315,6 +315,15 @@ protected:
void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode);
void WaitForDeletePartitionDone();
+ void SendCalcPredicate(ui64 step,
+ ui64 txId,
+ const TActorId& suppPartitionId);
+ void WaitForGetWriteInfoRequest();
+ void SendGetWriteInfoError(ui32 internalPartitionId,
+ TString message,
+ const TActorId& suppPartitionId);
+ void WaitForCalcPredicateResult(ui64 txId, bool predicate);
+
TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;
@@ -2512,6 +2521,47 @@ void TPartitionFixture::CmdChangeOwner(ui64 cookie, const TString& sourceId, TDu
ownerCookie = event->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
}
+void TPartitionFixture::SendCalcPredicate(ui64 step,
+ ui64 txId,
+ const TActorId& suppPartitionId)
+{
+ SendCalcPredicate(step, txId, "", 0, 0, suppPartitionId);
+}
+
+void TPartitionFixture::WaitForGetWriteInfoRequest()
+{
+ auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvGetWriteInfoRequest>();
+ UNIT_ASSERT(event != nullptr);
+ //UNIT_ASSERT_VALUES_EQUAL(event->OriginalPartition, ActorId);
+}
+
+void TPartitionFixture::SendGetWriteInfoError(ui32 internalPartitionId,
+ TString message,
+ const TActorId& suppPartitionId)
+{
+ auto event = MakeHolder<TEvPQ::TEvGetWriteInfoError>(internalPartitionId,
+ std::move(message));
+ //event->SupportivePartition = suppPartitionId;
+
+ Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, suppPartitionId, event.Release()));
+}
+
+void TPartitionFixture::WaitForCalcPredicateResult(ui64 txId, bool predicate)
+{
+ while (true) {
+ TAutoPtr<IEventHandle> handle;
+ auto events =
+ Ctx->Runtime->GrabEdgeEvents<TEvPQ::TEvTxCalcPredicateResult, TEvKeyValue::TEvRequest>(handle,
+ TDuration::Seconds(1));
+ if (std::get<TEvKeyValue::TEvRequest*>(events)) {
+ SendDiskStatusResponse(nullptr);
+ } else if (auto* event = std::get<TEvPQ::TEvTxCalcPredicateResult*>(events)) {
+ UNIT_ASSERT_VALUES_EQUAL(event->TxId, txId);
+ UNIT_ASSERT_VALUES_EQUAL(event->Predicate, predicate);
+ break;
+ }
+ }
+}
Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture)
{
@@ -3614,6 +3664,20 @@ Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_M
WaitForWriteError(2, NPersQueue::NErrorCode::ERROR);
}
+Y_UNIT_TEST_F(After_TEvGetWriteInfoError_Comes_TEvTxCalcPredicateResult, TPartitionFixture)
+{
+ const TPartitionId partitionId{1};
+ const ui64 step = 12345;
+ const ui64 txId = 67890;
+
+ CreatePartition({.Partition=partitionId});
+
+ SendCalcPredicate(step, txId, Ctx->Edge);
+ WaitForGetWriteInfoRequest();
+ SendGetWriteInfoError(31415, "error", Ctx->Edge);
+ WaitForCalcPredicateResult(txId, false);
+}
+
} // End of suite
} // namespace