aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-09-11 19:57:15 +0300
committerilnaz <ilnaz@ydb.tech>2022-09-11 19:57:15 +0300
commiteb1b0261b7cff8093f5d63c6fd0f9b12f94196cf (patch)
tree343aa0abab35f7c769b27272d2c3df5804466040
parent98a6390ee5c37d5c326956bc8494581364d8c11b (diff)
downloadydb-eb1b0261b7cff8093f5d63c6fd0f9b12f94196cf.tar.gz
Ignore quota deadline for some writes
-rw-r--r--ydb/core/persqueue/events/internal.h1
-rw-r--r--ydb/core/persqueue/partition.cpp11
-rw-r--r--ydb/core/persqueue/pq_impl.cpp5
-rw-r--r--ydb/core/persqueue/user_info.h10
-rw-r--r--ydb/core/protos/msgbus_pq.proto3
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp1
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp2
7 files changed, 21 insertions, 12 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index ac4e6be45e..afc8bf6651 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -142,6 +142,7 @@ struct TEvPQ {
TString PartitionKey;
TString ExplicitHashKey;
bool External;
+ bool IgnoreQuotaDeadline;
};
TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite)
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index f715e73c9e..43676d9c5c 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -4678,16 +4678,23 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) {
if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now())
return;
+ std::deque<TMessage> newRequests;
for (auto& w : Requests) {
- ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded");
if (w.IsWrite()) {
const auto& msg = w.GetWrite().Msg;
+ if (msg.IgnoreQuotaDeadline) {
+ newRequests.emplace_back(std::move(w));
+ continue;
+ }
+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
WriteInflightSize -= msg.Data.size();
}
+
+ ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded");
}
- Requests.clear();
+ Requests = std::move(newRequests);
QuotaDeadline = TInstant::Zero();
UpdateWriteBufferIsFullState(ctx.Now());
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 58f5a77196..d242264ffc 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1646,7 +1646,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
msgs.push_back({cmd.GetSourceId(), static_cast<ui64>(cmd.GetSeqNo()), partNo,
totalParts, totalSize, createTimestampMs, receiveTimestampMs,
disableDeduplication, writeTimestampMs, data, uncompressedSize,
- cmd.GetPartitionKey(), cmd.GetExplicitHash(), cmd.GetExternalOperation()
+ cmd.GetPartitionKey(), cmd.GetExplicitHash(), cmd.GetExternalOperation(),
+ cmd.GetIgnoreQuotaDeadline()
});
partNo++;
uncompressedSize = 0;
@@ -1665,7 +1666,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
static_cast<ui32>(cmd.HasTotalSize() ? cmd.GetTotalSize() : cmd.GetData().Size()),
createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, cmd.GetData(),
cmd.HasUncompressedSize() ? cmd.GetUncompressedSize() : 0u, cmd.GetPartitionKey(), cmd.GetExplicitHash(),
- cmd.GetExternalOperation()
+ cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline()
});
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "got client message topic: " << TopicConverter->GetClientsideName() <<
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 0a2957ab9c..49d7a7f774 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -102,18 +102,12 @@ public:
void Update(const TInstant& timestamp);
bool CanExaust() const {
- if (SpeedPerSecond) {
- return AvailableSize > 0;
- } else {
- return true;
- }
+ return AvailableSize > 0;
}
void Exaust(const ui64 size, const TInstant& timestamp) {
Update(timestamp);
- if (SpeedPerSecond) {
- AvailableSize -= (i64)size;
- }
+ AvailableSize -= (i64)size;
Update(timestamp);
}
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index 3431d4bd6f..6cec1d738b 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -70,6 +70,9 @@ message TPersQueuePartitionRequest {
optional bytes ExplicitHash = 14;
optional bool ExternalOperation = 15 [ default = false ];
+
+ // Do not reject request when quota exceeded
+ optional bool IgnoreQuotaDeadline = 16 [ default = false ];
}
message TCmdUpdateWriteTimestamp {
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index d5dccdc45c..145778306e 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -93,6 +93,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
cmd.SetSeqNo(record.GetSeqNo());
cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId));
cmd.SetCreateTimeMS(createdAt.MilliSeconds());
+ cmd.SetIgnoreQuotaDeadline(true);
NKikimrPQClient::TDataChunk data;
data.SetCodec(0 /* CODEC_RAW */);
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 43af563e85..1d5369bf6b 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -1209,6 +1209,7 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex));
w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex));
w->SetClientDC(ClientDC);
+ w->SetIgnoreQuotaDeadline(IsQuotaRequired());
payloadSize += w->GetData().size() + w->GetSourceId().size();
};
@@ -1220,6 +1221,7 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at()));
w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size());
w->SetClientDC(ClientDC);
+ w->SetIgnoreQuotaDeadline(IsQuotaRequired());
payloadSize += w->GetData().size() + w->GetSourceId().size();
};