diff options
| author | Alek5andr-Kotov <[email protected]> | 2025-04-22 09:55:22 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-04-22 09:55:22 +0300 |
| commit | 37c19de3f39ce3f5b1be0192317ad6129a2c7b91 (patch) | |
| tree | 1734a41e8a67bf94787d826db785e25000d6fe63 | |
| parent | 52cd0e15ab5b65d821df60f3a1e4c2450c99fd9a (diff) | |
The PQ tablet does not receive a TEvTxCalcPredicateResult (#17497)
| -rw-r--r-- | ydb/core/persqueue/partition.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 64 |
2 files changed, 65 insertions, 1 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index cd384de2734..08e28652ffe 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2256,7 +2256,7 @@ void TPartition::AnswerCurrentReplies(const TActorContext& ctx) TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t) { auto result = EProcessResult::Continue; - if (t->SupportivePartitionActor && !t->WriteInfo) { // Pending for write info + if (t->SupportivePartitionActor && !t->WriteInfo && !t->WriteInfoApplied) { // Pending for write info return EProcessResult::NotReady; } if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index a7d5385d657..95bea8c420f 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -315,6 +315,15 @@ protected: void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode); void WaitForDeletePartitionDone(); + void SendCalcPredicate(ui64 step, + ui64 txId, + const TActorId& suppPartitionId); + void WaitForGetWriteInfoRequest(); + void SendGetWriteInfoError(ui32 internalPartitionId, + TString message, + const TActorId& suppPartitionId); + void WaitForCalcPredicateResult(ui64 txId, bool predicate); + TMaybe<TTestContext> Ctx; TMaybe<TFinalizer> Finalizer; @@ -2512,6 +2521,47 @@ void TPartitionFixture::CmdChangeOwner(ui64 cookie, const TString& sourceId, TDu ownerCookie = event->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); } +void TPartitionFixture::SendCalcPredicate(ui64 step, + ui64 txId, + const TActorId& suppPartitionId) +{ + SendCalcPredicate(step, txId, "", 0, 0, suppPartitionId); +} + +void TPartitionFixture::WaitForGetWriteInfoRequest() +{ + auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvGetWriteInfoRequest>(); + UNIT_ASSERT(event != nullptr); + //UNIT_ASSERT_VALUES_EQUAL(event->OriginalPartition, ActorId); +} + +void TPartitionFixture::SendGetWriteInfoError(ui32 internalPartitionId, + TString message, + const TActorId& suppPartitionId) +{ + auto event = MakeHolder<TEvPQ::TEvGetWriteInfoError>(internalPartitionId, + std::move(message)); + //event->SupportivePartition = suppPartitionId; + + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, suppPartitionId, event.Release())); +} + +void TPartitionFixture::WaitForCalcPredicateResult(ui64 txId, bool predicate) +{ + while (true) { + TAutoPtr<IEventHandle> handle; + auto events = + Ctx->Runtime->GrabEdgeEvents<TEvPQ::TEvTxCalcPredicateResult, TEvKeyValue::TEvRequest>(handle, + TDuration::Seconds(1)); + if (std::get<TEvKeyValue::TEvRequest*>(events)) { + SendDiskStatusResponse(nullptr); + } else if (auto* event = std::get<TEvPQ::TEvTxCalcPredicateResult*>(events)) { + UNIT_ASSERT_VALUES_EQUAL(event->TxId, txId); + UNIT_ASSERT_VALUES_EQUAL(event->Predicate, predicate); + break; + } + } +} Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture) { @@ -3614,6 +3664,20 @@ Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_M WaitForWriteError(2, NPersQueue::NErrorCode::ERROR); } +Y_UNIT_TEST_F(After_TEvGetWriteInfoError_Comes_TEvTxCalcPredicateResult, TPartitionFixture) +{ + const TPartitionId partitionId{1}; + const ui64 step = 12345; + const ui64 txId = 67890; + + CreatePartition({.Partition=partitionId}); + + SendCalcPredicate(step, txId, Ctx->Edge); + WaitForGetWriteInfoRequest(); + SendGetWriteInfoError(31415, "error", Ctx->Edge); + WaitForCalcPredicateResult(txId, false); +} + } // End of suite } // namespace |
