diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-28 23:24:22 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-28 23:24:22 +0300 |
commit | 373e1375391a8eb3b2547faa6c2a8d26aa600b5a (patch) | |
tree | c4bdd036ad2134d6284c5bf10d8684714ca45ef2 | |
parent | e479ed951562bbc29537a54830768f0b0784c504 (diff) | |
download | ydb-373e1375391a8eb3b2547faa6c2a8d26aa600b5a.tar.gz |
Fix ranges mode
Fix ranges mode
ref:9ff2a29baa79c24d46223ad92d65f5cb45c1261a
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 784b004fd8..2d91b29b9d 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -107,7 +107,7 @@ bool RemoveEmptyMessages(MigrationStreamingReadServerMessage::DataBatch& data) { class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> { public: TPartitionActor(const TActorId& parentId, const TString& clientId, const TString& clientPath, const ui64 cookie, const TString& session, const TPartitionId& partition, ui32 generation, ui32 step, - const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const bool commitsDisabled, const TString& clientDC); + const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const bool commitsDisabled, const TString& clientDC, bool rangesMode); ~TPartitionActor(); void Bootstrap(const NActors::TActorContext& ctx); @@ -195,6 +195,7 @@ private: TSet<ui64> NextCommits; TDisjointIntervalTree<ui64> NextRanges; + bool RangesMode; std::deque<TOffsetInfo> Offsets; ui64 WTime; @@ -1036,7 +1037,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T TPartitionId partitionId{converterIter->second, record.GetPartition(), assignId}; IActor* partitionActor = new TPartitionActor(ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(), - record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC); + record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode); TActorId actorId = ctx.Register(partitionActor); if (SessionsActive) { @@ -1706,7 +1707,7 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) { TPartitionActor::TPartitionActor(const TActorId& parentId, const TString& clientId, const TString& clientPath, const ui64 cookie, const TString& session, const TPartitionId& partition, const ui32 generation, const ui32 step, const ui64 tabletID, - const TReadSessionActor::TTopicCounters& counters, bool commitsDisabled, const TString& clientDC) + const TReadSessionActor::TTopicCounters& counters, bool commitsDisabled, const TString& clientDC, bool rangesMode) : ParentId(parentId) , ClientId(clientId) , ClientPath(clientPath) @@ -1725,6 +1726,7 @@ TPartitionActor::TPartitionActor(const TActorId& parentId, const TString& client , WriteTimestampEstimateMs(0) , ReadIdToResponse(1) , ReadIdCommitted(0) + , RangesMode(rangesMode) , WTime(0) , InitDone(false) , StartReading(false) @@ -2171,7 +2173,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo bool hasData = FillBatchedData(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, ctx); WriteTimestampEstimateMs = Max(WriteTimestampEstimateMs, WTime); - if (!CommitsDisabled) { + if (!CommitsDisabled && !RangesMode) { Offsets.push_back({ReadIdToResponse, ReadOffset}); } |