diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-06-10 17:26:09 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-06-10 17:26:09 +0300 |
commit | 1bcf2dc16edab6ccb8d8f99967a89a639ce41a2a (patch) | |
tree | e689bfd57a207ae8692bfbb08e96f130d78492e3 | |
parent | 6fb83ed5735ef8911491f47fbc05dc6c8369401c (diff) | |
download | ydb-1bcf2dc16edab6ccb8d8f99967a89a639ce41a2a.tar.gz |
potential fix for LOGBROKER-7475
ref:f5bba25603aee8cffa9f6c91d4382076e6a91a8f
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 8 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 15 |
2 files changed, 16 insertions, 7 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 9b26dc3080..00e46d4046 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2727,7 +2727,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); ui64 offset = read->Offset; - if (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1)) { + if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) { TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero(); timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs)); timestamp = Max(timestamp, userInfo.ReadFromTimestamp); @@ -2910,7 +2910,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset - << " ReadingTimestamp " << ReadingTimestamp + << " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration ); if (ReadingTimestamp) { @@ -2956,7 +2956,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user << " send read request for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset - << " ReadingTimestamp " << ReadingTimestamp + << " ReadingTimestamp " << ReadingTimestamp << " rrg " << ReadingForUserReadRuleGeneration ); @@ -2987,7 +2987,7 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << ReadingForUser << " readTimeStamp for other generation or no client info at all" - ); + ); ProcessTimestampRead(ctx); return; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 6444258dfc..506a18c3e4 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -125,8 +125,6 @@ private: const auto& res = record.GetPartitionResponse().GetCmdReadResult(); - ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? res.GetReadFromTimestampMs() : 0; - Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); @@ -140,6 +138,11 @@ private: readRes->SetBlobsFromCache(readRes->GetBlobsFromCache() + res.GetBlobsFromCache()); isStart = true; } + ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() + ? (isStart ? res.GetReadFromTimestampMs() + : Response->Record.GetPartitionResponse().GetCmdReadResult().GetReadFromTimestampMs()) + : 0; + if (record.GetPartitionResponse().HasCookie()) Response->Record.MutablePartitionResponse()->SetCookie(record.GetPartitionResponse().GetCookie()); @@ -171,6 +174,12 @@ private: isStart = false; } else { //glue to last res auto rr = partResp->MutableResult(partResp->ResultSize() - 1); + if (rr->GetSeqNo() != res.GetResult(i).GetSeqNo() || rr->GetPartNo() + 1 != res.GetResult(i).GetPartNo()) { + LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRead tablet: " << Tablet + << " last read pos (seqno/parno): " << rr->GetSeqNo() << "," << rr->GetPartNo() << " readed now " + << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo() + << " full request(now): " << Request); + } Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo()); (*rr->MutableData()) += res.GetResult(i).GetData(); rr->SetPartitionKey(res.GetResult(i).GetPartitionKey()); @@ -194,7 +203,7 @@ private: read->ClearBytes(); read->ClearTimeoutMs(); read->ClearMaxTimeLagMs(); - read->SetReadTimestampMs(res.GetReadFromTimestampMs()); + read->SetReadTimestampMs(readFromTimestampMs); THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest); req->Record = Request; |