aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-06-10 17:26:09 +0300
committeralexnick <alexnick@yandex-team.ru>2022-06-10 17:26:09 +0300
commit1bcf2dc16edab6ccb8d8f99967a89a639ce41a2a (patch)
treee689bfd57a207ae8692bfbb08e96f130d78492e3
parent6fb83ed5735ef8911491f47fbc05dc6c8369401c (diff)
downloadydb-1bcf2dc16edab6ccb8d8f99967a89a639ce41a2a.tar.gz
potential fix for LOGBROKER-7475
ref:f5bba25603aee8cffa9f6c91d4382076e6a91a8f
-rw-r--r--ydb/core/persqueue/partition.cpp8
-rw-r--r--ydb/core/persqueue/pq_impl.cpp15
2 files changed, 16 insertions, 7 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 9b26dc3080..00e46d4046 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2727,7 +2727,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
ui64 offset = read->Offset;
- if (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1)) {
+ if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) {
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
timestamp = Max(timestamp, userInfo.ReadFromTimestamp);
@@ -2910,7 +2910,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
"Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
<< " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated "
<< " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset
- << " ReadingTimestamp " << ReadingTimestamp
+ << " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration
);
if (ReadingTimestamp) {
@@ -2956,7 +2956,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
"Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
<< " user " << user << " send read request for offset " << userInfo.Offset << " initiated "
<< " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset
- << " ReadingTimestamp " << ReadingTimestamp
+ << " ReadingTimestamp " << ReadingTimestamp << " rrg " << ReadingForUserReadRuleGeneration
);
@@ -2987,7 +2987,7 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext&
ctx, NKikimrServices::PERSQUEUE,
"Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
<< " user " << ReadingForUser << " readTimeStamp for other generation or no client info at all"
- );
+ );
ProcessTimestampRead(ctx);
return;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 6444258dfc..506a18c3e4 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -125,8 +125,6 @@ private:
const auto& res = record.GetPartitionResponse().GetCmdReadResult();
- ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? res.GetReadFromTimestampMs() : 0;
-
Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
@@ -140,6 +138,11 @@ private:
readRes->SetBlobsFromCache(readRes->GetBlobsFromCache() + res.GetBlobsFromCache());
isStart = true;
}
+ ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()
+ ? (isStart ? res.GetReadFromTimestampMs()
+ : Response->Record.GetPartitionResponse().GetCmdReadResult().GetReadFromTimestampMs())
+ : 0;
+
if (record.GetPartitionResponse().HasCookie())
Response->Record.MutablePartitionResponse()->SetCookie(record.GetPartitionResponse().GetCookie());
@@ -171,6 +174,12 @@ private:
isStart = false;
} else { //glue to last res
auto rr = partResp->MutableResult(partResp->ResultSize() - 1);
+ if (rr->GetSeqNo() != res.GetResult(i).GetSeqNo() || rr->GetPartNo() + 1 != res.GetResult(i).GetPartNo()) {
+ LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRead tablet: " << Tablet
+ << " last read pos (seqno/parno): " << rr->GetSeqNo() << "," << rr->GetPartNo() << " readed now "
+ << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo()
+ << " full request(now): " << Request);
+ }
Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo());
(*rr->MutableData()) += res.GetResult(i).GetData();
rr->SetPartitionKey(res.GetResult(i).GetPartitionKey());
@@ -194,7 +203,7 @@ private:
read->ClearBytes();
read->ClearTimeoutMs();
read->ClearMaxTimeLagMs();
- read->SetReadTimestampMs(res.GetReadFromTimestampMs());
+ read->SetReadTimestampMs(readFromTimestampMs);
THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
req->Record = Request;