aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <152866892+Alek5andr-Kotov@users.noreply.github.com>2024-02-01 13:40:03 +0300
committerGitHub <noreply@github.com>2024-02-01 13:40:03 +0300
commit922d72d7e2c1e2cad4e5896a7cdbe05e84e39d34 (patch)
tree5266b424335bb6c6e1087dbf8e0d9ad04f048795
parentec89d222d3cefcd00e295599e04fcd04508e60bf (diff)
downloadydb-922d72d7e2c1e2cad4e5896a7cdbe05e84e39d34.tar.gz
type TPartitionId (#1472)
-rw-r--r--ydb/core/persqueue/account_read_quoter.cpp2
-rw-r--r--ydb/core/persqueue/account_read_quoter.h4
-rw-r--r--ydb/core/persqueue/blob.cpp2
-rw-r--r--ydb/core/persqueue/blob.h4
-rw-r--r--ydb/core/persqueue/cache_eviction.h22
-rw-r--r--ydb/core/persqueue/event_helpers.cpp2
-rw-r--r--ydb/core/persqueue/event_helpers.h4
-rw-r--r--ydb/core/persqueue/events/internal.h55
-rw-r--r--ydb/core/persqueue/key.h22
-rw-r--r--ydb/core/persqueue/ownerinfo.cpp2
-rw-r--r--ydb/core/persqueue/ownerinfo.h2
-rw-r--r--ydb/core/persqueue/partition.cpp31
-rw-r--r--ydb/core/persqueue/partition.h6
-rw-r--r--ydb/core/persqueue/partition_id.h115
-rw-r--r--ydb/core/persqueue/partition_init.cpp28
-rw-r--r--ydb/core/persqueue/partition_init.h2
-rw-r--r--ydb/core/persqueue/partition_monitoring.cpp4
-rw-r--r--ydb/core/persqueue/partition_read.cpp12
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.cpp4
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.h2
-rw-r--r--ydb/core/persqueue/partition_write.cpp2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp57
-rw-r--r--ydb/core/persqueue/pq_impl.h8
-rw-r--r--ydb/core/persqueue/read.h16
-rw-r--r--ydb/core/persqueue/read_quoter.h4
-rw-r--r--ydb/core/persqueue/sourceid.cpp8
-rw-r--r--ydb/core/persqueue/sourceid.h4
-rw-r--r--ydb/core/persqueue/subscriber.cpp2
-rw-r--r--ydb/core/persqueue/subscriber.h11
-rw-r--r--ydb/core/persqueue/transaction.cpp9
-rw-r--r--ydb/core/persqueue/transaction.h1
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp8
-rw-r--r--ydb/core/persqueue/ut/internals_ut.cpp4
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp48
-rw-r--r--ydb/core/persqueue/ut/pq_ut.cpp20
-rw-r--r--ydb/core/persqueue/ut/sourceid_ut.cpp30
36 files changed, 353 insertions, 204 deletions
diff --git a/ydb/core/persqueue/account_read_quoter.cpp b/ydb/core/persqueue/account_read_quoter.cpp
index 583e76d6aa1..65ab6d5a8a8 100644
--- a/ydb/core/persqueue/account_read_quoter.cpp
+++ b/ydb/core/persqueue/account_read_quoter.cpp
@@ -29,7 +29,7 @@ TAccountReadQuoter::TAccountReadQuoter(
TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
- ui32 partition,
+ const TPartitionId& partition,
const TString& user,
const TTabletCountersBase& counters
)
diff --git a/ydb/core/persqueue/account_read_quoter.h b/ydb/core/persqueue/account_read_quoter.h
index 22b4e673763..d6c9e309d87 100644
--- a/ydb/core/persqueue/account_read_quoter.h
+++ b/ydb/core/persqueue/account_read_quoter.h
@@ -89,7 +89,7 @@ public:
TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
- ui32 partition,
+ const TPartitionId& partition,
const TString& user,
const TTabletCountersBase& counters
);
@@ -112,7 +112,7 @@ private:
const TActorId Recepient;
const ui64 TabletId;
const NPersQueue::TTopicConverterPtr TopicConverter;
- const ui32 Partition;
+ const TPartitionId Partition;
const TString User;
const TString ConsumerPath;
const ui64 ReadCreditBytes;
diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp
index 4b6bff79e89..9a2c6d2f70c 100644
--- a/ydb/core/persqueue/blob.cpp
+++ b/ydb/core/persqueue/blob.cpp
@@ -736,7 +736,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x)
, MaxBlobSize(x.MaxBlobSize)
{}
-TPartitionedBlob::TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
+TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize)
: Partition(partition)
, Offset(offset)
diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h
index 0f2c79e11b2..7a4b836a899 100644
--- a/ydb/core/persqueue/blob.h
+++ b/ydb/core/persqueue/blob.h
@@ -264,7 +264,7 @@ public:
TPartitionedBlob(const TPartitionedBlob& x);
- TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
+ TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize);
std::optional<std::pair<TKey, TString>> Add(TClientBlob&& blob);
@@ -287,7 +287,7 @@ private:
TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize);
private:
- ui32 Partition;
+ TPartitionId Partition;
ui64 Offset;
ui16 InternalPartsCount;
ui64 StartOffset;
diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h
index 4707fc4fff7..e92f64f40b4 100644
--- a/ydb/core/persqueue/cache_eviction.h
+++ b/ydb/core/persqueue/cache_eviction.h
@@ -54,11 +54,11 @@ namespace NPQ {
ERequestType Type;
TActorId Sender;
ui64 CookiePQ;
- ui32 Partition;
+ TPartitionId Partition;
ui32 MetadataWritesCount;
TVector<TRequestedBlob> Blobs;
- TKvRequest(ERequestType type, TActorId sender, ui64 cookie, ui32 partition)
+ TKvRequest(ERequestType type, TActorId sender, ui64 cookie, const TPartitionId& partition)
: Type(type)
, Sender(sender)
, CookiePQ(cookie)
@@ -66,7 +66,7 @@ namespace NPQ {
, MetadataWritesCount(0)
{}
- TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }
+ TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition.InternalPartitionId, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }
THolder<TEvKeyValue::TEvRequest> MakeKvRequest() const
{
@@ -110,7 +110,7 @@ namespace NPQ {
}
void Verify(const TRequestedBlob& blob) const {
- TKey key(TKeyPrefix::TypeData, 0, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
+ TKey key(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
Y_ABORT_UNLESS(blob.Value.size() == blob.Size);
TClientBlob::CheckBlob(key, blob.Value);
}
@@ -258,7 +258,7 @@ namespace NPQ {
for (const auto& blob : kvReq.Blobs) {
// Touching blobs in L2. We don't need data here
- TCacheBlobL2 key = {kvReq.Partition, blob.Offset, blob.PartNo, nullptr};
+ TCacheBlobL2 key = {kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, nullptr};
if (blob.Cached)
reqData->RequestedBlobs.push_back(key);
else
@@ -275,10 +275,10 @@ namespace NPQ {
THolder<TCacheL2Request> reqData = MakeHolder<TCacheL2Request>(TabletId);
for (const TRequestedBlob& reqBlob : kvReq.Blobs) {
- TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
+ TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
{ // there could be a new blob with same id (for big messages)
if (RemoveExists(ctx, blob)) {
- TCacheBlobL2 removed = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr};
+ TCacheBlobL2 removed = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, nullptr};
reqData->RemovedBlobs.push_back(removed);
}
}
@@ -290,7 +290,7 @@ namespace NPQ {
if (L1Strategy)
L1Strategy->SaveHeadBlob(blob);
- TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
+ TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
reqData->StoredBlobs.push_back(blobL2);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition "
@@ -314,7 +314,7 @@ namespace NPQ {
continue;
const TRequestedBlob& reqBlob = kvReq.Blobs[i];
- TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
+ TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
{
TValueL1 value;
if (CheckExists(ctx, blob, value)) {
@@ -328,7 +328,7 @@ namespace NPQ {
Cache[blob] = valL1; // weak
Counters.Inc(valL1);
- TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
+ TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
reqData->StoredBlobs.push_back(blobL2);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Prefetched blob in L1. Partition "
@@ -428,7 +428,7 @@ namespace NPQ {
++numCached;
continue;
}
- TBlobId blobId(kvReq.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
+ TBlobId blobId(kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
TCacheValue::TPtr cached = GetValue(ctx, blobId);
if (cached) {
++numCached;
diff --git a/ydb/core/persqueue/event_helpers.cpp b/ydb/core/persqueue/event_helpers.cpp
index aea0fb89aa5..c8a2aa8182d 100644
--- a/ydb/core/persqueue/event_helpers.cpp
+++ b/ydb/core/persqueue/event_helpers.cpp
@@ -15,7 +15,7 @@ void ReplyPersQueueError(
const TActorContext& ctx,
ui64 tabletId,
const TString& topicName,
- TMaybe<ui32> partition,
+ TMaybe<TPartitionId> partition,
NKikimr::TTabletCountersBase& counters,
NKikimrServices::EServiceKikimr service,
const ui64 responseCookie,
diff --git a/ydb/core/persqueue/event_helpers.h b/ydb/core/persqueue/event_helpers.h
index 1d676b3d0a9..ccc928a46f8 100644
--- a/ydb/core/persqueue/event_helpers.h
+++ b/ydb/core/persqueue/event_helpers.h
@@ -1,5 +1,7 @@
#pragma once
+#include "partition_id.h"
+
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
@@ -14,7 +16,7 @@ void ReplyPersQueueError(
const TActorContext& ctx,
ui64 tabletId,
const TString& topicName,
- TMaybe<ui32> partition,
+ TMaybe<TPartitionId> partition,
NKikimr::TTabletCountersBase& counters,
NKikimrServices::EServiceKikimr service,
const ui64 responseCookie,
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 69a8858da8b..be6aac79799 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -170,8 +170,6 @@ struct TEvPQ {
EvQuotaCountersUpdated,
EvConsumerRemoved,
EvFetchResponse,
- EvSourceIdRequest,
- EvSourceIdResponse,
EvPublishRead,
EvForgetRead,
EvRegisterDirectReadSession,
@@ -313,13 +311,18 @@ struct TEvPQ {
};
struct TEvMonResponse : public TEventLocal<TEvMonResponse, EvMonResponse> {
- TEvMonResponse(ui32 partition, const TVector<TString>& res, const TString& str)
+ TEvMonResponse(const NPQ::TPartitionId& partition, const TVector<TString>& res, const TString& str)
: Partition(partition)
, Res(res)
, Str(str)
{}
- ui32 Partition;
+ TEvMonResponse(const TVector<TString>& res, const TString& str)
+ : Res(res)
+ , Str(str)
+ {}
+
+ TMaybe<NPQ::TPartitionId> Partition;
TVector<TString> Res;
TString Str;
};
@@ -398,11 +401,13 @@ struct TEvPQ {
};
struct TEvPartitionOffsetsResponse : public TEventLocal<TEvPartitionOffsetsResponse, EvPartitionOffsetsResponse> {
- explicit TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult)
+ TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
: PartResult(partResult)
+ , Partition(partition)
{}
NKikimrPQ::TOffsetsResponse::TPartResult PartResult;
+ NPQ::TPartitionId Partition;
};
struct TEvPartitionStatus : public TEventLocal<TEvPartitionStatus, EvPartitionStatus> {
@@ -424,11 +429,13 @@ struct TEvPQ {
};
struct TEvPartitionStatusResponse : public TEventLocal<TEvPartitionStatusResponse, EvPartitionStatusResponse> {
- explicit TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult)
+ TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
: PartResult(partResult)
+ , Partition(partition)
{}
NKikimrPQ::TStatusResponse::TPartResult PartResult;
+ NPQ::TPartitionId Partition;
};
@@ -442,11 +449,11 @@ struct TEvPQ {
};
struct TEvInitComplete : public TEventLocal<TEvInitComplete, EvInitComplete> {
- explicit TEvInitComplete(const ui32 partition)
+ explicit TEvInitComplete(const NPQ::TPartitionId& partition)
: Partition(partition)
{}
- ui32 Partition;
+ NPQ::TPartitionId Partition;
};
struct TEvError : public TEventLocal<TEvError, EvError> {
@@ -462,7 +469,7 @@ struct TEvPQ {
};
struct TEvBlobRequest : public TEventLocal<TEvBlobRequest, EvBlobRequest> {
- TEvBlobRequest(const TString& user, const ui64 cookie, const ui32 partition, const ui64 readOffset,
+ TEvBlobRequest(const TString& user, const ui64 cookie, const NPQ::TPartitionId& partition, const ui64 readOffset,
TVector<NPQ::TRequestedBlob>&& blobs)
: User(user)
, Cookie(cookie)
@@ -473,7 +480,7 @@ struct TEvPQ {
TString User;
ui64 Cookie;
- ui32 Partition;
+ NPQ::TPartitionId Partition;
ui64 ReadOffset;
TVector<NPQ::TRequestedBlob> Blobs;
};
@@ -565,12 +572,12 @@ struct TEvPQ {
};
struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
- explicit TEvPartitionConfigChanged(ui32 partition) :
+ explicit TEvPartitionConfigChanged(const NPQ::TPartitionId& partition) :
Partition(partition)
{
}
- ui32 Partition;
+ NPQ::TPartitionId Partition;
};
struct TEvChangeCacheConfig : public TEventLocal<TEvChangeCacheConfig, EvChangeCacheConfig> {
@@ -588,35 +595,35 @@ struct TEvPQ {
};
struct TEvPartitionCounters : public TEventLocal<TEvPartitionCounters, EvPartitionCounters> {
- TEvPartitionCounters(const ui32 partition, const TTabletCountersBase& counters)
+ TEvPartitionCounters(const NPQ::TPartitionId& partition, const TTabletCountersBase& counters)
: Partition(partition)
{
Counters.Populate(counters);
}
- const ui32 Partition;
+ const NPQ::TPartitionId Partition;
TTabletCountersBase Counters;
};
struct TEvPartitionLabeledCounters : public TEventLocal<TEvPartitionLabeledCounters, EvPartitionLabeledCounters> {
- TEvPartitionLabeledCounters(const ui32 partition, const TTabletLabeledCountersBase& labeledCounters)
+ TEvPartitionLabeledCounters(const NPQ::TPartitionId& partition, const TTabletLabeledCountersBase& labeledCounters)
: Partition(partition)
, LabeledCounters(labeledCounters)
{
}
- const ui32 Partition;
+ const NPQ::TPartitionId Partition;
TTabletLabeledCountersBase LabeledCounters;
};
struct TEvPartitionLabeledCountersDrop : public TEventLocal<TEvPartitionLabeledCountersDrop, EvPartitionLabeledCountersDrop> {
- TEvPartitionLabeledCountersDrop(const ui32 partition, const TString& group)
+ TEvPartitionLabeledCountersDrop(const NPQ::TPartitionId& partition, const TString& group)
: Partition(partition)
, Group(group)
{
}
- const ui32 Partition;
+ const NPQ::TPartitionId Partition;
TString Group;
};
@@ -798,7 +805,7 @@ struct TEvPQ {
};
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
- TEvTxCalcPredicateResult(ui64 step, ui64 txId, ui32 partition, bool predicate) :
+ TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
Step(step),
TxId(txId),
Partition(partition),
@@ -808,7 +815,7 @@ struct TEvPQ {
ui64 Step;
ui64 TxId;
- ui32 Partition;
+ NPQ::TPartitionId Partition;
bool Predicate = false;
};
@@ -826,7 +833,7 @@ struct TEvPQ {
};
struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {
- TEvProposePartitionConfigResult(ui64 step, ui64 txId, ui32 partition) :
+ TEvProposePartitionConfigResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
Step(step),
TxId(txId),
Partition(partition)
@@ -835,7 +842,7 @@ struct TEvPQ {
ui64 Step;
ui64 TxId;
- ui32 Partition;
+ NPQ::TPartitionId Partition;
};
struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> {
@@ -850,7 +857,7 @@ struct TEvPQ {
};
struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> {
- TEvTxCommitDone(ui64 step, ui64 txId, ui32 partition) :
+ TEvTxCommitDone(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
Step(step),
TxId(txId),
Partition(partition)
@@ -859,7 +866,7 @@ struct TEvPQ {
ui64 Step;
ui64 TxId;
- ui32 Partition;
+ NPQ::TPartitionId Partition;
};
struct TEvTxRollback : public TEventLocal<TEvTxRollback, EvTxRollback> {
diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h
index 4be6be821a1..ecc6fd6d9ef 100644
--- a/ydb/core/persqueue/key.h
+++ b/ydb/core/persqueue/key.h
@@ -1,5 +1,8 @@
#pragma once
+#include "partition_id.h"
+
+#include <util/digest/multi.h>
#include <util/generic/buffer.h>
#include <util/string/cast.h>
#include <util/string/printf.h>
@@ -28,15 +31,15 @@ public:
MarkUserDeprecated = 'u'
};
- TKeyPrefix(EType type, const ui32 partition)
+ TKeyPrefix(EType type, const TPartitionId& partition)
: Partition(partition)
{
Resize(UnmarkedSize());
*PtrType() = type;
- memcpy(PtrPartition(), Sprintf("%.10" PRIu32, partition).data(), 10);
+ memcpy(PtrPartition(), Sprintf("%.10" PRIu32, Partition.InternalPartitionId).data(), 10);
}
- TKeyPrefix(EType type, const ui32 partition, EMark mark)
+ TKeyPrefix(EType type, const TPartitionId& partition, EMark mark)
: TKeyPrefix(type, partition)
{
Resize(MarkedSize());
@@ -44,7 +47,7 @@ public:
}
TKeyPrefix()
- : TKeyPrefix(TypeNone, 0)
+ : TKeyPrefix(TypeNone, TPartitionId(0))
{}
virtual ~TKeyPrefix()
@@ -67,15 +70,16 @@ public:
return EType(*PtrType());
}
- ui32 GetPartition() const { return Partition; }
+ const TPartitionId& GetPartition() const { return Partition; }
protected:
static constexpr ui32 UnmarkedSize() { return 1 + 10; }
void ParsePartition()
{
- Partition = FromString<ui32>(TStringBuf{PtrPartition(), 10});
+ Partition.InternalPartitionId = FromString<ui32>(TStringBuf{PtrPartition(), 10});
}
+
private:
char* PtrType() { return Data(); }
char* PtrMark() { return Data() + UnmarkedSize(); }
@@ -85,7 +89,7 @@ private:
const char* PtrMark() const { return Data() + UnmarkedSize(); }
const char* PtrPartition() const { return Data() + 1; }
- ui32 Partition;
+ TPartitionId Partition;
};
// {char type; ui32 partiton; ui64 offset; ui16 partNo; ui32 count, ui16 internalPartsCount}
@@ -98,7 +102,7 @@ private:
class TKey : public TKeyPrefix
{
public:
- TKey(EType type, const ui32 partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const bool isHead = false)
+ TKey(EType type, const TPartitionId& partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const bool isHead = false)
: TKeyPrefix(type, partition)
, Offset(offset)
, Count(count)
@@ -136,7 +140,7 @@ public:
}
TKey()
- : TKey(TypeNone, 0, 0, 0, 0, 0)
+ : TKey(TypeNone, TPartitionId(0), 0, 0, 0, 0)
{}
virtual ~TKey()
diff --git a/ydb/core/persqueue/ownerinfo.cpp b/ydb/core/persqueue/ownerinfo.cpp
index be110383504..294a1a8445c 100644
--- a/ydb/core/persqueue/ownerinfo.cpp
+++ b/ydb/core/persqueue/ownerinfo.cpp
@@ -6,7 +6,7 @@ namespace NKikimr {
namespace NPQ {
void TOwnerInfo::GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender,
- const TString& topicName, const ui32 partition, const TActorContext& ctx) {
+ const TString& topicName, const TPartitionId& partition, const TActorContext& ctx) {
TStringBuilder s;
s << owner << "|" << CreateGuidAsString() << "_" << OwnerGeneration;
++OwnerGeneration;
diff --git a/ydb/core/persqueue/ownerinfo.h b/ydb/core/persqueue/ownerinfo.h
index 8d0e0742430..2d60464246c 100644
--- a/ydb/core/persqueue/ownerinfo.h
+++ b/ydb/core/persqueue/ownerinfo.h
@@ -32,7 +32,7 @@ namespace NPQ {
static TStringBuf GetOwnerFromOwnerCookie(const TString& owner);
void GenerateCookie(const TString& owner, const TActorId& pipeClient, const TActorId& sender,
- const TString& topicName, const ui32 partition, const TActorContext& ctx);
+ const TString& topicName, const TPartitionId& partition, const TActorContext& ctx);
void AddReserveRequest(ui64 size, bool lastRequest) {
ReservedSize += size;
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index f29e9511f7c..cd1546fcbec 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -154,7 +154,7 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) {
}
}
-TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache,
+TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache,
const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless,
const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace, ui32 numChannels,
bool newPartition,
@@ -602,7 +602,7 @@ void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContex
void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
NKikimrPQ::TStatusResponse::TPartResult result;
- result.SetPartition(Partition);
+ result.SetPartition(Partition.InternalPartitionId);
if (DiskIsFull || WaitingForSubDomainQuota(ctx)) {
result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_DISK_IS_FULL);
@@ -757,23 +757,23 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
}
}
}
- ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result));
+ ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition));
}
void TPartition::HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
NKikimrPQ::TStatusResponse::TPartResult result;
- result.SetPartition(Partition);
+ result.SetPartition(Partition.InternalPartitionId);
result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_INITIALIZING);
result.SetLastInitDurationSeconds((ctx.Now() - CreationTime).Seconds());
result.SetCreationTimestamp(CreationTime.Seconds());
- ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result));
+ ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition));
}
void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActorContext& ctx) {
THolder<TEvPersQueue::TEvPartitionClientInfoResponse> response = MakeHolder<TEvPersQueue::TEvPartitionClientInfoResponse>();
NKikimrPQ::TClientInfoResponse& result(response->Record);
- result.SetPartition(Partition);
+ result.SetPartition(Partition.InternalPartitionId);
result.SetStartOffset(StartOffset);
result.SetEndOffset(EndOffset);
result.SetResponseTimestamp(ctx.Now().MilliSeconds());
@@ -1547,7 +1547,7 @@ bool TPartition::ProcessUserActionOrTransaction(TTransaction& t,
} else if (t.ProposeConfig) {
t.Predicate = BeginTransaction(*t.ProposeConfig);
- PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition);
+ PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config);
//Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found");
ctx.Send(Tablet,
@@ -1560,7 +1560,7 @@ bool TPartition::ProcessUserActionOrTransaction(TTransaction& t,
Y_ABORT_UNLESS(!ChangeConfig);
ChangeConfig = t.ChangeConfig;
- PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
+ PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config);
SendChangeConfigReply = t.SendReply;
BeginChangePartitionConfig(ChangeConfig->Config, ctx);
@@ -1640,7 +1640,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
ChangeConfig =
MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
event.Config);
- PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
+ PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config);
SendChangeConfigReply = false;
return true;
@@ -1846,7 +1846,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
const TActorContext& ctx)
{
Config = config;
- PartitionConfig = GetPartitionConfig(Config, Partition);
+ PartitionConfig = GetPartitionConfig(Config);
PartitionGraph = MakePartitionGraph(Config);
TopicConverter = topicConverter;
NewPartition = false;
@@ -1880,7 +1880,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
TString TPartition::GetKeyConfig() const
{
- return Sprintf("_config_%u", Partition);
+ return Sprintf("_config_%u", Partition.OriginalPartitionId);
}
void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
@@ -2552,7 +2552,7 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) {
void TPartition::CreateMirrorerActor() {
Mirrorer = MakeHolder<TMirrorerInfo>(
- Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)),
+ Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)),
TabletCounters
);
}
@@ -2597,7 +2597,7 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
- if (Partition != record.GetPartition()) {
+ if (Partition.InternalPartitionId != record.GetPartition()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." <<
@@ -2620,4 +2620,9 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
Send(ev->Sender, response.Release());
}
+const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
+{
+ return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);
+}
+
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 31cb40fd484..5337e15e6ef 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -353,7 +353,7 @@ public:
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
}
- TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache,
+ TPartition(ui64 tabletId, const TPartitionId& partition, const TActorId& tablet, ui32 tabletGeneration, const TActorId& blobCache,
const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless,
const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool SubDomainOutOfSpace, ui32 numChannels,
bool newPartition = false,
@@ -574,7 +574,7 @@ private:
private:
ui64 TabletID;
ui32 TabletGeneration;
- ui32 Partition;
+ TPartitionId Partition;
NKikimrPQ::TPQTabletConfig Config;
NKikimrPQ::TPQTabletConfig TabletConfig;
const NKikimrPQ::TPQTabletConfig::TPartition* PartitionConfig = nullptr;
@@ -764,6 +764,8 @@ private:
TDeque<std::unique_ptr<IEventBase>> PendingEvents;
TRowVersion LastEmittedHeartbeat;
+
+ const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config);
};
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h
new file mode 100644
index 00000000000..0e4e4cd6164
--- /dev/null
+++ b/ydb/core/persqueue/partition_id.h
@@ -0,0 +1,115 @@
+#pragma once
+
+#include <util/generic/maybe.h>
+#include <util/stream/output.h>
+#include <util/system/types.h>
+#include <util/digest/multi.h>
+#include <util/str_stl.h>
+
+#include <functional>
+
+namespace NKikimr::NPQ {
+
+class TPartitionId {
+public:
+ TPartitionId() = default;
+ explicit TPartitionId(ui32 partition) :
+ TPartitionId(partition, Nothing(), partition)
+ {
+ }
+
+ TPartitionId(ui32 partition, ui64 writeId) :
+ TPartitionId(partition, writeId, 0)
+ {
+ }
+
+ size_t GetHash() const
+ {
+ return MultiHash(OriginalPartitionId, WriteId);
+ }
+
+ bool IsEqual(const TPartitionId& rhs) const
+ {
+ return
+ (OriginalPartitionId == rhs.OriginalPartitionId) &&
+ (WriteId == rhs.WriteId);
+ }
+
+ void ToStream(IOutputStream& s) const
+ {
+ if (WriteId.Defined()) {
+ s << '{' << OriginalPartitionId << ", " << *WriteId << ", " << InternalPartitionId << '}';
+ } else {
+ s << OriginalPartitionId;
+ }
+ }
+
+ //
+ // FIXME: используется в RequestRange
+ //
+ TPartitionId NextInternalPartitionId() const
+ {
+ return {OriginalPartitionId, WriteId, InternalPartitionId + 1};
+ }
+
+ ui32 OriginalPartitionId = 0;
+ TMaybe<ui64> WriteId;
+ ui32 InternalPartitionId = 0;
+
+private:
+ TPartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId, ui32 internalPartitionId) :
+ OriginalPartitionId(originalPartitionId),
+ WriteId(writeId),
+ InternalPartitionId(internalPartitionId)
+ {
+ }
+};
+
+inline
+bool operator==(const TPartitionId& lhs, const TPartitionId& rhs)
+{
+ return lhs.IsEqual(rhs);
+}
+
+inline
+IOutputStream& operator<<(IOutputStream& s, const TPartitionId& v)
+{
+ v.ToStream(s);
+ return s;
+}
+
+}
+
+template <>
+struct THash<NKikimr::NPQ::TPartitionId> {
+ inline size_t operator()(const NKikimr::NPQ::TPartitionId& v) const
+ {
+ return v.GetHash();
+ }
+};
+
+namespace std {
+
+template <>
+struct less<NKikimr::NPQ::TPartitionId> {
+ inline bool operator()(const NKikimr::NPQ::TPartitionId& lhs, const NKikimr::NPQ::TPartitionId& rhs) const
+ {
+ if (lhs.OriginalPartitionId < rhs.OriginalPartitionId) {
+ return true;
+ } else if (rhs.OriginalPartitionId < lhs.OriginalPartitionId) {
+ return false;
+ } else {
+ return lhs.WriteId < rhs.WriteId;
+ }
+ }
+};
+
+template <>
+struct hash<NKikimr::NPQ::TPartitionId> {
+ inline size_t operator()(const NKikimr::NPQ::TPartitionId& v) const
+ {
+ return THash<NKikimr::NPQ::TPartitionId>()(v);
+ }
+};
+
+}
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index fcab0f62f36..dd18acf6e45 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -15,8 +15,8 @@ void CalcTopicWriteQuotaParams(const NKikimrPQ::TPQConfig& pqConfig,
TString& topicWriteQuoterPath,
TString& topicWriteQuotaResourcePath);
bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev);
-void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key);
-void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key);
+void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key);
+void RequestDataRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key);
bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
//
@@ -108,7 +108,7 @@ TPartition* TInitializerStep::Partition() const {
return Initializer->Partition;
}
-ui32 TInitializerStep::PartitionId() const {
+const TPartitionId& TInitializerStep::PartitionId() const {
return Initializer->Partition->Partition;
}
@@ -179,7 +179,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
case NKikimrProto::NODATA:
Partition()->Config = Partition()->TabletConfig;
- Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition);
+ Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition.OriginalPartitionId);
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
break;
@@ -255,7 +255,7 @@ TInitMetaStep::TInitMetaStep(TInitializer* initializer)
}
void TInitMetaStep::Execute(const TActorContext& ctx) {
- auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) {
+ auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, const TPartitionId& partition) {
auto read = request.AddCmdRead();
TKeyPrefix key{type, partition};
read->SetKey(key.Data(), key.Size());
@@ -700,7 +700,7 @@ void TPartition::Initialize(const TActorContext& ctx) {
UsersInfoStorage.ConstructInPlace(DCId,
TopicConverter,
- Partition,
+ Partition.InternalPartitionId,
Config,
CloudId,
DbId,
@@ -711,11 +711,11 @@ void TPartition::Initialize(const TActorContext& ctx) {
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicName()),
- Partition,
+ Partition.InternalPartitionId,
Config.GetYdbDatabasePath()));
} else {
PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicName(),
- Partition));
+ Partition.InternalPartitionId));
}
UsersInfoStorage->Init(Tablet, SelfId(), ctx);
@@ -759,7 +759,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
WriteBufferIsFullCounter.SetCounter(
NPersQueue::GetCounters(counters, "writingTime", TopicConverter),
{{"host", DCId},
- {"Partition", ToString<ui32>(Partition)}},
+ {"Partition", ToString<ui32>(Partition.InternalPartitionId)}},
{"sensor", "BufferFullTime" + suffix, true});
auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag");
@@ -990,7 +990,7 @@ bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) {
return !diskIsOk;
}
-static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 partition,
+static void RequestRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition,
TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
auto read = request->Record.AddCmdReadRange();
@@ -1003,7 +1003,7 @@ static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 par
}
range->SetFrom(from.Data(), from.Size());
- TKeyPrefix to(c, partition + 1);
+ TKeyPrefix to(c, partition.NextInternalPartitionId());
range->SetTo(to.Data(), to.Size());
if(includeData)
@@ -1015,18 +1015,18 @@ static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 par
TKeyPrefix from(TKeyPrefix::TypeTmpData, partition);
range->SetFrom(from.Data(), from.Size());
- TKeyPrefix to(TKeyPrefix::TypeTmpData, partition + 1);
+ TKeyPrefix to(TKeyPrefix::TypeTmpData, partition.NextInternalPartitionId());
range->SetTo(to.Data(), to.Size());
}
ctx.Send(dst, request.Release());
}
-void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) {
+void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key) {
RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == "");
}
-void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) {
+void RequestDataRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key) {
RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key);
}
diff --git a/ydb/core/persqueue/partition_init.h b/ydb/core/persqueue/partition_init.h
index 5f02770f8de..e45f04a38c0 100644
--- a/ydb/core/persqueue/partition_init.h
+++ b/ydb/core/persqueue/partition_init.h
@@ -62,7 +62,7 @@ public:
virtual bool Handle(STFUNC_SIG);
TPartition* Partition() const;
- ui32 PartitionId() const;
+ const TPartitionId& PartitionId() const;
TString TopicName() const;
const TString Name;
diff --git a/ydb/core/persqueue/partition_monitoring.cpp b/ydb/core/persqueue/partition_monitoring.cpp
index 4343adcbb6a..52353878da7 100644
--- a/ydb/core/persqueue/partition_monitoring.cpp
+++ b/ydb/core/persqueue/partition_monitoring.cpp
@@ -74,7 +74,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
Y_ABORT("");
}
TStringStream out;
- out << "Partition " << i32(Partition) << ": " << str; res.push_back(out.Str()); out.Clear();
+ out << "Partition " << Partition << ": " << str; res.push_back(out.Str()); out.Clear();
if (DiskIsFull) {
out << "DISK IS FULL";
res.push_back(out.Str());
@@ -111,7 +111,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
}
out << Config.DebugString(); res.push_back(out.Str()); out.Clear();
HTML(out) {
- DIV_CLASS_ID("tab-pane fade", Sprintf("partition_%u", ui32(Partition))) {
+ DIV_CLASS_ID("tab-pane fade", Sprintf("partition_%u", Partition.InternalPartitionId)) {
TABLE_SORTABLE_CLASS("table") {
TABLEHEAD() {
TABLER() {
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index 7173ebbf2ae..664e00aa504 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -208,7 +208,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx) {
NKikimrPQ::TOffsetsResponse::TPartResult result;
- result.SetPartition(Partition);
+ result.SetPartition(Partition.InternalPartitionId);
result.SetStartOffset(StartOffset);
result.SetEndOffset(EndOffset);
result.SetErrorCode(NPersQueue::NErrorCode::OK);
@@ -228,15 +228,15 @@ void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContex
result.SetReadCreateTimestampMS(userInfo->GetReadCreateTimestamp().MilliSeconds());
}
}
- ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result));
+ ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result, Partition));
}
void TPartition::HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx) {
NKikimrPQ::TOffsetsResponse::TPartResult result;
- result.SetPartition(Partition);
+ result.SetPartition(Partition.InternalPartitionId);
result.SetErrorCode(NPersQueue::NErrorCode::INITIALIZING);
result.SetErrorReason("partition is not ready yet");
- ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result));
+ ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result, Partition));
}
std::pair<TInstant, TInstant> TPartition::GetTime(const TUserInfo& userInfo, ui64 offset) const {
@@ -314,7 +314,7 @@ TReadAnswer TReadInfo::FormAnswer(
const TActorContext& ctx,
const TEvPQ::TEvBlobResponse& blobResponse,
const ui64 endOffset,
- const ui32 partition,
+ const TPartitionId& partition,
TUserInfo* userInfo,
const ui64 destination,
const ui64 sizeLag,
@@ -409,7 +409,7 @@ TReadAnswer TReadInfo::FormAnswer(
}
Y_ABORT_UNLESS(offset <= Offset);
Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo);
- TKey key(TKeyPrefix::TypeData, 0, offset, partNo, count, internalPartsCount, false);
+ TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false);
for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) {
TBatch batch = it.GetBatch();
auto& header = batch.Header;
diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp
index b0770f1f901..f73e21ec38a 100644
--- a/ydb/core/persqueue/partition_sourcemanager.cpp
+++ b/ydb/core/persqueue/partition_sourcemanager.cpp
@@ -41,7 +41,7 @@ TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModif
}
const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const {
- return Partition.PartitionGraph.GetPartition(Partition.Partition);
+ return Partition.PartitionGraph.GetPartition(Partition.Partition.OriginalPartitionId);
}
TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
@@ -49,7 +49,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
}
bool TPartitionSourceManager::HasParents() const {
- auto node = Partition.PartitionGraph.GetPartition(Partition.Partition);
+ auto node = GetPartitionNode();
return node && !node->Parents.empty();
}
diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h
index fcf53a25cd1..e825eedbd77 100644
--- a/ydb/core/persqueue/partition_sourcemanager.h
+++ b/ydb/core/persqueue/partition_sourcemanager.h
@@ -106,8 +106,6 @@ private:
TSourceIdStorage& GetSourceIdStorage() const;
bool HasParents() const;
- TActorId PartitionRequester(TPartitionId id, ui64 tabletId);
-
private:
TPartition& Partition;
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 871956a019b..786639ecba8 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -966,7 +966,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
auto range = del->MutableRange();
TKeyPrefix from(TKeyPrefix::TypeTmpData, Partition);
range->SetFrom(from.Data(), from.Size());
- TKeyPrefix to(TKeyPrefix::TypeTmpData, Partition + 1);
+ TKeyPrefix to(TKeyPrefix::TypeTmpData, TPartitionId(Partition.InternalPartitionId + 1));
range->SetTo(to.Data(), to.Size());
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index f52155f9430..af6459c15e4 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -597,10 +597,8 @@ private:
void Handle(TEvPQ::TEvMonResponse::TPtr& ev, const TActorContext& ctx)
{
- if (ev->Get()->Partition != Max<ui32>()) {
- Results[ev->Get()->Partition] = ev->Get()->Res;
- } else {
- Y_ABORT_UNLESS(ev->Get()->Partition == Max<ui32>());
+ if (ev->Get()->Partition.Defined()) {
+ Results[ev->Get()->Partition->InternalPartitionId] = ev->Get()->Res;
}
Str.push_back(ev->Get()->Str);
if(++TotalResponses == TotalRequests) {
@@ -682,7 +680,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
ChangePartitionConfigInflight += Partitions.size();
for (const auto& partition : Config.GetPartitions()) {
- const auto partitionId = partition.GetPartitionId();
+ const TPartitionId partitionId(partition.GetPartitionId());
if (Partitions.find(partitionId) == Partitions.end()) {
Partitions.emplace(partitionId,
TPartitionInfo(ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, true, ctx)),
@@ -897,7 +895,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
InitTransactions(readRange, partitionTxs);
for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config
- const auto partitionId = partition.GetPartitionId();
+ const TPartitionId partitionId(partition.GetPartitionId());
Partitions.emplace(partitionId, TPartitionInfo(
ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, false, ctx)),
GetPartitionKeyRange(Config, partition),
@@ -916,7 +914,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
UpdateConfigRequests.clear();
for (auto& req : HasDataRequests) {
- auto it = Partitions.find(req->Record.GetPartition());
+ const TPartitionId partitionId(req->Record.GetPartition());
+ auto it = Partitions.find(partitionId);
if (it != Partitions.end()) {
if (ctx.Now().MilliSeconds() < req->Record.GetDeadline()) { //otherwise there is no need to send event - proxy will generate event itself
ctx.Send(it->second.Actor, req.Release());
@@ -1505,7 +1504,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
}
for (const auto& partition : cfg.GetPartitions()) {
- sourceIdWriter.FillRequest(request, partition.GetPartitionId());
+ sourceIdWriter.FillRequest(request, TPartitionId(partition.GetPartitionId()));
}
}
@@ -1571,7 +1570,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
if (!ConfigInited) {
HasDataRequests.push_back(ev->Release());
} else {
- auto it = Partitions.find(record.GetPartition());
+ auto it = Partitions.find(TPartitionId(record.GetPartition()));
if (it != Partitions.end()) {
ctx.Send(it->second.Actor, ev->Release().Release());
}
@@ -1581,7 +1580,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
void TPersQueue::Handle(TEvPersQueue::TEvPartitionClientInfo::TPtr& ev, const TActorContext& ctx) {
for (auto partition : ev->Get()->Record.GetPartitions()) {
- auto it = Partitions.find(partition);
+ auto it = Partitions.find(TPartitionId(partition));
if (it != Partitions.end()) {
ctx.Send(it->second.Actor, new TEvPQ::TEvGetPartitionClientInfo(ev->Sender), 0, ev->Cookie);
} else {
@@ -2398,7 +2397,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
return;
}
- ui32 partition = req.GetPartition();
+ TPartitionId partition(req.GetPartition());
auto it = Partitions.find(partition);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
@@ -2554,7 +2553,7 @@ bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvRemoteHttpInfo: " << ev->Get()->Query);
TMap<ui32, TActorId> res;
for (auto& p : Partitions) {
- res.insert({p.first, p.second.Actor});
+ res.emplace(p.first.InternalPartitionId, p.second.Actor);
}
ctx.Register(new TMonitoringProxy(ev->Sender, ev->Get()->Query, res, CacheActor, TopicName, TabletID(), ResponseProxy.size()));
return true;
@@ -2809,7 +2808,10 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
//
Y_ABORT_UNLESS(txBody.OperationsSize() > 0);
- auto i = Partitions.find(txBody.GetOperations(0).GetPartitionId());
+ TPartitionId partitionId =
+ MakePartitionId(txBody.GetOperations(0).GetPartitionId(),
+ txBody.HasWriteId() ? TMaybe<ui64>(txBody.GetWriteId()) : Nothing());
+ auto i = Partitions.find(partitionId);
Y_ABORT_UNLESS(i != Partitions.end());
ctx.Send(i->second.Actor, ev.Release());
@@ -3286,13 +3288,20 @@ void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx,
}
}
+TPartitionId TPersQueue::MakePartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId) const
+{
+ Y_ABORT_UNLESS(!writeId.Defined());
+ return TPartitionId{originalPartitionId};
+}
+
void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
TDistributedTransaction& tx)
{
- THashMap<ui32, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events;
+ THashMap<TPartitionId, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events;
for (auto& operation : tx.Operations) {
- auto& event = events[operation.GetPartitionId()];
+ TPartitionId partitionId = MakePartitionId(operation.GetPartitionId(), tx.WriteId);
+ auto& event = events[partitionId];
if (!event) {
event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
}
@@ -3321,7 +3330,7 @@ void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx,
for (ui32 partitionId : tx.Partitions) {
auto event = std::make_unique<TEvPQ::TEvTxCommit>(tx.Step, tx.TxId);
- auto p = Partitions.find(partitionId);
+ auto p = Partitions.find(MakePartitionId(partitionId, tx.WriteId));
Y_ABORT_UNLESS(p != Partitions.end());
ctx.Send(p->second.Actor, event.release());
@@ -3339,7 +3348,7 @@ void TPersQueue::SendEvTxRollbackToPartitions(const TActorContext& ctx,
for (ui32 partitionId : tx.Partitions) {
auto event = std::make_unique<TEvPQ::TEvTxRollback>(tx.Step, tx.TxId);
- auto p = Partitions.find(partitionId);
+ auto p = Partitions.find(MakePartitionId(partitionId, tx.WriteId));
Y_ABORT_UNLESS(p != Partitions.end());
ctx.Send(p->second.Actor, event.release());
@@ -3712,7 +3721,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
tx.PartitionRepliesExpected = Partitions.size();
}
-TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId,
+TPartition* TPersQueue::CreatePartitionActor(const TPartitionId& partitionId,
const NPersQueue::TTopicConverterPtr topicConverter,
const NKikimrPQ::TPQTabletConfig& config,
bool newPartition,
@@ -3751,7 +3760,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
}
for (const auto& partition : config.GetPartitions()) {
- const auto partitionId = partition.GetPartitionId();
+ const TPartitionId partitionId(partition.GetPartitionId());
if (Partitions.contains(partitionId)) {
continue;
}
@@ -3888,7 +3897,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con
void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
- auto it = Partitions.find(record.GetPartition());
+ auto it = Partitions.find(TPartitionId(TPartitionId(record.GetPartition())));
if (it == Partitions.end()) {
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition());
@@ -3906,14 +3915,16 @@ void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
}
}
-void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) {
- auto sit = CheckPartitionStatusRequests.find(partitionId);
+void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId) {
+ Y_ABORT_UNLESS(!partitionId.WriteId.Defined());
+ ui32 originalPartitionId = partitionId.OriginalPartitionId;
+ auto sit = CheckPartitionStatusRequests.find(originalPartitionId);
if (sit != CheckPartitionStatusRequests.end()) {
auto it = Partitions.find(partitionId);
for (auto& r : sit->second) {
Forward(r, it->second.Actor);
}
- CheckPartitionStatusRequests.erase(partitionId);
+ CheckPartitionStatusRequests.erase(originalPartitionId);
}
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 1cd94ed6d79..1688ab13c34 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -166,7 +166,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
ui64 GetAllowedStep() const;
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
- void ProcessCheckPartitionStatusRequests(ui32 partitionId);
+ void ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId);
TString LogPrefix() const;
@@ -187,7 +187,7 @@ private:
bool ConfigInited;
ui32 PartitionsInited;
bool InitCompleted = false;
- THashMap<ui32, TPartitionInfo> Partitions;
+ THashMap<TPartitionId, TPartitionInfo> Partitions;
THashMap<TString, TIntrusivePtr<TEvTabletCounters::TInFlightCookie>> CounterEventsInflight;
TActorId CacheActor;
@@ -335,7 +335,7 @@ private:
void SendEvProposePartitionConfig(const TActorContext& ctx,
TDistributedTransaction& tx);
- TPartition* CreatePartitionActor(ui32 partitionId,
+ TPartition* CreatePartitionActor(const TPartitionId& partitionId,
const NPersQueue::TTopicConverterPtr topicConverter,
const NKikimrPQ::TPQTabletConfig& config,
bool newPartition,
@@ -406,6 +406,8 @@ private:
THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests;
TMaybe<ui64> TabletGeneration;
+
+ TPartitionId MakePartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId) const;
};
diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h
index 838562c7acb..5f43a924e4b 100644
--- a/ydb/core/persqueue/read.h
+++ b/ydb/core/persqueue/read.h
@@ -48,7 +48,7 @@ namespace NPQ {
void SaveInProgress(const TKvRequest& kvRequest)
{
for (const TRequestedBlob& reqBlob : kvRequest.Blobs) {
- TBlobId blob(kvRequest.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
+ TBlobId blob(kvRequest.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
ReadsInProgress.insert(blob);
}
}
@@ -56,7 +56,7 @@ namespace NPQ {
bool CheckInProgress(const TActorContext& ctx, TKvRequest& kvRequest)
{
for (const TRequestedBlob& reqBlob : kvRequest.Blobs) {
- TBlobId blob(kvRequest.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
+ TBlobId blob(kvRequest.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
auto it = ReadsInProgress.find(blob);
if (it != ReadsInProgress.end()) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Read request is blocked. Partition "
@@ -73,7 +73,7 @@ namespace NPQ {
{
TVector<TKvRequest> unblocked;
for (const TRequestedBlob& reqBlob : blocker.Blobs) {
- TBlobId blob(blocker.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
+ TBlobId blob(blocker.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
ReadsInProgress.erase(blob);
auto it = BlockedReads.find(blob);
@@ -104,8 +104,8 @@ namespace NPQ {
void Handle(TEvPQ::TEvBlobRequest::TPtr& ev, const TActorContext& ctx)
{
- ui32 partition = ev->Get()->Partition;
- Cache.SetUserOffset(ctx, ev->Get()->User, partition, ev->Get()->ReadOffset);
+ const TPartitionId& partition = ev->Get()->Partition;
+ Cache.SetUserOffset(ctx, ev->Get()->User, partition.InternalPartitionId, ev->Get()->ReadOffset);
TKvRequest kvReq(TKvRequest::TypeRead, ev->Sender, ev->Get()->Cookie, partition);
kvReq.Blobs = std::move(ev->Get()->Blobs);
@@ -263,7 +263,7 @@ namespace NPQ {
auto srcRequest = ev->Get()->Record;
- TKvRequest kvReq(TKvRequest::TypeWrite, ev->Sender, Max<ui64>(), Max<ui32>());
+ TKvRequest kvReq(TKvRequest::TypeWrite, ev->Sender, Max<ui64>(), TPartitionId(Max<ui32>()));
kvReq.Blobs.reserve(srcRequest.CmdWriteSize());
for (ui32 i = 0; i < srcRequest.CmdWriteSize(); ++i) {
@@ -333,7 +333,7 @@ namespace NPQ {
if (!data)
continue;
TABLER() {
- TABLED() {out << int(c.first.Partition);}
+ TABLED() {out << c.first.Partition;}
TABLED() {out << c.first.Offset;}
TABLED() {out << c.first.Count;}
TABLED() {out << data->GetValue().size();}
@@ -345,7 +345,7 @@ namespace NPQ {
}
}
}
- ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Max<ui32>(), TVector<TString>(), out.Str()));
+ ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(TVector<TString>(), out.Str()));
}
void UpdateCounters(const TActorContext& ctx)
diff --git a/ydb/core/persqueue/read_quoter.h b/ydb/core/persqueue/read_quoter.h
index ac2cdd4cb9a..3e37f4840e7 100644
--- a/ydb/core/persqueue/read_quoter.h
+++ b/ydb/core/persqueue/read_quoter.h
@@ -78,7 +78,7 @@ public:
TActorId partitionActor,
const NPersQueue::TTopicConverterPtr& topicConverter,
const NKikimrPQ::TPQTabletConfig& config,
- ui32 partition,
+ const TPartitionId& partition,
TActorId tabletActor,
ui64 tabletId,
const TTabletCountersBase& counters
@@ -159,7 +159,7 @@ private:
NKikimrPQ::TPQTabletConfig PQTabletConfig;
TQuotaTracker PartitionTotalQuotaTracker;
NPersQueue::TTopicConverterPtr TopicConverter;
- const ui32 Partition;
+ const TPartitionId Partition;
ui64 TabletId;
TTabletCountersBase Counters;
ui32 RequestsInflight;
diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp
index e2176d8c337..b47dd70c794 100644
--- a/ydb/core/persqueue/sourceid.cpp
+++ b/ydb/core/persqueue/sourceid.cpp
@@ -70,13 +70,13 @@ void FillDelete(const TKeyPrefix& key, NKikimrClient::TKeyValueRequest::TCmdDele
range.SetIncludeTo(true);
}
-void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
+void FillDelete(const TPartitionId& partition, const TString& sourceId, TKeyPrefix::EMark mark, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
TKeyPrefix key(TKeyPrefix::TypeInfo, partition, mark);
key.Append(sourceId.c_str(), sourceId.size());
FillDelete(key, cmd);
}
-void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
+void FillDelete(const TPartitionId& partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd);
}
@@ -263,7 +263,7 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) {
}
}
-bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config) {
+bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, const TPartitionId& partition, const NKikimrPQ::TPartitionConfig& config) {
TVector<std::pair<ui64, TString>> toDelOffsets;
ui64 maxDeleteSourceIds = 0;
@@ -478,7 +478,7 @@ TKeyPrefix::EMark TSourceIdWriter::FormatToMark(ESourceIdFormat format) {
}
}
-void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partition) {
+void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, const TPartitionId& partition) {
TKeyPrefix key(TKeyPrefix::TypeInfo, partition, FormatToMark(Format));
TBuffer data;
diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h
index 897014363f2..8566810c40c 100644
--- a/ydb/core/persqueue/sourceid.h
+++ b/ydb/core/persqueue/sourceid.h
@@ -92,7 +92,7 @@ public:
void DeregisterSourceId(const TString& sourceId);
void LoadSourceIdInfo(const TString& key, const TString& data, TInstant now);
- bool DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, ui32 partition, const NKikimrPQ::TPartitionConfig& config);
+ bool DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInstant now, ui64 startOffset, const TPartitionId& partition, const NKikimrPQ::TPartitionConfig& config);
void RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie);
void MarkOwnersForDeletedSourceId(THashMap<TString, TOwnerInfo>& owners);
@@ -130,7 +130,7 @@ public:
void DeregisterSourceId(const TString& sourceId);
void Clear();
- void FillRequest(TEvKeyValue::TEvRequest* request, ui32 partition);
+ void FillRequest(TEvKeyValue::TEvRequest* request, const TPartitionId& partition);
static void FillKeyAndData(ESourceIdFormat format, const TString& sourceId, const TSourceIdInfo& sourceIdInfo, TKeyPrefix& key, TBuffer& data);
private:
diff --git a/ydb/core/persqueue/subscriber.cpp b/ydb/core/persqueue/subscriber.cpp
index 5347f3f4e13..9d2f9336b4c 100644
--- a/ydb/core/persqueue/subscriber.cpp
+++ b/ydb/core/persqueue/subscriber.cpp
@@ -48,7 +48,7 @@ TVector<std::pair<TReadInfo, ui64>> TSubscriberLogic::CompleteSubscriptions(cons
}
-TSubscriber::TSubscriber(const ui32 partition, TTabletCountersBase& counters, const TActorId& tablet)
+TSubscriber::TSubscriber(const TPartitionId& partition, TTabletCountersBase& counters, const TActorId& tablet)
: Subscriber()
, Partition(partition)
, Counters(counters)
diff --git a/ydb/core/persqueue/subscriber.h b/ydb/core/persqueue/subscriber.h
index 7d8890ebb9d..51761465248 100644
--- a/ydb/core/persqueue/subscriber.h
+++ b/ydb/core/persqueue/subscriber.h
@@ -1,7 +1,8 @@
#pragma once
-#include "header.h"
#include "blob.h"
+#include "header.h"
+#include "partition_id.h"
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/base/appdata.h>
@@ -78,7 +79,7 @@ struct TReadInfo {
const TActorContext& ctx,
const TEvPQ::TEvBlobResponse& response,
const ui64 endOffset,
- const ui32 partition,
+ const TPartitionId& partition,
TUserInfo* ui,
const ui64 dst,
const ui64 sizeLag,
@@ -89,7 +90,7 @@ struct TReadInfo {
TReadAnswer FormAnswer(
const TActorContext& ctx,
const ui64 endOffset,
- const ui32 partition,
+ const TPartitionId& partition,
TUserInfo* ui,
const ui64 dst,
const ui64 sizeLag,
@@ -127,7 +128,7 @@ private:
class TSubscriber : public TNonCopyable {
public:
- TSubscriber(const ui32 partition, TTabletCountersBase& counters, const TActorId& tablet);
+ TSubscriber(const TPartitionId& partition, TTabletCountersBase& counters, const TActorId& tablet);
//will wait for new data or timeout for this read and set timer for timeout ms
void AddSubscription(TReadInfo&& info, const ui32 timeout, const ui64 cookie, const TActorContext& ctx);
@@ -140,7 +141,7 @@ public:
private:
TSubscriberLogic Subscriber;
- const ui32 Partition;
+ const TPartitionId& Partition;
TTabletCountersBase& Counters;
TActorId Tablet;
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index b613822c8e4..b17e0aacd2d 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -75,11 +75,11 @@ void TDistributedTransaction::InitPartitions()
if (TabletConfig.PartitionsSize()) {
for (const auto& partition : TabletConfig.GetPartitions()) {
- Partitions.insert(partition.GetPartitionId());
+ Partitions.emplace(partition.GetPartitionId());
}
} else {
for (auto partitionId : TabletConfig.GetPartitionIds()) {
- Partitions.insert(partitionId);
+ Partitions.emplace(partitionId);
}
}
}
@@ -155,6 +155,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
// Old configuration format without AllPartitions. Split/Merge is not supported.
continue;
}
+
if (node->Children.empty()) {
for (const auto* r : node->Parents) {
if (extractTabletId != r->TabletId) {
@@ -206,7 +207,7 @@ void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decisi
Y_ABORT_UNLESS(Step == event.Step);
Y_ABORT_UNLESS(TxId == event.TxId);
- Y_ABORT_UNLESS(Partitions.contains(event.Partition));
+ Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId));
SetDecision(SelfDecision, decision);
@@ -246,7 +247,7 @@ void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event
Y_ABORT_UNLESS(Step == event.Step);
Y_ABORT_UNLESS(TxId == event.TxId);
- Y_ABORT_UNLESS(Partitions.contains(event.Partition));
+ Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId));
++PartitionRepliesCount;
}
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index a3e69b08e7f..b964df24fd4 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -48,6 +48,7 @@ struct TDistributedTransaction {
THashSet<ui64> Senders; // список отправителей TEvReadSet
THashSet<ui64> Receivers; // список получателей TEvReadSet
TVector<NKikimrPQ::TPartitionOperation> Operations;
+ TMaybe<ui64> WriteId;
EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index ade9422864f..8d13437c557 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -848,10 +848,10 @@ TVector<TString> CmdSourceIdRead(TTestContext& tc) {
sourceIds.clear();
auto read = request->Record.AddCmdReadRange();
auto range = read->MutableRange();
- NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkProtoSourceId);
+ NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkProtoSourceId);
range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
range->SetIncludeFrom(true);
- NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUserDeprecated);
+ NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkUserDeprecated);
range->SetTo(ikeyTo.Data(), ikeyTo.Size());
range->SetIncludeTo(false);
Cout << request.Get()->ToString() << Endl;
@@ -1093,7 +1093,7 @@ void CmdForgetRead(const TCmdDirectReadSettings& settings, TTestContext& tc) {
}
void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) {
- NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser);
+ NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, TPartitionId(partition), NPQ::TKeyPrefix::MarkUser);
ikey.Append(client.c_str(), client.size());
NKikimrPQ::TUserInfo userInfo;
@@ -1117,7 +1117,7 @@ void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, co
TString session = "test-session";
ui32 gen = 1;
ui32 step = 2;
- NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUserDeprecated);
+ NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, TPartitionId(partition), NPQ::TKeyPrefix::MarkUserDeprecated);
ikeyDeprecated.Append(client.c_str(), client.size());
TBuffer idataDeprecated = NPQ::NDeprecatedUserData::Serialize(offset, gen, step, session);
diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp
index 4a00987d97b..8458e55a442 100644
--- a/ydb/core/persqueue/ut/internals_ut.cpp
+++ b/ydb/core/persqueue/ut/internals_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST(TestPartitionedBlobSimpleTest) {
THead head;
THead newHead;
- TPartitionedBlob blob(0, 0, "sourceId", 1, 1, 10, head, newHead, false, false, 8_MB);
+ TPartitionedBlob blob(TPartitionId(0), 0, "sourceId", 1, 1, 10, head, newHead, false, false, 8_MB);
TClientBlob clientBlob("sourceId", 1, "valuevalue", TMaybe<TPartData>(), TInstant::MilliSeconds(1), TInstant::MilliSeconds(1), 0, "123", "123");
UNIT_ASSERT(blob.IsInited());
TString error;
@@ -73,7 +73,7 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead)
newHead.PackedSize = newHead.Batches.back().GetUnpackedSize();
TString value2(partSize, 'b');
ui32 maxBlobSize = 8 << 20;
- TPartitionedBlob blob(0, newHead.GetNextOffset(), "sourceId3", 1, parts, parts * value2.size(), head, newHead, headCompacted, false, maxBlobSize);
+ TPartitionedBlob blob(TPartitionId(0), newHead.GetNextOffset(), "sourceId3", 1, parts, parts * value2.size(), head, newHead, headCompacted, false, maxBlobSize);
TVector<std::pair<TKey, TString>> formed;
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index d6803f176b8..4d58647a2f7 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -91,18 +91,18 @@ protected:
struct TCalcPredicateMatcher {
TMaybe<ui64> Step;
TMaybe<ui64> TxId;
- TMaybe<ui32> Partition;
+ TMaybe<TPartitionId> Partition;
TMaybe<bool> Predicate;
};
struct TCommitTxDoneMatcher {
TMaybe<ui64> Step;
TMaybe<ui64> TxId;
- TMaybe<ui32> Partition;
+ TMaybe<TPartitionId> Partition;
};
struct TChangePartitionConfigMatcher {
- TMaybe<ui32> Partition;
+ TMaybe<TPartitionId> Partition;
};
struct TTxOperationMatcher {
@@ -263,7 +263,7 @@ void TPartitionFixture::CreatePartitionActor(ui32 id,
TopicConverter = factory.MakeTopicConverter(Config);
auto actor = new NPQ::TPartition(Ctx->TabletId,
- id,
+ TPartitionId(id),
Ctx->Edge,
0,
Ctx->Edge,
@@ -680,7 +680,7 @@ void TPartitionFixture::SendInfoRangeResponse(ui32 partition,
auto pair = read->AddPair();
pair->SetStatus(NKikimrProto::OK);
- NPQ::TKeyPrefix key(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser);
+ NPQ::TKeyPrefix key(NPQ::TKeyPrefix::TypeInfo, TPartitionId(partition), NPQ::TKeyPrefix::MarkUser);
key.Append(c.Consumer.data(), c.Consumer.size());
pair->SetKey(key.Data(), key.Size());
@@ -719,7 +719,7 @@ void TPartitionFixture::SendDataRangeResponse(ui64 begin, ui64 end)
auto read = event->Record.AddReadRangeResult();
read->SetStatus(NKikimrProto::OK);
auto pair = read->AddPair();
- NPQ::TKey key(NPQ::TKeyPrefix::TypeData, 1, begin, 0, end - begin, 0);
+ NPQ::TKey key(NPQ::TKeyPrefix::TypeData, TPartitionId(1), begin, 0, end - begin, 0);
pair->SetStatus(NKikimrProto::OK);
pair->SetKey(key.ToString());
//pair->SetValueSize();
@@ -1094,14 +1094,14 @@ Y_UNIT_TEST_F(CorrectRange_Commit, TPartitionFixture)
CreateSession(client, session);
SendCalcPredicate(step, txId, client, 0, 2);
- WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=true});
SendCommitTx(step, txId);
WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=2}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
- WaitCommitTxDone({.TxId=txId, .Partition=partition});
+ WaitCommitTxDone({.TxId=txId, .Partition=TPartitionId(partition)});
}
Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture)
@@ -1121,23 +1121,23 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture)
CreateSession(client, session);
SendCalcPredicate(step, txId_1, client, 0, 1);
- WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true});
SendCalcPredicate(step, txId_2, client, 0, 2);
SendCalcPredicate(step, txId_3, client, 0, 2);
SendCommitTx(step, txId_1);
- WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false});
SendRollbackTx(step, txId_2);
- WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=partition, .Predicate=false});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=TPartitionId(partition), .Predicate=false});
SendRollbackTx(step, txId_3);
WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
- WaitCommitTxDone({.TxId=txId_1, .Partition=partition});
+ WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)});
}
Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture)
@@ -1162,7 +1162,7 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture)
WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK});
- WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=true});
SendCommitTx(step, txId);
WaitCmdWrite({.Count=5, .UserInfos={
@@ -1183,7 +1183,7 @@ Y_UNIT_TEST_F(OldPlanStep, TPartitionFixture)
CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=99999, .TxId=55555});
SendCommitTx(step, txId);
- WaitCommitTxDone({.TxId=txId, .Partition=partition});
+ WaitCommitTxDone({.TxId=txId, .Partition=TPartitionId(partition)});
}
Y_UNIT_TEST_F(AfterRestart_1, TPartitionFixture)
@@ -1211,7 +1211,7 @@ Y_UNIT_TEST_F(AfterRestart_1, TPartitionFixture)
SendCommitTx(step, 11111);
- WaitCalcPredicateResult({.Step=step, .TxId=22222, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=22222, .Partition=TPartitionId(partition), .Predicate=true});
SendCommitTx(step, 22222);
WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=22222, .UserInfos={{1, {.Session=session, .Offset=4}}}});
@@ -1240,7 +1240,7 @@ Y_UNIT_TEST_F(AfterRestart_2, TPartitionFixture)
.Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}}
});
- WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=TPartitionId(partition), .Predicate=true});
}
Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture)
@@ -1258,7 +1258,7 @@ Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture)
CreateSession(client, session);
SendCalcPredicate(step, txId, client, 4, 2);
- WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=false});
SendRollbackTx(step, txId);
WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId});
@@ -1267,7 +1267,7 @@ Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture)
++txId;
SendCalcPredicate(step, txId, client, 2, 4);
- WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=false});
SendRollbackTx(step, txId);
WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId});
@@ -1276,7 +1276,7 @@ Y_UNIT_TEST_F(IncorrectRange, TPartitionFixture)
++txId;
SendCalcPredicate(step, txId, client, 0, 11);
- WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=false});
}
Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture)
@@ -1295,12 +1295,12 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture)
CreateSession(client, session);
SendCalcPredicate(step, txId_1, client, 0, 2);
- WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true});
SendCalcPredicate(step, txId_2, client, 0, 5);
SendRollbackTx(step, txId_1);
- WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=true});
}
Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture)
@@ -1333,7 +1333,7 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture)
//
SendCalcPredicate(step, txId_2, "client-2", 0, 2);
- WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true});
SendCommitTx(step, txId_1);
//
@@ -1350,12 +1350,12 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture)
}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
- WaitPartitionConfigChanged({.Partition=partition});
+ WaitPartitionConfigChanged({.Partition=TPartitionId(partition)});
//
// consumer 'client-2' was deleted
//
- WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false});
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false});
SendRollbackTx(step, txId_2);
}
diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp
index 2c89b9f95cd..7b89ee28ae4 100644
--- a/ydb/core/persqueue/ut/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/pq_ut.cpp
@@ -416,10 +416,10 @@ Y_UNIT_TEST(TestReadRuleVersions) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
auto read = request->Record.AddCmdReadRange();
auto range = read->MutableRange();
- NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser);
+ NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkUser);
range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
range->SetIncludeFrom(true);
- NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser);
+ NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, TPartitionId(1), NPQ::TKeyPrefix::MarkUser);
range->SetTo(ikeyTo.Data(), ikeyTo.Size());
range->SetIncludeTo(true);
@@ -441,10 +441,10 @@ Y_UNIT_TEST(TestReadRuleVersions) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
auto read = request->Record.AddCmdReadRange();
auto range = read->MutableRange();
- NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser);
+ NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, TPartitionId(0), NPQ::TKeyPrefix::MarkUser);
range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
range->SetIncludeFrom(true);
- NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser);
+ NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, TPartitionId(1), NPQ::TKeyPrefix::MarkUser);
range->SetTo(ikeyTo.Data(), ikeyTo.Size());
range->SetIncludeTo(true);
@@ -2082,12 +2082,12 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {
Y_UNIT_TEST(TestOffsetEstimation) {
std::deque<NPQ::TDataKey> container = {
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 1, 0, 0, 0), 0, TInstant::Seconds(1), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 2, 0, 0, 0), 0, TInstant::Seconds(1), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 3, 0, 0, 0), 0, TInstant::Seconds(2), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 4, 0, 0, 0), 0, TInstant::Seconds(2), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 5, 0, 0, 0), 0, TInstant::Seconds(3), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 6, 0, 0, 0), 0, TInstant::Seconds(3), 10},
+ {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 1, 0, 0, 0), 0, TInstant::Seconds(1), 10},
+ {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 2, 0, 0, 0), 0, TInstant::Seconds(1), 10},
+ {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 3, 0, 0, 0), 0, TInstant::Seconds(2), 10},
+ {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 4, 0, 0, 0), 0, TInstant::Seconds(2), 10},
+ {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 5, 0, 0, 0), 0, TInstant::Seconds(3), 10},
+ {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 6, 0, 0, 0), 0, TInstant::Seconds(3), 10},
};
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate({}, TInstant::MilliSeconds(0), 9999), 9999);
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(0), 9999), 1);
diff --git a/ydb/core/persqueue/ut/sourceid_ut.cpp b/ydb/core/persqueue/ut/sourceid_ut.cpp
index 43164c888c5..8cb27cdc2b8 100644
--- a/ydb/core/persqueue/ut/sourceid_ut.cpp
+++ b/ydb/core/persqueue/ut/sourceid_ut.cpp
@@ -64,7 +64,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
writer.RegisterSourceId(sourceId, sourceIdInfo);
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
{
- TKeyPrefix key(TKeyPrefix::TypeInfo, TestPartition, TKeyPrefix::MarkSourceId);
+ TKeyPrefix key(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), TKeyPrefix::MarkSourceId);
TBuffer data;
TSourceIdWriter::FillKeyAndData(ESourceIdFormat::Raw, sourceId, sourceIdInfo, key, data);
@@ -73,7 +73,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
write->SetValue(data.Data(), data.Size());
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
- writer.FillRequest(actualRequest.Get(), TestPartition);
+ writer.FillRequest(actualRequest.Get(), TPartitionId(TestPartition));
UNIT_ASSERT_VALUES_EQUAL(actualRequest.Get()->ToString(), expectedRequest.Get()->ToString());
}
@@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
writer.RegisterSourceId(anotherSourceId, anotherSourceIdInfo);
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 2);
{
- TKeyPrefix key(TKeyPrefix::TypeInfo, TestPartition + 1, TKeyPrefix::MarkSourceId);
+ TKeyPrefix key(TKeyPrefix::TypeInfo, TPartitionId(TestPartition + 1), TKeyPrefix::MarkSourceId);
TBuffer data;
for (const auto& [sourceId, sourceIdInfo] : writer.GetSourceIdsToWrite()) {
@@ -93,7 +93,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
}
- writer.FillRequest(actualRequest.Get(), TestPartition + 1);
+ writer.FillRequest(actualRequest.Get(), TPartitionId(TestPartition + 1));
UNIT_ASSERT_VALUES_EQUAL(actualRequest.Get()->ToString(), expectedRequest.Get()->ToString());
}
}
@@ -132,7 +132,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
const auto sourceId = TestSourceId();
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
- TKeyPrefix ikey(TKeyPrefix::TypeInfo, TestPartition, mark);
+ TKeyPrefix ikey(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), mark);
TBuffer idata;
TSourceIdWriter::FillKeyAndData(format, sourceId, sourceIdInfo, ikey, idata);
@@ -194,7 +194,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
// sources are dropped before startOffset = 20
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 20, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 20, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 19); // first 19 sources are dropped
}
@@ -202,7 +202,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
// expired sources are dropped
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 341); // another 341 (360 - 19) sources are dropped
}
@@ -210,14 +210,14 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
// move to the past
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, false); // nothing to drop (everything is dropped)
}
// move to the future
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(3), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(3), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 360); // more 360 sources are dropped
}
@@ -234,14 +234,14 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
config.SetSourceIdMaxCounts(10000);
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, false);
}
config.SetSourceIdMaxCounts(9900); // decrease by 100
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 100); // 100 sources are dropped
}
@@ -267,7 +267,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
config.SetSourceIdMaxCounts(10000); // limit to 10000
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 360); // first 360 sources are dropped
}
@@ -276,7 +276,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
config.SetSourceIdMaxCounts(10000 - 360);
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 6); // another 6 sources are dropped by retention
}
@@ -284,7 +284,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
config.SetSourceIdMaxCounts(10000 - 370);
{
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2 * 5); // more 5 sources are dropped
}
@@ -306,7 +306,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
NKikimrPQ::TPartitionConfig config;
config.SetSourceIdMaxCounts(1); // limit to one
- const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TestPartition, config);
+ const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(1), 10000, TPartitionId(TestPartition), config);
UNIT_ASSERT_EQUAL(dropped, true);
UNIT_ASSERT_VALUES_EQUAL(request.Get()->Record.CmdDeleteRangeSize(), 2); // first source is dropped by retention