diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-03-01 11:37:19 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-03-01 11:37:19 +0300 |
commit | c39563e1bd85b78ce23a98ceddbb3c59eb043590 (patch) | |
tree | f63cedb897e76bcac4b2da554d2a0e2879f665b0 | |
parent | c400b24651590ee3e66795b13cf40dcdc5fb50c9 (diff) | |
download | ydb-c39563e1bd85b78ce23a98ceddbb3c59eb043590.tar.gz |
persqueue partition: fix local quoter update & small refactoring ui64->TDuration/TInstant
init
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 91 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 8 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_types.h | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/quota_tracker.cpp | 9 | ||||
-rw-r--r-- | ydb/core/persqueue/quota_tracker.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/quota_tracker_ut.cpp | 54 |
9 files changed, 118 insertions, 61 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index f47a432d6e..26b4451921 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -310,7 +310,7 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS void TPartition::ReplyWrite( const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo, - const ui64 partitionQuotedTime, const TDuration topicQuotedTime, const ui64 queueTime, const ui64 writeTime) { + const TDuration partitionQuotedTime, const TDuration topicQuotedTime, const TDuration queueTime, const TDuration writeTime) { Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset); Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo); @@ -329,10 +329,10 @@ void TPartition::ReplyWrite( write->SetMaxSeqNo(maxSeqNo); write->SetOffset(offset); - write->SetPartitionQuotedTimeMs(partitionQuotedTime); + write->SetPartitionQuotedTimeMs(partitionQuotedTime.MilliSeconds()); write->SetTopicQuotedTimeMs(topicQuotedTime.MilliSeconds()); - write->SetTotalTimeInPartitionQueueMs(queueTime); - write->SetWriteTimeMs(writeTime); + write->SetTotalTimeInPartitionQueueMs(queueTime.MilliSeconds()); + write->SetWriteTimeMs(writeTime.MilliSeconds()); ctx.Send(Tablet, response.Release()); } @@ -870,6 +870,15 @@ void TPartition::Initialize(const TActorContext& ctx) { } } +void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { + Responses.emplace_back( + message.Body, + WriteQuota->GetQuotedTime(ctx.Now()) - message.QuotedTime, + (ctx.Now() - TInstant::Zero()) - message.QueueTime, + ctx.Now() + ); +} + void TPartition::SetupTopicCounters(const TActorContext& ctx) { auto counters = AppData(ctx)->Counters; auto labels = NPersQueue::GetLabels(TopicConverter); @@ -1926,14 +1935,13 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c Owners[owner]; it = Owners.find(owner); } - WriteQuota->Update(ctx.Now()); if (it->second.NeedResetOwner || ev->Force) { //change owner Y_VERIFY(ReservedSize >= it->second.ReservedSize); ReservedSize -= it->second.ReservedSize; it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicConverter->GetClientsideName(), Partition, ctx);//will change OwnerCookie //cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests' - Requests.emplace_back(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0); + EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, ctx); TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); UpdateWriteBufferIsFullState(ctx.Now()); ProcessReserveRequests(ctx); @@ -2968,12 +2976,14 @@ void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) { void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { ui64 offset = EndOffset; while (!Responses.empty()) { - const ui64 quotedTime = Responses.front().QuotedTime; - const ui64 queueTime = Responses.front().QueueTime; - const ui64 writeTime = ctx.Now().MilliSeconds() - Responses.front().WriteTime; + const auto& response = Responses.front(); - if (Responses.front().IsWrite()) { - const auto& writeResponse = Responses.front().GetWrite(); + const TDuration quotedTime = response.QuotedTime; + const TDuration queueTime = response.QueueTime; + const TDuration writeTime = ctx.Now() - response.WriteTimeBaseline; + + if (response.IsWrite()) { + const auto& writeResponse = response.GetWrite(); const TString& s = writeResponse.Msg.SourceId; const ui64& seqNo = writeResponse.Msg.SeqNo; const ui16& partNo = writeResponse.Msg.PartNo; @@ -3026,21 +3036,21 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { ", Offset: " << offset << " is " << (already ? "already written" : "stored on disk") ); if (PartitionWriteQuotaWaitCounter) { - PartitionWriteQuotaWaitCounter->IncFor(quotedTime); + PartitionWriteQuotaWaitCounter->IncFor(quotedTime.MilliSeconds()); } if (!already && partNo + 1 == totalParts) ++offset; - } else if (Responses.front().IsOwnership()) { - const TString& ownerCookie = Responses.front().GetOwnership().OwnerCookie; + } else if (response.IsOwnership()) { + const TString& ownerCookie = response.GetOwnership().OwnerCookie; auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie)); if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) { - ReplyOwnerOk(ctx, Responses.front().GetCookie(), ownerCookie); + ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie); } else { - ReplyError(ctx, Responses.front().GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already"); + ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already"); } - } else if (Responses.front().IsRegisterMessageGroup()) { - const auto& body = Responses.front().GetRegisterMessageGroup().Body; + } else if (response.IsRegisterMessageGroup()) { + const auto& body = response.GetRegisterMessageGroup().Body; TMaybe<TPartitionKeyRange> keyRange; if (body.KeyRange) { @@ -3049,14 +3059,14 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { Y_VERIFY(body.AssignedOffset); SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange)); - ReplyOk(ctx, Responses.front().GetCookie()); - } else if (Responses.front().IsDeregisterMessageGroup()) { - const auto& body = Responses.front().GetDeregisterMessageGroup().Body; + ReplyOk(ctx, response.GetCookie()); + } else if (response.IsDeregisterMessageGroup()) { + const auto& body = response.GetDeregisterMessageGroup().Body; SourceIdStorage.DeregisterSourceId(body.SourceId); - ReplyOk(ctx, Responses.front().GetCookie()); - } else if (Responses.front().IsSplitMessageGroup()) { - const auto& split = Responses.front().GetSplitMessageGroup(); + ReplyOk(ctx, response.GetCookie()); + } else if (response.IsSplitMessageGroup()) { + const auto& split = response.GetSplitMessageGroup(); for (const auto& body : split.Deregistrations) { SourceIdStorage.DeregisterSourceId(body.SourceId); @@ -3072,7 +3082,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true); } - ReplyOk(ctx, Responses.front().GetCookie()); + ReplyOk(ctx, response.GetCookie()); } else { Y_FAIL("Unexpected message"); } @@ -4836,11 +4846,10 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c return; } ui64 size = 0; - WriteQuota->Update(ctx.Now()); for (auto& msg: ev->Get()->Msgs) { size += msg.Data.size(); bool needToChangeOffset = msg.PartNo + 1 == msg.TotalParts; - Requests.emplace_back(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0); + EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, ctx); if (offset && needToChangeOffset) ++*offset; } @@ -4886,8 +4895,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const T "SourceId not found, registration cannot be completed"); } - WriteQuota->Update(ctx.Now()); - Requests.emplace_back(TRegisterMessageGroupMsg(*ev->Get()), WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0); + EmplaceRequest(TRegisterMessageGroupMsg(*ev->Get()), ctx); } void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) { @@ -4903,9 +4911,8 @@ void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId doesn't exist"); } - - WriteQuota->Update(ctx.Now()); - Requests.emplace_back(TDeregisterMessageGroupMsg(*ev->Get()), WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0); + + EmplaceRequest(TDeregisterMessageGroupMsg(*ev->Get()), ctx); } void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) { @@ -4943,8 +4950,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TAct } } - WriteQuota->Update(ctx.Now()); - Requests.emplace_back(std::move(msg), WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0); + EmplaceRequest(std::move(msg), ctx); } std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared) { @@ -5045,8 +5051,6 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs //otherwise write this batch as is to head; - WriteQuota->Update(ctx.Now()); - while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big auto pp = Requests.front(); Requests.pop_front(); @@ -5082,10 +5086,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const Y_VERIFY(pp.IsOwnership()); } - pp.QuotedTime = WriteQuota->GetQuotedTime(ctx.Now()) - pp.QuotedTime; //change to duration - pp.QueueTime = ctx.Now().MilliSeconds() - pp.QueueTime; - pp.WriteTime = ctx.Now().MilliSeconds(); - Responses.push_back(pp); + EmplaceResponse(std::move(pp), ctx); continue; } @@ -5124,10 +5125,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const } TString().swap(p.Msg.Data); - pp.QuotedTime = WriteQuota->GetQuotedTime(ctx.Now()) - pp.QuotedTime; //change to duration - pp.QueueTime = ctx.Now().MilliSeconds() - pp.QueueTime; - pp.WriteTime = ctx.Now().MilliSeconds(); - Responses.push_back(pp); + EmplaceResponse(std::move(pp), ctx); continue; } @@ -5338,10 +5336,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); } TString().swap(p.Msg.Data); - pp.QuotedTime = WriteQuota->GetQuotedTime(ctx.Now()) - pp.QuotedTime; //change to duration - pp.QueueTime = ctx.Now().MilliSeconds() - pp.QueueTime; - pp.WriteTime = ctx.Now().MilliSeconds(); - Responses.push_back(pp); + EmplaceResponse(std::move(pp), ctx); } UpdateWriteBufferIsFullState(ctx.Now()); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 6379b7e14b..e68beb9347 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -80,7 +80,7 @@ private: void ReplyOk(const TActorContext& ctx, const ui64 dst); void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie); - void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, ui64 partitionQuotedTime, TDuration topicQuotedTime, ui64 queueTime, ui64 writeTime); + void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime); void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx); void AnswerCurrentWrites(const TActorContext& ctx); @@ -292,6 +292,12 @@ private: void HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx); void Initialize(const TActorContext& ctx); + template <typename T> + void EmplaceRequest(T&& body, const TActorContext& ctx) { + Requests.emplace_back(body, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now() - TInstant::Zero()); + } + void EmplaceResponse(TMessage&& message, const TActorContext& ctx); + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR; diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h index 9b709c990c..0dbe4d8af7 100644 --- a/ydb/core/persqueue/partition_types.h +++ b/ydb/core/persqueue/partition_types.h @@ -68,16 +68,16 @@ struct TMessage { TSplitMessageGroupMsg > Body; - ui64 QuotedTime; - ui64 QueueTime; - ui64 WriteTime; + TDuration QuotedTime; // baseline for request and duration for response + TDuration QueueTime; // baseline for request and duration for response + TInstant WriteTimeBaseline; template <typename T> - explicit TMessage(T&& body, ui64 quotedTime, ui64 queueTime, ui64 writeTime) + explicit TMessage(T&& body, TDuration quotedTime, TDuration queueTime, TInstant writeTimeBaseline = TInstant::Zero()) : Body(std::forward<T>(body)) , QuotedTime(quotedTime) , QueueTime(queueTime) - , WriteTime(writeTime) + , WriteTimeBaseline(writeTimeBaseline) { } diff --git a/ydb/core/persqueue/quota_tracker.cpp b/ydb/core/persqueue/quota_tracker.cpp index 6fb005fd88..9d8d0722cf 100644 --- a/ydb/core/persqueue/quota_tracker.cpp +++ b/ydb/core/persqueue/quota_tracker.cpp @@ -7,7 +7,6 @@ namespace NKikimr::NPQ { , SpeedPerSecond(speedPerSecond)
, LastUpdateTime(timestamp)
, MaxBurst(maxBurst)
- , QuotedTime(0)
{}
void TQuotaTracker::UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
@@ -17,14 +16,14 @@ namespace NKikimr::NPQ { }
void TQuotaTracker::Update(const TInstant timestamp) {
- ui64 ms = (timestamp - LastUpdateTime).MilliSeconds();
+ TDuration diff = timestamp - LastUpdateTime;
LastUpdateTime = timestamp;
if (AvailableSize < 0) {
- QuotedTime += ms;
+ QuotedTime += diff;
}
- AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * ms / 1000, MaxBurst);
+ AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * diff.MicroSeconds() / 1000'000, MaxBurst);
}
bool TQuotaTracker::CanExaust(const TInstant timestamp) {
@@ -38,7 +37,7 @@ namespace NKikimr::NPQ { Update(timestamp);
}
- ui64 TQuotaTracker::GetQuotedTime(const TInstant timestamp) {
+ TDuration TQuotaTracker::GetQuotedTime(const TInstant timestamp) {
Update(timestamp);
return QuotedTime;
}
diff --git a/ydb/core/persqueue/quota_tracker.h b/ydb/core/persqueue/quota_tracker.h index 877a45f9e5..82dcfba9ec 100644 --- a/ydb/core/persqueue/quota_tracker.h +++ b/ydb/core/persqueue/quota_tracker.h @@ -14,7 +14,7 @@ namespace NKikimr::NPQ { bool CanExaust(const TInstant timestamp) ;
void Exaust(const ui64 size, const TInstant timestamp);
- ui64 GetQuotedTime(const TInstant timestamp);
+ TDuration GetQuotedTime(const TInstant timestamp);
ui64 GetTotalSpeed() const;
private:
@@ -23,7 +23,7 @@ namespace NKikimr::NPQ { TInstant LastUpdateTime;
ui64 MaxBurst;
- ui64 QuotedTime;
+ TDuration QuotedTime;
};
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin.txt b/ydb/core/persqueue/ut/CMakeLists.darwin.txt index 3f2862a1b6..36255ac79e 100644 --- a/ydb/core/persqueue/ut/CMakeLists.darwin.txt +++ b/ydb/core/persqueue/ut/CMakeLists.darwin.txt @@ -48,6 +48,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt index a3b285dc6f..30d91415d8 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt @@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.linux.txt b/ydb/core/persqueue/ut/CMakeLists.linux.txt index 2fe115ddce..ce0f935969 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux.txt @@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp diff --git a/ydb/core/persqueue/ut/quota_tracker_ut.cpp b/ydb/core/persqueue/ut/quota_tracker_ut.cpp new file mode 100644 index 0000000000..48af429385 --- /dev/null +++ b/ydb/core/persqueue/ut/quota_tracker_ut.cpp @@ -0,0 +1,54 @@ +#include "quota_tracker.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NKikimr::NPQ {
+
+Y_UNIT_TEST_SUITE(TQuotaTracker) {
+
+Y_UNIT_TEST(TestSmallMessages) {
+ TInstant ts = TInstant::MilliSeconds(123456789);
+ TQuotaTracker quota(2_MB, 2_MB, ts);
+
+ UNIT_ASSERT(quota.CanExaust(ts));
+
+ quota.Exaust(2_MB - 1, ts);
+ ui64 blobSize = 500;
+ ui64 processedBlobs = 0;
+
+ for (ui32 i = 0; i < 100'000; ++i) { // 10 sec total
+ if (quota.CanExaust(ts)) {
+ quota.Exaust(blobSize, ts);
+ ++processedBlobs;
+ }
+ ts += TDuration::MicroSeconds(100);
+ }
+ Cerr << "processed_blobs=" << processedBlobs << " quoted_time=" << quota.GetQuotedTime(ts) << Endl;
+ UNIT_ASSERT_EQUAL(processedBlobs, 41800);
+ UNIT_ASSERT_EQUAL(quota.GetQuotedTime(ts), TDuration::MilliSeconds(9980));
+}
+
+Y_UNIT_TEST(TestBigMessages) {
+ TInstant ts = TInstant::MilliSeconds(123456789);
+ TQuotaTracker quota(2_MB, 2_MB, ts);
+
+ UNIT_ASSERT(quota.CanExaust(ts));
+
+ auto CannotExaustAfter = [&](TDuration diff) {
+ ts += diff;
+ UNIT_ASSERT_C(!quota.CanExaust(ts), TStringBuilder() << "at " << ts);
+ };
+
+ quota.Exaust(10_MB, ts);
+ CannotExaustAfter(TDuration::Zero());
+ CannotExaustAfter(TDuration::Seconds(4));
+
+ ts += TDuration::MilliSeconds(1);
+ UNIT_ASSERT(quota.CanExaust(ts));
+}
+
+} //Y_UNIT_TEST_SUITE
+
+} // namespace NKikimr::NPQ
|