diff options
author | Alek5andr-Kotov <152866892+Alek5andr-Kotov@users.noreply.github.com> | 2024-02-01 13:40:03 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-01 13:40:03 +0300 |
commit | 922d72d7e2c1e2cad4e5896a7cdbe05e84e39d34 (patch) | |
tree | 5266b424335bb6c6e1087dbf8e0d9ad04f048795 | |
parent | ec89d222d3cefcd00e295599e04fcd04508e60bf (diff) | |
download | ydb-922d72d7e2c1e2cad4e5896a7cdbe05e84e39d34.tar.gz |
type TPartitionId (#1472)
36 files changed, 353 insertions, 204 deletions
diff --git a/ydb/core/persqueue/account_read_quoter.cpp b/ydb/core/persqueue/account_read_quoter.cpp index 583e76d6aa1..65ab6d5a8a8 100644 --- a/ydb/core/persqueue/account_read_quoter.cpp +++ b/ydb/core/persqueue/account_read_quoter.cpp @@ -29,7 +29,7 @@ TAccountReadQuoter::TAccountReadQuoter( TActorId recepient, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, - ui32 partition, + const TPartitionId& partition, const TString& user, const TTabletCountersBase& counters ) diff --git a/ydb/core/persqueue/account_read_quoter.h b/ydb/core/persqueue/account_read_quoter.h index 22b4e673763..d6c9e309d87 100644 --- a/ydb/core/persqueue/account_read_quoter.h +++ b/ydb/core/persqueue/account_read_quoter.h @@ -89,7 +89,7 @@ public: TActorId recepient, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, - ui32 partition, + const TPartitionId& partition, const TString& user, const TTabletCountersBase& counters ); @@ -112,7 +112,7 @@ private: const TActorId Recepient; const ui64 TabletId; const NPersQueue::TTopicConverterPtr TopicConverter; - const ui32 Partition; + const TPartitionId Partition; const TString User; const TString ConsumerPath; const ui64 ReadCreditBytes; diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp index 4b6bff79e89..9a2c6d2f70c 100644 --- a/ydb/core/persqueue/blob.cpp +++ b/ydb/core/persqueue/blob.cpp @@ -736,7 +736,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x) , MaxBlobSize(x.MaxBlobSize) {} -TPartitionedBlob::TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts, +TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize) : Partition(partition) , Offset(offset) diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h index 0f2c79e11b2..7a4b836a899 100644 --- a/ydb/core/persqueue/blob.h +++ b/ydb/core/persqueue/blob.h @@ -264,7 +264,7 @@ public: TPartitionedBlob(const TPartitionedBlob& x); - TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, + TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize); std::optional<std::pair<TKey, TString>> Add(TClientBlob&& blob); @@ -287,7 +287,7 @@ private: TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize); private: - ui32 Partition; + TPartitionId Partition; ui64 Offset; ui16 InternalPartsCount; ui64 StartOffset; diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h index 4707fc4fff7..e92f64f40b4 100644 --- a/ydb/core/persqueue/cache_eviction.h +++ b/ydb/core/persqueue/cache_eviction.h @@ -54,11 +54,11 @@ namespace NPQ { ERequestType Type; TActorId Sender; ui64 CookiePQ; - ui32 Partition; + TPartitionId Partition; ui32 MetadataWritesCount; TVector<TRequestedBlob> Blobs; - TKvRequest(ERequestType type, TActorId sender, ui64 cookie, ui32 partition) + TKvRequest(ERequestType type, TActorId sender, ui64 cookie, const TPartitionId& partition) : Type(type) , Sender(sender) , CookiePQ(cookie) @@ -66,7 +66,7 @@ namespace NPQ { , MetadataWritesCount(0) {} - TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); } + TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition.InternalPartitionId, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); } THolder<TEvKeyValue::TEvRequest> MakeKvRequest() const { @@ -110,7 +110,7 @@ namespace NPQ { } void Verify(const TRequestedBlob& blob) const { - TKey key(TKeyPrefix::TypeData, 0, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false); + TKey key(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false); Y_ABORT_UNLESS(blob.Value.size() == blob.Size); TClientBlob::CheckBlob(key, blob.Value); } @@ -258,7 +258,7 @@ namespace NPQ { for (const auto& blob : kvReq.Blobs) { // Touching blobs in L2. We don't need data here - TCacheBlobL2 key = {kvReq.Partition, blob.Offset, blob.PartNo, nullptr}; + TCacheBlobL2 key = {kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, nullptr}; if (blob.Cached) reqData->RequestedBlobs.push_back(key); else @@ -275,10 +275,10 @@ namespace NPQ { THolder<TCacheL2Request> reqData = MakeHolder<TCacheL2Request>(TabletId); for (const TRequestedBlob& reqBlob : kvReq.Blobs) { - TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); + TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); { // there could be a new blob with same id (for big messages) if (RemoveExists(ctx, blob)) { - TCacheBlobL2 removed = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr}; + TCacheBlobL2 removed = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, nullptr}; reqData->RemovedBlobs.push_back(removed); } } @@ -290,7 +290,7 @@ namespace NPQ { if (L1Strategy) L1Strategy->SaveHeadBlob(blob); - TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached}; + TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached}; reqData->StoredBlobs.push_back(blobL2); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition " @@ -314,7 +314,7 @@ namespace NPQ { continue; const TRequestedBlob& reqBlob = kvReq.Blobs[i]; - TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); + TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); { TValueL1 value; if (CheckExists(ctx, blob, value)) { @@ -328,7 +328,7 @@ namespace NPQ { Cache[blob] = valL1; // weak Counters.Inc(valL1); - TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached}; + TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached}; reqData->StoredBlobs.push_back(blobL2); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Prefetched blob in L1. Partition " @@ -428,7 +428,7 @@ namespace NPQ { ++numCached; continue; } - TBlobId blobId(kvReq.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount); + TBlobId blobId(kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount); TCacheValue::TPtr cached = GetValue(ctx, blobId); if (cached) { ++numCached; diff --git a/ydb/core/persqueue/event_helpers.cpp b/ydb/core/persqueue/event_helpers.cpp index aea0fb89aa5..c8a2aa8182d 100644 --- a/ydb/core/persqueue/event_helpers.cpp +++ b/ydb/core/persqueue/event_helpers.cpp @@ -15,7 +15,7 @@ void ReplyPersQueueError( const TActorContext& ctx, ui64 tabletId, const TString& topicName, - TMaybe<ui32> partition, + TMaybe<TPartitionId> partition, NKikimr::TTabletCountersBase& counters, NKikimrServices::EServiceKikimr service, const ui64 responseCookie, diff --git a/ydb/core/persqueue/event_helpers.h b/ydb/core/persqueue/event_helpers.h index 1d676b3d0a9..ccc928a46f8 100644 --- a/ydb/core/persqueue/event_helpers.h +++ b/ydb/core/persqueue/event_helpers.h @@ -1,5 +1,7 @@ #pragma once +#include "partition_id.h" + #include <ydb/core/tablet/tablet_counters.h> #include <ydb/library/services/services.pb.h> #include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h> @@ -14,7 +16,7 @@ void ReplyPersQueueError( const TActorContext& ctx, ui64 tabletId, const TString& topicName, - TMaybe<ui32> partition, + TMaybe<TPartitionId> partition, NKikimr::TTabletCountersBase& counters, NKikimrServices::EServiceKikimr service, const ui64 responseCookie, diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 69a8858da8b..be6aac79799 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -170,8 +170,6 @@ struct TEvPQ { EvQuotaCountersUpdated, EvConsumerRemoved, EvFetchResponse, - EvSourceIdRequest, - EvSourceIdResponse, EvPublishRead, EvForgetRead, EvRegisterDirectReadSession, @@ -313,13 +311,18 @@ struct TEvPQ { }; struct TEvMonResponse : public TEventLocal<TEvMonResponse, EvMonResponse> { - TEvMonResponse(ui32 partition, const TVector<TString>& res, const TString& str) + TEvMonResponse(const NPQ::TPartitionId& partition, const TVector<TString>& res, const TString& str) : Partition(partition) , Res(res) , Str(str) {} - ui32 Partition; + TEvMonResponse(const TVector<TString>& res, const TString& str) + : Res(res) + , Str(str) + {} + + TMaybe<NPQ::TPartitionId> Partition; TVector<TString> Res; TString Str; }; @@ -398,11 +401,13 @@ struct TEvPQ { }; struct TEvPartitionOffsetsResponse : public TEventLocal<TEvPartitionOffsetsResponse, EvPartitionOffsetsResponse> { - explicit TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult) + TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult, const NPQ::TPartitionId& partition) : PartResult(partResult) + , Partition(partition) {} NKikimrPQ::TOffsetsResponse::TPartResult PartResult; + NPQ::TPartitionId Partition; }; struct TEvPartitionStatus : public TEventLocal<TEvPartitionStatus, EvPartitionStatus> { @@ -424,11 +429,13 @@ struct TEvPQ { }; struct TEvPartitionStatusResponse : public TEventLocal<TEvPartitionStatusResponse, EvPartitionStatusResponse> { - explicit TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult) + TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult, const NPQ::TPartitionId& partition) : PartResult(partResult) + , Partition(partition) {} NKikimrPQ::TStatusResponse::TPartResult PartResult; + NPQ::TPartitionId Partition; }; @@ -442,11 +449,11 @@ struct TEvPQ { }; struct TEvInitComplete : public TEventLocal<TEvInitComplete, EvInitComplete> { - explicit TEvInitComplete(const ui32 partition) + explicit TEvInitComplete(const NPQ::TPartitionId& partition) : Partition(partition) {} - ui32 Partition; + NPQ::TPartitionId Partition; }; struct TEvError : public TEventLocal<TEvError, EvError> { @@ -462,7 +469,7 @@ struct TEvPQ { }; struct TEvBlobRequest : public TEventLocal<TEvBlobRequest, EvBlobRequest> { - TEvBlobRequest(const TString& user, const ui64 cookie, const ui32 partition, const ui64 readOffset, + TEvBlobRequest(const TString& user, const ui64 cookie, const NPQ::TPartitionId& partition, const ui64 readOffset, TVector<NPQ::TRequestedBlob>&& blobs) : User(user) , Cookie(cookie) @@ -473,7 +480,7 @@ struct TEvPQ { TString User; ui64 Cookie; - ui32 Partition; + NPQ::TPartitionId Partition; ui64 ReadOffset; TVector<NPQ::TRequestedBlob> Blobs; }; @@ -565,12 +572,12 @@ struct TEvPQ { }; struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> { - explicit TEvPartitionConfigChanged(ui32 partition) : + explicit TEvPartitionConfigChanged(const NPQ::TPartitionId& partition) : Partition(partition) { } - ui32 Partition; + NPQ::TPartitionId Partition; }; struct TEvChangeCacheConfig : public TEventLocal<TEvChangeCacheConfig, EvChangeCacheConfig> { @@ -588,35 +595,35 @@ struct TEvPQ { }; struct TEvPartitionCounters : public TEventLocal<TEvPartitionCounters, EvPartitionCounters> { - TEvPartitionCounters(const ui32 partition, const TTabletCountersBase& counters) + TEvPartitionCounters(const NPQ::TPartitionId& partition, const TTabletCountersBase& counters) : Partition(partition) { Counters.Populate(counters); } - const ui32 Partition; + const NPQ::TPartitionId Partition; TTabletCountersBase Counters; }; struct TEvPartitionLabeledCounters : public TEventLocal<TEvPartitionLabeledCounters, EvPartitionLabeledCounters> { - TEvPartitionLabeledCounters(const ui32 partition, const TTabletLabeledCountersBase& labeledCounters) + TEvPartitionLabeledCounters(const NPQ::TPartitionId& partition, const TTabletLabeledCountersBase& labeledCounters) : Partition(partition) , LabeledCounters(labeledCounters) { } - const ui32 Partition; + const NPQ::TPartitionId Partition; TTabletLabeledCountersBase LabeledCounters; }; struct TEvPartitionLabeledCountersDrop : public TEventLocal<TEvPartitionLabeledCountersDrop, EvPartitionLabeledCountersDrop> { - TEvPartitionLabeledCountersDrop(const ui32 partition, const TString& group) + TEvPartitionLabeledCountersDrop(const NPQ::TPartitionId& partition, const TString& group) : Partition(partition) , Group(group) { } - const ui32 Partition; + const NPQ::TPartitionId Partition; TString Group; }; @@ -798,7 +805,7 @@ struct TEvPQ { }; struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> { - TEvTxCalcPredicateResult(ui64 step, ui64 txId, ui32 partition, bool predicate) : + TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) : Step(step), TxId(txId), Partition(partition), @@ -808,7 +815,7 @@ struct TEvPQ { ui64 Step; ui64 TxId; - ui32 Partition; + NPQ::TPartitionId Partition; bool Predicate = false; }; @@ -826,7 +833,7 @@ struct TEvPQ { }; struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> { - TEvProposePartitionConfigResult(ui64 step, ui64 txId, ui32 partition) : + TEvProposePartitionConfigResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) : Step(step), TxId(txId), Partition(partition) @@ -835,7 +842,7 @@ struct TEvPQ { ui64 Step; ui64 TxId; - ui32 Partition; + NPQ::TPartitionId Partition; }; struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> { @@ -850,7 +857,7 @@ struct TEvPQ { }; struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> { - TEvTxCommitDone(ui64 step, ui64 txId, ui32 partition) : + TEvTxCommitDone(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) : Step(step), TxId(txId), Partition(partition) @@ -859,7 +866,7 @@ struct TEvPQ { ui64 Step; ui64 TxId; - ui32 Partition; + NPQ::TPartitionId Partition; }; struct TEvTxRollback : public TEventLocal<TEvTxRollback, EvTxRollback> { diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h index 4be6be821a1..ecc6fd6d9ef 100644 --- a/ydb/core/persqueue/key.h +++ b/ydb/core/persqueue/key.h @@ -1,5 +1,8 @@ #pragma once +#include "partition_id.h" + +#include <util/digest/multi.h> #include <util/generic/buffer.h> #include <util/string/cast.h> #include <util/string/printf.h> @@ -28,15 +31,15 @@ public: MarkUserDeprecated = 'u' }; - TKeyPrefix(EType type, const ui32 partition) + TKeyPrefix(EType type, const TPartitionId& partition) : Partition(partition) { Resize(UnmarkedSize()); *PtrType() = type; - memcpy(PtrPartition(), Sprintf("%.10" PRIu32, partition).data(), 10); + memcpy(PtrPartition(), Sprintf("%.10" PRIu32, Partition.InternalPartitionId).data(), 10); } - TKeyPrefix(EType type, const ui32 partition, EMark mark) + TKeyPrefix(EType type, const TPartitionId& partition, EMark mark) : TKeyPrefix(type, partition) { Resize(MarkedSize()); @@ -44,7 +47,7 @@ public: } TKeyPrefix() - : TKeyPrefix(TypeNone, 0) + : TKeyPrefix(TypeNone, TPartitionId(0)) {} virtual ~TKeyPrefix() @@ -67,15 +70,16 @@ public: return EType(*PtrType()); } - ui32 GetPartition() const { return Partition; } + const TPartitionId& GetPartition() const { return Partition; } protected: static constexpr ui32 UnmarkedSize() { return 1 + 10; } void ParsePartition() { - Partition = FromString<ui32>(TStringBuf{PtrPartition(), 10}); + Partition.InternalPartitionId = FromString<ui32>(TStringBuf{PtrPartition(), 10}); } + private: char* PtrType() { return Data(); } char* PtrMark() { return Data() + UnmarkedSize(); } @@ -85,7 +89,7 @@ private: const char* PtrMark() const { return Data() + UnmarkedSize(); } const char* PtrPartition() const { return Data() + 1; } - ui32 Partition; + TPartitionId Partition; }; // {char type; ui32 partiton; ui64 offset; ui16 partNo; ui32 count, ui16 internalPartsCount} @@ -98,7 +102,7 @@ private: class TKey : public TKeyPrefix { public: - TKey(EType type, const ui32 partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const bool isHead = false) + TKey(EType type, const TPartitionId& partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const bool isHead = false) : TKeyPrefix(type, partition) , Offset(offset) , Count(count) @@ -136,7 +140,7 @@ public: } TKey() - : TKey(TypeNone, 0, 0, 0, 0, 0) + : TKey(TypeNone, TPartitionId(0), 0, 0, 0, 0) {} virtual ~TKey() diff --git a/ydb/core/persqueue/ownerinfo.cpp b/ydb/core/persqueue/ownerinfo.cpp index be110383504..294a1a8445c 100644 --- a/ydb/core/persqueue/ownerinfo.cpp +++ b/ydb/core/persqueue/ownerinfo.cpp @@ -6,7 +6,7 @@ namespace NKikimr { namespace NPQ { void TOwnerInfo::GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender, - const TString& topicName, const ui32 partition, const TActorContext& ctx) { + const TString& topicName, const TPartitionId& partition, const TActorContext& ctx) { TStringBuilder s; s << owner << "|" << CreateGuidAsString() << "_" << OwnerGeneration; ++OwnerGeneration; diff --git a/ydb/core/persqueue/ownerinfo.h b/ydb/core/persqueue/ownerinfo.h index 8d0e0742430..2d60464246c 100644 --- a/ydb/core/persqueue/ownerinfo.h +++ b/ydb/core/persqueue/ownerinfo.h @@ -32,7 +32,7 @@ namespace NPQ { static TStringBuf GetOwnerFromOwnerCookie(const TString& owner); void GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender, - const TString& topicName, const ui32 partition, const TActorContext& ctx); + const TString& topicName, const TPartitionId& partition, const TActorContext& ctx); void AddReserveRequest(ui64 size, bool lastRequest) { ReservedSize += size; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index f29e9511f7c..cd1546fcbec 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -154,7 +154,7 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) { } } -TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache, +TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless, const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace, ui32 numChannels, bool newPartition, @@ -602,7 +602,7 @@ void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContex void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) { NKikimrPQ::TStatusResponse::TPartResult result; - result.SetPartition(Partition); + result.SetPartition(Partition.InternalPartitionId); if (DiskIsFull || WaitingForSubDomainQuota(ctx)) { result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_DISK_IS_FULL); @@ -757,23 +757,23 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext } } } - ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result)); + ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition)); } void TPartition::HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) { NKikimrPQ::TStatusResponse::TPartResult result; - result.SetPartition(Partition); + result.SetPartition(Partition.InternalPartitionId); result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_INITIALIZING); result.SetLastInitDurationSeconds((ctx.Now() - CreationTime).Seconds()); result.SetCreationTimestamp(CreationTime.Seconds()); - ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result)); + ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition)); } void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActorContext& ctx) { THolder<TEvPersQueue::TEvPartitionClientInfoResponse> response = MakeHolder<TEvPersQueue::TEvPartitionClientInfoResponse>(); NKikimrPQ::TClientInfoResponse& result(response->Record); - result.SetPartition(Partition); + result.SetPartition(Partition.InternalPartitionId); result.SetStartOffset(StartOffset); result.SetEndOffset(EndOffset); result.SetResponseTimestamp(ctx.Now().MilliSeconds()); @@ -1547,7 +1547,7 @@ bool TPartition::ProcessUserActionOrTransaction(TTransaction& t, } else if (t.ProposeConfig) { t.Predicate = BeginTransaction(*t.ProposeConfig); - PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition); + PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config); //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found"); ctx.Send(Tablet, @@ -1560,7 +1560,7 @@ bool TPartition::ProcessUserActionOrTransaction(TTransaction& t, Y_ABORT_UNLESS(!ChangeConfig); ChangeConfig = t.ChangeConfig; - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition); + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); SendChangeConfigReply = t.SendReply; BeginChangePartitionConfig(ChangeConfig->Config, ctx); @@ -1640,7 +1640,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) ChangeConfig = MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter, event.Config); - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition); + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); SendChangeConfigReply = false; return true; @@ -1846,7 +1846,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf const TActorContext& ctx) { Config = config; - PartitionConfig = GetPartitionConfig(Config, Partition); + PartitionConfig = GetPartitionConfig(Config); PartitionGraph = MakePartitionGraph(Config); TopicConverter = topicConverter; NewPartition = false; @@ -1880,7 +1880,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf TString TPartition::GetKeyConfig() const { - return Sprintf("_config_%u", Partition); + return Sprintf("_config_%u", Partition.OriginalPartitionId); } void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) @@ -2552,7 +2552,7 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) { void TPartition::CreateMirrorerActor() { Mirrorer = MakeHolder<TMirrorerInfo>( - Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)), + Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)), TabletCounters ); } @@ -2597,7 +2597,7 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; - if (Partition != record.GetPartition()) { + if (Partition.InternalPartitionId != record.GetPartition()) { LOG_INFO_S( ctx, NKikimrServices::PERSQUEUE, "TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." << @@ -2620,4 +2620,9 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T Send(ev->Sender, response.Release()); } +const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config) +{ + return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId); +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 31cb40fd484..5337e15e6ef 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -353,7 +353,7 @@ public: return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR; } - TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache, + TPartition(ui64 tabletId, const TPartitionId& partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless, const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool SubDomainOutOfSpace, ui32 numChannels, bool newPartition = false, @@ -574,7 +574,7 @@ private: private: ui64 TabletID; ui32 TabletGeneration; - ui32 Partition; + TPartitionId Partition; NKikimrPQ::TPQTabletConfig Config; NKikimrPQ::TPQTabletConfig TabletConfig; const NKikimrPQ::TPQTabletConfig::TPartition* PartitionConfig = nullptr; @@ -764,6 +764,8 @@ private: TDeque<std::unique_ptr<IEventBase>> PendingEvents; TRowVersion LastEmittedHeartbeat; + + const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config); }; } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h new file mode 100644 index 00000000000..0e4e4cd6164 --- /dev/null +++ b/ydb/core/persqueue/partition_id.h @@ -0,0 +1,115 @@ +#pragma once + +#include <util/generic/maybe.h> +#include <util/stream/output.h> +#include <util/system/types.h> +#include <util/digest/multi.h> +#include <util/str_stl.h> + +#include <functional> + +namespace NKikimr::NPQ { + +class TPartitionId { +public: + TPartitionId() = default; + explicit TPartitionId(ui32 partition) : + TPartitionId(partition, Nothing(), partition) + { + } + + TPartitionId(ui32 partition, ui64 writeId) : + TPartitionId(partition, writeId, 0) + { + } + + size_t GetHash() const + { + return MultiHash(OriginalPartitionId, WriteId); + } + + bool IsEqual(const TPartitionId& rhs) const + { + return + (OriginalPartitionId == rhs.OriginalPartitionId) && + (WriteId == rhs.WriteId); + } + + void ToStream(IOutputStream& s) const + { + if (WriteId.Defined()) { + s << '{' << OriginalPartitionId << ", " << *WriteId << ", " << InternalPartitionId << '}'; + } else { + s << OriginalPartitionId; + } + } + + // + // FIXME: используется в RequestRange + // + TPartitionId NextInternalPartitionId() const + { + return {OriginalPartitionId, WriteId, InternalPartitionId + 1}; + } + + ui32 OriginalPartitionId = 0; + TMaybe<ui64> WriteId; + ui32 InternalPartitionId = 0; + +private: + TPartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId, ui32 internalPartitionId) : + OriginalPartitionId(originalPartitionId), + WriteId(writeId), + InternalPartitionId(internalPartitionId) + { + } +}; + +inline +bool operator==(const TPartitionId& lhs, const TPartitionId& rhs) +{ + return lhs.IsEqual(rhs); +} + +inline +IOutputStream& operator<<(IOutputStream& s, const TPartitionId& v) +{ + v.ToStream(s); + return s; +} + +} + +template <> +struct THash<NKikimr::NPQ::TPartitionId> { + inline size_t operator()(const NKikimr::NPQ::TPartitionId& v) const + { + return v.GetHash(); + } +}; + +namespace std { + +template <> +struct less<NKikimr::NPQ::TPartitionId> { + inline bool operator()(const NKikimr::NPQ::TPartitionId& lhs, const NKikimr::NPQ::TPartitionId& rhs) const + { + if (lhs.OriginalPartitionId < rhs.OriginalPartitionId) { + return true; + } else if (rhs.OriginalPartitionId < lhs.OriginalPartitionId) { + return false; + } else { + return lhs.WriteId < rhs.WriteId; + } + } +}; + +template <> +struct hash<NKikimr::NPQ::TPartitionId> { + inline size_t operator()(const NKikimr::NPQ::TPartitionId& v) const + { + return THash<NKikimr::NPQ::TPartitionId>()(v); + } +}; + +} diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index fcab0f62f36..dd18acf6e45 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -15,8 +15,8 @@ void CalcTopicWriteQuotaParams(const NKikimrPQ::TPQConfig& pqConfig, TString& topicWriteQuoterPath, TString& topicWriteQuotaResourcePath); bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev); -void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); -void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); +void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key); +void RequestDataRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key); bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); // @@ -108,7 +108,7 @@ TPartition* TInitializerStep::Partition() const { return Initializer->Partition; } -ui32 TInitializerStep::PartitionId() const { +const TPartitionId& TInitializerStep::PartitionId() const { return Initializer->Partition->Partition; } @@ -179,7 +179,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon case NKikimrProto::NODATA: Partition()->Config = Partition()->TabletConfig; - Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition); + Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition.OriginalPartitionId); Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config); break; @@ -255,7 +255,7 @@ TInitMetaStep::TInitMetaStep(TInitializer* initializer) } void TInitMetaStep::Execute(const TActorContext& ctx) { - auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) { + auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, const TPartitionId& partition) { auto read = request.AddCmdRead(); TKeyPrefix key{type, partition}; read->SetKey(key.Data(), key.Size()); @@ -700,7 +700,7 @@ void TPartition::Initialize(const TActorContext& ctx) { UsersInfoStorage.ConstructInPlace(DCId, TopicConverter, - Partition, + Partition.InternalPartitionId, Config, CloudId, DbId, @@ -711,11 +711,11 @@ void TPartition::Initialize(const TActorContext& ctx) { if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicName()), - Partition, + Partition.InternalPartitionId, Config.GetYdbDatabasePath())); } else { PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicName(), - Partition)); + Partition.InternalPartitionId)); } UsersInfoStorage->Init(Tablet, SelfId(), ctx); @@ -759,7 +759,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { WriteBufferIsFullCounter.SetCounter( NPersQueue::GetCounters(counters, "writingTime", TopicConverter), {{"host", DCId}, - {"Partition", ToString<ui32>(Partition)}}, + {"Partition", ToString<ui32>(Partition.InternalPartitionId)}}, {"sensor", "BufferFullTime" + suffix, true}); auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag"); @@ -990,7 +990,7 @@ bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) { return !diskIsOk; } -static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, +static void RequestRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); auto read = request->Record.AddCmdReadRange(); @@ -1003,7 +1003,7 @@ static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 par } range->SetFrom(from.Data(), from.Size()); - TKeyPrefix to(c, partition + 1); + TKeyPrefix to(c, partition.NextInternalPartitionId()); range->SetTo(to.Data(), to.Size()); if(includeData) @@ -1015,18 +1015,18 @@ static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 par TKeyPrefix from(TKeyPrefix::TypeTmpData, partition); range->SetFrom(from.Data(), from.Size()); - TKeyPrefix to(TKeyPrefix::TypeTmpData, partition + 1); + TKeyPrefix to(TKeyPrefix::TypeTmpData, partition.NextInternalPartitionId()); range->SetTo(to.Data(), to.Size()); } ctx.Send(dst, request.Release()); } -void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { +void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key) { RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == ""); } -void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { +void RequestDataRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key) { RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key); } diff --git a/ydb/core/persqueue/partition_init.h b/ydb/core/persqueue/partition_init.h index 5f02770f8de..e45f04a38c0 100644 --- a/ydb/core/persqueue/partition_init.h +++ b/ydb/core/persqueue/partition_init.h @@ -62,7 +62,7 @@ public: virtual bool Handle(STFUNC_SIG); TPartition* Partition() const; - ui32 PartitionId() const; + const TPartitionId& PartitionId() const; TString TopicName() const; const TString Name; diff --git a/ydb/core/persqueue/partition_monitoring.cpp b/ydb/core/persqueue/partition_monitoring.cpp index 4343adcbb6a..52353878da7 100644 --- a/ydb/core/persqueue/partition_monitoring.cpp +++ b/ydb/core/persqueue/partition_monitoring.cpp @@ -74,7 +74,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo Y_ABORT(""); } TStringStream out; - out << "Partition " << i32(Partition) << ": " << str; res.push_back(out.Str()); out.Clear(); + out << "Partition " << Partition << ": " << str; res.push_back(out.Str()); out.Clear(); if (DiskIsFull) { out << "DISK IS FULL"; res.push_back(out.Str()); @@ -111,7 +111,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo } out << Config.DebugString(); res.push_back(out.Str()); out.Clear(); HTML(out) { - DIV_CLASS_ID("tab-pane fade", Sprintf("partition_%u", ui32(Partition))) { + DIV_CLASS_ID("tab-pane fade", Sprintf("partition_%u", Partition.InternalPartitionId)) { TABLE_SORTABLE_CLASS("table") { TABLEHEAD() { TABLER() { diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 7173ebbf2ae..664e00aa504 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -208,7 +208,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx) { NKikimrPQ::TOffsetsResponse::TPartResult result; - result.SetPartition(Partition); + result.SetPartition(Partition.InternalPartitionId); result.SetStartOffset(StartOffset); result.SetEndOffset(EndOffset); result.SetErrorCode(NPersQueue::NErrorCode::OK); @@ -228,15 +228,15 @@ void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContex result.SetReadCreateTimestampMS(userInfo->GetReadCreateTimestamp().MilliSeconds()); } } - ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result)); + ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result, Partition)); } void TPartition::HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx) { NKikimrPQ::TOffsetsResponse::TPartResult result; - result.SetPartition(Partition); + result.SetPartition(Partition.InternalPartitionId); result.SetErrorCode(NPersQueue::NErrorCode::INITIALIZING); result.SetErrorReason("partition is not ready yet"); - ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result)); + ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result, Partition)); } std::pair<TInstant, TInstant> TPartition::GetTime(const TUserInfo& userInfo, ui64 offset) const { @@ -314,7 +314,7 @@ TReadAnswer TReadInfo::FormAnswer( const TActorContext& ctx, const TEvPQ::TEvBlobResponse& blobResponse, const ui64 endOffset, - const ui32 partition, + const TPartitionId& partition, TUserInfo* userInfo, const ui64 destination, const ui64 sizeLag, @@ -409,7 +409,7 @@ TReadAnswer TReadInfo::FormAnswer( } Y_ABORT_UNLESS(offset <= Offset); Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo); - TKey key(TKeyPrefix::TypeData, 0, offset, partNo, count, internalPartsCount, false); + TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false); for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) { TBatch batch = it.GetBatch(); auto& header = batch.Header; diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index b0770f1f901..f73e21ec38a 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -41,7 +41,7 @@ TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModif } const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const { - return Partition.PartitionGraph.GetPartition(Partition.Partition); + return Partition.PartitionGraph.GetPartition(Partition.Partition.OriginalPartitionId); } TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const { @@ -49,7 +49,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const { } bool TPartitionSourceManager::HasParents() const { - auto node = Partition.PartitionGraph.GetPartition(Partition.Partition); + auto node = GetPartitionNode(); return node && !node->Parents.empty(); } diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h index fcf53a25cd1..e825eedbd77 100644 --- a/ydb/core/persqueue/partition_sourcemanager.h +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -106,8 +106,6 @@ private: TSourceIdStorage& GetSourceIdStorage() const; bool HasParents() const; - TActorId PartitionRequester(TPartitionId id, ui64 tabletId); - private: TPartition& Partition; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 871956a019b..786639ecba8 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -966,7 +966,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame auto range = del->MutableRange(); TKeyPrefix from(TKeyPrefix::TypeTmpData, Partition); range->SetFrom(from.Data(), from.Size()); - TKeyPrefix to(TKeyPrefix::TypeTmpData, Partition + 1); + TKeyPrefix to(TKeyPrefix::TypeTmpData, TPartitionId(Partition.InternalPartitionId + 1)); range->SetTo(to.Data(), to.Size()); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index f52155f9430..af6459c15e4 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -597,10 +597,8 @@ private: void Handle(TEvPQ::TEvMonResponse::TPtr& ev, const TActorContext& ctx) { - if (ev->Get()->Partition != Max<ui32>()) { - Results[ev->Get()->Partition] = ev->Get()->Res; - } else { - Y_ABORT_UNLESS(ev->Get()->Partition == Max<ui32>()); + if (ev->Get()->Partition.Defined()) { + Results[ev->Get()->Partition->InternalPartitionId] = ev->Get()->Res; } Str.push_back(ev->Get()->Str); if(++TotalResponses == TotalRequests) { @@ -682,7 +680,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) ChangePartitionConfigInflight += Partitions.size(); for (const auto& partition : Config.GetPartitions()) { - const auto partitionId = partition.GetPartitionId(); + const TPartitionId partitionId(partition.GetPartitionId()); if (Partitions.find(partitionId) == Partitions.end()) { Partitions.emplace(partitionId, TPartitionInfo(ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, true, ctx)), @@ -897,7 +895,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& InitTransactions(readRange, partitionTxs); for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config - const auto partitionId = partition.GetPartitionId(); + const TPartitionId partitionId(partition.GetPartitionId()); Partitions.emplace(partitionId, TPartitionInfo( ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, false, ctx)), GetPartitionKeyRange(Config, partition), @@ -916,7 +914,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& UpdateConfigRequests.clear(); for (auto& req : HasDataRequests) { - auto it = Partitions.find(req->Record.GetPartition()); + const TPartitionId partitionId(req->Record.GetPartition()); + auto it = Partitions.find(partitionId); if (it != Partitions.end()) { if (ctx.Now().MilliSeconds() < req->Record.GetDeadline()) { //otherwise there is no need to send event - proxy will generate event itself ctx.Send(it->second.Actor, req.Release()); @@ -1505,7 +1504,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request, } for (const auto& partition : cfg.GetPartitions()) { - sourceIdWriter.FillRequest(request, partition.GetPartitionId()); + sourceIdWriter.FillRequest(request, TPartitionId(partition.GetPartitionId())); } } @@ -1571,7 +1570,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont if (!ConfigInited) { HasDataRequests.push_back(ev->Release()); } else { - auto it = Partitions.find(record.GetPartition()); + auto it = Partitions.find(TPartitionId(record.GetPartition())); if (it != Partitions.end()) { ctx.Send(it->second.Actor, ev->Release().Release()); } @@ -1581,7 +1580,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont void TPersQueue::Handle(TEvPersQueue::TEvPartitionClientInfo::TPtr& ev, const TActorContext& ctx) { for (auto partition : ev->Get()->Record.GetPartitions()) { - auto it = Partitions.find(partition); + auto it = Partitions.find(TPartitionId(partition)); if (it != Partitions.end()) { ctx.Send(it->second.Actor, new TEvPQ::TEvGetPartitionClientInfo(ev->Sender), 0, ev->Cookie); } else { @@ -2398,7 +2397,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& return; } - ui32 partition = req.GetPartition(); + TPartitionId partition(req.GetPartition()); auto it = Partitions.find(partition); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '" @@ -2554,7 +2553,7 @@ bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvRemoteHttpInfo: " << ev->Get()->Query); TMap<ui32, TActorId> res; for (auto& p : Partitions) { - res.insert({p.first, p.second.Actor}); + res.emplace(p.first.InternalPartitionId, p.second.Actor); } ctx.Register(new TMonitoringProxy(ev->Sender, ev->Get()->Query, res, CacheActor, TopicName, TabletID(), ResponseProxy.size())); return true; @@ -2809,7 +2808,10 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact // Y_ABORT_UNLESS(txBody.OperationsSize() > 0); - auto i = Partitions.find(txBody.GetOperations(0).GetPartitionId()); + TPartitionId partitionId = + MakePartitionId(txBody.GetOperations(0).GetPartitionId(), + txBody.HasWriteId() ? TMaybe<ui64>(txBody.GetWriteId()) : Nothing()); + auto i = Partitions.find(partitionId); Y_ABORT_UNLESS(i != Partitions.end()); ctx.Send(i->second.Actor, ev.Release()); @@ -3286,13 +3288,20 @@ void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx, } } +TPartitionId TPersQueue::MakePartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId) const +{ + Y_ABORT_UNLESS(!writeId.Defined()); + return TPartitionId{originalPartitionId}; +} + void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx, TDistributedTransaction& tx) { - THashMap<ui32, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events; + THashMap<TPartitionId, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events; for (auto& operation : tx.Operations) { - auto& event = events[operation.GetPartitionId()]; + TPartitionId partitionId = MakePartitionId(operation.GetPartitionId(), tx.WriteId); + auto& event = events[partitionId]; if (!event) { event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId); } @@ -3321,7 +3330,7 @@ void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx, for (ui32 partitionId : tx.Partitions) { auto event = std::make_unique<TEvPQ::TEvTxCommit>(tx.Step, tx.TxId); - auto p = Partitions.find(partitionId); + auto p = Partitions.find(MakePartitionId(partitionId, tx.WriteId)); Y_ABORT_UNLESS(p != Partitions.end()); ctx.Send(p->second.Actor, event.release()); @@ -3339,7 +3348,7 @@ void TPersQueue::SendEvTxRollbackToPartitions(const TActorContext& ctx, for (ui32 partitionId : tx.Partitions) { auto event = std::make_unique<TEvPQ::TEvTxRollback>(tx.Step, tx.TxId); - auto p = Partitions.find(partitionId); + auto p = Partitions.find(MakePartitionId(partitionId, tx.WriteId)); Y_ABORT_UNLESS(p != Partitions.end()); ctx.Send(p->second.Actor, event.release()); @@ -3712,7 +3721,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, tx.PartitionRepliesExpected = Partitions.size(); } -TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId, +TPartition* TPersQueue::CreatePartitionActor(const TPartitionId& partitionId, const NPersQueue::TTopicConverterPtr topicConverter, const NKikimrPQ::TPQTabletConfig& config, bool newPartition, @@ -3751,7 +3760,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, } for (const auto& partition : config.GetPartitions()) { - const auto partitionId = partition.GetPartitionId(); + const TPartitionId partitionId(partition.GetPartitionId()); if (Partitions.contains(partitionId)) { continue; } @@ -3888,7 +3897,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; - auto it = Partitions.find(record.GetPartition()); + auto it = Partitions.find(TPartitionId(TPartitionId(record.GetPartition()))); if (it == Partitions.end()) { LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition()); @@ -3906,14 +3915,16 @@ void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T } } -void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) { - auto sit = CheckPartitionStatusRequests.find(partitionId); +void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId) { + Y_ABORT_UNLESS(!partitionId.WriteId.Defined()); + ui32 originalPartitionId = partitionId.OriginalPartitionId; + auto sit = CheckPartitionStatusRequests.find(originalPartitionId); if (sit != CheckPartitionStatusRequests.end()) { auto it = Partitions.find(partitionId); for (auto& r : sit->second) { Forward(r, it->second.Actor); } - CheckPartitionStatusRequests.erase(partitionId); + CheckPartitionStatusRequests.erase(originalPartitionId); } } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 1cd94ed6d79..1688ab13c34 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -166,7 +166,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { ui64 GetAllowedStep() const; void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); - void ProcessCheckPartitionStatusRequests(ui32 partitionId); + void ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId); TString LogPrefix() const; @@ -187,7 +187,7 @@ private: bool ConfigInited; ui32 PartitionsInited; bool InitCompleted = false; - THashMap<ui32, TPartitionInfo> Partitions; + THashMap<TPartitionId, TPartitionInfo> Partitions; THashMap<TString, TIntrusivePtr<TEvTabletCounters::TInFlightCookie>> CounterEventsInflight; TActorId CacheActor; @@ -335,7 +335,7 @@ private: void SendEvProposePartitionConfig(const TActorContext& ctx, TDistributedTransaction& tx); - TPartition* CreatePartitionActor(ui32 partitionId, + TPartition* CreatePartitionActor(const TPartitionId& partitionId, const NPersQueue::TTopicConverterPtr topicConverter, const NKikimrPQ::TPQTabletConfig& config, bool newPartition, @@ -406,6 +406,8 @@ private: THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests; TMaybe<ui64> TabletGeneration; + + TPartitionId MakePartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId) const; }; diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h index 838562c7acb..5f43a924e4b 100644 --- a/ydb/core/persqueue/read.h +++ b/ydb/core/persqueue/read.h @@ -48,7 +48,7 @@ namespace NPQ { void SaveInProgress(const TKvRequest& kvRequest) { for (const TRequestedBlob& reqBlob : kvRequest.Blobs) { - TBlobId blob(kvRequest.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); + TBlobId blob(kvRequest.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); ReadsInProgress.insert(blob); } } @@ -56,7 +56,7 @@ namespace NPQ { bool CheckInProgress(const TActorContext& ctx, TKvRequest& kvRequest) { for (const TRequestedBlob& reqBlob : kvRequest.Blobs) { - TBlobId blob(kvRequest.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); + TBlobId blob(kvRequest.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); auto it = ReadsInProgress.find(blob); if (it != ReadsInProgress.end()) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Read request is blocked. Partition " @@ -73,7 +73,7 @@ namespace NPQ { { TVector<TKvRequest> unblocked; for (const TRequestedBlob& reqBlob : blocker.Blobs) { - TBlobId blob(blocker.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); + TBlobId blob(blocker.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); ReadsInProgress.erase(blob); auto it = BlockedReads.find(blob); @@ -104,8 +104,8 @@ namespace NPQ { void Handle(TEvPQ::TEvBlobRequest::TPtr& ev, const TActorContext& ctx) { - ui32 partition = ev->Get()->Partition; - Cache.SetUserOffset(ctx, ev->Get()->User, partition, ev->Get()->ReadOffset); + const TPartitionId& partition = ev->Get()->Partition; + Cache.SetUserOffset(ctx, ev->Get()->User, partition.InternalPartitionId, ev->Get()->ReadOffset); TKvRequest kvReq(TKvRequest::TypeRead, ev->Sender, ev->Get()->Cookie, partition); kvReq.Blobs = std::move(ev->Get()->Blobs); @@ -263,7 +263,7 @@ namespace NPQ { auto srcRequest = ev->Get()->Record; - TKvRequest kvReq(TKvRequest::TypeWrite, ev->Sender, Max<ui64>(), Max<ui32>()); + TKvRequest kvReq(TKvRequest::TypeWrite, ev->Sender, Max<ui64>(), TPartitionId(Max<ui32>())); kvReq.Blobs.reserve(srcRequest.CmdWriteSize()); for (ui32 i = 0; i < srcRequest.CmdWriteSize(); ++i) { @@ -333,7 +333,7 @@ namespace NPQ { if (!data) continue; TABLER() { - TABLED() {out << int(c.first.Partition);} + TABLED() {out << c.first.Partition;} TABLED() {out << c.first.Offset;} TABLED() {out << c.first.Count;} TABLED() {out << data->GetValue().size();} @@ -345,7 +345,7 @@ namespace NPQ { } } } - ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Max<ui32>(), TVector<TString>(), out.Str())); + ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(TVector<TString>(), out.Str())); } void UpdateCounters(const TActorContext& ctx) diff --git a/ydb/core/persqueue/read_quoter.h b/ydb/core/persqueue/read_quoter.h index ac2cdd4cb9a..3e37f4840e7 100644 --- a/ydb/core/persqueue/read_quoter.h +++ b/ydb/core/persqueue/read_quoter.h @@ -78,7 +78,7 @@ public: TActorId partitionActor, const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config, - ui32 partition, + const TPartitionId& partition, TActorId tabletActor, ui64 tabletId, const TTabletCountersBase& counters @@ -159,7 +159,7 @@ private: NKikimrPQ::TPQTabletConfig PQTabletConfig; TQuotaTracker PartitionTotalQuotaTracker; NPersQueue::TTopicConverterPtr TopicConverter; - const ui32 Partition; + const TPartitionId Partition; ui64 TabletId; TTabletCountersBase Counters; ui32 RequestsInflight; diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index e2176d8c337..b47dd70c794 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -70,13 +70,13 @@ void FillDelete(const TKeyPrefix& key, NKikimrClient::TKeyValueRequest::TCmdDele range.SetIncludeTo(true); } -void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) { +void FillDelete(const TPartitionId& partition, const TString& sourceId, TKeyPrefix::EMark mark, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) { TKeyPrefix key(TKeyPrefix::TypeInfo, partition, mark); key.Append(sourceId.c_str(), sourceId.size()); FillDelete(key, cmd); } -void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) { +void FillDelete(const TPartitionId& partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) { FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd); } @@ -263,7 +263,7 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) { } } -bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config) { +bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, const TPartitionId& partition, const NKikimrPQ::TPartitionConfig& config) { TVector<std::pair<ui64, TString>> toDelOffsets; ui64 maxDeleteSourceIds = 0; @@ -478,7 +478,7 @@ TKeyPrefix::EMark TSourceIdWriter::FormatToMark(ESourceIdFormat format) { } } -void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partition) { +void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, const TPartitionId& partition) { TKeyPrefix key(TKeyPrefix::TypeInfo, partition, FormatToMark(Format)); TBuffer data; diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 897014363f2..8566810c40c 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -92,7 +92,7 @@ public: void DeregisterSourceId(const TString& sourceId); void LoadSourceIdInfo(const TString& key, const TString& data, TInstant now); - bool DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config); + bool DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, const TPartitionId& partition, const NKikimrPQ::TPartitionConfig& config); void RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie); void MarkOwnersForDeletedSourceId(THashMap<TString, TOwnerInfo>& owners); @@ -130,7 +130,7 @@ public: void DeregisterSourceId(const TString& sourceId); void Clear(); - void FillRequest(TEvKeyValue::TEvRequest* request, ui32 partition); + void FillRequest(TEvKeyValue::TEvRequest* request, const TPartitionId& partition); static void FillKeyAndData(ESourceIdFormat format, const TString& sourceId, const TSourceIdInfo& sourceIdInfo, TKeyPrefix& key, TBuffer& data); private: diff --git a/ydb/core/persqueue/subscriber.cpp b/ydb/core/persqueue/subscriber.cpp index 5347f3f4e13..9d2f9336b4c 100644 --- a/ydb/core/persqueue/subscriber.cpp +++ b/ydb/core/persqueue/subscriber.cpp @@ -48,7 +48,7 @@ TVector<std::pair<TReadInfo, ui64>> TSubscriberLogic::CompleteSubscriptions(cons } -TSubscriber::TSubscriber(const ui32 partition, TTabletCountersBase& counters, const TActorId& tablet) +TSubscriber::TSubscriber(const TPartitionId& partition, TTabletCountersBase& counters, const TActorId& tablet) : Subscriber() , Partition(partition) , Counters(counters) diff --git a/ydb/core/persqueue/subscriber.h b/ydb/core/persqueue/subscriber.h index 7d8890ebb9d..51761465248 100644 --- a/ydb/core/persqueue/subscriber.h +++ b/ydb/core/persqueue/subscriber.h @@ -1,7 +1,8 @@ #pragma once -#include "header.h" #include "blob.h" +#include "header.h" +#include "partition_id.h" #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/base/appdata.h> @@ -78,7 +79,7 @@ struct TReadInfo { const TActorContext& ctx, const TEvPQ::TEvBlobResponse& response, const ui64 endOffset, - const ui32 partition, + const TPartitionId& partition, TUserInfo* ui, const ui64 dst, const ui64 sizeLag, @@ -89,7 +90,7 @@ struct TReadInfo { TReadAnswer FormAnswer( const TActorContext& ctx, const ui64 endOffset, - const ui32 partition, + const TPartitionId& partition, TUserInfo* ui, const ui64 dst, const ui64 sizeLag, @@ -127,7 +128,7 @@ private: class TSubscriber : public TNonCopyable { public: - TSubscriber(const ui32 partition, TTabletCountersBase& counters, const TActorId& tablet); + TSubscriber(const TPartitionId& partition, TTabletCountersBase& counters, const TActorId& tablet); //will wait for new data or timeout for this read and set timer for timeout ms void AddSubscription(TReadInfo&& info, const ui32 timeout, const ui64 cookie, const TActorContext& ctx); @@ -140,7 +141,7 @@ public: private: TSubscriberLogic Subscriber; - const ui32 Partition; + const TPartitionId& Partition; TTabletCountersBase& Counters; TActorId Tablet; }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index b613822c8e4..b17e0aacd2d 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -75,11 +75,11 @@ void TDistributedTransaction::InitPartitions() if (TabletConfig.PartitionsSize()) { for (const auto& partition : TabletConfig.GetPartitions()) { - Partitions.insert(partition.GetPartitionId()); + Partitions.emplace(partition.GetPartitionId()); } } else { for (auto partitionId : TabletConfig.GetPartitionIds()) { - Partitions.insert(partitionId); + Partitions.emplace(partitionId); } } } @@ -155,6 +155,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans // Old configuration format without AllPartitions. Split/Merge is not supported. continue; } + if (node->Children.empty()) { for (const auto* r : node->Parents) { if (extractTabletId != r->TabletId) { @@ -206,7 +207,7 @@ void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decisi Y_ABORT_UNLESS(Step == event.Step); Y_ABORT_UNLESS(TxId == event.TxId); - Y_ABORT_UNLESS(Partitions.contains(event.Partition)); + Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId)); SetDecision(SelfDecision, decision); @@ -246,7 +247,7 @@ void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event Y_ABORT_UNLESS(Step == event.Step); Y_ABORT_UNLESS(TxId == event.TxId); - Y_ABORT_UNLESS(Partitions.contains(event.Partition)); + Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId)); ++PartitionRepliesCount; } diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index a3e69b08e7f..b964df24fd4 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -48,6 +48,7 @@ struct TDistributedTransaction { THashSet<ui64> Senders; // список отправителей TEvReadSet THashSet<ui64> Receivers; // список получателей TEvReadSet TVector<NKikimrPQ::TPartitionOperation> Operations; + TMaybe<ui64> WriteId; EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index ade9422864f..8d13437c557 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -848,10 +848,10 @@ TVector<TString> CmdSourceIdRead(TTestContext& tc) { sourceIds.clear(); auto read = request->Record.AddCmdReadRange(); auto range = read->MutableRange(); - NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkProtoSourceId); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkProtoSourceId); range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); range->SetIncludeFrom(true); - NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUserDeprecated); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkUserDeprecated); range->SetTo(ikeyTo.Data(), ikeyTo.Size()); range->SetIncludeTo(false); Cout << request.Get()->ToString() << Endl; @@ -1093,7 +1093,7 @@ void CmdForgetRead(const TCmdDirectReadSettings& settings, TTestContext& tc) { } void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { - NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser); + NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, TPartitionId(partition), NPQ::TKeyPrefix::MarkUser); ikey.Append(client.c_str(), client.size()); NKikimrPQ::TUserInfo userInfo; @@ -1117,7 +1117,7 @@ void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, co TString session = "test-session"; ui32 gen = 1; ui32 step = 2; - NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUserDeprecated); + NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, TPartitionId(partition), NPQ::TKeyPrefix::MarkUserDeprecated); ikeyDeprecated.Append(client.c_str(), client.size()); TBuffer idataDeprecated = NPQ::NDeprecatedUserData::Serialize(offset, gen, step, session); diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp index 4a00987d97b..8458e55a442 100644 --- a/ydb/core/persqueue/ut/internals_ut.cpp +++ b/ydb/core/persqueue/ut/internals_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST(TestPartitionedBlobSimpleTest) { THead head; THead newHead; - TPartitionedBlob blob(0, 0, "sourceId", 1, 1, 10, head, newHead, false, false, 8_MB); + TPartitionedBlob blob(TPartitionId(0), 0, "sourceId", 1, 1, 10, head, newHead, false, false, 8_MB); TClientBlob clientBlob("sourceId", 1, "valuevalue", TMaybe<TPartData>(), TInstant::MilliSeconds(1), TInstant::MilliSeconds(1), 0, "123", "123"); UNIT_ASSERT(blob.IsInited()); TString error; @@ -73,7 +73,7 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead) newHead.PackedSize = newHead.Batches.back().GetUnpackedSize(); TString value2(partSize, 'b'); ui32 maxBlobSize = 8 << 20; - TPartitionedBlob blob(0, newHead.GetNextOffset(), "sourceId3", 1, parts, parts * value2.size(), head, newHead, headCompacted, false, maxBlobSize); + TPartitionedBlob blob(TPartitionId(0), newHead.GetNextOffset(), "sourceId3", 1, parts, parts * value2.size(), head, newHead, headCompacted, false, maxBlobSize); TVector<std::pair<TKey, TString>> formed; diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index d6803f176b8..4d58647a2f7 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -91,18 +91,18 @@ protected: struct TCalcPredicateMatcher { TMaybe<ui64> Step; TMaybe<ui64> TxId; - TMaybe<ui32> Partition; + TMaybe<TPartitionId> Partition; TMaybe<bool> Predicate; }; struct TCommitTxDoneMatcher { TMaybe<ui64> Step; TMaybe<ui64> TxId; - TMaybe<ui32> Partition; + TMaybe<TPartitionId> Partition; }; struct TChangePartitionConfigMatcher { - TMaybe<ui32> Partition; + TMaybe<TPartitionId> Partition; }; struct TTxOperationMatcher { @@ -263,7 +263,7 @@ void TPartitionFixture::CreatePartitionActor(ui32 id, TopicConverter = factory.MakeTopicConverter(Config); auto actor = new NPQ::TPartition(Ctx->TabletId, - id, + TPartitionId(id), Ctx->Edge, 0, Ctx->Edge, @@ -680,7 +680,7 @@ void TPartitionFixture::SendInfoRangeResponse(ui32 partition, auto pair = read->AddPair(); pair->SetStatus(NKikimrProto::OK); - NPQ::TKeyPrefix key(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser); + NPQ::TKeyPrefix key(NPQ::TKeyPrefix::TypeInfo, TPartitionId(partition), NPQ::TKeyPrefix::MarkUser); key.Append(c.Consumer.data(), c.Consumer.size()); pair->SetKey(key.Data(), key.Size()); @@ -719,7 +719,7 @@ void TPartitionFixture::SendDataRangeResponse(ui64 begin, ui64 end) auto read = event->Record.AddReadRangeResult(); read->SetStatus(NKikimrProto::OK); auto pair = read->AddPair(); - NPQ::TKey key(NPQ::TKeyPrefix::TypeData, 1, begin, 0, end - begin, 0); + NPQ::TKey key(NPQ::TKeyPrefix::TypeData, TPartitionId(1), begin, 0, end - begin, 0); pair->SetStatus(NKikimrProto::OK); pair->SetKey(key.ToString()); //pair->SetValueSize(); @@ -1094,14 +1094,14 @@ Y_UNIT_TEST_F(CorrectRange_Commit, TPartitionFixture) CreateSession(client, session); SendCalcPredicate(step, txId, client, 0, 2); - WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=true}); SendCommitTx(step, txId); WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=2}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); - WaitCommitTxDone({.TxId=txId, .Partition=partition}); + WaitCommitTxDone({.TxId=txId, .Partition=TPartitionId(partition)}); } Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) @@ -1121,23 +1121,23 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) CreateSession(client, session); SendCalcPredicate(step, txId_1, client, 0, 1); - WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); SendCalcPredicate(step, txId_2, client, 0, 2); SendCalcPredicate(step, txId_3, client, 0, 2); SendCommitTx(step, txId_1); - WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_2); - WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=partition, .Predicate=false}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_3); WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); - WaitCommitTxDone({.TxId=txId_1, .Partition=partition}); + WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)}); } Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture) @@ -1162,7 +1162,7 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture) WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK}); - WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=true}); SendCommitTx(step, txId); WaitCmdWrite({.Count=5, .UserInfos={ @@ -1183,7 +1183,7 @@ Y_UNIT_TEST_F(OldPlanStep, TPartitionFixture) CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=99999, .TxId=55555}); SendCommitTx(step, txId); - WaitCommitTxDone({.TxId=txId, .Partition=partition}); + WaitCommitTxDone({.TxId=txId, .Partition=TPartitionId(partition)}); } Y_UNIT_TEST_F(AfterRestart_1, TPartitionFixture) @@ -1211,7 +1211,7 @@ Y_UNIT_TEST_F(AfterRestart_1, TPartitionFixture) SendCommitTx(step, 11111); - WaitCalcPredicateResult({.Step=step, .TxId=22222, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=22222, .Partition=TPartitionId(partition), .Predicate=true}); SendCommitTx(step, 22222); WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=22222, .UserInfos={{1, {.Session=session, .Offset=4}}}}); @@ -1240,7 +1240,7 @@ Y_UNIT_TEST_F(AfterRestart_2, TPartitionFixture) .Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}} }); - WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=TPartitionId(partition), .Predicate=true}); } Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture) @@ -1258,7 +1258,7 @@ Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture) CreateSession(client, session); SendCalcPredicate(step, txId, client, 4, 2); - WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false}); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId); WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId}); @@ -1267,7 +1267,7 @@ Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture) ++txId; SendCalcPredicate(step, txId, client, 2, 4); - WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false}); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId); WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId}); @@ -1276,7 +1276,7 @@ Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture) ++txId; SendCalcPredicate(step, txId, client, 0, 11); - WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false}); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=false}); } Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture) @@ -1295,12 +1295,12 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture) CreateSession(client, session); SendCalcPredicate(step, txId_1, client, 0, 2); - WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); SendCalcPredicate(step, txId_2, client, 0, 5); SendRollbackTx(step, txId_1); - WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=true}); } Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) @@ -1333,7 +1333,7 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) // SendCalcPredicate(step, txId_2, "client-2", 0, 2); - WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); SendCommitTx(step, txId_1); // @@ -1350,12 +1350,12 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) }}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); - WaitPartitionConfigChanged({.Partition=partition}); + WaitPartitionConfigChanged({.Partition=TPartitionId(partition)}); // // consumer 'client-2' was deleted // - WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_2); } diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 2c89b9f95cd..7b89ee28ae4 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -416,10 +416,10 @@ Y_UNIT_TEST(TestReadRuleVersions) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); auto read = request->Record.AddCmdReadRange(); auto range = read->MutableRange(); - NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkUser); range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); range->SetIncludeFrom(true); - NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, TPartitionId(1), NPQ::TKeyPrefix::MarkUser); range->SetTo(ikeyTo.Data(), ikeyTo.Size()); range->SetIncludeTo(true); @@ -441,10 +441,10 @@ Y_UNIT_TEST(TestReadRuleVersions) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); auto read = request->Record.AddCmdReadRange(); auto range = read->MutableRange(); - NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkUser); range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); range->SetIncludeFrom(true); - NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, TPartitionId(1), NPQ::TKeyPrefix::MarkUser); range->SetTo(ikeyTo.Data(), ikeyTo.Size()); range->SetIncludeTo(true); @@ -2082,12 +2082,12 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) { Y_UNIT_TEST(TestOffsetEstimation) { std::deque<NPQ::TDataKey> container = { - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 1, 0, 0, 0), 0, TInstant::Seconds(1), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 2, 0, 0, 0), 0, TInstant::Seconds(1), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 3, 0, 0, 0), 0, TInstant::Seconds(2), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 4, 0, 0, 0), 0, TInstant::Seconds(2), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 5, 0, 0, 0), 0, TInstant::Seconds(3), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 6, 0, 0, 0), 0, TInstant::Seconds(3), 10}, + {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 1, 0, 0, 0), 0, TInstant::Seconds(1), 10}, + {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 2, 0, 0, 0), 0, TInstant::Seconds(1), 10}, + {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 3, 0, 0, 0), 0, TInstant::Seconds(2), 10}, + {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 4, 0, 0, 0), 0, TInstant::Seconds(2), 10}, + {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 5, 0, 0, 0), 0, TInstant::Seconds(3), 10}, + {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 6, 0, 0, 0), 0, TInstant::Seconds(3), 10}, }; UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate({}, TInstant::MilliSeconds(0), 9999), 9999); UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(0), 9999), 1); diff --git a/ydb/core/persqueue/ut/sourceid_ut.cpp b/ydb/core/persqueue/ut/sourceid_ut.cpp index 43164c888c5..8cb27cdc2b8 100644 --- a/ydb/core/persqueue/ut/sourceid_ut.cpp +++ b/ydb/core/persqueue/ut/sourceid_ut.cpp @@ -64,7 +64,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { writer.RegisterSourceId(sourceId, sourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1); { - TKeyPrefix key(TKeyPrefix::TypeInfo, TestPartition, TKeyPrefix::MarkSourceId); + TKeyPrefix key(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), TKeyPrefix::MarkSourceId); TBuffer data; TSourceIdWriter::FillKeyAndData(ESourceIdFormat::Raw, sourceId, sourceIdInfo, key, data); @@ -73,7 +73,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { write->SetValue(data.Data(), data.Size()); write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); - writer.FillRequest(actualRequest.Get(), TestPartition); + writer.FillRequest(actualRequest.Get(), TPartitionId(TestPartition)); UNIT_ASSERT_VALUES_EQUAL(actualRequest.Get()->ToString(), expectedRequest.Get()->ToString()); } @@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { writer.RegisterSourceId(anotherSourceId, anotherSourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 2); { - TKeyPrefix key(TKeyPrefix::TypeInfo, TestPartition + 1, TKeyPrefix::MarkSourceId); + TKeyPrefix key(TKeyPrefix::TypeInfo, TPartitionId(TestPartition + 1), TKeyPrefix::MarkSourceId); TBuffer data; for (const auto& [sourceId, sourceIdInfo] : writer.GetSourceIdsToWrite()) { @@ -93,7 +93,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); } - writer.FillRequest(actualRequest.Get(), TestPartition + 1); + writer.FillRequest(actualRequest.Get(), TPartitionId(TestPartition + 1)); UNIT_ASSERT_VALUES_EQUAL(actualRequest.Get()->ToString(), expectedRequest.Get()->ToString()); } } @@ -132,7 +132,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { const auto sourceId = TestSourceId(); const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); - TKeyPrefix ikey(TKeyPrefix::TypeInfo, TestPartition, mark); + TKeyPrefix ikey(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), mark); TBuffer idata; TSourceIdWriter::FillKeyAndData(format, sourceId, sourceIdInfo, ikey, idata); @@ -194,7 +194,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { // sources are dropped before startOffset = 20 { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 20, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 20, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 19); // first 19 sources are dropped } @@ -202,7 +202,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { // expired sources are dropped { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 341); // another 341 (360 - 19) sources are dropped } @@ -210,14 +210,14 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { // move to the past { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, false); // nothing to drop (everything is dropped) } // move to the future { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(3), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(3), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 360); // more 360 sources are dropped } @@ -234,14 +234,14 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { config.SetSourceIdMaxCounts(10000); { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, false); } config.SetSourceIdMaxCounts(9900); // decrease by 100 { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 100); // 100 sources are dropped } @@ -267,7 +267,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { config.SetSourceIdMaxCounts(10000); // limit to 10000 { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 360); // first 360 sources are dropped } @@ -276,7 +276,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { config.SetSourceIdMaxCounts(10000 - 360); { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 6); // another 6 sources are dropped by retention } @@ -284,7 +284,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { config.SetSourceIdMaxCounts(10000 - 370); { auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 5); // more 5 sources are dropped } @@ -306,7 +306,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { NKikimrPQ::TPartitionConfig config; config.SetSourceIdMaxCounts(1); // limit to one - const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config); + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config); UNIT_ASSERT_EQUAL(dropped, true); UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2); // first source is dropped by retention |