aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqyryq <qyryq@ydb.tech>2024-11-20 14:40:08 +0300
committerGitHub <noreply@github.com>2024-11-20 11:40:08 +0000
commit0cc149bcbf3defc46d8df66e99c500ad2f636880 (patch)
tree1035d48a25a3cd5fb318b497f1eeaccd17f522fc
parent8bc06e6062040f2b3508daa346edd5d6a4478543 (diff)
downloadydb-0cc149bcbf3defc46d8df66e99c500ad2f636880.tar.gz
SQS: Fix TimeoutCookie_ leak (#11762)
-rw-r--r--ydb/core/ymq/actor/action.h15
-rw-r--r--ydb/core/ymq/actor/proxy_actor.cpp3
-rw-r--r--ydb/core/ymq/actor/proxy_actor.h2
3 files changed, 11 insertions, 9 deletions
diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h
index 2c81d80fe0..8097389b9f 100644
--- a/ydb/core/ymq/actor/action.h
+++ b/ydb/core/ymq/actor/action.h
@@ -111,15 +111,15 @@ public:
DoBootstrap();
}
- void Bootstrap(const NActors::TActorContext&) {
+ void Bootstrap(const NActors::TActorContext&) {
#define SQS_REQUEST_CASE(action) \
const auto& request = SourceSqsRequest_.Y_CAT(Get, action)(); \
auto response = Response_.Y_CAT(Mutable, action)(); \
FillAuthInformation(request); \
response->SetRequestId(RequestId_);
-
+
SQS_SWITCH_REQUEST_CUSTOM(SourceSqsRequest_, ENUMERATE_ALL_ACTIONS, Y_ABORT_UNLESS(false));
- #undef SQS_REQUEST_CASE
+ #undef SQS_REQUEST_CASE
RLOG_SQS_DEBUG("Request started. Actor: " << this->SelfId()); // log new request id
StartTs_ = TActivationContext::Now();
@@ -130,6 +130,7 @@ public:
// Set timeout
if (cfg.GetRequestTimeoutMs()) {
+ TimeoutCookie_.Reset(ISchedulerCookie::Make2Way());
this->Schedule(TDuration::MilliSeconds(cfg.GetRequestTimeoutMs()), new TEvWakeup(REQUEST_TIMEOUT_WAKEUP_TAG), TimeoutCookie_.Get());
}
@@ -349,7 +350,7 @@ protected:
RESPONSE_BATCH_CASE(SendMessageBatch)
RESPONSE_CASE(SetQueueAttributes)
RESPONSE_CASE(ListDeadLetterSourceQueues)
- RESPONSE_CASE(CountQueues)
+ RESPONSE_CASE(CountQueues)
case NKikimrClient::TSqsResponse::kDeleteQueueBatch:
case NKikimrClient::TSqsResponse::kGetQueueAttributesBatch:
case NKikimrClient::TSqsResponse::kPurgeQueueBatch:
@@ -382,7 +383,7 @@ private:
);
}
-protected:
+protected:
template <class TResponse>
void AuditLogEntry(const TResponse& response, const TString& requestId, const TError* error = nullptr) {
if (!error && response.HasError()) {
@@ -555,7 +556,7 @@ private:
UserName_ = request.GetAuth().GetUserName();
FolderId_ = request.GetAuth().GetFolderId();
UserSID_ = request.GetAuth().GetUserSID();
-
+
if (IsCloud() && !FolderId_) {
auto items = ParseCloudSecurityToken(SecurityToken_);
UserName_ = std::get<0>(items);
@@ -885,7 +886,7 @@ protected:
TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResources_;
bool NeedReportSqsActionInflyCounter = false;
bool NeedReportYmqActionInflyCounter = false;
- TSchedulerCookieHolder TimeoutCookie_ = ISchedulerCookie::Make2Way();
+ TSchedulerCookieHolder TimeoutCookie_;
NKikimrClient::TSqsRequest SourceSqsRequest_;
};
diff --git a/ydb/core/ymq/actor/proxy_actor.cpp b/ydb/core/ymq/actor/proxy_actor.cpp
index 750e4f297c..72c953c3a7 100644
--- a/ydb/core/ymq/actor/proxy_actor.cpp
+++ b/ydb/core/ymq/actor/proxy_actor.cpp
@@ -48,7 +48,7 @@ TString SecurityPrint(const NKikimrClient::TSqsResponse& resp) {
case NKikimrClient::TSqsResponse::kReceiveMessage: {
NKikimrClient::TSqsResponse respCopy = resp;
for (auto& msg : *respCopy.MutableReceiveMessage()->MutableMessages()) {
- msg.SetData(TStringBuilder() << "[...user_data_" << msg.GetData().size() << "bytes" << "...]");
+ msg.SetData(TStringBuilder() << "[...user_data_" << msg.GetData().size() << "bytes" << "...]");
}
return TStringBuilder() << respCopy;
}
@@ -82,6 +82,7 @@ void TProxyActor::Bootstrap() {
const auto& cfg = Cfg();
if (cfg.GetRequestTimeoutMs()) {
+ TimeoutCookie_.Reset(ISchedulerCookie::Make2Way());
this->Schedule(TDuration::MilliSeconds(cfg.GetRequestTimeoutMs()), new TEvWakeup(), TimeoutCookie_.Get());
}
diff --git a/ydb/core/ymq/actor/proxy_actor.h b/ydb/core/ymq/actor/proxy_actor.h
index eed87c4f53..8244a76272 100644
--- a/ydb/core/ymq/actor/proxy_actor.h
+++ b/ydb/core/ymq/actor/proxy_actor.h
@@ -62,7 +62,7 @@ private:
THolder<IReplyCallback> Cb_;
bool ErrorResponse_ = false;
TInstant StartTs_;
- TSchedulerCookieHolder TimeoutCookie_ = ISchedulerCookie::Make2Way();
+ TSchedulerCookieHolder TimeoutCookie_;
TIntrusivePtr<TUserCounters> UserCounters_;
TIntrusivePtr<TQueueCounters> QueueCounters_;