aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-03-10 20:53:03 +0300
committeralexnick <alexnick@yandex-team.ru>2022-03-10 20:53:03 +0300
commit2e401e65e8c9c5824b56d25639af1ac71f9c6120 (patch)
tree19ec5e86442ab1e62fb6cccee079e8c82573ff09
parenteced39c380512422e0eb0d81cfabc966bc681d70 (diff)
downloadydb-2e401e65e8c9c5824b56d25639af1ac71f9c6120.tar.gz
fix for read from timestamp LOGBROKER-7158
ref:408014dc512a080000fda34630f9e8620aacfd15
-rw-r--r--ydb/core/persqueue/partition.cpp59
-rw-r--r--ydb/core/persqueue/pq_impl.cpp72
-rw-r--r--ydb/core/persqueue/pq_ut.cpp1
-rw-r--r--ydb/core/persqueue/user_info.cpp5
-rw-r--r--ydb/core/persqueue/user_info.h3
-rw-r--r--ydb/core/protos/msgbus_pq.proto1
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp13
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp165
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp2
9 files changed, 264 insertions, 57 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 4da1293a836..06f9161843f 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -419,7 +419,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
}
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
- auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx);
+ auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx, 0);
userInfo.HasReadRule = true;
ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
if (userInfo.ReadRuleGeneration != rrGen) {
@@ -2317,6 +2317,7 @@ TReadAnswer TReadInfo::FormAnswer(
readResult->SetWaitQuotaTimeMs(WaitQuotaTime.MilliSeconds());
readResult->SetMaxOffset(endOffset);
readResult->SetRealReadOffset(Offset);
+ readResult->SetReadFromTimestampMs(ReadTimestampMs);
Y_VERIFY(endOffset <= (ui64)Max<i64>(), "Max offset is too big: %" PRIu64, endOffset);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "FormAnswer " << Blobs.size());
@@ -2340,7 +2341,7 @@ TReadAnswer TReadInfo::FormAnswer(
ui16 internalPartsCount = blobs[pos].InternalPartsCount;
const TString& blobValue = blobs[pos].Value;
- if (blobValue.empty()) { // this is ok. Means that someone requested too much data
+ if (blobValue.empty()) { // this is ok. Means that someone requested too much data or retention race
LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, "Not full answer here!");
ui64 answerSize = answer->Response.ByteSize();
if (userInfo && Destination != 0) {
@@ -2385,24 +2386,20 @@ TReadAnswer TReadInfo::FormAnswer(
psize = size;
TClientBlob &res = batch.Blobs[i];
VERIFY_RESULT_BLOB(res, i);
- bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() &&
- ReadTimestampMs > res.WriteTimestamp.MilliSeconds();
- if (!messageSkippingBehaviour) {
- size += res.GetBlobSize();
- Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16,
- pos, i, Offset, PartNo, offset, res.GetPartNo());
-
- if (userInfo) {
- userInfo->AddTimestampToCache(
- Offset, res.WriteTimestamp, res.CreateTimestamp,
- Destination != 0, ctx.Now()
- );
- }
+ size += res.GetBlobSize();
+ Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16,
+ pos, i, Offset, PartNo, offset, res.GetPartNo());
+
+ if (userInfo) {
+ userInfo->AddTimestampToCache(
+ Offset, res.WriteTimestamp, res.CreateTimestamp,
+ Destination != 0, ctx.Now()
+ );
+ }
- AddResultBlob(readResult, res, Offset);
- if (res.IsLastPart()) {
- ++cnt;
- }
+ AddResultBlob(readResult, res, Offset);
+ if (res.IsLastPart()) {
+ ++cnt;
}
if (res.IsLastPart()) {
@@ -2532,6 +2529,7 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse
TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset)
{
+ Y_UNUSED(readTimestampMs);
ui32& count = *rcount;
ui32& size = *rsize;
TVector<TClientBlob> res;
@@ -2551,11 +2549,11 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset,
ui16 pno = Head.Batches[pos].GetPartNo();
for (; i < blobs.size(); ++i) {
+ ui64 curOffset = offset;
+
Y_VERIFY(pno == blobs[i].GetPartNo());
- bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() &&
- readTimestampMs > blobs[i].WriteTimestamp.MilliSeconds();
bool skip = offset < startOffset || offset == startOffset &&
- blobs[i].GetPartNo() < partNo || messageSkippingBehaviour;
+ blobs[i].GetPartNo() < partNo;
if (blobs[i].IsLastPart()) {
++offset;
pno = 0;
@@ -2572,8 +2570,8 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset,
break;
size += blobs[i].GetBlobSize();
res.push_back(blobs[i]);
- if (!firstAddedBlobOffset && AppData()->PQConfig.GetTopicsAreFirstClassCitizen())
- firstAddedBlobOffset = offset > 0 ? offset - 1 : 0;
+ if (!firstAddedBlobOffset)
+ firstAddedBlobOffset = curOffset;
}
if (i < blobs.size()) // already got limit
break;
@@ -2852,7 +2850,6 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
for (const auto& user : UpdateUserInfoTimestamp) {
Y_VERIFY(user != ReadingForUser);
}
-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
<< " user " << user << " send read request for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << " ReadingTimestamp " << ReadingTimestamp);
@@ -2882,13 +2879,14 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext&
ProcessTimestampRead(ctx);
return;
}
- Y_VERIFY(userInfo->ReadScheduled);
- userInfo->ReadScheduled = false;
- Y_VERIFY(ReadingForUser != "");
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
<< " user " << ReadingForUser << " readTimeStamp done, result " << userInfo->WriteTimestamp.MilliSeconds()
<< " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset);
+ Y_VERIFY(userInfo->ReadScheduled);
+ userInfo->ReadScheduled = false;
+ Y_VERIFY(ReadingForUser != "");
+
if (!userInfo->ActualTimestamps) {
LOG_INFO_S(
ctx,
@@ -3407,7 +3405,7 @@ void TPartition::HandleSetOffsetResponse(NKikimrClient::TResponse& response, con
userInfo->Session = "";
userInfo->Generation = userInfo->Step = 0;
userInfo->Offset = 0;
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
<< " user " << user << " drop done; " << (userInfo->UserActs.empty() ? "dropping state" : "preserve state for existed init"));
while (!userInfo->UserActs.empty() && userInfo->UserActs.front()->Type != TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE) {
@@ -3889,7 +3887,6 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T
session = "";
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
<< " user " << ev->ClientId << " reinit request with generation " << readRuleGeneration);
-
}
Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset);
@@ -4598,7 +4595,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
ui64 insideHeadOffset{0};
info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset);
- info.CachedOffset = Head.Offset > 0 ? Head.Offset : insideHeadOffset;
+ info.CachedOffset = insideHeadOffset;
if (info.Destination != 0) {
++userInfo.ActiveReads;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index eb9d3d525a5..f1b63977245 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -121,10 +121,13 @@ private:
return;
}
+
Y_VERIFY(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdReadResult());
const auto& res = record.GetPartitionResponse().GetCmdReadResult();
+ ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? res.GetReadFromTimestampMs() : 0;
+
Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
@@ -148,14 +151,26 @@ private:
partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + res.GetWaitQuotaTimeMs());
for (ui32 i = 0; i < res.ResultSize(); ++i) {
- if (!res.GetResult(i).HasPartNo() || res.GetResult(i).GetPartNo() == 0) {
+ bool isNewMsg = !res.GetResult(i).HasPartNo() || res.GetResult(i).GetPartNo() == 0;
+ if (!isStart) {
+ Y_VERIFY(partResp->ResultSize() > 0);
+ auto& back = partResp->GetResult(partResp->ResultSize() - 1);
+ bool lastMsgIsNotFull = back.GetPartNo() + 1 < back.GetTotalParts();
+ bool trancate = lastMsgIsNotFull && isNewMsg;
+ if (trancate) {
+ partResp->MutableResult()->RemoveLast();
+ if (partResp->GetResult().empty()) isStart = false;
+ }
+ }
+
+ if (isNewMsg) {
if (!isStart && res.GetResult(i).HasTotalParts() && res.GetResult(i).GetTotalParts() + i > res.ResultSize()) //last blob is not full
break;
partResp->AddResult()->CopyFrom(res.GetResult(i));
isStart = false;
} else { //glue to last res
- Y_VERIFY(partResp->GetResult(partResp->ResultSize() - 1).GetSeqNo() == res.GetResult(i).GetSeqNo());
auto rr = partResp->MutableResult(partResp->ResultSize() - 1);
+ Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo());
(*rr->MutableData()) += res.GetResult(i).GetData();
rr->SetPartitionKey(res.GetResult(i).GetPartitionKey());
rr->SetExplicitHash(res.GetResult(i).GetExplicitHash());
@@ -166,27 +181,40 @@ private:
}
}
}
- const auto& lastRes = partResp->GetResult(partResp->ResultSize() - 1);
- if (!lastRes.HasPartNo() || lastRes.GetPartNo() + 1 == lastRes.GetTotalParts()) { //last res is full, can answer
- ctx.Send(Sender, Response.Release());
- Die(ctx);
- return;
+ if (!partResp->GetResult().empty()) {
+ const auto& lastRes = partResp->GetResult(partResp->GetResult().size() - 1);
+ if (lastRes.HasPartNo() && lastRes.GetPartNo() + 1 < lastRes.GetTotalParts()) { //last res is not full
+ Request.SetRequestId(TMP_REQUEST_MARKER);
+
+ auto read = Request.MutablePartitionRequest()->MutableCmdRead();
+ read->SetOffset(lastRes.GetOffset());
+ read->SetPartNo(lastRes.GetPartNo() + 1);
+ read->SetCount(1);
+ read->ClearBytes();
+ read->ClearTimeoutMs();
+ read->ClearMaxTimeLagMs();
+ read->SetReadTimestampMs(res.GetReadFromTimestampMs());
+
+ THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
+ req->Record = Request;
+ ctx.Send(Tablet, req.Release());
+ return;
+ }
+ }
+
+ //filter old messages
+ ::google::protobuf::RepeatedPtrField<NKikimrClient::TCmdReadResult::TResult> records;
+ records.Swap(partResp->MutableResult());
+ partResp->ClearResult();
+ for (auto & rec : records) {
+ if (rec.GetWriteTimestampMS() >= readFromTimestampMs) {
+ auto result = partResp->AddResult();
+ result->CopyFrom(rec);
+ }
}
- //not full answer - need uprequest
-
- Request.SetRequestId(TMP_REQUEST_MARKER);
-
- auto read = Request.MutablePartitionRequest()->MutableCmdRead();
- read->SetOffset(lastRes.GetOffset());
- read->SetPartNo(lastRes.GetPartNo() + 1);
- read->SetCount(1);
- read->ClearBytes();
- read->ClearTimeoutMs();
- read->ClearMaxTimeLagMs();
- read->ClearReadTimestampMs();
- THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
- req->Record = Request;
- ctx.Send(Tablet, req.Release());
+
+ ctx.Send(Sender, Response.Release());
+ Die(ctx);
}
STFUNC(StateFunc) {
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp
index f62f28f18a0..fd621bc8ecf 100644
--- a/ydb/core/persqueue/pq_ut.cpp
+++ b/ydb/core/persqueue/pq_ut.cpp
@@ -225,6 +225,7 @@ Y_UNIT_TEST(TestReadRuleVersions) {
CmdGetOffset(0, client, 1, tc);
CmdGetOffset(1, client, 2, tc);
+ CmdGetOffset(0, "user", 0, tc);
{
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp
index bdf1d51a05f..6a3a7409050 100644
--- a/ydb/core/persqueue/user_info.cpp
+++ b/ydb/core/persqueue/user_info.cpp
@@ -49,6 +49,7 @@ TUsersInfoStorage::TUsersInfoStorage(
, DbId(dbId)
, FolderId(folderId)
, StreamName(streamName)
+ , CurReadRuleGeneration(0)
{
Counters.Populate(counters);
}
@@ -133,11 +134,11 @@ void TUsersInfoStorage::Remove(const TString& user, const TActorContext& ctx) {
UsersInfo.erase(it);
}
-TUserInfo& TUsersInfoStorage::GetOrCreate(const TString& user, const TActorContext& ctx) {
+TUserInfo& TUsersInfoStorage::GetOrCreate(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration) {
Y_VERIFY(!user.empty());
auto it = UsersInfo.find(user);
if (it == UsersInfo.end()) {
- return Create(ctx, user, 0, false, "", 0, 0, 0, 0, TInstant::Zero());
+ return Create(ctx, user, readRuleGeneration ? *readRuleGeneration : ++CurReadRuleGeneration, false, "", 0, 0, 0, 0, TInstant::Zero());
}
return it->second;
}
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 1752ea0b3b8..c3aa5b0a90c 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -517,7 +517,7 @@ public:
void ParseDeprecated(const TString& key, const TString& data, const TActorContext& ctx);
void Parse(const TString& key, const TString& data, const TActorContext& ctx);
- TUserInfo& GetOrCreate(const TString& user, const TActorContext& ctx);
+ TUserInfo& GetOrCreate(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration = {});
TUserInfo* GetIfExists(const TString& user);
void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) {
@@ -555,6 +555,7 @@ private:
TString DbId;
TString FolderId;
TString StreamName;
+ ui64 CurReadRuleGeneration;
};
} //NPQ
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index 849b292383a..3431d4bd6ff 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -379,6 +379,7 @@ message TCmdReadResult {
optional uint64 SizeLag = 9;
optional uint64 RealReadOffset = 10;
optional uint64 WaitQuotaTimeMs = 11;
+ optional uint64 ReadFromTimestampMs = 12;
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 2d91b29b9dd..87591c75434 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -2074,8 +2074,19 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
return;
}
+ auto MaskResult = [](const NKikimrClient::TPersQueuePartitionResponse& resp) {
+ if (resp.HasCmdReadResult()) {
+ auto res = resp;
+ for (auto& rr : *res.MutableCmdReadResult()->MutableResult()) {
+ rr.SetData(TStringBuilder() << "... " << rr.GetData().size() << " bytes ...");
+ }
+ return res;
+ }
+ return resp;
+ };
+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
- << " initDone " << InitDone << " event " << result);
+ << " initDone " << InitDone << " event " << MaskResult(result));
if (!InitDone) {
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index 257648f9c2c..f5cccddafcf 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -18,6 +18,7 @@
#include <library/cpp/json/json_reader.h>
#include <util/string/join.h>
+#include <util/generic/overloaded.h>
#include <grpc++/client_context.h>
@@ -43,6 +44,7 @@ namespace NKikimr::NPersQueueTests {
using namespace NYdb::NPersQueue;
using namespace NPersQueue;
+
Y_UNIT_TEST_SUITE(TPersQueueNewSchemeCacheTest) {
void PrepareForGrpcNoDC(TFlatMsgBusPQClient& annoyingClient) {
@@ -139,6 +141,166 @@ namespace NKikimr::NPersQueueTests {
testReadFromTopic("account2/topic2");
}
+ void TestReadAtTimestampImpl(ui32 maxMessagesCount, std::function<TString(ui32)> generateMessage) {
+ TTestServer server(false);
+ server.ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ server.StartServer();
+ server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE});
+ PrepareForGrpcNoDC(*server.AnnoyingClient);
+ NYdb::TDriverConfig driverCfg;
+
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort).SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)).SetDatabase("/Root");
+
+ auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg);
+ auto persqueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver);
+
+ TString topic = "account2/topic2";
+ server.EnableLogs({ NKikimrServices::PQ_READ_PROXY});
+
+ NYdb::NPersQueue::TWriteSessionSettings writeSessionSettings;
+ writeSessionSettings.ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off)
+ .Path(topic)
+ .MessageGroupId(topic)
+ .Codec(NYdb::NPersQueue::ECodec::RAW);
+
+ {
+ auto res = persqueueClient->AddReadRule("/Root/" + topic,
+ NYdb::NPersQueue::TAddReadRuleSettings().ReadRule(NYdb::NPersQueue::TReadRuleSettings().ConsumerName("userx")));
+ res.Wait();
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+ }
+
+ TVector<TInstant> ts;
+ TVector<ui32> firstOffset;
+
+ auto writeSession = persqueueClient->CreateWriteSession(writeSessionSettings);
+ TMaybe<TContinuationToken> continuationToken = Nothing();
+ ui32 messagesAcked = 0;
+ auto processEvent = [&](TWriteSessionEvent::TEvent& event) {
+ std::visit(TOverloaded {
+ [&](const TWriteSessionEvent::TAcksEvent& event) {
+ //! Acks just confirm that message was received and saved by server successfully.
+ //! Here we just count acked messages to check, that everything written is confirmed.
+ Cerr << "GOT ACK " << TInstant::Now() << "\n";
+ Sleep(TDuration::MilliSeconds(3));
+ for (const auto& ack : event.Acks) {
+ Y_UNUSED(ack);
+ messagesAcked++;
+ }
+ },
+ [&](TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ continuationToken = std::move(event.ContinuationToken);
+ },
+ [&](const TSessionClosedEvent&) {
+ UNIT_ASSERT(false);
+ }
+ }, event);
+ };
+
+ for (auto& event: writeSession->GetEvents(true)) {
+ processEvent(event);
+ }
+ UNIT_ASSERT(continuationToken.Defined());
+
+ for (ui32 i = 0; i < maxMessagesCount; ++i) {
+ TString message = generateMessage(i);
+ Cerr << "WRITTEN message " << i << "\n";
+ writeSession->Write(std::move(*continuationToken), std::move(message));
+ //! Continue token is no longer valid once used.
+ continuationToken = Nothing();
+ while (messagesAcked <= i || !continuationToken.Defined()) {
+ for (auto& event: writeSession->GetEvents(true)) {
+ processEvent(event);
+ }
+ }
+ }
+
+ //TODO check skip inside big blob
+ ui32 tsIt = 0;
+ while (true) {
+ std::shared_ptr<NYdb::NPersQueue::IReadSession> reader;
+ TInstant curTs = tsIt == 0 ? TInstant::Zero() : (ts[tsIt]);
+ auto settings = NYdb::NPersQueue::TReadSessionSettings()
+ .AppendTopics(topic)
+ .ConsumerName("userx")
+ .StartingMessageTimestamp(curTs)
+ .ReadOnlyOriginal(true);
+
+ TMap<TString, ui32> map;
+ ui32 messagesReceived = 0;
+ settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable {
+ for (const auto& msg : event.GetMessages()) {
+ Cerr << "TS: " << curTs << " Got message: " << msg.DebugString(false) << Endl;
+ Cout.Flush();
+ auto count = ++map[msg.GetData()];
+ UNIT_ASSERT(count == 1);
+ if (tsIt == 0) {
+ if (ts.empty()) {
+ ts.push_back(TInstant::Zero());
+ firstOffset.push_back(0);
+ }
+
+ ts.push_back(msg.GetWriteTime() - TDuration::MilliSeconds(1));
+ ts.push_back(msg.GetWriteTime());
+ ts.push_back(msg.GetWriteTime() + TDuration::MilliSeconds(1));
+ firstOffset.push_back(msg.GetOffset());
+ firstOffset.push_back(msg.GetOffset());
+ firstOffset.push_back(msg.GetOffset() + 1);
+
+ Cerr << "GOT MESSAGE TIMESTAMP " << ts.back() << "\n";
+ } else {
+ Cerr << "WAITING FIRST MESSAGE " << firstOffset[tsIt] << " got " << msg.GetOffset() << "\n";
+
+ UNIT_ASSERT(messagesReceived > 0 || msg.GetOffset() == firstOffset[tsIt]);
+ }
+ messagesReceived = msg.GetOffset() + 1;
+ }
+ if (messagesReceived >= maxMessagesCount) {
+ Cerr << "Closing session. Got " << messagesReceived << " messages" << Endl;
+ reader->Close(TDuration::Seconds(0));
+ Cerr << "Session closed" << Endl;
+ }
+ }, false);
+ reader = CreateReader(*ydbDriver, settings);
+
+ Cout << "Created reader\n";
+
+ Cout.Flush();
+ while (messagesReceived < maxMessagesCount) Sleep(TDuration::MilliSeconds(10));
+
+ if (tsIt == 0) {
+ for (ui32 i = 0; i < ts.size(); ++i) {
+ Cout << "TS " << ts[i] << " OFFSET " << firstOffset[i] << "\n";
+ }
+ }
+
+
+ tsIt++;
+ if (tsIt == ts.size()) break;
+ if (firstOffset[tsIt] >= messagesReceived) break;
+ }
+ }
+
+
+ Y_UNIT_TEST(TestReadAtTimestamp) {
+ auto generate1 = [](ui32 messageId) {
+ Y_UNUSED(messageId);
+ TString message = "Hello___" + CreateGuidAsString() + TString(1024*1024, 'a');
+ return message;
+ };
+
+ TestReadAtTimestampImpl(10, generate1);
+
+ auto generate2 = [](ui32 messageId) {
+ Y_UNUSED(messageId);
+ TString message = "Hello___" + CreateGuidAsString() + TString(1024*10240, 'a');
+ return message;
+ };
+
+ TestReadAtTimestampImpl(3, generate2);
+
+ }
+
Y_UNIT_TEST(TestWriteStat1stClass) {
auto testWriteStat1stClass = [](const TString& consumerName) {
TTestServer server(false);
@@ -544,5 +706,8 @@ namespace NKikimr::NPersQueueTests {
}
}
+
+
+
}
}
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 1b8da1967bf..7b3cfcd6a97 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -3742,6 +3742,7 @@ namespace {
.ReadOnlyOriginal(true)
);
+
{
TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1);
auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event);
@@ -3758,5 +3759,6 @@ namespace {
Cerr << "partition status: " << partitionStatus->DebugString() << Endl;
}
}
+
}
}