diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-04-05 19:17:46 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-04-05 19:17:46 +0300 |
commit | c7142e467a411f8a729e9912d5f2b568091cf31c (patch) | |
tree | 3e54631dd8b387f4b53382f8503793fb094fde99 | |
parent | fd86d9a7713285bdfb8f09300f04bac908f451e5 (diff) | |
download | ydb-c7142e467a411f8a729e9912d5f2b568091cf31c.tar.gz |
Fix possible race at init, reject pending requests at zombie state
-rw-r--r-- | ydb/core/persqueue/writer/writer.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 13 |
2 files changed, 55 insertions, 8 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 12a88372fbe..447ba4a9474 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -84,22 +84,52 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { return true; } - void InitResult(const TString& reason, NKikimrClient::TResponse&& response) { - Send(Client, new TEvPartitionWriter::TEvInitResult(reason, std::move(response))); + static NKikimrClient::TResponse MakeResponse(ui64 cookie) { + NKikimrClient::TResponse response; + response.MutablePartitionResponse()->SetCookie(cookie); + return response; + } + + void BecomeZombie(const TString& error) { + for (auto cookie : std::exchange(PendingWrite, {})) { + SendWriteResult(error, MakeResponse(cookie)); + } + for (const auto& [cookie, _] : std::exchange(PendingReserve, {})) { + SendWriteResult(error, MakeResponse(cookie)); + } + for (const auto& [cookie, _] : std::exchange(Pending, {})) { + SendWriteResult(error, MakeResponse(cookie)); + } + Become(&TThis::StateZombie); } + template <typename... Args> + void SendInitResult(Args&&... args) { + Send(Client, new TEvPartitionWriter::TEvInitResult(std::forward<Args>(args)...)); + } + + void InitResult(const TString& reason, NKikimrClient::TResponse&& response) { + SendInitResult(reason, std::move(response)); + BecomeZombie("Init error"); + } + void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo) { - Send(Client, new TEvPartitionWriter::TEvInitResult(ownerCookie, sourceIdInfo)); + SendInitResult(ownerCookie, sourceIdInfo); + } + + template <typename... Args> + void SendWriteResult(Args&&... args) { + Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::forward<Args>(args)...)); } void WriteResult(const TString& reason, NKikimrClient::TResponse&& response) { - Send(Client, new TEvPartitionWriter::TEvWriteResponse(reason, std::move(response))); - Become(&TThis::StateZombie); + SendWriteResult(reason, std::move(response)); + BecomeZombie("Write error"); } void WriteResult(NKikimrClient::TResponse&& response) { - Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::move(response))); + SendWriteResult(std::move(response)); PendingWrite.pop_front(); } @@ -109,7 +139,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { void Disconnected() { Send(Client, new TEvPartitionWriter::TEvDisconnected()); - Become(&TThis::StateZombie); + BecomeZombie("Disconnected"); } /// GetOwnership @@ -266,6 +296,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { } } + void Reject(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) { + const auto cookie = ev->Get()->Record.GetPartitionRequest().GetCookie(); + return WriteResult("Rejected by writer", MakeResponse(cookie)); + } + void HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) { auto& record = ev->Get()->Record; const auto cookie = record.GetPartitionRequest().GetCookie(); @@ -427,6 +462,7 @@ public: STATEFN(StateZombie) { switch (ev->GetTypeRewrite()) { + hFunc(TEvPartitionWriter::TEvWriteRequest, Reject); sFunc(TEvents::TEvPoison, PassAway); } } diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 098d26bf58d..89441acda6a 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -73,6 +73,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti STATEFN(StateWaitingRecords) { switch (ev->GetTypeRewrite()) { hFunc(TEvChangeExchange::TEvRecords, Handle); + sFunc(TEvPartitionWriter::TEvWriteResponse, Lost); default: return StateBase(ev, TlsActivationContext->AsActorContext()); } @@ -228,6 +229,16 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str())); } + void Disconnected() { + LOG_D("Disconnected"); + Leave(); + } + + void Lost() { + LOG_W("Lost"); + Leave(); + } + void Leave() { Send(Parent, new TEvChangeExchangePrivate::TEvGone(PartitionId)); PassAway(); @@ -271,7 +282,7 @@ public: STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { - sFunc(TEvPartitionWriter::TEvDisconnected, Leave); + sFunc(TEvPartitionWriter::TEvDisconnected, Disconnected); hFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } |