aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Nikolaevskii <alexnikolaevsky@gmail.com>2022-06-15 13:44:04 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-06-15 13:44:04 +0300
commitd9fa62401a336c0c1743b42a6775112fdb860487 (patch)
tree2a08c5712e999f0fd348a408c30bc893e2985120
parent4230edd531b7ad8794a2f0b2fbd3ff3b5cdd8f9f (diff)
downloadydb-d9fa62401a336c0c1743b42a6775112fdb860487.tar.gz
potential fix for LOGBROKER-7475
potential fix for LOGBROKER-7475 REVIEW: 2581092 REVIEW: 2632504 x-ydb-stable-ref: d9e7541ee7d0b730ea8f0401173f8049bfe823e4
-rw-r--r--ydb/core/persqueue/partition.cpp20
-rw-r--r--ydb/core/persqueue/pq_impl.cpp15
2 files changed, 27 insertions, 8 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index b4708f3cc3..4f9c4bb785 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2699,7 +2699,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
ui64 offset = read->Offset;
- if (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1)) {
+ if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) {
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
timestamp = Max(timestamp, userInfo.ReadFromTimestamp);
@@ -2867,8 +2867,13 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
if (userInfo.ReadScheduled)
return;
userInfo.ReadScheduled = true;
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
- << " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << " ReadingTimestamp " << ReadingTimestamp);
+ LOG_DEBUG_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "Topic '" << TopicName << "' partition " << Partition
+ << " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated "
+ << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset
+ << " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration
+ );
if (ReadingTimestamp) {
UpdateUserInfoTimestamp.push_back(std::make_pair(user, userInfo.ReadRuleGeneration));
@@ -2907,8 +2912,13 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
for (const auto& user : UpdateUserInfoTimestamp) {
Y_VERIFY(user.first != ReadingForUser || user.second != ReadingForUserReadRuleGeneration);
}
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
- << " user " << user << " send read request for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << " ReadingTimestamp " << ReadingTimestamp);
+ LOG_DEBUG_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "Topic '" << TopicName << "' partition " << Partition
+ << " user " << user << " send read request for offset " << userInfo.Offset << " initiated "
+ << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset
+ << " ReadingTimestamp " << ReadingTimestamp << " rrg " << ReadingForUserReadRuleGeneration
+ );
THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "",
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 113f910762..d79820c138 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -125,8 +125,6 @@ private:
const auto& res = record.GetPartitionResponse().GetCmdReadResult();
- ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? res.GetReadFromTimestampMs() : 0;
-
Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
@@ -140,6 +138,11 @@ private:
readRes->SetBlobsFromCache(readRes->GetBlobsFromCache() + res.GetBlobsFromCache());
isStart = true;
}
+ ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()
+ ? (isStart ? res.GetReadFromTimestampMs()
+ : Response->Record.GetPartitionResponse().GetCmdReadResult().GetReadFromTimestampMs())
+ : 0;
+
if (record.GetPartitionResponse().HasCookie())
Response->Record.MutablePartitionResponse()->SetCookie(record.GetPartitionResponse().GetCookie());
@@ -171,6 +174,12 @@ private:
isStart = false;
} else { //glue to last res
auto rr = partResp->MutableResult(partResp->ResultSize() - 1);
+ if (rr->GetSeqNo() != res.GetResult(i).GetSeqNo() || rr->GetPartNo() + 1 != res.GetResult(i).GetPartNo()) {
+ LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRead tablet: " << Tablet
+ << " last read pos (seqno/parno): " << rr->GetSeqNo() << "," << rr->GetPartNo() << " readed now "
+ << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo()
+ << " full request(now): " << Request);
+ }
Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo());
(*rr->MutableData()) += res.GetResult(i).GetData();
rr->SetPartitionKey(res.GetResult(i).GetPartitionKey());
@@ -194,7 +203,7 @@ private:
read->ClearBytes();
read->ClearTimeoutMs();
read->ClearMaxTimeLagMs();
- read->SetReadTimestampMs(res.GetReadFromTimestampMs());
+ read->SetReadTimestampMs(readFromTimestampMs);
THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
req->Record = Request;