aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2024-03-13 16:18:44 +0500
committerGitHub <noreply@github.com>2024-03-13 16:18:44 +0500
commitc0a403cc6b0cd7b86a4e0760eaf47af8493e7cfc (patch)
treef35c35b0eaaaf92b638694f8fe3cf6b81371eb45
parent7c77f0898e5fa635a1252426be1453364d4c737a (diff)
downloadydb-c0a403cc6b0cd7b86a4e0760eaf47af8493e7cfc.tar.gz
Fix Verify in WriteSessionActor (#2651)
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp7
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp5
2 files changed, 9 insertions, 3 deletions
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
index e29b4bc4f3..196b65ec8b 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
@@ -310,7 +310,10 @@ void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& db
void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) {
- Y_ABORT_UNLESS(State == ES_WAIT_SCHEME || State == ES_INITED);
+ if (State != ES_WAIT_SCHEME && State != ES_INITED) {
+ return CloseSession("erroneous internal state", NPersQueue::NErrorCode::ERROR, ctx);
+ }
+
auto& res = ev->Get()->Result;
Y_ABORT_UNLESS(res->ResultSet.size() == 1);
@@ -865,7 +868,7 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) {
void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) {
if (State != ES_INITED) {
- return;
+ return CloseSession("erroneous internal state", NPersQueue::NErrorCode::ERROR, ctx);
}
auto now = ctx.Now();
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 09746e0426..175f584cb2 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -1505,7 +1505,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr&
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) {
- Y_ABORT_UNLESS(State == ES_INITED);
+ if (State != ES_INITED) {
+ LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "WriteSessionActor state is wrong. Actual state '" << (int)State << "'");
+ return CloseSession("erroneous internal state", PersQueue::ErrorCode::ERROR, ctx);
+ }
auto now = ctx.Now();