aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-04-05 19:17:46 +0300
committerilnaz <ilnaz@ydb.tech>2023-04-05 19:17:46 +0300
commitc7142e467a411f8a729e9912d5f2b568091cf31c (patch)
tree3e54631dd8b387f4b53382f8503793fb094fde99
parentfd86d9a7713285bdfb8f09300f04bac908f451e5 (diff)
downloadydb-c7142e467a411f8a729e9912d5f2b568091cf31c.tar.gz
Fix possible race at init, reject pending requests at zombie state
-rw-r--r--ydb/core/persqueue/writer/writer.cpp50
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp13
2 files changed, 55 insertions, 8 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index 12a88372fbe..447ba4a9474 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -84,22 +84,52 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
return true;
}
- void InitResult(const TString& reason, NKikimrClient::TResponse&& response) {
- Send(Client, new TEvPartitionWriter::TEvInitResult(reason, std::move(response)));
+ static NKikimrClient::TResponse MakeResponse(ui64 cookie) {
+ NKikimrClient::TResponse response;
+ response.MutablePartitionResponse()->SetCookie(cookie);
+ return response;
+ }
+
+ void BecomeZombie(const TString& error) {
+ for (auto cookie : std::exchange(PendingWrite, {})) {
+ SendWriteResult(error, MakeResponse(cookie));
+ }
+ for (const auto& [cookie, _] : std::exchange(PendingReserve, {})) {
+ SendWriteResult(error, MakeResponse(cookie));
+ }
+ for (const auto& [cookie, _] : std::exchange(Pending, {})) {
+ SendWriteResult(error, MakeResponse(cookie));
+ }
+
Become(&TThis::StateZombie);
}
+ template <typename... Args>
+ void SendInitResult(Args&&... args) {
+ Send(Client, new TEvPartitionWriter::TEvInitResult(std::forward<Args>(args)...));
+ }
+
+ void InitResult(const TString& reason, NKikimrClient::TResponse&& response) {
+ SendInitResult(reason, std::move(response));
+ BecomeZombie("Init error");
+ }
+
void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo) {
- Send(Client, new TEvPartitionWriter::TEvInitResult(ownerCookie, sourceIdInfo));
+ SendInitResult(ownerCookie, sourceIdInfo);
+ }
+
+ template <typename... Args>
+ void SendWriteResult(Args&&... args) {
+ Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::forward<Args>(args)...));
}
void WriteResult(const TString& reason, NKikimrClient::TResponse&& response) {
- Send(Client, new TEvPartitionWriter::TEvWriteResponse(reason, std::move(response)));
- Become(&TThis::StateZombie);
+ SendWriteResult(reason, std::move(response));
+ BecomeZombie("Write error");
}
void WriteResult(NKikimrClient::TResponse&& response) {
- Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::move(response)));
+ SendWriteResult(std::move(response));
PendingWrite.pop_front();
}
@@ -109,7 +139,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
void Disconnected() {
Send(Client, new TEvPartitionWriter::TEvDisconnected());
- Become(&TThis::StateZombie);
+ BecomeZombie("Disconnected");
}
/// GetOwnership
@@ -266,6 +296,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
}
}
+ void Reject(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
+ const auto cookie = ev->Get()->Record.GetPartitionRequest().GetCookie();
+ return WriteResult("Rejected by writer", MakeResponse(cookie));
+ }
+
void HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
auto& record = ev->Get()->Record;
const auto cookie = record.GetPartitionRequest().GetCookie();
@@ -427,6 +462,7 @@ public:
STATEFN(StateZombie) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPartitionWriter::TEvWriteRequest, Reject);
sFunc(TEvents::TEvPoison, PassAway);
}
}
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 098d26bf58d..89441acda6a 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -73,6 +73,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
STATEFN(StateWaitingRecords) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvChangeExchange::TEvRecords, Handle);
+ sFunc(TEvPartitionWriter::TEvWriteResponse, Lost);
default:
return StateBase(ev, TlsActivationContext->AsActorContext());
}
@@ -228,6 +229,16 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str()));
}
+ void Disconnected() {
+ LOG_D("Disconnected");
+ Leave();
+ }
+
+ void Lost() {
+ LOG_W("Lost");
+ Leave();
+ }
+
void Leave() {
Send(Parent, new TEvChangeExchangePrivate::TEvGone(PartitionId));
PassAway();
@@ -271,7 +282,7 @@ public:
STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
- sFunc(TEvPartitionWriter::TEvDisconnected, Leave);
+ sFunc(TEvPartitionWriter::TEvDisconnected, Disconnected);
hFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}