aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-03-01 11:37:19 +0300
committeralexbogo <alexbogo@ydb.tech>2023-03-01 11:37:19 +0300
commitc39563e1bd85b78ce23a98ceddbb3c59eb043590 (patch)
treef63cedb897e76bcac4b2da554d2a0e2879f665b0
parentc400b24651590ee3e66795b13cf40dcdc5fb50c9 (diff)
downloadydb-c39563e1bd85b78ce23a98ceddbb3c59eb043590.tar.gz
persqueue partition: fix local quoter update & small refactoring ui64->TDuration/TInstant
init
-rw-r--r--ydb/core/persqueue/partition.cpp91
-rw-r--r--ydb/core/persqueue/partition.h8
-rw-r--r--ydb/core/persqueue/partition_types.h10
-rw-r--r--ydb/core/persqueue/quota_tracker.cpp9
-rw-r--r--ydb/core/persqueue/quota_tracker.h4
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/persqueue/ut/quota_tracker_ut.cpp54
9 files changed, 118 insertions, 61 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index f47a432d6e..26b4451921 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -310,7 +310,7 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
void TPartition::ReplyWrite(
const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts,
const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo,
- const ui64 partitionQuotedTime, const TDuration topicQuotedTime, const ui64 queueTime, const ui64 writeTime) {
+ const TDuration partitionQuotedTime, const TDuration topicQuotedTime, const TDuration queueTime, const TDuration writeTime) {
Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset);
Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo);
@@ -329,10 +329,10 @@ void TPartition::ReplyWrite(
write->SetMaxSeqNo(maxSeqNo);
write->SetOffset(offset);
- write->SetPartitionQuotedTimeMs(partitionQuotedTime);
+ write->SetPartitionQuotedTimeMs(partitionQuotedTime.MilliSeconds());
write->SetTopicQuotedTimeMs(topicQuotedTime.MilliSeconds());
- write->SetTotalTimeInPartitionQueueMs(queueTime);
- write->SetWriteTimeMs(writeTime);
+ write->SetTotalTimeInPartitionQueueMs(queueTime.MilliSeconds());
+ write->SetWriteTimeMs(writeTime.MilliSeconds());
ctx.Send(Tablet, response.Release());
}
@@ -870,6 +870,15 @@ void TPartition::Initialize(const TActorContext& ctx) {
}
}
+void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
+ Responses.emplace_back(
+ message.Body,
+ WriteQuota->GetQuotedTime(ctx.Now()) - message.QuotedTime,
+ (ctx.Now() - TInstant::Zero()) - message.QueueTime,
+ ctx.Now()
+ );
+}
+
void TPartition::SetupTopicCounters(const TActorContext& ctx) {
auto counters = AppData(ctx)->Counters;
auto labels = NPersQueue::GetLabels(TopicConverter);
@@ -1926,14 +1935,13 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
Owners[owner];
it = Owners.find(owner);
}
- WriteQuota->Update(ctx.Now());
if (it->second.NeedResetOwner || ev->Force) { //change owner
Y_VERIFY(ReservedSize >= it->second.ReservedSize);
ReservedSize -= it->second.ReservedSize;
it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicConverter->GetClientsideName(), Partition, ctx);//will change OwnerCookie
//cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests'
- Requests.emplace_back(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0);
+ EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, ctx);
TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
UpdateWriteBufferIsFullState(ctx.Now());
ProcessReserveRequests(ctx);
@@ -2968,12 +2976,14 @@ void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) {
void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
ui64 offset = EndOffset;
while (!Responses.empty()) {
- const ui64 quotedTime = Responses.front().QuotedTime;
- const ui64 queueTime = Responses.front().QueueTime;
- const ui64 writeTime = ctx.Now().MilliSeconds() - Responses.front().WriteTime;
+ const auto& response = Responses.front();
- if (Responses.front().IsWrite()) {
- const auto& writeResponse = Responses.front().GetWrite();
+ const TDuration quotedTime = response.QuotedTime;
+ const TDuration queueTime = response.QueueTime;
+ const TDuration writeTime = ctx.Now() - response.WriteTimeBaseline;
+
+ if (response.IsWrite()) {
+ const auto& writeResponse = response.GetWrite();
const TString& s = writeResponse.Msg.SourceId;
const ui64& seqNo = writeResponse.Msg.SeqNo;
const ui16& partNo = writeResponse.Msg.PartNo;
@@ -3026,21 +3036,21 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
", Offset: " << offset << " is " << (already ? "already written" : "stored on disk")
);
if (PartitionWriteQuotaWaitCounter) {
- PartitionWriteQuotaWaitCounter->IncFor(quotedTime);
+ PartitionWriteQuotaWaitCounter->IncFor(quotedTime.MilliSeconds());
}
if (!already && partNo + 1 == totalParts)
++offset;
- } else if (Responses.front().IsOwnership()) {
- const TString& ownerCookie = Responses.front().GetOwnership().OwnerCookie;
+ } else if (response.IsOwnership()) {
+ const TString& ownerCookie = response.GetOwnership().OwnerCookie;
auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie));
if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) {
- ReplyOwnerOk(ctx, Responses.front().GetCookie(), ownerCookie);
+ ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie);
} else {
- ReplyError(ctx, Responses.front().GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
+ ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
}
- } else if (Responses.front().IsRegisterMessageGroup()) {
- const auto& body = Responses.front().GetRegisterMessageGroup().Body;
+ } else if (response.IsRegisterMessageGroup()) {
+ const auto& body = response.GetRegisterMessageGroup().Body;
TMaybe<TPartitionKeyRange> keyRange;
if (body.KeyRange) {
@@ -3049,14 +3059,14 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
Y_VERIFY(body.AssignedOffset);
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
- ReplyOk(ctx, Responses.front().GetCookie());
- } else if (Responses.front().IsDeregisterMessageGroup()) {
- const auto& body = Responses.front().GetDeregisterMessageGroup().Body;
+ ReplyOk(ctx, response.GetCookie());
+ } else if (response.IsDeregisterMessageGroup()) {
+ const auto& body = response.GetDeregisterMessageGroup().Body;
SourceIdStorage.DeregisterSourceId(body.SourceId);
- ReplyOk(ctx, Responses.front().GetCookie());
- } else if (Responses.front().IsSplitMessageGroup()) {
- const auto& split = Responses.front().GetSplitMessageGroup();
+ ReplyOk(ctx, response.GetCookie());
+ } else if (response.IsSplitMessageGroup()) {
+ const auto& split = response.GetSplitMessageGroup();
for (const auto& body : split.Deregistrations) {
SourceIdStorage.DeregisterSourceId(body.SourceId);
@@ -3072,7 +3082,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
}
- ReplyOk(ctx, Responses.front().GetCookie());
+ ReplyOk(ctx, response.GetCookie());
} else {
Y_FAIL("Unexpected message");
}
@@ -4836,11 +4846,10 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
return;
}
ui64 size = 0;
- WriteQuota->Update(ctx.Now());
for (auto& msg: ev->Get()->Msgs) {
size += msg.Data.size();
bool needToChangeOffset = msg.PartNo + 1 == msg.TotalParts;
- Requests.emplace_back(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0);
+ EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, ctx);
if (offset && needToChangeOffset)
++*offset;
}
@@ -4886,8 +4895,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const T
"SourceId not found, registration cannot be completed");
}
- WriteQuota->Update(ctx.Now());
- Requests.emplace_back(TRegisterMessageGroupMsg(*ev->Get()), WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0);
+ EmplaceRequest(TRegisterMessageGroupMsg(*ev->Get()), ctx);
}
void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) {
@@ -4903,9 +4911,8 @@ void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const
return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED,
"SourceId doesn't exist");
}
-
- WriteQuota->Update(ctx.Now());
- Requests.emplace_back(TDeregisterMessageGroupMsg(*ev->Get()), WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0);
+
+ EmplaceRequest(TDeregisterMessageGroupMsg(*ev->Get()), ctx);
}
void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) {
@@ -4943,8 +4950,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TAct
}
}
- WriteQuota->Update(ctx.Now());
- Requests.emplace_back(std::move(msg), WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now().MilliSeconds(), 0);
+ EmplaceRequest(std::move(msg), ctx);
}
std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared) {
@@ -5045,8 +5051,6 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
//Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs
//otherwise write this batch as is to head;
- WriteQuota->Update(ctx.Now());
-
while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big
auto pp = Requests.front();
Requests.pop_front();
@@ -5082,10 +5086,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
Y_VERIFY(pp.IsOwnership());
}
- pp.QuotedTime = WriteQuota->GetQuotedTime(ctx.Now()) - pp.QuotedTime; //change to duration
- pp.QueueTime = ctx.Now().MilliSeconds() - pp.QueueTime;
- pp.WriteTime = ctx.Now().MilliSeconds();
- Responses.push_back(pp);
+ EmplaceResponse(std::move(pp), ctx);
continue;
}
@@ -5124,10 +5125,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
TString().swap(p.Msg.Data);
- pp.QuotedTime = WriteQuota->GetQuotedTime(ctx.Now()) - pp.QuotedTime; //change to duration
- pp.QueueTime = ctx.Now().MilliSeconds() - pp.QueueTime;
- pp.WriteTime = ctx.Now().MilliSeconds();
- Responses.push_back(pp);
+ EmplaceResponse(std::move(pp), ctx);
continue;
}
@@ -5338,10 +5336,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
}
TString().swap(p.Msg.Data);
- pp.QuotedTime = WriteQuota->GetQuotedTime(ctx.Now()) - pp.QuotedTime; //change to duration
- pp.QueueTime = ctx.Now().MilliSeconds() - pp.QueueTime;
- pp.WriteTime = ctx.Now().MilliSeconds();
- Responses.push_back(pp);
+ EmplaceResponse(std::move(pp), ctx);
}
UpdateWriteBufferIsFullState(ctx.Now());
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 6379b7e14b..e68beb9347 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -80,7 +80,7 @@ private:
void ReplyOk(const TActorContext& ctx, const ui64 dst);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);
- void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, ui64 partitionQuotedTime, TDuration topicQuotedTime, ui64 queueTime, ui64 writeTime);
+ void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);
void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
void AnswerCurrentWrites(const TActorContext& ctx);
@@ -292,6 +292,12 @@ private:
void HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx);
void Initialize(const TActorContext& ctx);
+ template <typename T>
+ void EmplaceRequest(T&& body, const TActorContext& ctx) {
+ Requests.emplace_back(body, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now() - TInstant::Zero());
+ }
+ void EmplaceResponse(TMessage&& message, const TActorContext& ctx);
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h
index 9b709c990c..0dbe4d8af7 100644
--- a/ydb/core/persqueue/partition_types.h
+++ b/ydb/core/persqueue/partition_types.h
@@ -68,16 +68,16 @@ struct TMessage {
TSplitMessageGroupMsg
> Body;
- ui64 QuotedTime;
- ui64 QueueTime;
- ui64 WriteTime;
+ TDuration QuotedTime; // baseline for request and duration for response
+ TDuration QueueTime; // baseline for request and duration for response
+ TInstant WriteTimeBaseline;
template <typename T>
- explicit TMessage(T&& body, ui64 quotedTime, ui64 queueTime, ui64 writeTime)
+ explicit TMessage(T&& body, TDuration quotedTime, TDuration queueTime, TInstant writeTimeBaseline = TInstant::Zero())
: Body(std::forward<T>(body))
, QuotedTime(quotedTime)
, QueueTime(queueTime)
- , WriteTime(writeTime)
+ , WriteTimeBaseline(writeTimeBaseline)
{
}
diff --git a/ydb/core/persqueue/quota_tracker.cpp b/ydb/core/persqueue/quota_tracker.cpp
index 6fb005fd88..9d8d0722cf 100644
--- a/ydb/core/persqueue/quota_tracker.cpp
+++ b/ydb/core/persqueue/quota_tracker.cpp
@@ -7,7 +7,6 @@ namespace NKikimr::NPQ {
, SpeedPerSecond(speedPerSecond)
, LastUpdateTime(timestamp)
, MaxBurst(maxBurst)
- , QuotedTime(0)
{}
void TQuotaTracker::UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
@@ -17,14 +16,14 @@ namespace NKikimr::NPQ {
}
void TQuotaTracker::Update(const TInstant timestamp) {
- ui64 ms = (timestamp - LastUpdateTime).MilliSeconds();
+ TDuration diff = timestamp - LastUpdateTime;
LastUpdateTime = timestamp;
if (AvailableSize < 0) {
- QuotedTime += ms;
+ QuotedTime += diff;
}
- AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * ms / 1000, MaxBurst);
+ AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * diff.MicroSeconds() / 1000'000, MaxBurst);
}
bool TQuotaTracker::CanExaust(const TInstant timestamp) {
@@ -38,7 +37,7 @@ namespace NKikimr::NPQ {
Update(timestamp);
}
- ui64 TQuotaTracker::GetQuotedTime(const TInstant timestamp) {
+ TDuration TQuotaTracker::GetQuotedTime(const TInstant timestamp) {
Update(timestamp);
return QuotedTime;
}
diff --git a/ydb/core/persqueue/quota_tracker.h b/ydb/core/persqueue/quota_tracker.h
index 877a45f9e5..82dcfba9ec 100644
--- a/ydb/core/persqueue/quota_tracker.h
+++ b/ydb/core/persqueue/quota_tracker.h
@@ -14,7 +14,7 @@ namespace NKikimr::NPQ {
bool CanExaust(const TInstant timestamp) ;
void Exaust(const ui64 size, const TInstant timestamp);
- ui64 GetQuotedTime(const TInstant timestamp);
+ TDuration GetQuotedTime(const TInstant timestamp);
ui64 GetTotalSpeed() const;
private:
@@ -23,7 +23,7 @@ namespace NKikimr::NPQ {
TInstant LastUpdateTime;
ui64 MaxBurst;
- ui64 QuotedTime;
+ TDuration QuotedTime;
};
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin.txt b/ydb/core/persqueue/ut/CMakeLists.darwin.txt
index 3f2862a1b6..36255ac79e 100644
--- a/ydb/core/persqueue/ut/CMakeLists.darwin.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.darwin.txt
@@ -48,6 +48,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
index a3b285dc6f..30d91415d8 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
@@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux.txt b/ydb/core/persqueue/ut/CMakeLists.linux.txt
index 2fe115ddce..ce0f935969 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux.txt
@@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
diff --git a/ydb/core/persqueue/ut/quota_tracker_ut.cpp b/ydb/core/persqueue/ut/quota_tracker_ut.cpp
new file mode 100644
index 0000000000..48af429385
--- /dev/null
+++ b/ydb/core/persqueue/ut/quota_tracker_ut.cpp
@@ -0,0 +1,54 @@
+#include "quota_tracker.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NKikimr::NPQ {
+
+Y_UNIT_TEST_SUITE(TQuotaTracker) {
+
+Y_UNIT_TEST(TestSmallMessages) {
+ TInstant ts = TInstant::MilliSeconds(123456789);
+ TQuotaTracker quota(2_MB, 2_MB, ts);
+
+ UNIT_ASSERT(quota.CanExaust(ts));
+
+ quota.Exaust(2_MB - 1, ts);
+ ui64 blobSize = 500;
+ ui64 processedBlobs = 0;
+
+ for (ui32 i = 0; i < 100'000; ++i) { // 10 sec total
+ if (quota.CanExaust(ts)) {
+ quota.Exaust(blobSize, ts);
+ ++processedBlobs;
+ }
+ ts += TDuration::MicroSeconds(100);
+ }
+ Cerr << "processed_blobs=" << processedBlobs << " quoted_time=" << quota.GetQuotedTime(ts) << Endl;
+ UNIT_ASSERT_EQUAL(processedBlobs, 41800);
+ UNIT_ASSERT_EQUAL(quota.GetQuotedTime(ts), TDuration::MilliSeconds(9980));
+}
+
+Y_UNIT_TEST(TestBigMessages) {
+ TInstant ts = TInstant::MilliSeconds(123456789);
+ TQuotaTracker quota(2_MB, 2_MB, ts);
+
+ UNIT_ASSERT(quota.CanExaust(ts));
+
+ auto CannotExaustAfter = [&](TDuration diff) {
+ ts += diff;
+ UNIT_ASSERT_C(!quota.CanExaust(ts), TStringBuilder() << "at " << ts);
+ };
+
+ quota.Exaust(10_MB, ts);
+ CannotExaustAfter(TDuration::Zero());
+ CannotExaustAfter(TDuration::Seconds(4));
+
+ ts += TDuration::MilliSeconds(1);
+ UNIT_ASSERT(quota.CanExaust(ts));
+}
+
+} //Y_UNIT_TEST_SUITE
+
+} // namespace NKikimr::NPQ