aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-28 23:24:22 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-28 23:24:22 +0300
commit373e1375391a8eb3b2547faa6c2a8d26aa600b5a (patch)
treec4bdd036ad2134d6284c5bf10d8684714ca45ef2
parente479ed951562bbc29537a54830768f0b0784c504 (diff)
downloadydb-373e1375391a8eb3b2547faa6c2a8d26aa600b5a.tar.gz
Fix ranges mode
Fix ranges mode ref:9ff2a29baa79c24d46223ad92d65f5cb45c1261a
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp10
1 files changed, 6 insertions, 4 deletions
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 784b004fd8..2d91b29b9d 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -107,7 +107,7 @@ bool RemoveEmptyMessages(MigrationStreamingReadServerMessage::DataBatch& data) {
class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
public:
TPartitionActor(const TActorId& parentId, const TString& clientId, const TString& clientPath, const ui64 cookie, const TString& session, const TPartitionId& partition, ui32 generation, ui32 step,
- const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const bool commitsDisabled, const TString& clientDC);
+ const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const bool commitsDisabled, const TString& clientDC, bool rangesMode);
~TPartitionActor();
void Bootstrap(const NActors::TActorContext& ctx);
@@ -195,6 +195,7 @@ private:
TSet<ui64> NextCommits;
TDisjointIntervalTree<ui64> NextRanges;
+ bool RangesMode;
std::deque<TOffsetInfo> Offsets;
ui64 WTime;
@@ -1036,7 +1037,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
TPartitionId partitionId{converterIter->second, record.GetPartition(), assignId};
IActor* partitionActor = new TPartitionActor(ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(),
- record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC);
+ record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode);
TActorId actorId = ctx.Register(partitionActor);
if (SessionsActive) {
@@ -1706,7 +1707,7 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) {
TPartitionActor::TPartitionActor(const TActorId& parentId, const TString& clientId, const TString& clientPath, const ui64 cookie, const TString& session,
const TPartitionId& partition, const ui32 generation, const ui32 step, const ui64 tabletID,
- const TReadSessionActor::TTopicCounters& counters, bool commitsDisabled, const TString& clientDC)
+ const TReadSessionActor::TTopicCounters& counters, bool commitsDisabled, const TString& clientDC, bool rangesMode)
: ParentId(parentId)
, ClientId(clientId)
, ClientPath(clientPath)
@@ -1725,6 +1726,7 @@ TPartitionActor::TPartitionActor(const TActorId& parentId, const TString& client
, WriteTimestampEstimateMs(0)
, ReadIdToResponse(1)
, ReadIdCommitted(0)
+ , RangesMode(rangesMode)
, WTime(0)
, InitDone(false)
, StartReading(false)
@@ -2171,7 +2173,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
bool hasData = FillBatchedData(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, ctx);
WriteTimestampEstimateMs = Max(WriteTimestampEstimateMs, WTime);
- if (!CommitsDisabled) {
+ if (!CommitsDisabled && !RangesMode) {
Offsets.push_back({ReadIdToResponse, ReadOffset});
}