diff options
author | xifos <xifos@yandex-team.ru> | 2022-02-10 16:52:23 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:23 +0300 |
commit | ee987daedfc7e930134260d36705299cec22c994 (patch) | |
tree | 8a369b005c0314144482022cf8e4d15213783ade | |
parent | f075ef4e79295c591793b202bed63d5e6ae1f397 (diff) | |
download | ydb-ee987daedfc7e930134260d36705299cec22c994.tar.gz |
Restoring authorship annotation for <xifos@yandex-team.ru>. Commit 1 of 2.
24 files changed, 470 insertions, 470 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp index 237bba147b9..ded0ed3c66e 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp @@ -172,7 +172,7 @@ protected: config->SetMaxCountInPartition(20000000); config->SetMaxSizeInPartition(100 * 1024 * 1024); config->SetLifetimeSeconds(0); - config->SetSourceIdLifetimeSeconds(1*60*60); + config->SetSourceIdLifetimeSeconds(1*60*60); config->SetMaxWriteInflightSize(90000000); config->SetLowWatermark(6*1024*1024); diff --git a/ydb/core/persqueue/ownerinfo.cpp b/ydb/core/persqueue/ownerinfo.cpp index f2deb8fa14a..f08884f02c6 100644 --- a/ydb/core/persqueue/ownerinfo.cpp +++ b/ydb/core/persqueue/ownerinfo.cpp @@ -1,40 +1,40 @@ -#include "ownerinfo.h" -#include <util/generic/guid.h> -#include <util/string/escape.h> - -namespace NKikimr { -namespace NPQ { - - void TOwnerInfo::GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender, - const TString& topicName, const ui32 partition, const TActorContext& ctx) { - TStringBuilder s; - s << owner << "|" << CreateGuidAsString() << "_" << OwnerGeneration; - ++OwnerGeneration; - Y_VERIFY(OwnerCookie != s); - OwnerCookie = s; - NextMessageNo = 0; - NeedResetOwner = false; - PipeClient = pipeClient; - if (Sender) { - THolder<TEvPersQueue::TEvResponse> response = MakeHolder<TEvPersQueue::TEvResponse>(); - response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); - response->Record.SetErrorCode(NPersQueue::NErrorCode::BAD_REQUEST); - response->Record.SetErrorReason("ownership session is killed by another session with id " + OwnerCookie); - ctx.Send(Sender, response.Release()); - } - Sender = sender; - ReservedSize = 0; - Requests.clear(); - //WaitToChageOwner not touched - they will wait for this owner to be dropped - this new owner must have force flag - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "new Cookie " << s << " generated for partition " << partition << " topic '" << topicName << "' owner " << EscapeC(owner)); - } - - TStringBuf TOwnerInfo::GetOwnerFromOwnerCookie(const TString& cookie) { - auto pos = cookie.rfind('|'); - if (pos == TString::npos) - pos = cookie.size(); - TStringBuf res = TStringBuf(cookie.c_str(), pos); - return res; - } -} // NPQ -} // NKikimr +#include "ownerinfo.h" +#include <util/generic/guid.h> +#include <util/string/escape.h> + +namespace NKikimr { +namespace NPQ { + + void TOwnerInfo::GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender, + const TString& topicName, const ui32 partition, const TActorContext& ctx) { + TStringBuilder s; + s << owner << "|" << CreateGuidAsString() << "_" << OwnerGeneration; + ++OwnerGeneration; + Y_VERIFY(OwnerCookie != s); + OwnerCookie = s; + NextMessageNo = 0; + NeedResetOwner = false; + PipeClient = pipeClient; + if (Sender) { + THolder<TEvPersQueue::TEvResponse> response = MakeHolder<TEvPersQueue::TEvResponse>(); + response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + response->Record.SetErrorCode(NPersQueue::NErrorCode::BAD_REQUEST); + response->Record.SetErrorReason("ownership session is killed by another session with id " + OwnerCookie); + ctx.Send(Sender, response.Release()); + } + Sender = sender; + ReservedSize = 0; + Requests.clear(); + //WaitToChageOwner not touched - they will wait for this owner to be dropped - this new owner must have force flag + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "new Cookie " << s << " generated for partition " << partition << " topic '" << topicName << "' owner " << EscapeC(owner)); + } + + TStringBuf TOwnerInfo::GetOwnerFromOwnerCookie(const TString& cookie) { + auto pos = cookie.rfind('|'); + if (pos == TString::npos) + pos = cookie.size(); + TStringBuf res = TStringBuf(cookie.c_str(), pos); + return res; + } +} // NPQ +} // NKikimr diff --git a/ydb/core/persqueue/ownerinfo.h b/ydb/core/persqueue/ownerinfo.h index a6f5aef7166..81ff37e5186 100644 --- a/ydb/core/persqueue/ownerinfo.h +++ b/ydb/core/persqueue/ownerinfo.h @@ -1,60 +1,60 @@ -#pragma once +#pragma once #include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/core/persqueue/events/internal.h> -#include <library/cpp/actors/core/actor.h> - - -namespace NKikimr { -namespace NPQ { - - struct TOwnerInfo { - bool NeedResetOwner; - bool SourceIdDeleted; - TString OwnerCookie; - ui64 NextMessageNo; - ui32 OwnerGeneration; - TActorId PipeClient; - TActorId Sender; - - ui64 ReservedSize; - std::deque<ui64> Requests; - - std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner; - - TOwnerInfo() - : NeedResetOwner(true) - , SourceIdDeleted(false) - , NextMessageNo(0) - , OwnerGeneration(0) - , ReservedSize(0) - {} - - static TStringBuf GetOwnerFromOwnerCookie(const TString& owner); - void GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender, - const TString& topicName, const ui32 partition, const TActorContext& ctx); - - void AddReserveRequest(ui64 size, bool lastRequest) { - ReservedSize += size; - if (!lastRequest) { - Requests.push_back(size); - } else { - Y_VERIFY(!Requests.empty()); - Requests.back() += size; - } - } - - ui64 DecReservedSize() { - //TODO: Y_VERIFY(!Requests.empty()); - if (Requests.empty()) - return 0; - ui64 size = Requests.front(); - Requests.pop_front(); - ReservedSize -= size; - return size; - } - - TOwnerInfo(const TOwnerInfo&) = delete; - }; -} //NPQ -} // NKikimr +#include <library/cpp/actors/core/actor.h> + + +namespace NKikimr { +namespace NPQ { + + struct TOwnerInfo { + bool NeedResetOwner; + bool SourceIdDeleted; + TString OwnerCookie; + ui64 NextMessageNo; + ui32 OwnerGeneration; + TActorId PipeClient; + TActorId Sender; + + ui64 ReservedSize; + std::deque<ui64> Requests; + + std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner; + + TOwnerInfo() + : NeedResetOwner(true) + , SourceIdDeleted(false) + , NextMessageNo(0) + , OwnerGeneration(0) + , ReservedSize(0) + {} + + static TStringBuf GetOwnerFromOwnerCookie(const TString& owner); + void GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender, + const TString& topicName, const ui32 partition, const TActorContext& ctx); + + void AddReserveRequest(ui64 size, bool lastRequest) { + ReservedSize += size; + if (!lastRequest) { + Requests.push_back(size); + } else { + Y_VERIFY(!Requests.empty()); + Requests.back() += size; + } + } + + ui64 DecReservedSize() { + //TODO: Y_VERIFY(!Requests.empty()); + if (Requests.empty()) + return 0; + ui64 size = Requests.front(); + Requests.pop_front(); + ReservedSize -= size; + return size; + } + + TOwnerInfo(const TOwnerInfo&) = delete; + }; +} //NPQ +} // NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index cc8e2419cf2..573634b6e68 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1,8 +1,8 @@ #include "partition.h" #include "event_helpers.h" #include "read.h" -#include "sourceid.h" -#include "ownerinfo.h" +#include "sourceid.h" +#include "ownerinfo.h" #include "mirrorer.h" #include <ydb/core/base/appdata.h> @@ -512,7 +512,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co ManageWriteTimestampEstimate = LocalDC; } - + WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero(); CalcTopicWriteQuotaParams(); @@ -1075,13 +1075,13 @@ bool TPartition::DropOldStuff(TEvKeyValue::TEvRequest* request, bool hasWrites, bool haveChanges = false; if (DropOldData(request, hasWrites, ctx)) haveChanges = true; - LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff"); + LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff"); if (SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition, Config.GetPartitionConfig())) { haveChanges = true; - SourceIdStorage.MarkOwnersForDeletedSourceId(Owners); - } - LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff"); - LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString()); + SourceIdStorage.MarkOwnersForDeletedSourceId(Owners); + } + LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff"); + LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString()); return haveChanges; } @@ -1646,7 +1646,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { TStringBuilder ss; ss << "SYNC INIT topic " << TopicName << " partitition " << Partition << " so " << StartOffset << " endOffset " << EndOffset << " Head " << Head << "\n"; - for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { + for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n"; } for (const auto& h : DataKeysBody) { @@ -1674,7 +1674,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { if (!NewPartition) { ctx.Send(Tablet, new TEvPQ::TEvInitComplete(Partition)); } - for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { + for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Init complete for topic '" << TopicName << "' Partition: " << Partition << " SourceId: " << s.first << " SeqNo: " << s.second.SeqNo << " offset: " << s.second.Offset << " MaxOffset: " << EndOffset); } @@ -1737,7 +1737,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c } -THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) { +THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) { Y_VERIFY(ReservedSize >= it->second.ReservedSize); ReservedSize -= it->second.ReservedSize; UpdateWriteBufferIsFullState(ctx.Now()); @@ -2700,12 +2700,12 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) bool already = false; - auto it = SourceIdStorage.GetInMemorySourceIds().find(s); + auto it = SourceIdStorage.GetInMemorySourceIds().find(s); ui64 maxSeqNo = 0; ui64 maxOffset = 0; - if (it != SourceIdStorage.GetInMemorySourceIds().end()) { + if (it != SourceIdStorage.GetInMemorySourceIds().end()) { maxSeqNo = it->second.SeqNo; maxOffset = it->second.Offset; if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { @@ -2721,12 +2721,12 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) } if (!already && partNo + 1 == totalParts) { if (it == SourceIdStorage.GetInMemorySourceIds().end()) { - Counters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); + Counters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp); } else { SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp)); } - + Counters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1); } ReplyWrite( @@ -3212,10 +3212,10 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx) TDuration lifetimeNow = ctx.Now() - SourceIdStorage.MinAvailableTimestamp(ctx.Now()); if (lifetimeNow.MilliSeconds() != PartitionLabeledCounters.GetCounters()[METRIC_MIN_SID_LIFETIME].Get()) { - haveChanges = true; + haveChanges = true; PartitionLabeledCounters.GetCounters()[METRIC_MIN_SID_LIFETIME].Set(lifetimeNow.MilliSeconds()); - } - + } + ui64 headGapSize = DataKeysBody.empty() ? 0 : (Head.Offset - (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount())); ui64 gapSize = GapSize + headGapSize; ui32 gapsCount = GapOffsets.size() + (headGapSize ? 1 : 0); @@ -3563,10 +3563,10 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c } ui64 decReservedSize = 0; - TStringBuf owner; + TStringBuf owner; if (!mirroredPartition && !ev->Get()->IsDirectWrite) { - owner = TOwnerInfo::GetOwnerFromOwnerCookie(ev->Get()->OwnerCookie); + owner = TOwnerInfo::GetOwnerFromOwnerCookie(ev->Get()->OwnerCookie); auto it = Owners.find(owner); if (it == Owners.end() || it->second.NeedResetOwner) { @@ -3575,15 +3575,15 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c return; } - if (it->second.SourceIdDeleted) { - ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, - TStringBuilder() << "Yours maximum written sequence number for session was deleted, need to recreate session. " + if (it->second.SourceIdDeleted) { + ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, + TStringBuilder() << "Yours maximum written sequence number for session was deleted, need to recreate session. " << "Current count of sourceIds is " << SourceIdStorage.GetInMemorySourceIds().size() << " and limit is " << Config.GetPartitionConfig().GetSourceIdMaxCounts() << ", current minimum sourceid timestamp(Ms) is " << SourceIdStorage.MinAvailableTimestamp(ctx.Now()).MilliSeconds() << " and border timestamp(Ms) is " << ((ctx.Now() - TInstant::Seconds(Config.GetPartitionConfig().GetSourceIdLifetimeSeconds())).MilliSeconds())); - return; - } - + return; + } + if (it->second.OwnerCookie != ev->Get()->OwnerCookie) { ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE, TStringBuilder() << "incorrect ownerCookie " << ev->Get()->OwnerCookie << ", must be " << it->second.OwnerCookie); @@ -3633,10 +3633,10 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c TStringBuilder() << "too big message " << sz << " vs. maximum " << MAX_BLOB_PART_SIZE); return; } - - if (!mirroredPartition) { - SourceIdStorage.RegisterSourceIdOwner(msg.SourceId, owner); - } + + if (!mirroredPartition) { + SourceIdStorage.RegisterSourceIdOwner(msg.SourceId, owner); + } } if (EndOffset - StartOffset >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxCountInPartition()) @@ -3951,7 +3951,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T FailBadClient(ctx); NewHead.Clear(); NewHead.Offset = EndOffset; - sourceIdWriter.Clear(); + sourceIdWriter.Clear(); request->Record.Clear(); PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); CompactedKeys.clear(); @@ -4038,17 +4038,17 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const ui64 poffset = p.Offset ? *p.Offset : curOffset; - auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId); - auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId); - if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) { - bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end()); - bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end()); + auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId); + auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId); + if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) { + bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end()); + bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end()); if (poffset >= curOffset) { LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, "Already written message. Topic: '" << TopicName << "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId) << "'. Message seqNo = " << p.Msg.SeqNo - << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0) - << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0) << " EndOffset " << EndOffset + << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0) + << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0) << " EndOffset " << EndOffset << " CurOffset " << curOffset << " offset " << poffset); Counters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); @@ -4070,7 +4070,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const CancelAllWritesOnWrite(ctx, request, TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo << " partNo: " << p.Msg.PartNo << " has incorrect offset " << poffset << ", must be at least " << curOffset, - p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET); + p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET); return false; } @@ -4083,7 +4083,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo << " partNo: " << p.Msg.PartNo << " has gap inside partitioned message, incorrect offset " << poffset << ", must be " << curOffset, - p, sourceIdWriter); + p, sourceIdWriter); return false; } curOffset = poffset; @@ -4123,7 +4123,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TString s; if (!PartitionedBlob.IsNextPart(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.PartNo, &s)) { //this must not be happen - client sends gaps, fail this client till the end - CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter); + CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter); //now no changes will leak return false; } @@ -4473,7 +4473,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon : ESourceIdFormat::Raw; TSourceIdWriter sourceIdWriter(format); - bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter); + bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter); if (headCleared) { Y_VERIFY(!CompactedKeys.empty() || Head.PackedSize == 0); @@ -4483,7 +4483,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon } if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed - Y_VERIFY(sourceIdWriter.GetSourceIdsToWrite().empty()); + Y_VERIFY(sourceIdWriter.GetSourceIdsToWrite().empty()); return request->Record.CmdWriteSize() > 0 || request->Record.CmdRenameSize() > 0 || request->Record.CmdDeleteRangeSize() > 0; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 2eec26f2516..f11bdc168ae 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -17,8 +17,8 @@ #include "subscriber.h" #include "header.h" #include "user_info.h" -#include "sourceid.h" -#include "ownerinfo.h" +#include "sourceid.h" +#include "ownerinfo.h" #include <variant> @@ -162,7 +162,7 @@ private: //will fill sourceIds, request and NewHead //returns true if head is compacted - bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter); + bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter); std::pair<TKey, ui32> GetNewWriteKey(bool headCleared); void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx); @@ -200,7 +200,7 @@ private: void InitUserInfoForImportantClients(const TActorContext& ctx); - THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it, const TActorContext& ctx); + THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it, const TActorContext& ctx); void Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx); @@ -586,10 +586,10 @@ private: bool InitDone; const bool NewPartition; - THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners; + THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners; THashSet<TActorId> OwnerPipes; - TSourceIdStorage SourceIdStorage; + TSourceIdStorage SourceIdStorage; std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner; diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp index 13a61229105..94c8b6cf3ad 100644 --- a/ydb/core/persqueue/pq_ut.cpp +++ b/ydb/core/persqueue/pq_ut.cpp @@ -1205,7 +1205,7 @@ Y_UNIT_TEST(TestWritePQ) { } -Y_UNIT_TEST(TestSourceIdDropByUserWrites) { +Y_UNIT_TEST(TestSourceIdDropByUserWrites) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { return tc.InitialEventsFilter.Prepare(); @@ -1242,49 +1242,49 @@ Y_UNIT_TEST(TestSourceIdDropByUserWrites) { } -Y_UNIT_TEST(TestSourceIdDropBySourceIdCount) { - TTestContext tc; - RunTestWithReboots(tc.TabletIds, [&]() { - return tc.InitialEventsFilter.Prepare(); - }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { - TFinalizer finalizer(tc); - tc.Prepare(dispatchName, setup, activeZone); - tc.Runtime->SetScheduledLimit(200); - - PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 2, 6*1024*1024, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now - - TVector<std::pair<ui64, TString>> data; - activeZone = true; - - TString ss{32, '_'}; - - data.push_back({1, ss}); - CmdWrite(0,"sourceid0", data, tc, false, {}, false, "", -1, 100); - Cout << "written sourceid0" << Endl; - +Y_UNIT_TEST(TestSourceIdDropBySourceIdCount) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + tc.Runtime->SetScheduledLimit(200); + + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 2, 6*1024*1024, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now + + TVector<std::pair<ui64, TString>> data; + activeZone = true; + + TString ss{32, '_'}; + + data.push_back({1, ss}); + CmdWrite(0,"sourceid0", data, tc, false, {}, false, "", -1, 100); + Cout << "written sourceid0" << Endl; + PQGetPartInfo(100, 101, tc); - - CmdWrite(0,"sourceidx", data, tc, false, {}, false, "", -1, 2000); - Cout << "written sourceidx" << Endl; - CmdWrite(0,"sourceid1", data, tc, false, {}, false, "", -1, 3000); - Cout << "written sourceid1" << Endl; + + CmdWrite(0,"sourceidx", data, tc, false, {}, false, "", -1, 2000); + Cout << "written sourceidx" << Endl; + CmdWrite(0,"sourceid1", data, tc, false, {}, false, "", -1, 3000); + Cout << "written sourceid1" << Endl; PQGetPartInfo(2000, 3001, tc); - //fail - already written - CmdWrite(0,"sourceid0", data, tc, false); - Cout << "written sourceid0" << Endl; + //fail - already written + CmdWrite(0,"sourceid0", data, tc, false); + Cout << "written sourceid0" << Endl; PQGetPartInfo(2000, 3001, tc); - - for (ui64 i=0; i < 5; ++i) { - CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, 3001 + i); - Cout << "written sourceid_" << i << Endl; - } - CmdWrite(0,"sourceid0", data, tc, false); - Cout << "written sourceid0" << Endl; + + for (ui64 i=0; i < 5; ++i) { + CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, 3001 + i); + Cout << "written sourceid_" << i << Endl; + } + CmdWrite(0,"sourceid0", data, tc, false); + Cout << "written sourceid0" << Endl; PQGetPartInfo(2000, 3007, tc); - }); -} - - + }); +} + + Y_UNIT_TEST(TestWriteOffsetWithBigMessage) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h index 3db93bc1047..ec5eb0ef971 100644 --- a/ydb/core/persqueue/pq_ut.h +++ b/ydb/core/persqueue/pq_ut.h @@ -237,9 +237,9 @@ void PQTabletPrepare(ui32 mcip, ui64 msip, ui32 deleteTime, const TVector<std::p config->SetMaxCountInPartition(mcip); config->SetMaxSizeInPartition(msip); config->SetLifetimeSeconds(deleteTime); - config->SetSourceIdLifetimeSeconds(1*60*60); - if (sidMaxCount > 0) - config->SetSourceIdMaxCounts(sidMaxCount); + config->SetSourceIdLifetimeSeconds(1*60*60); + if (sidMaxCount > 0) + config->SetSourceIdMaxCounts(sidMaxCount); config->SetMaxWriteInflightSize(90000000); config->SetLowWatermark(lw); @@ -946,47 +946,47 @@ void CmdUpdateWriteTimestamp(const ui32 partition, ui64 timestamp, TTestContext& } -TVector<TString> CmdSourceIdRead(TTestContext& tc) { - TAutoPtr<IEventHandle> handle; - TVector<TString> sourceIds; - THolder<TEvKeyValue::TEvRequest> request; - TEvKeyValue::TEvResponse *result; - - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - request.Reset(new TEvKeyValue::TEvRequest); - sourceIds.clear(); - auto read = request->Record.AddCmdReadRange(); - auto range = read->MutableRange(); - NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId); - range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); - range->SetIncludeFrom(true); - NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkSourceId); - range->SetTo(ikeyTo.Data(), ikeyTo.Size()); - range->SetIncludeTo(true); - Cout << request.Get()->ToString() << Endl; - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); - for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) { - const auto &readResult = result->Record.GetReadRangeResult(idx); - UNIT_ASSERT(readResult.HasStatus()); - UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK); - for (size_t j = 0; j < readResult.PairSize(); ++j) { - const auto& pair = readResult.GetPair(j); - TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize()); - sourceIds.push_back(s); - } - } - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } - return sourceIds; -} +TVector<TString> CmdSourceIdRead(TTestContext& tc) { + TAutoPtr<IEventHandle> handle; + TVector<TString> sourceIds; + THolder<TEvKeyValue::TEvRequest> request; + TEvKeyValue::TEvResponse *result; + + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + request.Reset(new TEvKeyValue::TEvRequest); + sourceIds.clear(); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId); + range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); + range->SetIncludeFrom(true); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkSourceId); + range->SetTo(ikeyTo.Data(), ikeyTo.Size()); + range->SetIncludeTo(true); + Cout << request.Get()->ToString() << Endl; + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) { + const auto &readResult = result->Record.GetReadRangeResult(idx); + UNIT_ASSERT(readResult.HasStatus()); + UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK); + for (size_t j = 0; j < readResult.PairSize(); ++j) { + const auto& pair = readResult.GetPair(j); + TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize()); + sourceIds.push_back(s); + } + } + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } + return sourceIds; +} void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets = {}, const ui32 maxTimeLagMs = 0, const ui64 readTimestampMs = 0) { diff --git a/ydb/core/persqueue/pq_ut_slow.cpp b/ydb/core/persqueue/pq_ut_slow.cpp index 51041c10459..cff394bf4dd 100644 --- a/ydb/core/persqueue/pq_ut_slow.cpp +++ b/ydb/core/persqueue/pq_ut_slow.cpp @@ -57,106 +57,106 @@ Y_UNIT_TEST(TestWriteVeryBigMessage) { } -Y_UNIT_TEST(TestOnDiskStoredSourceIds) { - TTestContext tc; - RunTestWithReboots(tc.TabletIds, [&]() { - return tc.InitialEventsFilter.Prepare(); - }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { - TFinalizer finalizer(tc); - tc.Prepare(dispatchName, setup, activeZone); - tc.Runtime->SetScheduledLimit(200); +Y_UNIT_TEST(TestOnDiskStoredSourceIds) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + tc.Runtime->SetScheduledLimit(200); PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 2, 6*1024*1024, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now - - TVector<TString> writtenSourceIds; - - TVector<std::pair<ui64, TString>> data; - activeZone = true; - - TString ss{32, '_'}; - ui32 NUM_SOURCEIDS = 100; - data.push_back({1, ss}); - CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 100); - ui32 offset = 1200; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - TString cookie = CmdSetOwner(0, tc).first; - THolder<TEvPersQueue::TEvRequest> request; - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(0); - req->SetOwnerCookie(cookie); - req->SetMessageNo(0); - req->SetCmdWriteOffset(offset); - for (ui32 i=0; i < NUM_SOURCEIDS; ++i) { - auto write = req->AddCmdWrite(); - write->SetSourceId(TStringBuilder() << "sourceid_" << i); - write->SetSeqNo(0); - write->SetData(ss); - } - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ - if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult()) - return true; - return false; - }); //there could be outgoing reads in TestReadSubscription test - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 2; - continue; - } - - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) { - ++offset; - continue; - } - - Cout << "Error code is " << static_cast<int>(result->Record.GetErrorCode()) << Endl; - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - - UNIT_ASSERT(result->Record.GetPartitionResponse().CmdWriteResultSize() == NUM_SOURCEIDS); - for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) { - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten()); - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset()); - } - // for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) { - // auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i); - // UNIT_ASSERT(!result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten()); - // } - - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - retriesLeft = 3; - } - } - for (ui64 i=100; i < 104; ++i) { - CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, offset + i); - Cout << TInstant::Now() << " written sourceid_" << i << Endl; - } - for (ui64 i=100; i < 104; ++i) { - writtenSourceIds.push_back(TStringBuilder() << "sourceid_" << i); - } - Cout << TInstant::Now() << " now check list of sourceIds" << Endl; - auto sourceIds = CmdSourceIdRead(tc); - UNIT_ASSERT(sourceIds.size() > 0); - for (auto& s: sourceIds) { - Cout << "try to find sourceId " << s << Endl; - auto findIt = std::find(writtenSourceIds.begin(), writtenSourceIds.end(), s); - UNIT_ASSERT_VALUES_UNEQUAL(findIt, writtenSourceIds.end()); - } - Cout << TInstant::Now() << "All Ok" << Endl; - }); -} - - - - + + TVector<TString> writtenSourceIds; + + TVector<std::pair<ui64, TString>> data; + activeZone = true; + + TString ss{32, '_'}; + ui32 NUM_SOURCEIDS = 100; + data.push_back({1, ss}); + CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 100); + ui32 offset = 1200; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + TString cookie = CmdSetOwner(0, tc).first; + THolder<TEvPersQueue::TEvRequest> request; + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(0); + req->SetOwnerCookie(cookie); + req->SetMessageNo(0); + req->SetCmdWriteOffset(offset); + for (ui32 i=0; i < NUM_SOURCEIDS; ++i) { + auto write = req->AddCmdWrite(); + write->SetSourceId(TStringBuilder() << "sourceid_" << i); + write->SetSeqNo(0); + write->SetData(ss); + } + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ + if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult()) + return true; + return false; + }); //there could be outgoing reads in TestReadSubscription test + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 2; + continue; + } + + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) { + ++offset; + continue; + } + + Cout << "Error code is " << static_cast<int>(result->Record.GetErrorCode()) << Endl; + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + + UNIT_ASSERT(result->Record.GetPartitionResponse().CmdWriteResultSize() == NUM_SOURCEIDS); + for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) { + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten()); + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset()); + } + // for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) { + // auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i); + // UNIT_ASSERT(!result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten()); + // } + + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + retriesLeft = 3; + } + } + for (ui64 i=100; i < 104; ++i) { + CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, offset + i); + Cout << TInstant::Now() << " written sourceid_" << i << Endl; + } + for (ui64 i=100; i < 104; ++i) { + writtenSourceIds.push_back(TStringBuilder() << "sourceid_" << i); + } + Cout << TInstant::Now() << " now check list of sourceIds" << Endl; + auto sourceIds = CmdSourceIdRead(tc); + UNIT_ASSERT(sourceIds.size() > 0); + for (auto& s: sourceIds) { + Cout << "try to find sourceId " << s << Endl; + auto findIt = std::find(writtenSourceIds.begin(), writtenSourceIds.end(), s); + UNIT_ASSERT_VALUES_UNEQUAL(findIt, writtenSourceIds.end()); + } + Cout << TInstant::Now() << "All Ok" << Endl; + }); +} + + + + } // TKeyValueTest } // NKikimr diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index a63b2f6f705..1854b66fdb9 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -1,25 +1,25 @@ -#include "sourceid.h" -#include "ownerinfo.h" +#include "sourceid.h" +#include "ownerinfo.h" #include <ydb/core/persqueue/partition.h> #include <util/generic/size_literals.h> #include <algorithm> - -namespace NKikimr { -namespace NPQ { - + +namespace NKikimr { +namespace NPQ { + static constexpr ui64 MAX_DELETE_COMMAND_SIZE = 10_MB; static constexpr ui64 MAX_DELETE_COMMAND_COUNT = 1000; - + template <typename T> T ReadAs(const TString& data, ui32& pos) { auto result = *((T*)(data.c_str() + pos)); pos += sizeof(T); return result; } - + template <typename T> T ReadAs(const TString& data, ui32& pos, const T& default_) { if (pos + sizeof(T) <= data.size()) { @@ -28,13 +28,13 @@ T ReadAs(const TString& data, ui32& pos, const T& default_) { return default_; } } - + template <typename T> void Write(T value, TBuffer& data, ui32& pos) { memcpy(data.Data() + pos, &value, sizeof(T)); pos += sizeof(T); } - + TSourceIdInfo::EState TSourceIdInfo::ConvertState(NKikimrPQ::TMessageGroupInfo::EState value) { switch (value) { case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED: @@ -214,7 +214,7 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) { bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config) { TVector<std::pair<ui64, TString>> toDelOffsets; - + ui64 maxDeleteSourceIds = 0; if (InMemorySourceIds.size() > config.GetSourceIdMaxCounts()) { maxDeleteSourceIds = InMemorySourceIds.size() - config.GetSourceIdMaxCounts(); @@ -253,20 +253,20 @@ bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInsta reachedLimit = true; break; } - } - + } + if (reachedLimit) { break; // Check here size to prevent crashing in protobuf verify -max size is 25Mb's and no more then 100K operations // but even 10Mbs sounds too big here. // Rest of SourceIds will be deleted in next DropOldSourceIds calls, whitch will be made // right after complete of current deletion. - } + } } else { // there are no space for new sourcID to delete, stop check sourceIds break; - } - } - + } + } + for (auto& t : toDelOffsets) { // delete sourceId from memory size_t res = InMemorySourceIds.erase(t.second); @@ -298,7 +298,7 @@ void TSourceIdStorage::LoadSourceIdInfo(const TString& key, const TString& data, Y_FAIL_S("Unexpected mark: " << (char)mark); } } - + void TSourceIdStorage::LoadRawSourceIdInfo(const TString& key, const TString& data, TInstant now) { Y_VERIFY(key.size() >= TKeyPrefix::MarkedSize()); Y_VERIFY(key[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkSourceId); @@ -316,7 +316,7 @@ void TSourceIdStorage::LoadProtoSourceIdInfo(const TString& key, const TString& RegisterSourceIdInfo(key.substr(TKeyPrefix::MarkedSize()), TSourceIdInfo::Parse(proto), true); } - + void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load) { auto it = InMemorySourceIds.find(sourceId); if (it != InMemorySourceIds.end()) { @@ -324,51 +324,51 @@ void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdIn const auto res = SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId)); Y_VERIFY(res == 1); } - + const auto offset = sourceIdInfo.Offset; InMemorySourceIds[sourceId] = std::move(sourceIdInfo); const bool res = SourceIdsByOffset.emplace(offset, sourceId).second; Y_VERIFY(res); } - + void TSourceIdStorage::RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie) { if (ownerCookie == "default") { // cookie for legacy http protocol - skip it, we use one cookie for all write sessions return; - } + } // owner cookie could be deleted in main object - so we should copy it SourceIdOwners[sourceId] = ownerCookie; } - + void TSourceIdStorage::MarkOwnersForDeletedSourceId(THashMap<TString, TOwnerInfo>& owners) { for (auto& sourceid : OwnersToDrop) { auto it = owners.find(sourceid); if (it != owners.end()) { it->second.SourceIdDeleted = true; - } - } - + } + } + OwnersToDrop.clear(); } - + TInstant TSourceIdStorage::MinAvailableTimestamp(TInstant now) const { TInstant ds = now; if (!SourceIdsByOffset.empty()) { auto it = InMemorySourceIds.find(SourceIdsByOffset.begin()->second); Y_VERIFY(it != InMemorySourceIds.end()); ds = Min(ds, it->second.WriteTimestamp); - } + } return ds; } - + /// TSourceIdWriter TSourceIdWriter::TSourceIdWriter(ESourceIdFormat format) : Format(format) { } - + void TSourceIdWriter::DeregisterSourceId(const TString& sourceId) { Deregistrations.insert(sourceId); } @@ -418,12 +418,12 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti for (const auto& [sourceId, sourceIdInfo] : Registrations) { FillKeyAndData(Format, sourceId, sourceIdInfo, key, data); FillWrite(key, data, *request->Record.AddCmdWrite()); - } + } for (const auto& sourceId : Deregistrations) { FillDelete(partition, sourceId, *request->Record.AddCmdDeleteRange()); } } - -} // NPQ -} // NKikimr + +} // NPQ +} // NKikimr diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 62b9c2834e2..3bd899b495b 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/core/persqueue/key.h> @@ -6,11 +6,11 @@ #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/protos/pqconfig.pb.h> -#include <util/generic/set.h> - -namespace NKikimr { -namespace NPQ { - +#include <util/generic/set.h> + +namespace NKikimr { +namespace NPQ { + enum class ESourceIdFormat: ui8 { Raw = 0, Proto = 1, @@ -47,37 +47,37 @@ struct TSourceIdInfo { // Proto format static TSourceIdInfo Parse(const NKikimrPQ::TMessageGroupInfo& proto); void Serialize(NKikimrPQ::TMessageGroupInfo& proto) const; - + bool operator==(const TSourceIdInfo& rhs) const; void Out(IOutputStream& out) const; - + bool IsExpired(TDuration ttl, TInstant now) const; - + }; // TSourceIdInfo - + using TSourceIdMap = THashMap<TString, TSourceIdInfo>; - + class TSourceIdStorage { public: const TSourceIdMap& GetInMemorySourceIds() const { return InMemorySourceIds; } - + template <typename... Args> void RegisterSourceId(const TString& sourceId, Args&&... args) { RegisterSourceIdInfo(sourceId, TSourceIdInfo(std::forward<Args>(args)...), false); } - + void DeregisterSourceId(const TString& sourceId); void LoadSourceIdInfo(const TString& key, const TString& data, TInstant now); bool DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config); - + void RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie); void MarkOwnersForDeletedSourceId(THashMap<TString, TOwnerInfo>& owners); - + TInstant MinAvailableTimestamp(TInstant now) const; - + private: void LoadRawSourceIdInfo(const TString& key, const TString& data, TInstant now); void LoadProtoSourceIdInfo(const TString& key, const TString& data); @@ -88,28 +88,28 @@ private: THashMap<TString, TString> SourceIdOwners; TVector<TString> OwnersToDrop; TSet<std::pair<ui64, TString>> SourceIdsByOffset; - + }; // TSourceIdStorage class TSourceIdWriter { public: explicit TSourceIdWriter(ESourceIdFormat format); - + const TSourceIdMap& GetSourceIdsToWrite() const { return Registrations; } - + template <typename... Args> void RegisterSourceId(const TString& sourceId, Args&&... args) { Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...); } - + void DeregisterSourceId(const TString& sourceId); void Clear(); - + void FillRequest(TEvKeyValue::TEvRequest* request, ui32 partition); static void FillKeyAndData(ESourceIdFormat format, const TString& sourceId, const TSourceIdInfo& sourceIdInfo, TKeyPrefix& key, TBuffer& data); - + private: static void FillRawData(const TSourceIdInfo& sourceIdInfo, TBuffer& data); static void FillProtoData(const TSourceIdInfo& sourceIdInfo, TBuffer& data); @@ -119,11 +119,11 @@ private: const ESourceIdFormat Format; TSourceIdMap Registrations; THashSet<TString> Deregistrations; - + }; // TSourceIdWriter -} // NPQ -} // NKikimr +} // NPQ +} // NKikimr Y_DECLARE_OUT_SPEC(inline, NKikimr::NPQ::TSourceIdInfo, out, value) { return value.Out(out); diff --git a/ydb/core/persqueue/sourceid_ut.cpp b/ydb/core/persqueue/sourceid_ut.cpp index 46e03b2d3f2..eeef8f24a33 100644 --- a/ydb/core/persqueue/sourceid_ut.cpp +++ b/ydb/core/persqueue/sourceid_ut.cpp @@ -1,26 +1,26 @@ -#include "sourceid.h" +#include "sourceid.h" #include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/core/persqueue/key.h> -#include <library/cpp/testing/unittest/registar.h> - +#include <library/cpp/testing/unittest/registar.h> + namespace NKikimr { namespace NPQ { - + Y_UNIT_TEST_SUITE(TSourceIdTests) { inline static TString TestSourceId(ui64 idx = 0) { return TStringBuilder() << "testSourceId" << idx; } - + inline static TStringBuf TestOwner(TStringBuf sourceId) { UNIT_ASSERT(sourceId.SkipPrefix("test")); return sourceId; } - + static constexpr ui32 TestPartition = 1; - Y_UNIT_TEST(SourceIdWriterAddMessage) { + Y_UNIT_TEST(SourceIdWriterAddMessage) { TSourceIdWriter writer(ESourceIdFormat::Raw); const auto sourceId = TestSourceId(1); @@ -43,9 +43,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { auto it = writer.GetSourceIdsToWrite().find(anotherSourceId); UNIT_ASSERT_EQUAL(it, writer.GetSourceIdsToWrite().end()); } - } - - Y_UNIT_TEST(SourceIdWriterClean) { + } + + Y_UNIT_TEST(SourceIdWriterClean) { TSourceIdWriter writer(ESourceIdFormat::Raw); writer.RegisterSourceId(TestSourceId(), 1, 10, TInstant::Seconds(100)); @@ -53,13 +53,13 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { writer.Clear(); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 0); - } - - Y_UNIT_TEST(SourceIdWriterFormCommand) { + } + + Y_UNIT_TEST(SourceIdWriterFormCommand) { TSourceIdWriter writer(ESourceIdFormat::Raw); auto actualRequest = MakeHolder<TEvKeyValue::TEvRequest>(); auto expectedRequest = MakeHolder<TEvKeyValue::TEvRequest>(); - + const auto sourceId = TestSourceId(1); const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); writer.RegisterSourceId(sourceId, sourceIdInfo); @@ -67,7 +67,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { { TKeyPrefix key(TKeyPrefix::TypeInfo, TestPartition, TKeyPrefix::MarkSourceId); TBuffer data; - + TSourceIdWriter::FillKeyAndData(ESourceIdFormat::Raw, sourceId, sourceIdInfo, key, data); auto write = expectedRequest.Get()->Record.AddCmdWrite(); write->SetKey(key.Data(), key.Size()); @@ -97,9 +97,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { writer.FillRequest(actualRequest.Get(), TestPartition + 1); UNIT_ASSERT_VALUES_EQUAL(actualRequest.Get()->ToString(), expectedRequest.Get()->ToString()); } - } - - Y_UNIT_TEST(SourceIdStorageAdd) { + } + + Y_UNIT_TEST(SourceIdStorageAdd) { TSourceIdStorage storage; const auto sourceId = TestSourceId(1); @@ -127,8 +127,8 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { UNIT_ASSERT_UNEQUAL(it, storage.GetInMemorySourceIds().end()); UNIT_ASSERT_VALUES_EQUAL(it->second, anotherSourceIdInfo); } - } - + } + void SourceIdStorageParseAndAdd(TKeyPrefix::EMark mark, ESourceIdFormat format) { const auto sourceId = TestSourceId(); const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); @@ -148,8 +148,8 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { auto it = storage.GetInMemorySourceIds().find(sourceId); UNIT_ASSERT_UNEQUAL(it, storage.GetInMemorySourceIds().end()); UNIT_ASSERT_VALUES_EQUAL(it->second, sourceIdInfo); - } - + } + Y_UNIT_TEST(SourceIdStorageParseAndAdd) { SourceIdStorageParseAndAdd(TKeyPrefix::MarkSourceId, ESourceIdFormat::Raw); } @@ -158,7 +158,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { SourceIdStorageParseAndAdd(TKeyPrefix::MarkProtoSourceId, ESourceIdFormat::Proto); } - Y_UNIT_TEST(SourceIdStorageMinDS) { + Y_UNIT_TEST(SourceIdStorageMinDS) { const auto now = TInstant::Now(); TSourceIdStorage storage; @@ -181,9 +181,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { auto ds = storage.MinAvailableTimestamp(now); UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(200)); } - } - - Y_UNIT_TEST(SourceIdStorageTestClean) { + } + + Y_UNIT_TEST(SourceIdStorageTestClean) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000; ++i) { storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); @@ -222,9 +222,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 360); // more 360 sources are dropped } - } - - Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) { + } + + Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000; ++i) { storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); @@ -254,9 +254,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { auto it = storage.GetInMemorySourceIds().find(TestSourceId(101)); // 101th source is alive UNIT_ASSERT_UNEQUAL(it, storage.GetInMemorySourceIds().end()); } - } - - Y_UNIT_TEST(SourceIdStorageComplexDelete) { + } + + Y_UNIT_TEST(SourceIdStorageComplexDelete) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); @@ -289,9 +289,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 5); // more 5 sources are dropped } - } - - Y_UNIT_TEST(SourceIdStorageDeleteAndOwnersMark) { + } + + Y_UNIT_TEST(SourceIdStorageDeleteAndOwnersMark) { TSourceIdStorage storage; THashMap<TString, TOwnerInfo> owners; for (ui64 i = 1; i <= 2; ++i) { // add two sources @@ -322,9 +322,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { UNIT_ASSERT_UNEQUAL(it, owners.end()); UNIT_ASSERT_VALUES_EQUAL(it->second.SourceIdDeleted, false); } - } - + } + } // TSourceIdTests - + } // NPQ } // NKikimr diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make index 7cbb0b94960..2fc79953c5d 100644 --- a/ydb/core/persqueue/ut/ya.make +++ b/ydb/core/persqueue/ut/ya.make @@ -35,7 +35,7 @@ SRCS( pq_ut.cpp type_codecs_ut.cpp pq_ut.h - sourceid_ut.cpp + sourceid_ut.cpp user_info_ut.cpp ) diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index 50e89676ef9..a8a27dbd7d8 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -15,10 +15,10 @@ SRCS( pq.cpp pq_database.cpp pq_impl.cpp - sourceid.cpp + sourceid.cpp mirrorer.cpp mirrorer.h - ownerinfo.cpp + ownerinfo.cpp partition.cpp pq_l2_cache.cpp read_balancer.cpp diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 72337581351..6cc0f12db86 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -230,7 +230,7 @@ enum EPartitionLabeledCounters { METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX}]; - METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN}]; - + METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN}]; + } diff --git a/ydb/core/protos/counters_pq_labeled.proto b/ydb/core/protos/counters_pq_labeled.proto index d67890bf276..cd57661bedf 100644 --- a/ydb/core/protos/counters_pq_labeled.proto +++ b/ydb/core/protos/counters_pq_labeled.proto @@ -45,7 +45,7 @@ enum EClientLabeledCounters { METRIC_WRITE_TIME_LAG = 24 [(LabeledCounterOpts) = {Name: "WriteTimeLagMsByLastRead" AggrFunc : EAF_MAX}]; METRIC_LAST_READ_TIME = 25 [(LabeledCounterOpts) = {Name: "TimeSinceLastReadMs" AggrFunc : EAF_MIN Type : CT_TIMELAG}]; - METRIC_READ_QUOTA_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX}]; + METRIC_READ_QUOTA_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX}]; } @@ -94,7 +94,7 @@ enum EPartitionLabeledCounters { METRIC_TOTAL_QUOTA_SPEED_4 = 30 [(LabeledCounterOpts) = {Name: "QuotaBytesPerDay" AggrFunc : EAF_SUM}]; METRIC_MAX_QUOTA_SPEED_4 = 31 [(LabeledCounterOpts) = {Name: "QuotaBytesMaxPerDay" AggrFunc : EAF_MAX}]; - METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX}]; + METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX}]; } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 7c859274496..7eae280ff3d 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -184,9 +184,9 @@ message TPartitionConfig { // List of ClientIds, for which we don't delete data until they are read by these clients repeated string ImportantClientId = 4; //can be empty optional uint32 LowWatermark = 5 [default = 6291456]; //6Mb, compact blobs if they at least this big. - optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days - optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition - // default - generate 5 new source id each second during 14 days + optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days + optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition + // default - generate 5 new source id each second during 14 days optional uint64 WriteSpeedInBytesPerSecond = 7 [default = 50000000]; optional uint64 BurstSize = 8 [default = 50000000]; diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index f0599232508..a77dd534cff 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -122,9 +122,9 @@ struct TRequestCreatePQ { std::optional<NKikimrPQ::TMirrorPartitionConfig> MirrorFrom; - ui64 SourceIdMaxCount; - ui64 SourceIdLifetime; - + ui64 SourceIdMaxCount; + ui64 SourceIdLifetime; + THolder<NMsgBusProxy::TBusPersQueue> GetRequest() const { THolder<NMsgBusProxy::TBusPersQueue> request(new NMsgBusProxy::TBusPersQueue); auto req = request->Record.MutableMetaRequest()->MutableCmdCreateTopic(); @@ -134,8 +134,8 @@ struct TRequestCreatePQ { if (CacheSize) config->SetCacheSize(CacheSize); config->MutablePartitionConfig()->SetLifetimeSeconds(LifetimeS); - config->MutablePartitionConfig()->SetSourceIdLifetimeSeconds(SourceIdLifetime); - config->MutablePartitionConfig()->SetSourceIdMaxCounts(SourceIdMaxCount); + config->MutablePartitionConfig()->SetSourceIdLifetimeSeconds(SourceIdLifetime); + config->MutablePartitionConfig()->SetSourceIdMaxCounts(SourceIdMaxCount); config->MutablePartitionConfig()->SetLowWatermark(LowWatermark); config->SetLocalDC(true); @@ -925,9 +925,9 @@ public: ui64 readSpeed = 200000000, TVector<TString> rr = {}, TVector<TString> important = {}, - std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {}, - ui64 sourceIdMaxCount = 6000000, - ui64 sourceIdLifetime = 86400 + std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {}, + ui64 sourceIdMaxCount = 6000000, + ui64 sourceIdLifetime = 86400 ) { Y_VERIFY(name.StartsWith("rt3.")); diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto index 8b2a0986314..d2d1bdab596 100644 --- a/ydb/public/api/protos/draft/persqueue_error_codes.proto +++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto @@ -9,7 +9,7 @@ enum EErrorCode { OVERLOAD = 2; BAD_REQUEST = 3; WRONG_COOKIE = 4; - SOURCEID_DELETED = 24; + SOURCEID_DELETED = 24; WRITE_ERROR_PARTITION_IS_FULL = 5; WRITE_ERROR_DISK_IS_FULL = 15; diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto index c6658305c2e..06a96c37180 100644 --- a/ydb/public/api/protos/persqueue_error_codes_v1.proto +++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto @@ -12,7 +12,7 @@ enum ErrorCode { OVERLOAD = 500002; BAD_REQUEST = 500003; WRONG_COOKIE = 500004; - SOURCEID_DELETED = 500024; + SOURCEID_DELETED = 500024; WRITE_ERROR_PARTITION_IS_FULL = 500005; WRITE_ERROR_DISK_IS_FULL = 500015; diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto index 93a7fb6c79d..2407388342a 100644 --- a/ydb/public/api/protos/ydb_persqueue_v1.proto +++ b/ydb/public/api/protos/ydb_persqueue_v1.proto @@ -1137,10 +1137,10 @@ message TopicSettings { int32 partitions_count = 1 [(value) = "> 0"]; // How long data in partition should be stored. Must be greater than 0 and less than limit for this database. Default limit - 36 hours. int64 retention_period_ms = 2 [(value) = "> 0"]; - // How long last written seqno for message group should be stored. Must be greater then retention_period_ms and less then limit for this database. Default limit - 16 days. - int64 message_group_seqno_retention_period_ms = 12 [(value) = ">= 0"]; - // How many last written seqno for various message groups should be stored per partition. Must be less than limit for this database. Default limit - 6*10^6 values. - int64 max_partition_message_groups_seqno_stored = 13 [(value) = ">= 0"]; + // How long last written seqno for message group should be stored. Must be greater then retention_period_ms and less then limit for this database. Default limit - 16 days. + int64 message_group_seqno_retention_period_ms = 12 [(value) = ">= 0"]; + // How many last written seqno for various message groups should be stored per partition. Must be less than limit for this database. Default limit - 6*10^6 values. + int64 max_partition_message_groups_seqno_stored = 13 [(value) = ">= 0"]; // Max format version that is allowed for writers. Must be value from enum FormatVersion. // Writes with greater format version are forbiden. Format supported_format = 3; diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 654edfcfcd4..a438914f388 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -2996,7 +2996,7 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue: case WRITE_ERROR_PARTITION_IS_FULL: case WRITE_ERROR_DISK_IS_FULL: case WRITE_ERROR_BAD_OFFSET: - case SOURCEID_DELETED: + case SOURCEID_DELETED: case READ_ERROR_IN_PROGRESS: case READ_ERROR_TOO_SMALL_OFFSET: case READ_ERROR_TOO_BIG_OFFSET: diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h index 2ef8c503e4d..f4c26ee8ca1 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.h +++ b/ydb/services/persqueue_v1/grpc_pq_schema.h @@ -14,7 +14,7 @@ namespace NKikimr::NGRpcProxy::V1 { -static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000; +static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000; inline TActorId GetPQSchemaServiceActorID() { return TActorId(0, "PQSchmSvc"); diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index 3038cc82a61..fa37956f6db 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -756,9 +756,9 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev const auto& error = result.GetError(); if (error.Response.HasErrorCode()) { return CloseSession("status is not ok: " + error.Response.GetErrorReason(), ConvertOldCode(error.Response.GetErrorCode()), ctx); - } else { + } else { return CloseSession("error at writer init: " + error.Reason, PersQueue::ErrorCode::ERROR, ctx); - } + } } OwnerCookie = result.GetResult().OwnerCookie; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 3bc9b217285..17906a8cfb1 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -375,20 +375,20 @@ namespace { } Y_UNIT_TEST(StoreNoMoreThanXSourceIDs) { - ui16 X = 4; - ui64 SOURCEID_COUNT_DELETE_BATCH_SIZE = 100; + ui16 X = 4; + ui64 SOURCEID_COUNT_DELETE_BATCH_SIZE = 100; NPersQueue::TTestServer server; server.EnableLogs({ NKikimrServices::PERSQUEUE, NKikimrServices::PQ_WRITE_PROXY }); server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "", 200000000, {}, {}, {}, X, 86400); - + auto driver = server.AnnoyingClient->GetDriver(); - + auto writer1 = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "test source ID " << 0, {}, {}, true); writer1->GetInitSeqNo(); - + bool res = writer1->Write("x", 1); UNIT_ASSERT(res); - + Sleep(TDuration::Seconds(5)); auto writer2 = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "test source ID Del " << 0); @@ -404,7 +404,7 @@ namespace { Sleep(TDuration::Seconds(5)); - for (ui32 nProducer=1; nProducer < X + SOURCEID_COUNT_DELETE_BATCH_SIZE + 1; ++nProducer) { + for (ui32 nProducer=1; nProducer < X + SOURCEID_COUNT_DELETE_BATCH_SIZE + 1; ++nProducer) { auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "test source ID " << nProducer); res = writer->Write("x", 1); @@ -415,19 +415,19 @@ namespace { res = writer->Close(TDuration::Seconds(10)); UNIT_ASSERT(res); - } - + } + res = writer1->Write("x", 3); UNIT_ASSERT(res); res = writer1->Close(TDuration::Seconds(5)); UNIT_ASSERT(res); - + res = writer2->Write("x", 4); UNIT_ASSERT(res); - + UNIT_ASSERT(!writer2->Close()); - } - + } + Y_UNIT_TEST(EachMessageGetsExactlyOneAcknowledgementInCorrectOrder) { NPersQueue::TTestServer server; server.AnnoyingClient->CreateTopic("rt3.dc1--topic", 1); @@ -2494,7 +2494,7 @@ namespace { MaxSizeInPartition: 234 LifetimeSeconds: 86400 ImportantClientId: "consumer" - SourceIdLifetimeSeconds: 1382400 + SourceIdLifetimeSeconds: 1382400 WriteSpeedInBytesPerSecond: 123 BurstSize: 1234 NumChannels: 10 @@ -2534,7 +2534,7 @@ namespace { ExplicitChannelProfiles { PoolKind: "test" } - SourceIdMaxCounts: 6000000 + SourceIdMaxCounts: 6000000 } Version: 3 LocalDC: true |