aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxifos <xifos@yandex-team.ru>2022-02-10 16:52:23 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:23 +0300
commitee987daedfc7e930134260d36705299cec22c994 (patch)
tree8a369b005c0314144482022cf8e4d15213783ade
parentf075ef4e79295c591793b202bed63d5e6ae1f397 (diff)
downloadydb-ee987daedfc7e930134260d36705299cec22c994.tar.gz
Restoring authorship annotation for <xifos@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp2
-rw-r--r--ydb/core/persqueue/ownerinfo.cpp80
-rw-r--r--ydb/core/persqueue/ownerinfo.h114
-rw-r--r--ydb/core/persqueue/partition.cpp86
-rw-r--r--ydb/core/persqueue/partition.h12
-rw-r--r--ydb/core/persqueue/pq_ut.cpp80
-rw-r--r--ydb/core/persqueue/pq_ut.h88
-rw-r--r--ydb/core/persqueue/pq_ut_slow.cpp198
-rw-r--r--ydb/core/persqueue/sourceid.cpp68
-rw-r--r--ydb/core/persqueue/sourceid.h50
-rw-r--r--ydb/core/persqueue/sourceid_ut.cpp76
-rw-r--r--ydb/core/persqueue/ut/ya.make2
-rw-r--r--ydb/core/persqueue/ya.make4
-rw-r--r--ydb/core/protos/counters_pq.proto4
-rw-r--r--ydb/core/protos/counters_pq_labeled.proto4
-rw-r--r--ydb/core/protos/pqconfig.proto6
-rw-r--r--ydb/core/testlib/test_pq_client.h16
-rw-r--r--ydb/public/api/protos/draft/persqueue_error_codes.proto2
-rw-r--r--ydb/public/api/protos/persqueue_error_codes_v1.proto2
-rw-r--r--ydb/public/api/protos/ydb_persqueue_v1.proto8
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.h2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp4
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp30
24 files changed, 470 insertions, 470 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
index 237bba147b9..ded0ed3c66e 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
@@ -172,7 +172,7 @@ protected:
config->SetMaxCountInPartition(20000000);
config->SetMaxSizeInPartition(100 * 1024 * 1024);
config->SetLifetimeSeconds(0);
- config->SetSourceIdLifetimeSeconds(1*60*60);
+ config->SetSourceIdLifetimeSeconds(1*60*60);
config->SetMaxWriteInflightSize(90000000);
config->SetLowWatermark(6*1024*1024);
diff --git a/ydb/core/persqueue/ownerinfo.cpp b/ydb/core/persqueue/ownerinfo.cpp
index f2deb8fa14a..f08884f02c6 100644
--- a/ydb/core/persqueue/ownerinfo.cpp
+++ b/ydb/core/persqueue/ownerinfo.cpp
@@ -1,40 +1,40 @@
-#include "ownerinfo.h"
-#include <util/generic/guid.h>
-#include <util/string/escape.h>
-
-namespace NKikimr {
-namespace NPQ {
-
- void TOwnerInfo::GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender,
- const TString& topicName, const ui32 partition, const TActorContext& ctx) {
- TStringBuilder s;
- s << owner << "|" << CreateGuidAsString() << "_" << OwnerGeneration;
- ++OwnerGeneration;
- Y_VERIFY(OwnerCookie != s);
- OwnerCookie = s;
- NextMessageNo = 0;
- NeedResetOwner = false;
- PipeClient = pipeClient;
- if (Sender) {
- THolder<TEvPersQueue::TEvResponse> response = MakeHolder<TEvPersQueue::TEvResponse>();
- response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
- response->Record.SetErrorCode(NPersQueue::NErrorCode::BAD_REQUEST);
- response->Record.SetErrorReason("ownership session is killed by another session with id " + OwnerCookie);
- ctx.Send(Sender, response.Release());
- }
- Sender = sender;
- ReservedSize = 0;
- Requests.clear();
- //WaitToChageOwner not touched - they will wait for this owner to be dropped - this new owner must have force flag
- LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "new Cookie " << s << " generated for partition " << partition << " topic '" << topicName << "' owner " << EscapeC(owner));
- }
-
- TStringBuf TOwnerInfo::GetOwnerFromOwnerCookie(const TString& cookie) {
- auto pos = cookie.rfind('|');
- if (pos == TString::npos)
- pos = cookie.size();
- TStringBuf res = TStringBuf(cookie.c_str(), pos);
- return res;
- }
-} // NPQ
-} // NKikimr
+#include "ownerinfo.h"
+#include <util/generic/guid.h>
+#include <util/string/escape.h>
+
+namespace NKikimr {
+namespace NPQ {
+
+ void TOwnerInfo::GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender,
+ const TString& topicName, const ui32 partition, const TActorContext& ctx) {
+ TStringBuilder s;
+ s << owner << "|" << CreateGuidAsString() << "_" << OwnerGeneration;
+ ++OwnerGeneration;
+ Y_VERIFY(OwnerCookie != s);
+ OwnerCookie = s;
+ NextMessageNo = 0;
+ NeedResetOwner = false;
+ PipeClient = pipeClient;
+ if (Sender) {
+ THolder<TEvPersQueue::TEvResponse> response = MakeHolder<TEvPersQueue::TEvResponse>();
+ response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ response->Record.SetErrorCode(NPersQueue::NErrorCode::BAD_REQUEST);
+ response->Record.SetErrorReason("ownership session is killed by another session with id " + OwnerCookie);
+ ctx.Send(Sender, response.Release());
+ }
+ Sender = sender;
+ ReservedSize = 0;
+ Requests.clear();
+ //WaitToChageOwner not touched - they will wait for this owner to be dropped - this new owner must have force flag
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "new Cookie " << s << " generated for partition " << partition << " topic '" << topicName << "' owner " << EscapeC(owner));
+ }
+
+ TStringBuf TOwnerInfo::GetOwnerFromOwnerCookie(const TString& cookie) {
+ auto pos = cookie.rfind('|');
+ if (pos == TString::npos)
+ pos = cookie.size();
+ TStringBuf res = TStringBuf(cookie.c_str(), pos);
+ return res;
+ }
+} // NPQ
+} // NKikimr
diff --git a/ydb/core/persqueue/ownerinfo.h b/ydb/core/persqueue/ownerinfo.h
index a6f5aef7166..81ff37e5186 100644
--- a/ydb/core/persqueue/ownerinfo.h
+++ b/ydb/core/persqueue/ownerinfo.h
@@ -1,60 +1,60 @@
-#pragma once
+#pragma once
#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/core/persqueue/events/internal.h>
-#include <library/cpp/actors/core/actor.h>
-
-
-namespace NKikimr {
-namespace NPQ {
-
- struct TOwnerInfo {
- bool NeedResetOwner;
- bool SourceIdDeleted;
- TString OwnerCookie;
- ui64 NextMessageNo;
- ui32 OwnerGeneration;
- TActorId PipeClient;
- TActorId Sender;
-
- ui64 ReservedSize;
- std::deque<ui64> Requests;
-
- std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner;
-
- TOwnerInfo()
- : NeedResetOwner(true)
- , SourceIdDeleted(false)
- , NextMessageNo(0)
- , OwnerGeneration(0)
- , ReservedSize(0)
- {}
-
- static TStringBuf GetOwnerFromOwnerCookie(const TString& owner);
- void GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender,
- const TString& topicName, const ui32 partition, const TActorContext& ctx);
-
- void AddReserveRequest(ui64 size, bool lastRequest) {
- ReservedSize += size;
- if (!lastRequest) {
- Requests.push_back(size);
- } else {
- Y_VERIFY(!Requests.empty());
- Requests.back() += size;
- }
- }
-
- ui64 DecReservedSize() {
- //TODO: Y_VERIFY(!Requests.empty());
- if (Requests.empty())
- return 0;
- ui64 size = Requests.front();
- Requests.pop_front();
- ReservedSize -= size;
- return size;
- }
-
- TOwnerInfo(const TOwnerInfo&) = delete;
- };
-} //NPQ
-} // NKikimr
+#include <library/cpp/actors/core/actor.h>
+
+
+namespace NKikimr {
+namespace NPQ {
+
+ struct TOwnerInfo {
+ bool NeedResetOwner;
+ bool SourceIdDeleted;
+ TString OwnerCookie;
+ ui64 NextMessageNo;
+ ui32 OwnerGeneration;
+ TActorId PipeClient;
+ TActorId Sender;
+
+ ui64 ReservedSize;
+ std::deque<ui64> Requests;
+
+ std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner;
+
+ TOwnerInfo()
+ : NeedResetOwner(true)
+ , SourceIdDeleted(false)
+ , NextMessageNo(0)
+ , OwnerGeneration(0)
+ , ReservedSize(0)
+ {}
+
+ static TStringBuf GetOwnerFromOwnerCookie(const TString& owner);
+ void GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender,
+ const TString& topicName, const ui32 partition, const TActorContext& ctx);
+
+ void AddReserveRequest(ui64 size, bool lastRequest) {
+ ReservedSize += size;
+ if (!lastRequest) {
+ Requests.push_back(size);
+ } else {
+ Y_VERIFY(!Requests.empty());
+ Requests.back() += size;
+ }
+ }
+
+ ui64 DecReservedSize() {
+ //TODO: Y_VERIFY(!Requests.empty());
+ if (Requests.empty())
+ return 0;
+ ui64 size = Requests.front();
+ Requests.pop_front();
+ ReservedSize -= size;
+ return size;
+ }
+
+ TOwnerInfo(const TOwnerInfo&) = delete;
+ };
+} //NPQ
+} // NKikimr
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index cc8e2419cf2..573634b6e68 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1,8 +1,8 @@
#include "partition.h"
#include "event_helpers.h"
#include "read.h"
-#include "sourceid.h"
-#include "ownerinfo.h"
+#include "sourceid.h"
+#include "ownerinfo.h"
#include "mirrorer.h"
#include <ydb/core/base/appdata.h>
@@ -512,7 +512,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
ManageWriteTimestampEstimate = LocalDC;
}
-
+
WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();
CalcTopicWriteQuotaParams();
@@ -1075,13 +1075,13 @@ bool TPartition::DropOldStuff(TEvKeyValue::TEvRequest* request, bool hasWrites,
bool haveChanges = false;
if (DropOldData(request, hasWrites, ctx))
haveChanges = true;
- LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff");
+ LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff");
if (SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition, Config.GetPartitionConfig())) {
haveChanges = true;
- SourceIdStorage.MarkOwnersForDeletedSourceId(Owners);
- }
- LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff");
- LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString());
+ SourceIdStorage.MarkOwnersForDeletedSourceId(Owners);
+ }
+ LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff");
+ LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString());
return haveChanges;
}
@@ -1646,7 +1646,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
TStringBuilder ss;
ss << "SYNC INIT topic " << TopicName << " partitition " << Partition << " so " << StartOffset << " endOffset " << EndOffset << " Head " << Head << "\n";
- for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
+ for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n";
}
for (const auto& h : DataKeysBody) {
@@ -1674,7 +1674,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
if (!NewPartition) {
ctx.Send(Tablet, new TEvPQ::TEvInitComplete(Partition));
}
- for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
+ for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Init complete for topic '" << TopicName << "' Partition: " << Partition << " SourceId: " <<
s.first << " SeqNo: " << s.second.SeqNo << " offset: " << s.second.Offset << " MaxOffset: " << EndOffset);
}
@@ -1737,7 +1737,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
}
-THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) {
+THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) {
Y_VERIFY(ReservedSize >= it->second.ReservedSize);
ReservedSize -= it->second.ReservedSize;
UpdateWriteBufferIsFullState(ctx.Now());
@@ -2700,12 +2700,12 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx)
bool already = false;
- auto it = SourceIdStorage.GetInMemorySourceIds().find(s);
+ auto it = SourceIdStorage.GetInMemorySourceIds().find(s);
ui64 maxSeqNo = 0;
ui64 maxOffset = 0;
- if (it != SourceIdStorage.GetInMemorySourceIds().end()) {
+ if (it != SourceIdStorage.GetInMemorySourceIds().end()) {
maxSeqNo = it->second.SeqNo;
maxOffset = it->second.Offset;
if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) {
@@ -2721,12 +2721,12 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx)
}
if (!already && partNo + 1 == totalParts) {
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
- Counters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
+ Counters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp);
} else {
SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp));
}
-
+
Counters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1);
}
ReplyWrite(
@@ -3212,10 +3212,10 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx)
TDuration lifetimeNow = ctx.Now() - SourceIdStorage.MinAvailableTimestamp(ctx.Now());
if (lifetimeNow.MilliSeconds() != PartitionLabeledCounters.GetCounters()[METRIC_MIN_SID_LIFETIME].Get()) {
- haveChanges = true;
+ haveChanges = true;
PartitionLabeledCounters.GetCounters()[METRIC_MIN_SID_LIFETIME].Set(lifetimeNow.MilliSeconds());
- }
-
+ }
+
ui64 headGapSize = DataKeysBody.empty() ? 0 : (Head.Offset - (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()));
ui64 gapSize = GapSize + headGapSize;
ui32 gapsCount = GapOffsets.size() + (headGapSize ? 1 : 0);
@@ -3563,10 +3563,10 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
}
ui64 decReservedSize = 0;
- TStringBuf owner;
+ TStringBuf owner;
if (!mirroredPartition && !ev->Get()->IsDirectWrite) {
- owner = TOwnerInfo::GetOwnerFromOwnerCookie(ev->Get()->OwnerCookie);
+ owner = TOwnerInfo::GetOwnerFromOwnerCookie(ev->Get()->OwnerCookie);
auto it = Owners.find(owner);
if (it == Owners.end() || it->second.NeedResetOwner) {
@@ -3575,15 +3575,15 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
return;
}
- if (it->second.SourceIdDeleted) {
- ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED,
- TStringBuilder() << "Yours maximum written sequence number for session was deleted, need to recreate session. "
+ if (it->second.SourceIdDeleted) {
+ ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED,
+ TStringBuilder() << "Yours maximum written sequence number for session was deleted, need to recreate session. "
<< "Current count of sourceIds is " << SourceIdStorage.GetInMemorySourceIds().size() << " and limit is " << Config.GetPartitionConfig().GetSourceIdMaxCounts()
<< ", current minimum sourceid timestamp(Ms) is " << SourceIdStorage.MinAvailableTimestamp(ctx.Now()).MilliSeconds()
<< " and border timestamp(Ms) is " << ((ctx.Now() - TInstant::Seconds(Config.GetPartitionConfig().GetSourceIdLifetimeSeconds())).MilliSeconds()));
- return;
- }
-
+ return;
+ }
+
if (it->second.OwnerCookie != ev->Get()->OwnerCookie) {
ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE,
TStringBuilder() << "incorrect ownerCookie " << ev->Get()->OwnerCookie << ", must be " << it->second.OwnerCookie);
@@ -3633,10 +3633,10 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
TStringBuilder() << "too big message " << sz << " vs. maximum " << MAX_BLOB_PART_SIZE);
return;
}
-
- if (!mirroredPartition) {
- SourceIdStorage.RegisterSourceIdOwner(msg.SourceId, owner);
- }
+
+ if (!mirroredPartition) {
+ SourceIdStorage.RegisterSourceIdOwner(msg.SourceId, owner);
+ }
}
if (EndOffset - StartOffset >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxCountInPartition())
@@ -3951,7 +3951,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T
FailBadClient(ctx);
NewHead.Clear();
NewHead.Offset = EndOffset;
- sourceIdWriter.Clear();
+ sourceIdWriter.Clear();
request->Record.Clear();
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
CompactedKeys.clear();
@@ -4038,17 +4038,17 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
ui64 poffset = p.Offset ? *p.Offset : curOffset;
- auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId);
- auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId);
- if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) {
- bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end());
- bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end());
+ auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId);
+ auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId);
+ if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) {
+ bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end());
+ bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end());
if (poffset >= curOffset) {
LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, "Already written message. Topic: '" << TopicName << "' Partition: " << Partition
<< " SourceId: '" << EscapeC(p.Msg.SourceId) << "'. Message seqNo = " << p.Msg.SeqNo
- << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0)
- << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0) << " EndOffset " << EndOffset
+ << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0)
+ << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0) << " EndOffset " << EndOffset
<< " CurOffset " << curOffset << " offset " << poffset);
Counters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
@@ -4070,7 +4070,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
CancelAllWritesOnWrite(ctx, request,
TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo
<< " partNo: " << p.Msg.PartNo << " has incorrect offset " << poffset << ", must be at least " << curOffset,
- p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET);
+ p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET);
return false;
}
@@ -4083,7 +4083,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo
<< " partNo: " << p.Msg.PartNo << " has gap inside partitioned message, incorrect offset "
<< poffset << ", must be " << curOffset,
- p, sourceIdWriter);
+ p, sourceIdWriter);
return false;
}
curOffset = poffset;
@@ -4123,7 +4123,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
TString s;
if (!PartitionedBlob.IsNextPart(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.PartNo, &s)) {
//this must not be happen - client sends gaps, fail this client till the end
- CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter);
+ CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter);
//now no changes will leak
return false;
}
@@ -4473,7 +4473,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon
: ESourceIdFormat::Raw;
TSourceIdWriter sourceIdWriter(format);
- bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter);
+ bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter);
if (headCleared) {
Y_VERIFY(!CompactedKeys.empty() || Head.PackedSize == 0);
@@ -4483,7 +4483,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon
}
if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
- Y_VERIFY(sourceIdWriter.GetSourceIdsToWrite().empty());
+ Y_VERIFY(sourceIdWriter.GetSourceIdsToWrite().empty());
return request->Record.CmdWriteSize() > 0 || request->Record.CmdRenameSize() > 0 || request->Record.CmdDeleteRangeSize() > 0;
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 2eec26f2516..f11bdc168ae 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -17,8 +17,8 @@
#include "subscriber.h"
#include "header.h"
#include "user_info.h"
-#include "sourceid.h"
-#include "ownerinfo.h"
+#include "sourceid.h"
+#include "ownerinfo.h"
#include <variant>
@@ -162,7 +162,7 @@ private:
//will fill sourceIds, request and NewHead
//returns true if head is compacted
- bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter);
+ bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter);
std::pair<TKey, ui32> GetNewWriteKey(bool headCleared);
void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
@@ -200,7 +200,7 @@ private:
void InitUserInfoForImportantClients(const TActorContext& ctx);
- THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it, const TActorContext& ctx);
+ THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it, const TActorContext& ctx);
void Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx);
@@ -586,10 +586,10 @@ private:
bool InitDone;
const bool NewPartition;
- THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners;
+ THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners;
THashSet<TActorId> OwnerPipes;
- TSourceIdStorage SourceIdStorage;
+ TSourceIdStorage SourceIdStorage;
std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner;
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp
index 13a61229105..94c8b6cf3ad 100644
--- a/ydb/core/persqueue/pq_ut.cpp
+++ b/ydb/core/persqueue/pq_ut.cpp
@@ -1205,7 +1205,7 @@ Y_UNIT_TEST(TestWritePQ) {
}
-Y_UNIT_TEST(TestSourceIdDropByUserWrites) {
+Y_UNIT_TEST(TestSourceIdDropByUserWrites) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
return tc.InitialEventsFilter.Prepare();
@@ -1242,49 +1242,49 @@ Y_UNIT_TEST(TestSourceIdDropByUserWrites) {
}
-Y_UNIT_TEST(TestSourceIdDropBySourceIdCount) {
- TTestContext tc;
- RunTestWithReboots(tc.TabletIds, [&]() {
- return tc.InitialEventsFilter.Prepare();
- }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
- TFinalizer finalizer(tc);
- tc.Prepare(dispatchName, setup, activeZone);
- tc.Runtime->SetScheduledLimit(200);
-
- PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 2, 6*1024*1024, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now
-
- TVector<std::pair<ui64, TString>> data;
- activeZone = true;
-
- TString ss{32, '_'};
-
- data.push_back({1, ss});
- CmdWrite(0,"sourceid0", data, tc, false, {}, false, "", -1, 100);
- Cout << "written sourceid0" << Endl;
-
+Y_UNIT_TEST(TestSourceIdDropBySourceIdCount) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ tc.Runtime->SetScheduledLimit(200);
+
+ PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 2, 6*1024*1024, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now
+
+ TVector<std::pair<ui64, TString>> data;
+ activeZone = true;
+
+ TString ss{32, '_'};
+
+ data.push_back({1, ss});
+ CmdWrite(0,"sourceid0", data, tc, false, {}, false, "", -1, 100);
+ Cout << "written sourceid0" << Endl;
+
PQGetPartInfo(100, 101, tc);
-
- CmdWrite(0,"sourceidx", data, tc, false, {}, false, "", -1, 2000);
- Cout << "written sourceidx" << Endl;
- CmdWrite(0,"sourceid1", data, tc, false, {}, false, "", -1, 3000);
- Cout << "written sourceid1" << Endl;
+
+ CmdWrite(0,"sourceidx", data, tc, false, {}, false, "", -1, 2000);
+ Cout << "written sourceidx" << Endl;
+ CmdWrite(0,"sourceid1", data, tc, false, {}, false, "", -1, 3000);
+ Cout << "written sourceid1" << Endl;
PQGetPartInfo(2000, 3001, tc);
- //fail - already written
- CmdWrite(0,"sourceid0", data, tc, false);
- Cout << "written sourceid0" << Endl;
+ //fail - already written
+ CmdWrite(0,"sourceid0", data, tc, false);
+ Cout << "written sourceid0" << Endl;
PQGetPartInfo(2000, 3001, tc);
-
- for (ui64 i=0; i < 5; ++i) {
- CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, 3001 + i);
- Cout << "written sourceid_" << i << Endl;
- }
- CmdWrite(0,"sourceid0", data, tc, false);
- Cout << "written sourceid0" << Endl;
+
+ for (ui64 i=0; i < 5; ++i) {
+ CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, 3001 + i);
+ Cout << "written sourceid_" << i << Endl;
+ }
+ CmdWrite(0,"sourceid0", data, tc, false);
+ Cout << "written sourceid0" << Endl;
PQGetPartInfo(2000, 3007, tc);
- });
-}
-
-
+ });
+}
+
+
Y_UNIT_TEST(TestWriteOffsetWithBigMessage) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h
index 3db93bc1047..ec5eb0ef971 100644
--- a/ydb/core/persqueue/pq_ut.h
+++ b/ydb/core/persqueue/pq_ut.h
@@ -237,9 +237,9 @@ void PQTabletPrepare(ui32 mcip, ui64 msip, ui32 deleteTime, const TVector<std::p
config->SetMaxCountInPartition(mcip);
config->SetMaxSizeInPartition(msip);
config->SetLifetimeSeconds(deleteTime);
- config->SetSourceIdLifetimeSeconds(1*60*60);
- if (sidMaxCount > 0)
- config->SetSourceIdMaxCounts(sidMaxCount);
+ config->SetSourceIdLifetimeSeconds(1*60*60);
+ if (sidMaxCount > 0)
+ config->SetSourceIdMaxCounts(sidMaxCount);
config->SetMaxWriteInflightSize(90000000);
config->SetLowWatermark(lw);
@@ -946,47 +946,47 @@ void CmdUpdateWriteTimestamp(const ui32 partition, ui64 timestamp, TTestContext&
}
-TVector<TString> CmdSourceIdRead(TTestContext& tc) {
- TAutoPtr<IEventHandle> handle;
- TVector<TString> sourceIds;
- THolder<TEvKeyValue::TEvRequest> request;
- TEvKeyValue::TEvResponse *result;
-
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- request.Reset(new TEvKeyValue::TEvRequest);
- sourceIds.clear();
- auto read = request->Record.AddCmdReadRange();
- auto range = read->MutableRange();
- NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId);
- range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
- range->SetIncludeFrom(true);
- NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkSourceId);
- range->SetTo(ikeyTo.Data(), ikeyTo.Size());
- range->SetIncludeTo(true);
- Cout << request.Get()->ToString() << Endl;
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
- for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) {
- const auto &readResult = result->Record.GetReadRangeResult(idx);
- UNIT_ASSERT(readResult.HasStatus());
- UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK);
- for (size_t j = 0; j < readResult.PairSize(); ++j) {
- const auto& pair = readResult.GetPair(j);
- TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize());
- sourceIds.push_back(s);
- }
- }
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
- return sourceIds;
-}
+TVector<TString> CmdSourceIdRead(TTestContext& tc) {
+ TAutoPtr<IEventHandle> handle;
+ TVector<TString> sourceIds;
+ THolder<TEvKeyValue::TEvRequest> request;
+ TEvKeyValue::TEvResponse *result;
+
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ request.Reset(new TEvKeyValue::TEvRequest);
+ sourceIds.clear();
+ auto read = request->Record.AddCmdReadRange();
+ auto range = read->MutableRange();
+ NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId);
+ range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
+ range->SetIncludeFrom(true);
+ NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkSourceId);
+ range->SetTo(ikeyTo.Data(), ikeyTo.Size());
+ range->SetIncludeTo(true);
+ Cout << request.Get()->ToString() << Endl;
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) {
+ const auto &readResult = result->Record.GetReadRangeResult(idx);
+ UNIT_ASSERT(readResult.HasStatus());
+ UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK);
+ for (size_t j = 0; j < readResult.PairSize(); ++j) {
+ const auto& pair = readResult.GetPair(j);
+ TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize());
+ sourceIds.push_back(s);
+ }
+ }
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+ return sourceIds;
+}
void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets = {}, const ui32 maxTimeLagMs = 0, const ui64 readTimestampMs = 0) {
diff --git a/ydb/core/persqueue/pq_ut_slow.cpp b/ydb/core/persqueue/pq_ut_slow.cpp
index 51041c10459..cff394bf4dd 100644
--- a/ydb/core/persqueue/pq_ut_slow.cpp
+++ b/ydb/core/persqueue/pq_ut_slow.cpp
@@ -57,106 +57,106 @@ Y_UNIT_TEST(TestWriteVeryBigMessage) {
}
-Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
- TTestContext tc;
- RunTestWithReboots(tc.TabletIds, [&]() {
- return tc.InitialEventsFilter.Prepare();
- }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
- TFinalizer finalizer(tc);
- tc.Prepare(dispatchName, setup, activeZone);
- tc.Runtime->SetScheduledLimit(200);
+Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ tc.Runtime->SetScheduledLimit(200);
PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 2, 6*1024*1024, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now
-
- TVector<TString> writtenSourceIds;
-
- TVector<std::pair<ui64, TString>> data;
- activeZone = true;
-
- TString ss{32, '_'};
- ui32 NUM_SOURCEIDS = 100;
- data.push_back({1, ss});
- CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 100);
- ui32 offset = 1200;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- TString cookie = CmdSetOwner(0, tc).first;
- THolder<TEvPersQueue::TEvRequest> request;
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(0);
- req->SetOwnerCookie(cookie);
- req->SetMessageNo(0);
- req->SetCmdWriteOffset(offset);
- for (ui32 i=0; i < NUM_SOURCEIDS; ++i) {
- auto write = req->AddCmdWrite();
- write->SetSourceId(TStringBuilder() << "sourceid_" << i);
- write->SetSeqNo(0);
- write->SetData(ss);
- }
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){
- if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult())
- return true;
- return false;
- }); //there could be outgoing reads in TestReadSubscription test
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 2;
- continue;
- }
-
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) {
- ++offset;
- continue;
- }
-
- Cout << "Error code is " << static_cast<int>(result->Record.GetErrorCode()) << Endl;
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
-
- UNIT_ASSERT(result->Record.GetPartitionResponse().CmdWriteResultSize() == NUM_SOURCEIDS);
- for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) {
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten());
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset());
- }
- // for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) {
- // auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i);
- // UNIT_ASSERT(!result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten());
- // }
-
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- retriesLeft = 3;
- }
- }
- for (ui64 i=100; i < 104; ++i) {
- CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, offset + i);
- Cout << TInstant::Now() << " written sourceid_" << i << Endl;
- }
- for (ui64 i=100; i < 104; ++i) {
- writtenSourceIds.push_back(TStringBuilder() << "sourceid_" << i);
- }
- Cout << TInstant::Now() << " now check list of sourceIds" << Endl;
- auto sourceIds = CmdSourceIdRead(tc);
- UNIT_ASSERT(sourceIds.size() > 0);
- for (auto& s: sourceIds) {
- Cout << "try to find sourceId " << s << Endl;
- auto findIt = std::find(writtenSourceIds.begin(), writtenSourceIds.end(), s);
- UNIT_ASSERT_VALUES_UNEQUAL(findIt, writtenSourceIds.end());
- }
- Cout << TInstant::Now() << "All Ok" << Endl;
- });
-}
-
-
-
-
+
+ TVector<TString> writtenSourceIds;
+
+ TVector<std::pair<ui64, TString>> data;
+ activeZone = true;
+
+ TString ss{32, '_'};
+ ui32 NUM_SOURCEIDS = 100;
+ data.push_back({1, ss});
+ CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 100);
+ ui32 offset = 1200;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ TString cookie = CmdSetOwner(0, tc).first;
+ THolder<TEvPersQueue::TEvRequest> request;
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(0);
+ req->SetOwnerCookie(cookie);
+ req->SetMessageNo(0);
+ req->SetCmdWriteOffset(offset);
+ for (ui32 i=0; i < NUM_SOURCEIDS; ++i) {
+ auto write = req->AddCmdWrite();
+ write->SetSourceId(TStringBuilder() << "sourceid_" << i);
+ write->SetSeqNo(0);
+ write->SetData(ss);
+ }
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){
+ if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult())
+ return true;
+ return false;
+ }); //there could be outgoing reads in TestReadSubscription test
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 2;
+ continue;
+ }
+
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) {
+ ++offset;
+ continue;
+ }
+
+ Cout << "Error code is " << static_cast<int>(result->Record.GetErrorCode()) << Endl;
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+
+ UNIT_ASSERT(result->Record.GetPartitionResponse().CmdWriteResultSize() == NUM_SOURCEIDS);
+ for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) {
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten());
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset());
+ }
+ // for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) {
+ // auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i);
+ // UNIT_ASSERT(!result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten());
+ // }
+
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ retriesLeft = 3;
+ }
+ }
+ for (ui64 i=100; i < 104; ++i) {
+ CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, offset + i);
+ Cout << TInstant::Now() << " written sourceid_" << i << Endl;
+ }
+ for (ui64 i=100; i < 104; ++i) {
+ writtenSourceIds.push_back(TStringBuilder() << "sourceid_" << i);
+ }
+ Cout << TInstant::Now() << " now check list of sourceIds" << Endl;
+ auto sourceIds = CmdSourceIdRead(tc);
+ UNIT_ASSERT(sourceIds.size() > 0);
+ for (auto& s: sourceIds) {
+ Cout << "try to find sourceId " << s << Endl;
+ auto findIt = std::find(writtenSourceIds.begin(), writtenSourceIds.end(), s);
+ UNIT_ASSERT_VALUES_UNEQUAL(findIt, writtenSourceIds.end());
+ }
+ Cout << TInstant::Now() << "All Ok" << Endl;
+ });
+}
+
+
+
+
} // TKeyValueTest
} // NKikimr
diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp
index a63b2f6f705..1854b66fdb9 100644
--- a/ydb/core/persqueue/sourceid.cpp
+++ b/ydb/core/persqueue/sourceid.cpp
@@ -1,25 +1,25 @@
-#include "sourceid.h"
-#include "ownerinfo.h"
+#include "sourceid.h"
+#include "ownerinfo.h"
#include <ydb/core/persqueue/partition.h>
#include <util/generic/size_literals.h>
#include <algorithm>
-
-namespace NKikimr {
-namespace NPQ {
-
+
+namespace NKikimr {
+namespace NPQ {
+
static constexpr ui64 MAX_DELETE_COMMAND_SIZE = 10_MB;
static constexpr ui64 MAX_DELETE_COMMAND_COUNT = 1000;
-
+
template <typename T>
T ReadAs(const TString& data, ui32& pos) {
auto result = *((T*)(data.c_str() + pos));
pos += sizeof(T);
return result;
}
-
+
template <typename T>
T ReadAs(const TString& data, ui32& pos, const T& default_) {
if (pos + sizeof(T) <= data.size()) {
@@ -28,13 +28,13 @@ T ReadAs(const TString& data, ui32& pos, const T& default_) {
return default_;
}
}
-
+
template <typename T>
void Write(T value, TBuffer& data, ui32& pos) {
memcpy(data.Data() + pos, &value, sizeof(T));
pos += sizeof(T);
}
-
+
TSourceIdInfo::EState TSourceIdInfo::ConvertState(NKikimrPQ::TMessageGroupInfo::EState value) {
switch (value) {
case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED:
@@ -214,7 +214,7 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) {
bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config) {
TVector<std::pair<ui64, TString>> toDelOffsets;
-
+
ui64 maxDeleteSourceIds = 0;
if (InMemorySourceIds.size() > config.GetSourceIdMaxCounts()) {
maxDeleteSourceIds = InMemorySourceIds.size() - config.GetSourceIdMaxCounts();
@@ -253,20 +253,20 @@ bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInsta
reachedLimit = true;
break;
}
- }
-
+ }
+
if (reachedLimit) {
break; // Check here size to prevent crashing in protobuf verify -max size is 25Mb's and no more then 100K operations
// but even 10Mbs sounds too big here.
// Rest of SourceIds will be deleted in next DropOldSourceIds calls, whitch will be made
// right after complete of current deletion.
- }
+ }
} else {
// there are no space for new sourcID to delete, stop check sourceIds
break;
- }
- }
-
+ }
+ }
+
for (auto& t : toDelOffsets) {
// delete sourceId from memory
size_t res = InMemorySourceIds.erase(t.second);
@@ -298,7 +298,7 @@ void TSourceIdStorage::LoadSourceIdInfo(const TString& key, const TString& data,
Y_FAIL_S("Unexpected mark: " << (char)mark);
}
}
-
+
void TSourceIdStorage::LoadRawSourceIdInfo(const TString& key, const TString& data, TInstant now) {
Y_VERIFY(key.size() >= TKeyPrefix::MarkedSize());
Y_VERIFY(key[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkSourceId);
@@ -316,7 +316,7 @@ void TSourceIdStorage::LoadProtoSourceIdInfo(const TString& key, const TString&
RegisterSourceIdInfo(key.substr(TKeyPrefix::MarkedSize()), TSourceIdInfo::Parse(proto), true);
}
-
+
void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load) {
auto it = InMemorySourceIds.find(sourceId);
if (it != InMemorySourceIds.end()) {
@@ -324,51 +324,51 @@ void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdIn
const auto res = SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId));
Y_VERIFY(res == 1);
}
-
+
const auto offset = sourceIdInfo.Offset;
InMemorySourceIds[sourceId] = std::move(sourceIdInfo);
const bool res = SourceIdsByOffset.emplace(offset, sourceId).second;
Y_VERIFY(res);
}
-
+
void TSourceIdStorage::RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie) {
if (ownerCookie == "default") {
// cookie for legacy http protocol - skip it, we use one cookie for all write sessions
return;
- }
+ }
// owner cookie could be deleted in main object - so we should copy it
SourceIdOwners[sourceId] = ownerCookie;
}
-
+
void TSourceIdStorage::MarkOwnersForDeletedSourceId(THashMap<TString, TOwnerInfo>& owners) {
for (auto& sourceid : OwnersToDrop) {
auto it = owners.find(sourceid);
if (it != owners.end()) {
it->second.SourceIdDeleted = true;
- }
- }
-
+ }
+ }
+
OwnersToDrop.clear();
}
-
+
TInstant TSourceIdStorage::MinAvailableTimestamp(TInstant now) const {
TInstant ds = now;
if (!SourceIdsByOffset.empty()) {
auto it = InMemorySourceIds.find(SourceIdsByOffset.begin()->second);
Y_VERIFY(it != InMemorySourceIds.end());
ds = Min(ds, it->second.WriteTimestamp);
- }
+ }
return ds;
}
-
+
/// TSourceIdWriter
TSourceIdWriter::TSourceIdWriter(ESourceIdFormat format)
: Format(format)
{
}
-
+
void TSourceIdWriter::DeregisterSourceId(const TString& sourceId) {
Deregistrations.insert(sourceId);
}
@@ -418,12 +418,12 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti
for (const auto& [sourceId, sourceIdInfo] : Registrations) {
FillKeyAndData(Format, sourceId, sourceIdInfo, key, data);
FillWrite(key, data, *request->Record.AddCmdWrite());
- }
+ }
for (const auto& sourceId : Deregistrations) {
FillDelete(partition, sourceId, *request->Record.AddCmdDeleteRange());
}
}
-
-} // NPQ
-} // NKikimr
+
+} // NPQ
+} // NKikimr
diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h
index 62b9c2834e2..3bd899b495b 100644
--- a/ydb/core/persqueue/sourceid.h
+++ b/ydb/core/persqueue/sourceid.h
@@ -1,4 +1,4 @@
-#pragma once
+#pragma once
#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/core/persqueue/key.h>
@@ -6,11 +6,11 @@
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/protos/pqconfig.pb.h>
-#include <util/generic/set.h>
-
-namespace NKikimr {
-namespace NPQ {
-
+#include <util/generic/set.h>
+
+namespace NKikimr {
+namespace NPQ {
+
enum class ESourceIdFormat: ui8 {
Raw = 0,
Proto = 1,
@@ -47,37 +47,37 @@ struct TSourceIdInfo {
// Proto format
static TSourceIdInfo Parse(const NKikimrPQ::TMessageGroupInfo& proto);
void Serialize(NKikimrPQ::TMessageGroupInfo& proto) const;
-
+
bool operator==(const TSourceIdInfo& rhs) const;
void Out(IOutputStream& out) const;
-
+
bool IsExpired(TDuration ttl, TInstant now) const;
-
+
}; // TSourceIdInfo
-
+
using TSourceIdMap = THashMap<TString, TSourceIdInfo>;
-
+
class TSourceIdStorage {
public:
const TSourceIdMap& GetInMemorySourceIds() const {
return InMemorySourceIds;
}
-
+
template <typename... Args>
void RegisterSourceId(const TString& sourceId, Args&&... args) {
RegisterSourceIdInfo(sourceId, TSourceIdInfo(std::forward<Args>(args)...), false);
}
-
+
void DeregisterSourceId(const TString& sourceId);
void LoadSourceIdInfo(const TString& key, const TString& data, TInstant now);
bool DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config);
-
+
void RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie);
void MarkOwnersForDeletedSourceId(THashMap<TString, TOwnerInfo>& owners);
-
+
TInstant MinAvailableTimestamp(TInstant now) const;
-
+
private:
void LoadRawSourceIdInfo(const TString& key, const TString& data, TInstant now);
void LoadProtoSourceIdInfo(const TString& key, const TString& data);
@@ -88,28 +88,28 @@ private:
THashMap<TString, TString> SourceIdOwners;
TVector<TString> OwnersToDrop;
TSet<std::pair<ui64, TString>> SourceIdsByOffset;
-
+
}; // TSourceIdStorage
class TSourceIdWriter {
public:
explicit TSourceIdWriter(ESourceIdFormat format);
-
+
const TSourceIdMap& GetSourceIdsToWrite() const {
return Registrations;
}
-
+
template <typename... Args>
void RegisterSourceId(const TString& sourceId, Args&&... args) {
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);
}
-
+
void DeregisterSourceId(const TString& sourceId);
void Clear();
-
+
void FillRequest(TEvKeyValue::TEvRequest* request, ui32 partition);
static void FillKeyAndData(ESourceIdFormat format, const TString& sourceId, const TSourceIdInfo& sourceIdInfo, TKeyPrefix& key, TBuffer& data);
-
+
private:
static void FillRawData(const TSourceIdInfo& sourceIdInfo, TBuffer& data);
static void FillProtoData(const TSourceIdInfo& sourceIdInfo, TBuffer& data);
@@ -119,11 +119,11 @@ private:
const ESourceIdFormat Format;
TSourceIdMap Registrations;
THashSet<TString> Deregistrations;
-
+
}; // TSourceIdWriter
-} // NPQ
-} // NKikimr
+} // NPQ
+} // NKikimr
Y_DECLARE_OUT_SPEC(inline, NKikimr::NPQ::TSourceIdInfo, out, value) {
return value.Out(out);
diff --git a/ydb/core/persqueue/sourceid_ut.cpp b/ydb/core/persqueue/sourceid_ut.cpp
index 46e03b2d3f2..eeef8f24a33 100644
--- a/ydb/core/persqueue/sourceid_ut.cpp
+++ b/ydb/core/persqueue/sourceid_ut.cpp
@@ -1,26 +1,26 @@
-#include "sourceid.h"
+#include "sourceid.h"
#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/core/persqueue/key.h>
-#include <library/cpp/testing/unittest/registar.h>
-
+#include <library/cpp/testing/unittest/registar.h>
+
namespace NKikimr {
namespace NPQ {
-
+
Y_UNIT_TEST_SUITE(TSourceIdTests) {
inline static TString TestSourceId(ui64 idx = 0) {
return TStringBuilder() << "testSourceId" << idx;
}
-
+
inline static TStringBuf TestOwner(TStringBuf sourceId) {
UNIT_ASSERT(sourceId.SkipPrefix("test"));
return sourceId;
}
-
+
static constexpr ui32 TestPartition = 1;
- Y_UNIT_TEST(SourceIdWriterAddMessage) {
+ Y_UNIT_TEST(SourceIdWriterAddMessage) {
TSourceIdWriter writer(ESourceIdFormat::Raw);
const auto sourceId = TestSourceId(1);
@@ -43,9 +43,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
auto it = writer.GetSourceIdsToWrite().find(anotherSourceId);
UNIT_ASSERT_EQUAL(it, writer.GetSourceIdsToWrite().end());
}
- }
-
- Y_UNIT_TEST(SourceIdWriterClean) {
+ }
+
+ Y_UNIT_TEST(SourceIdWriterClean) {
TSourceIdWriter writer(ESourceIdFormat::Raw);
writer.RegisterSourceId(TestSourceId(), 1, 10, TInstant::Seconds(100));
@@ -53,13 +53,13 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
writer.Clear();
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 0);
- }
-
- Y_UNIT_TEST(SourceIdWriterFormCommand) {
+ }
+
+ Y_UNIT_TEST(SourceIdWriterFormCommand) {
TSourceIdWriter writer(ESourceIdFormat::Raw);
auto actualRequest = MakeHolder<TEvKeyValue::TEvRequest>();
auto expectedRequest = MakeHolder<TEvKeyValue::TEvRequest>();
-
+
const auto sourceId = TestSourceId(1);
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
writer.RegisterSourceId(sourceId, sourceIdInfo);
@@ -67,7 +67,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
{
TKeyPrefix key(TKeyPrefix::TypeInfo, TestPartition, TKeyPrefix::MarkSourceId);
TBuffer data;
-
+
TSourceIdWriter::FillKeyAndData(ESourceIdFormat::Raw, sourceId, sourceIdInfo, key, data);
auto write = expectedRequest.Get()->Record.AddCmdWrite();
write->SetKey(key.Data(), key.Size());
@@ -97,9 +97,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
writer.FillRequest(actualRequest.Get(), TestPartition + 1);
UNIT_ASSERT_VALUES_EQUAL(actualRequest.Get()->ToString(), expectedRequest.Get()->ToString());
}
- }
-
- Y_UNIT_TEST(SourceIdStorageAdd) {
+ }
+
+ Y_UNIT_TEST(SourceIdStorageAdd) {
TSourceIdStorage storage;
const auto sourceId = TestSourceId(1);
@@ -127,8 +127,8 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
UNIT_ASSERT_UNEQUAL(it, storage.GetInMemorySourceIds().end());
UNIT_ASSERT_VALUES_EQUAL(it->second, anotherSourceIdInfo);
}
- }
-
+ }
+
void SourceIdStorageParseAndAdd(TKeyPrefix::EMark mark, ESourceIdFormat format) {
const auto sourceId = TestSourceId();
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
@@ -148,8 +148,8 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
auto it = storage.GetInMemorySourceIds().find(sourceId);
UNIT_ASSERT_UNEQUAL(it, storage.GetInMemorySourceIds().end());
UNIT_ASSERT_VALUES_EQUAL(it->second, sourceIdInfo);
- }
-
+ }
+
Y_UNIT_TEST(SourceIdStorageParseAndAdd) {
SourceIdStorageParseAndAdd(TKeyPrefix::MarkSourceId, ESourceIdFormat::Raw);
}
@@ -158,7 +158,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
SourceIdStorageParseAndAdd(TKeyPrefix::MarkProtoSourceId, ESourceIdFormat::Proto);
}
- Y_UNIT_TEST(SourceIdStorageMinDS) {
+ Y_UNIT_TEST(SourceIdStorageMinDS) {
const auto now = TInstant::Now();
TSourceIdStorage storage;
@@ -181,9 +181,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
auto ds = storage.MinAvailableTimestamp(now);
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(200));
}
- }
-
- Y_UNIT_TEST(SourceIdStorageTestClean) {
+ }
+
+ Y_UNIT_TEST(SourceIdStorageTestClean) {
TSourceIdStorage storage;
for (ui64 i = 1; i <= 10000; ++i) {
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
@@ -222,9 +222,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 360); // more 360 sources are dropped
}
- }
-
- Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) {
+ }
+
+ Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) {
TSourceIdStorage storage;
for (ui64 i = 1; i <= 10000; ++i) {
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
@@ -254,9 +254,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
auto it = storage.GetInMemorySourceIds().find(TestSourceId(101)); // 101th source is alive
UNIT_ASSERT_UNEQUAL(it, storage.GetInMemorySourceIds().end());
}
- }
-
- Y_UNIT_TEST(SourceIdStorageComplexDelete) {
+ }
+
+ Y_UNIT_TEST(SourceIdStorageComplexDelete) {
TSourceIdStorage storage;
for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
@@ -289,9 +289,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 5); // more 5 sources are dropped
}
- }
-
- Y_UNIT_TEST(SourceIdStorageDeleteAndOwnersMark) {
+ }
+
+ Y_UNIT_TEST(SourceIdStorageDeleteAndOwnersMark) {
TSourceIdStorage storage;
THashMap<TString, TOwnerInfo> owners;
for (ui64 i = 1; i <= 2; ++i) { // add two sources
@@ -322,9 +322,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
UNIT_ASSERT_UNEQUAL(it, owners.end());
UNIT_ASSERT_VALUES_EQUAL(it->second.SourceIdDeleted, false);
}
- }
-
+ }
+
} // TSourceIdTests
-
+
} // NPQ
} // NKikimr
diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make
index 7cbb0b94960..2fc79953c5d 100644
--- a/ydb/core/persqueue/ut/ya.make
+++ b/ydb/core/persqueue/ut/ya.make
@@ -35,7 +35,7 @@ SRCS(
pq_ut.cpp
type_codecs_ut.cpp
pq_ut.h
- sourceid_ut.cpp
+ sourceid_ut.cpp
user_info_ut.cpp
)
diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make
index 50e89676ef9..a8a27dbd7d8 100644
--- a/ydb/core/persqueue/ya.make
+++ b/ydb/core/persqueue/ya.make
@@ -15,10 +15,10 @@ SRCS(
pq.cpp
pq_database.cpp
pq_impl.cpp
- sourceid.cpp
+ sourceid.cpp
mirrorer.cpp
mirrorer.h
- ownerinfo.cpp
+ ownerinfo.cpp
partition.cpp
pq_l2_cache.cpp
read_balancer.cpp
diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto
index 72337581351..6cc0f12db86 100644
--- a/ydb/core/protos/counters_pq.proto
+++ b/ydb/core/protos/counters_pq.proto
@@ -230,7 +230,7 @@ enum EPartitionLabeledCounters {
METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX}];
- METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN}];
-
+ METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN}];
+
}
diff --git a/ydb/core/protos/counters_pq_labeled.proto b/ydb/core/protos/counters_pq_labeled.proto
index d67890bf276..cd57661bedf 100644
--- a/ydb/core/protos/counters_pq_labeled.proto
+++ b/ydb/core/protos/counters_pq_labeled.proto
@@ -45,7 +45,7 @@ enum EClientLabeledCounters {
METRIC_WRITE_TIME_LAG = 24 [(LabeledCounterOpts) = {Name: "WriteTimeLagMsByLastRead" AggrFunc : EAF_MAX}];
METRIC_LAST_READ_TIME = 25 [(LabeledCounterOpts) = {Name: "TimeSinceLastReadMs" AggrFunc : EAF_MIN Type : CT_TIMELAG}];
- METRIC_READ_QUOTA_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX}];
+ METRIC_READ_QUOTA_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX}];
}
@@ -94,7 +94,7 @@ enum EPartitionLabeledCounters {
METRIC_TOTAL_QUOTA_SPEED_4 = 30 [(LabeledCounterOpts) = {Name: "QuotaBytesPerDay" AggrFunc : EAF_SUM}];
METRIC_MAX_QUOTA_SPEED_4 = 31 [(LabeledCounterOpts) = {Name: "QuotaBytesMaxPerDay" AggrFunc : EAF_MAX}];
- METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX}];
+ METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX}];
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 7c859274496..7eae280ff3d 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -184,9 +184,9 @@ message TPartitionConfig {
// List of ClientIds, for which we don't delete data until they are read by these clients
repeated string ImportantClientId = 4; //can be empty
optional uint32 LowWatermark = 5 [default = 6291456]; //6Mb, compact blobs if they at least this big.
- optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days
- optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition
- // default - generate 5 new source id each second during 14 days
+ optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days
+ optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition
+ // default - generate 5 new source id each second during 14 days
optional uint64 WriteSpeedInBytesPerSecond = 7 [default = 50000000];
optional uint64 BurstSize = 8 [default = 50000000];
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index f0599232508..a77dd534cff 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -122,9 +122,9 @@ struct TRequestCreatePQ {
std::optional<NKikimrPQ::TMirrorPartitionConfig> MirrorFrom;
- ui64 SourceIdMaxCount;
- ui64 SourceIdLifetime;
-
+ ui64 SourceIdMaxCount;
+ ui64 SourceIdLifetime;
+
THolder<NMsgBusProxy::TBusPersQueue> GetRequest() const {
THolder<NMsgBusProxy::TBusPersQueue> request(new NMsgBusProxy::TBusPersQueue);
auto req = request->Record.MutableMetaRequest()->MutableCmdCreateTopic();
@@ -134,8 +134,8 @@ struct TRequestCreatePQ {
if (CacheSize)
config->SetCacheSize(CacheSize);
config->MutablePartitionConfig()->SetLifetimeSeconds(LifetimeS);
- config->MutablePartitionConfig()->SetSourceIdLifetimeSeconds(SourceIdLifetime);
- config->MutablePartitionConfig()->SetSourceIdMaxCounts(SourceIdMaxCount);
+ config->MutablePartitionConfig()->SetSourceIdLifetimeSeconds(SourceIdLifetime);
+ config->MutablePartitionConfig()->SetSourceIdMaxCounts(SourceIdMaxCount);
config->MutablePartitionConfig()->SetLowWatermark(LowWatermark);
config->SetLocalDC(true);
@@ -925,9 +925,9 @@ public:
ui64 readSpeed = 200000000,
TVector<TString> rr = {},
TVector<TString> important = {},
- std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {},
- ui64 sourceIdMaxCount = 6000000,
- ui64 sourceIdLifetime = 86400
+ std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {},
+ ui64 sourceIdMaxCount = 6000000,
+ ui64 sourceIdLifetime = 86400
) {
Y_VERIFY(name.StartsWith("rt3."));
diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto
index 8b2a0986314..d2d1bdab596 100644
--- a/ydb/public/api/protos/draft/persqueue_error_codes.proto
+++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto
@@ -9,7 +9,7 @@ enum EErrorCode {
OVERLOAD = 2;
BAD_REQUEST = 3;
WRONG_COOKIE = 4;
- SOURCEID_DELETED = 24;
+ SOURCEID_DELETED = 24;
WRITE_ERROR_PARTITION_IS_FULL = 5;
WRITE_ERROR_DISK_IS_FULL = 15;
diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto
index c6658305c2e..06a96c37180 100644
--- a/ydb/public/api/protos/persqueue_error_codes_v1.proto
+++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto
@@ -12,7 +12,7 @@ enum ErrorCode {
OVERLOAD = 500002;
BAD_REQUEST = 500003;
WRONG_COOKIE = 500004;
- SOURCEID_DELETED = 500024;
+ SOURCEID_DELETED = 500024;
WRITE_ERROR_PARTITION_IS_FULL = 500005;
WRITE_ERROR_DISK_IS_FULL = 500015;
diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto
index 93a7fb6c79d..2407388342a 100644
--- a/ydb/public/api/protos/ydb_persqueue_v1.proto
+++ b/ydb/public/api/protos/ydb_persqueue_v1.proto
@@ -1137,10 +1137,10 @@ message TopicSettings {
int32 partitions_count = 1 [(value) = "> 0"];
// How long data in partition should be stored. Must be greater than 0 and less than limit for this database. Default limit - 36 hours.
int64 retention_period_ms = 2 [(value) = "> 0"];
- // How long last written seqno for message group should be stored. Must be greater then retention_period_ms and less then limit for this database. Default limit - 16 days.
- int64 message_group_seqno_retention_period_ms = 12 [(value) = ">= 0"];
- // How many last written seqno for various message groups should be stored per partition. Must be less than limit for this database. Default limit - 6*10^6 values.
- int64 max_partition_message_groups_seqno_stored = 13 [(value) = ">= 0"];
+ // How long last written seqno for message group should be stored. Must be greater then retention_period_ms and less then limit for this database. Default limit - 16 days.
+ int64 message_group_seqno_retention_period_ms = 12 [(value) = ">= 0"];
+ // How many last written seqno for various message groups should be stored per partition. Must be less than limit for this database. Default limit - 6*10^6 values.
+ int64 max_partition_message_groups_seqno_stored = 13 [(value) = ">= 0"];
// Max format version that is allowed for writers. Must be value from enum FormatVersion.
// Writes with greater format version are forbiden.
Format supported_format = 3;
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 654edfcfcd4..a438914f388 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -2996,7 +2996,7 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue:
case WRITE_ERROR_PARTITION_IS_FULL:
case WRITE_ERROR_DISK_IS_FULL:
case WRITE_ERROR_BAD_OFFSET:
- case SOURCEID_DELETED:
+ case SOURCEID_DELETED:
case READ_ERROR_IN_PROGRESS:
case READ_ERROR_TOO_SMALL_OFFSET:
case READ_ERROR_TOO_BIG_OFFSET:
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h
index 2ef8c503e4d..f4c26ee8ca1 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.h
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.h
@@ -14,7 +14,7 @@
namespace NKikimr::NGRpcProxy::V1 {
-static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000;
+static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000;
inline TActorId GetPQSchemaServiceActorID() {
return TActorId(0, "PQSchmSvc");
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index 3038cc82a61..fa37956f6db 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -756,9 +756,9 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev
const auto& error = result.GetError();
if (error.Response.HasErrorCode()) {
return CloseSession("status is not ok: " + error.Response.GetErrorReason(), ConvertOldCode(error.Response.GetErrorCode()), ctx);
- } else {
+ } else {
return CloseSession("error at writer init: " + error.Reason, PersQueue::ErrorCode::ERROR, ctx);
- }
+ }
}
OwnerCookie = result.GetResult().OwnerCookie;
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 3bc9b217285..17906a8cfb1 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -375,20 +375,20 @@ namespace {
}
Y_UNIT_TEST(StoreNoMoreThanXSourceIDs) {
- ui16 X = 4;
- ui64 SOURCEID_COUNT_DELETE_BATCH_SIZE = 100;
+ ui16 X = 4;
+ ui64 SOURCEID_COUNT_DELETE_BATCH_SIZE = 100;
NPersQueue::TTestServer server;
server.EnableLogs({ NKikimrServices::PERSQUEUE, NKikimrServices::PQ_WRITE_PROXY });
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "", 200000000, {}, {}, {}, X, 86400);
-
+
auto driver = server.AnnoyingClient->GetDriver();
-
+
auto writer1 = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "test source ID " << 0, {}, {}, true);
writer1->GetInitSeqNo();
-
+
bool res = writer1->Write("x", 1);
UNIT_ASSERT(res);
-
+
Sleep(TDuration::Seconds(5));
auto writer2 = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "test source ID Del " << 0);
@@ -404,7 +404,7 @@ namespace {
Sleep(TDuration::Seconds(5));
- for (ui32 nProducer=1; nProducer < X + SOURCEID_COUNT_DELETE_BATCH_SIZE + 1; ++nProducer) {
+ for (ui32 nProducer=1; nProducer < X + SOURCEID_COUNT_DELETE_BATCH_SIZE + 1; ++nProducer) {
auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "test source ID " << nProducer);
res = writer->Write("x", 1);
@@ -415,19 +415,19 @@ namespace {
res = writer->Close(TDuration::Seconds(10));
UNIT_ASSERT(res);
- }
-
+ }
+
res = writer1->Write("x", 3);
UNIT_ASSERT(res);
res = writer1->Close(TDuration::Seconds(5));
UNIT_ASSERT(res);
-
+
res = writer2->Write("x", 4);
UNIT_ASSERT(res);
-
+
UNIT_ASSERT(!writer2->Close());
- }
-
+ }
+
Y_UNIT_TEST(EachMessageGetsExactlyOneAcknowledgementInCorrectOrder) {
NPersQueue::TTestServer server;
server.AnnoyingClient->CreateTopic("rt3.dc1--topic", 1);
@@ -2494,7 +2494,7 @@ namespace {
MaxSizeInPartition: 234
LifetimeSeconds: 86400
ImportantClientId: "consumer"
- SourceIdLifetimeSeconds: 1382400
+ SourceIdLifetimeSeconds: 1382400
WriteSpeedInBytesPerSecond: 123
BurstSize: 1234
NumChannels: 10
@@ -2534,7 +2534,7 @@ namespace {
ExplicitChannelProfiles {
PoolKind: "test"
}
- SourceIdMaxCounts: 6000000
+ SourceIdMaxCounts: 6000000
}
Version: 3
LocalDC: true