diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-09-11 19:57:15 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-09-11 19:57:15 +0300 |
commit | eb1b0261b7cff8093f5d63c6fd0f9b12f94196cf (patch) | |
tree | 343aa0abab35f7c769b27272d2c3df5804466040 | |
parent | 98a6390ee5c37d5c326956bc8494581364d8c11b (diff) | |
download | ydb-eb1b0261b7cff8093f5d63c6fd0f9b12f94196cf.tar.gz |
Ignore quota deadline for some writes
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 10 | ||||
-rw-r--r-- | ydb/core/protos/msgbus_pq.proto | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 2 |
7 files changed, 21 insertions, 12 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index ac4e6be45e..afc8bf6651 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -142,6 +142,7 @@ struct TEvPQ { TString PartitionKey; TString ExplicitHashKey; bool External; + bool IgnoreQuotaDeadline; }; TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index f715e73c9e..43676d9c5c 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -4678,16 +4678,23 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now()) return; + std::deque<TMessage> newRequests; for (auto& w : Requests) { - ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded"); if (w.IsWrite()) { const auto& msg = w.GetWrite().Msg; + if (msg.IgnoreQuotaDeadline) { + newRequests.emplace_back(std::move(w)); + continue; + } + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); WriteInflightSize -= msg.Data.size(); } + + ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded"); } - Requests.clear(); + Requests = std::move(newRequests); QuotaDeadline = TInstant::Zero(); UpdateWriteBufferIsFullState(ctx.Now()); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 58f5a77196..d242264ffc 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1646,7 +1646,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p msgs.push_back({cmd.GetSourceId(), static_cast<ui64>(cmd.GetSeqNo()), partNo, totalParts, totalSize, createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, data, uncompressedSize, - cmd.GetPartitionKey(), cmd.GetExplicitHash(), cmd.GetExternalOperation() + cmd.GetPartitionKey(), cmd.GetExplicitHash(), cmd.GetExternalOperation(), + cmd.GetIgnoreQuotaDeadline() }); partNo++; uncompressedSize = 0; @@ -1665,7 +1666,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p static_cast<ui32>(cmd.HasTotalSize() ? cmd.GetTotalSize() : cmd.GetData().Size()), createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, cmd.GetData(), cmd.HasUncompressedSize() ? cmd.GetUncompressedSize() : 0u, cmd.GetPartitionKey(), cmd.GetExplicitHash(), - cmd.GetExternalOperation() + cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline() }); } LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "got client message topic: " << TopicConverter->GetClientsideName() << diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 0a2957ab9c..49d7a7f774 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -102,18 +102,12 @@ public: void Update(const TInstant& timestamp); bool CanExaust() const { - if (SpeedPerSecond) { - return AvailableSize > 0; - } else { - return true; - } + return AvailableSize > 0; } void Exaust(const ui64 size, const TInstant& timestamp) { Update(timestamp); - if (SpeedPerSecond) { - AvailableSize -= (i64)size; - } + AvailableSize -= (i64)size; Update(timestamp); } diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 3431d4bd6f..6cec1d738b 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -70,6 +70,9 @@ message TPersQueuePartitionRequest { optional bytes ExplicitHash = 14; optional bool ExternalOperation = 15 [ default = false ]; + + // Do not reject request when quota exceeded + optional bool IgnoreQuotaDeadline = 16 [ default = false ]; } message TCmdUpdateWriteTimestamp { diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index d5dccdc45c..145778306e 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -93,6 +93,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti cmd.SetSeqNo(record.GetSeqNo()); cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId)); cmd.SetCreateTimeMS(createdAt.MilliSeconds()); + cmd.SetIgnoreQuotaDeadline(true); NKikimrPQClient::TDataChunk data; data.SetCodec(0 /* CODEC_RAW */); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 43af563e85..1d5369bf6b 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -1209,6 +1209,7 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex)); w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex)); w->SetClientDC(ClientDC); + w->SetIgnoreQuotaDeadline(IsQuotaRequired()); payloadSize += w->GetData().size() + w->GetSourceId().size(); }; @@ -1220,6 +1221,7 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at())); w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size()); w->SetClientDC(ClientDC); + w->SetIgnoreQuotaDeadline(IsQuotaRequired()); payloadSize += w->GetData().size() + w->GetSourceId().size(); }; |