diff options
author | Aleksei Nikolaevskii <alexnikolaevsky@gmail.com> | 2022-06-15 13:44:04 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-06-15 13:44:04 +0300 |
commit | d9fa62401a336c0c1743b42a6775112fdb860487 (patch) | |
tree | 2a08c5712e999f0fd348a408c30bc893e2985120 | |
parent | 4230edd531b7ad8794a2f0b2fbd3ff3b5cdd8f9f (diff) | |
download | ydb-d9fa62401a336c0c1743b42a6775112fdb860487.tar.gz |
potential fix for LOGBROKER-7475
potential fix for LOGBROKER-7475
REVIEW: 2581092
REVIEW: 2632504
x-ydb-stable-ref: d9e7541ee7d0b730ea8f0401173f8049bfe823e4
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 20 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 15 |
2 files changed, 27 insertions, 8 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index b4708f3cc3..4f9c4bb785 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2699,7 +2699,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); ui64 offset = read->Offset; - if (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1)) { + if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) { TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero(); timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs)); timestamp = Max(timestamp, userInfo.ReadFromTimestamp); @@ -2867,8 +2867,13 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo if (userInfo.ReadScheduled) return; userInfo.ReadScheduled = true; - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition - << " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << " ReadingTimestamp " << ReadingTimestamp); + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicName << "' partition " << Partition + << " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated " + << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset + << " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration + ); if (ReadingTimestamp) { UpdateUserInfoTimestamp.push_back(std::make_pair(user, userInfo.ReadRuleGeneration)); @@ -2907,8 +2912,13 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo for (const auto& user : UpdateUserInfoTimestamp) { Y_VERIFY(user.first != ReadingForUser || user.second != ReadingForUserReadRuleGeneration); } - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition - << " user " << user << " send read request for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << " ReadingTimestamp " << ReadingTimestamp); + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicName << "' partition " << Partition + << " user " << user << " send read request for offset " << userInfo.Offset << " initiated " + << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset + << " ReadingTimestamp " << ReadingTimestamp << " rrg " << ReadingForUserReadRuleGeneration + ); THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "", diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 113f910762..d79820c138 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -125,8 +125,6 @@ private: const auto& res = record.GetPartitionResponse().GetCmdReadResult(); - ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? res.GetReadFromTimestampMs() : 0; - Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); @@ -140,6 +138,11 @@ private: readRes->SetBlobsFromCache(readRes->GetBlobsFromCache() + res.GetBlobsFromCache()); isStart = true; } + ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() + ? (isStart ? res.GetReadFromTimestampMs() + : Response->Record.GetPartitionResponse().GetCmdReadResult().GetReadFromTimestampMs()) + : 0; + if (record.GetPartitionResponse().HasCookie()) Response->Record.MutablePartitionResponse()->SetCookie(record.GetPartitionResponse().GetCookie()); @@ -171,6 +174,12 @@ private: isStart = false; } else { //glue to last res auto rr = partResp->MutableResult(partResp->ResultSize() - 1); + if (rr->GetSeqNo() != res.GetResult(i).GetSeqNo() || rr->GetPartNo() + 1 != res.GetResult(i).GetPartNo()) { + LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRead tablet: " << Tablet + << " last read pos (seqno/parno): " << rr->GetSeqNo() << "," << rr->GetPartNo() << " readed now " + << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo() + << " full request(now): " << Request); + } Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo()); (*rr->MutableData()) += res.GetResult(i).GetData(); rr->SetPartitionKey(res.GetResult(i).GetPartitionKey()); @@ -194,7 +203,7 @@ private: read->ClearBytes(); read->ClearTimeoutMs(); read->ClearMaxTimeLagMs(); - read->SetReadTimestampMs(res.GetReadFromTimestampMs()); + read->SetReadTimestampMs(readFromTimestampMs); THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest); req->Record = Request; |