diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-03-10 20:53:03 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-03-10 20:53:03 +0300 |
commit | 2e401e65e8c9c5824b56d25639af1ac71f9c6120 (patch) | |
tree | 19ec5e86442ab1e62fb6cccee079e8c82573ff09 | |
parent | eced39c380512422e0eb0d81cfabc966bc681d70 (diff) | |
download | ydb-2e401e65e8c9c5824b56d25639af1ac71f9c6120.tar.gz |
fix for read from timestamp LOGBROKER-7158
ref:408014dc512a080000fda34630f9e8620aacfd15
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 59 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 72 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 3 | ||||
-rw-r--r-- | ydb/core/protos/msgbus_pq.proto | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 13 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp | 165 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 2 |
9 files changed, 264 insertions, 57 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 4da1293a836..06f9161843f 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -419,7 +419,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config } for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { const auto& consumer = config.GetReadRules(i); - auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx); + auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx, 0); userInfo.HasReadRule = true; ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; if (userInfo.ReadRuleGeneration != rrGen) { @@ -2317,6 +2317,7 @@ TReadAnswer TReadInfo::FormAnswer( readResult->SetWaitQuotaTimeMs(WaitQuotaTime.MilliSeconds()); readResult->SetMaxOffset(endOffset); readResult->SetRealReadOffset(Offset); + readResult->SetReadFromTimestampMs(ReadTimestampMs); Y_VERIFY(endOffset <= (ui64)Max<i64>(), "Max offset is too big: %" PRIu64, endOffset); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "FormAnswer " << Blobs.size()); @@ -2340,7 +2341,7 @@ TReadAnswer TReadInfo::FormAnswer( ui16 internalPartsCount = blobs[pos].InternalPartsCount; const TString& blobValue = blobs[pos].Value; - if (blobValue.empty()) { // this is ok. Means that someone requested too much data + if (blobValue.empty()) { // this is ok. Means that someone requested too much data or retention race LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, "Not full answer here!"); ui64 answerSize = answer->Response.ByteSize(); if (userInfo && Destination != 0) { @@ -2385,24 +2386,20 @@ TReadAnswer TReadInfo::FormAnswer( psize = size; TClientBlob &res = batch.Blobs[i]; VERIFY_RESULT_BLOB(res, i); - bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && - ReadTimestampMs > res.WriteTimestamp.MilliSeconds(); - if (!messageSkippingBehaviour) { - size += res.GetBlobSize(); - Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16, - pos, i, Offset, PartNo, offset, res.GetPartNo()); - - if (userInfo) { - userInfo->AddTimestampToCache( - Offset, res.WriteTimestamp, res.CreateTimestamp, - Destination != 0, ctx.Now() - ); - } + size += res.GetBlobSize(); + Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16, + pos, i, Offset, PartNo, offset, res.GetPartNo()); + + if (userInfo) { + userInfo->AddTimestampToCache( + Offset, res.WriteTimestamp, res.CreateTimestamp, + Destination != 0, ctx.Now() + ); + } - AddResultBlob(readResult, res, Offset); - if (res.IsLastPart()) { - ++cnt; - } + AddResultBlob(readResult, res, Offset); + if (res.IsLastPart()) { + ++cnt; } if (res.IsLastPart()) { @@ -2532,6 +2529,7 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset) { + Y_UNUSED(readTimestampMs); ui32& count = *rcount; ui32& size = *rsize; TVector<TClientBlob> res; @@ -2551,11 +2549,11 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, ui16 pno = Head.Batches[pos].GetPartNo(); for (; i < blobs.size(); ++i) { + ui64 curOffset = offset; + Y_VERIFY(pno == blobs[i].GetPartNo()); - bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && - readTimestampMs > blobs[i].WriteTimestamp.MilliSeconds(); bool skip = offset < startOffset || offset == startOffset && - blobs[i].GetPartNo() < partNo || messageSkippingBehaviour; + blobs[i].GetPartNo() < partNo; if (blobs[i].IsLastPart()) { ++offset; pno = 0; @@ -2572,8 +2570,8 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, break; size += blobs[i].GetBlobSize(); res.push_back(blobs[i]); - if (!firstAddedBlobOffset && AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) - firstAddedBlobOffset = offset > 0 ? offset - 1 : 0; + if (!firstAddedBlobOffset) + firstAddedBlobOffset = curOffset; } if (i < blobs.size()) // already got limit break; @@ -2852,7 +2850,6 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo for (const auto& user : UpdateUserInfoTimestamp) { Y_VERIFY(user != ReadingForUser); } - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition << " user " << user << " send read request for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << " ReadingTimestamp " << ReadingTimestamp); @@ -2882,13 +2879,14 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ProcessTimestampRead(ctx); return; } - Y_VERIFY(userInfo->ReadScheduled); - userInfo->ReadScheduled = false; - Y_VERIFY(ReadingForUser != ""); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition << " user " << ReadingForUser << " readTimeStamp done, result " << userInfo->WriteTimestamp.MilliSeconds() << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset); + Y_VERIFY(userInfo->ReadScheduled); + userInfo->ReadScheduled = false; + Y_VERIFY(ReadingForUser != ""); + if (!userInfo->ActualTimestamps) { LOG_INFO_S( ctx, @@ -3407,7 +3405,7 @@ void TPartition::HandleSetOffsetResponse(NKikimrClient::TResponse& response, con userInfo->Session = ""; userInfo->Generation = userInfo->Step = 0; userInfo->Offset = 0; - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition << " user " << user << " drop done; " << (userInfo->UserActs.empty() ? "dropping state" : "preserve state for existed init")); while (!userInfo->UserActs.empty() && userInfo->UserActs.front()->Type != TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE) { @@ -3889,7 +3887,6 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T session = ""; LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition << " user " << ev->ClientId << " reinit request with generation " << readRuleGeneration); - } Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset); @@ -4598,7 +4595,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u ui64 insideHeadOffset{0}; info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset); - info.CachedOffset = Head.Offset > 0 ? Head.Offset : insideHeadOffset; + info.CachedOffset = insideHeadOffset; if (info.Destination != 0) { ++userInfo.ActiveReads; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index eb9d3d525a5..f1b63977245 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -121,10 +121,13 @@ private: return; } + Y_VERIFY(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdReadResult()); const auto& res = record.GetPartitionResponse().GetCmdReadResult(); + ui64 readFromTimestampMs = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? res.GetReadFromTimestampMs() : 0; + Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); @@ -148,14 +151,26 @@ private: partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + res.GetWaitQuotaTimeMs()); for (ui32 i = 0; i < res.ResultSize(); ++i) { - if (!res.GetResult(i).HasPartNo() || res.GetResult(i).GetPartNo() == 0) { + bool isNewMsg = !res.GetResult(i).HasPartNo() || res.GetResult(i).GetPartNo() == 0; + if (!isStart) { + Y_VERIFY(partResp->ResultSize() > 0); + auto& back = partResp->GetResult(partResp->ResultSize() - 1); + bool lastMsgIsNotFull = back.GetPartNo() + 1 < back.GetTotalParts(); + bool trancate = lastMsgIsNotFull && isNewMsg; + if (trancate) { + partResp->MutableResult()->RemoveLast(); + if (partResp->GetResult().empty()) isStart = false; + } + } + + if (isNewMsg) { if (!isStart && res.GetResult(i).HasTotalParts() && res.GetResult(i).GetTotalParts() + i > res.ResultSize()) //last blob is not full break; partResp->AddResult()->CopyFrom(res.GetResult(i)); isStart = false; } else { //glue to last res - Y_VERIFY(partResp->GetResult(partResp->ResultSize() - 1).GetSeqNo() == res.GetResult(i).GetSeqNo()); auto rr = partResp->MutableResult(partResp->ResultSize() - 1); + Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo()); (*rr->MutableData()) += res.GetResult(i).GetData(); rr->SetPartitionKey(res.GetResult(i).GetPartitionKey()); rr->SetExplicitHash(res.GetResult(i).GetExplicitHash()); @@ -166,27 +181,40 @@ private: } } } - const auto& lastRes = partResp->GetResult(partResp->ResultSize() - 1); - if (!lastRes.HasPartNo() || lastRes.GetPartNo() + 1 == lastRes.GetTotalParts()) { //last res is full, can answer - ctx.Send(Sender, Response.Release()); - Die(ctx); - return; + if (!partResp->GetResult().empty()) { + const auto& lastRes = partResp->GetResult(partResp->GetResult().size() - 1); + if (lastRes.HasPartNo() && lastRes.GetPartNo() + 1 < lastRes.GetTotalParts()) { //last res is not full + Request.SetRequestId(TMP_REQUEST_MARKER); + + auto read = Request.MutablePartitionRequest()->MutableCmdRead(); + read->SetOffset(lastRes.GetOffset()); + read->SetPartNo(lastRes.GetPartNo() + 1); + read->SetCount(1); + read->ClearBytes(); + read->ClearTimeoutMs(); + read->ClearMaxTimeLagMs(); + read->SetReadTimestampMs(res.GetReadFromTimestampMs()); + + THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest); + req->Record = Request; + ctx.Send(Tablet, req.Release()); + return; + } + } + + //filter old messages + ::google::protobuf::RepeatedPtrField<NKikimrClient::TCmdReadResult::TResult> records; + records.Swap(partResp->MutableResult()); + partResp->ClearResult(); + for (auto & rec : records) { + if (rec.GetWriteTimestampMS() >= readFromTimestampMs) { + auto result = partResp->AddResult(); + result->CopyFrom(rec); + } } - //not full answer - need uprequest - - Request.SetRequestId(TMP_REQUEST_MARKER); - - auto read = Request.MutablePartitionRequest()->MutableCmdRead(); - read->SetOffset(lastRes.GetOffset()); - read->SetPartNo(lastRes.GetPartNo() + 1); - read->SetCount(1); - read->ClearBytes(); - read->ClearTimeoutMs(); - read->ClearMaxTimeLagMs(); - read->ClearReadTimestampMs(); - THolder<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest); - req->Record = Request; - ctx.Send(Tablet, req.Release()); + + ctx.Send(Sender, Response.Release()); + Die(ctx); } STFUNC(StateFunc) { diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp index f62f28f18a0..fd621bc8ecf 100644 --- a/ydb/core/persqueue/pq_ut.cpp +++ b/ydb/core/persqueue/pq_ut.cpp @@ -225,6 +225,7 @@ Y_UNIT_TEST(TestReadRuleVersions) { CmdGetOffset(0, client, 1, tc); CmdGetOffset(1, client, 2, tc); + CmdGetOffset(0, "user", 0, tc); { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index bdf1d51a05f..6a3a7409050 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -49,6 +49,7 @@ TUsersInfoStorage::TUsersInfoStorage( , DbId(dbId) , FolderId(folderId) , StreamName(streamName) + , CurReadRuleGeneration(0) { Counters.Populate(counters); } @@ -133,11 +134,11 @@ void TUsersInfoStorage::Remove(const TString& user, const TActorContext& ctx) { UsersInfo.erase(it); } -TUserInfo& TUsersInfoStorage::GetOrCreate(const TString& user, const TActorContext& ctx) { +TUserInfo& TUsersInfoStorage::GetOrCreate(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration) { Y_VERIFY(!user.empty()); auto it = UsersInfo.find(user); if (it == UsersInfo.end()) { - return Create(ctx, user, 0, false, "", 0, 0, 0, 0, TInstant::Zero()); + return Create(ctx, user, readRuleGeneration ? *readRuleGeneration : ++CurReadRuleGeneration, false, "", 0, 0, 0, 0, TInstant::Zero()); } return it->second; } diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 1752ea0b3b8..c3aa5b0a90c 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -517,7 +517,7 @@ public: void ParseDeprecated(const TString& key, const TString& data, const TActorContext& ctx); void Parse(const TString& key, const TString& data, const TActorContext& ctx); - TUserInfo& GetOrCreate(const TString& user, const TActorContext& ctx); + TUserInfo& GetOrCreate(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration = {}); TUserInfo* GetIfExists(const TString& user); void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) { @@ -555,6 +555,7 @@ private: TString DbId; TString FolderId; TString StreamName; + ui64 CurReadRuleGeneration; }; } //NPQ diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 849b292383a..3431d4bd6ff 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -379,6 +379,7 @@ message TCmdReadResult { optional uint64 SizeLag = 9; optional uint64 RealReadOffset = 10; optional uint64 WaitQuotaTimeMs = 11; + optional uint64 ReadFromTimestampMs = 12; } diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 2d91b29b9dd..87591c75434 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -2074,8 +2074,19 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo return; } + auto MaskResult = [](const NKikimrClient::TPersQueuePartitionResponse& resp) { + if (resp.HasCmdReadResult()) { + auto res = resp; + for (auto& rr : *res.MutableCmdReadResult()->MutableResult()) { + rr.SetData(TStringBuilder() << "... " << rr.GetData().size() << " bytes ..."); + } + return res; + } + return resp; + }; + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition - << " initDone " << InitDone << " event " << result); + << " initDone " << InitDone << " event " << MaskResult(result)); if (!InitDone) { diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 257648f9c2c..f5cccddafcf 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -18,6 +18,7 @@ #include <library/cpp/json/json_reader.h> #include <util/string/join.h> +#include <util/generic/overloaded.h> #include <grpc++/client_context.h> @@ -43,6 +44,7 @@ namespace NKikimr::NPersQueueTests { using namespace NYdb::NPersQueue; using namespace NPersQueue; + Y_UNIT_TEST_SUITE(TPersQueueNewSchemeCacheTest) { void PrepareForGrpcNoDC(TFlatMsgBusPQClient& annoyingClient) { @@ -139,6 +141,166 @@ namespace NKikimr::NPersQueueTests { testReadFromTopic("account2/topic2"); } + void TestReadAtTimestampImpl(ui32 maxMessagesCount, std::function<TString(ui32)> generateMessage) { + TTestServer server(false); + server.ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + server.StartServer(); + server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE}); + PrepareForGrpcNoDC(*server.AnnoyingClient); + NYdb::TDriverConfig driverCfg; + + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort).SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)).SetDatabase("/Root"); + + auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg); + auto persqueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver); + + TString topic = "account2/topic2"; + server.EnableLogs({ NKikimrServices::PQ_READ_PROXY}); + + NYdb::NPersQueue::TWriteSessionSettings writeSessionSettings; + writeSessionSettings.ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off) + .Path(topic) + .MessageGroupId(topic) + .Codec(NYdb::NPersQueue::ECodec::RAW); + + { + auto res = persqueueClient->AddReadRule("/Root/" + topic, + NYdb::NPersQueue::TAddReadRuleSettings().ReadRule(NYdb::NPersQueue::TReadRuleSettings().ConsumerName("userx"))); + res.Wait(); + UNIT_ASSERT(res.GetValue().IsSuccess()); + } + + TVector<TInstant> ts; + TVector<ui32> firstOffset; + + auto writeSession = persqueueClient->CreateWriteSession(writeSessionSettings); + TMaybe<TContinuationToken> continuationToken = Nothing(); + ui32 messagesAcked = 0; + auto processEvent = [&](TWriteSessionEvent::TEvent& event) { + std::visit(TOverloaded { + [&](const TWriteSessionEvent::TAcksEvent& event) { + //! Acks just confirm that message was received and saved by server successfully. + //! Here we just count acked messages to check, that everything written is confirmed. + Cerr << "GOT ACK " << TInstant::Now() << "\n"; + Sleep(TDuration::MilliSeconds(3)); + for (const auto& ack : event.Acks) { + Y_UNUSED(ack); + messagesAcked++; + } + }, + [&](TWriteSessionEvent::TReadyToAcceptEvent& event) { + continuationToken = std::move(event.ContinuationToken); + }, + [&](const TSessionClosedEvent&) { + UNIT_ASSERT(false); + } + }, event); + }; + + for (auto& event: writeSession->GetEvents(true)) { + processEvent(event); + } + UNIT_ASSERT(continuationToken.Defined()); + + for (ui32 i = 0; i < maxMessagesCount; ++i) { + TString message = generateMessage(i); + Cerr << "WRITTEN message " << i << "\n"; + writeSession->Write(std::move(*continuationToken), std::move(message)); + //! Continue token is no longer valid once used. + continuationToken = Nothing(); + while (messagesAcked <= i || !continuationToken.Defined()) { + for (auto& event: writeSession->GetEvents(true)) { + processEvent(event); + } + } + } + + //TODO check skip inside big blob + ui32 tsIt = 0; + while (true) { + std::shared_ptr<NYdb::NPersQueue::IReadSession> reader; + TInstant curTs = tsIt == 0 ? TInstant::Zero() : (ts[tsIt]); + auto settings = NYdb::NPersQueue::TReadSessionSettings() + .AppendTopics(topic) + .ConsumerName("userx") + .StartingMessageTimestamp(curTs) + .ReadOnlyOriginal(true); + + TMap<TString, ui32> map; + ui32 messagesReceived = 0; + settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable { + for (const auto& msg : event.GetMessages()) { + Cerr << "TS: " << curTs << " Got message: " << msg.DebugString(false) << Endl; + Cout.Flush(); + auto count = ++map[msg.GetData()]; + UNIT_ASSERT(count == 1); + if (tsIt == 0) { + if (ts.empty()) { + ts.push_back(TInstant::Zero()); + firstOffset.push_back(0); + } + + ts.push_back(msg.GetWriteTime() - TDuration::MilliSeconds(1)); + ts.push_back(msg.GetWriteTime()); + ts.push_back(msg.GetWriteTime() + TDuration::MilliSeconds(1)); + firstOffset.push_back(msg.GetOffset()); + firstOffset.push_back(msg.GetOffset()); + firstOffset.push_back(msg.GetOffset() + 1); + + Cerr << "GOT MESSAGE TIMESTAMP " << ts.back() << "\n"; + } else { + Cerr << "WAITING FIRST MESSAGE " << firstOffset[tsIt] << " got " << msg.GetOffset() << "\n"; + + UNIT_ASSERT(messagesReceived > 0 || msg.GetOffset() == firstOffset[tsIt]); + } + messagesReceived = msg.GetOffset() + 1; + } + if (messagesReceived >= maxMessagesCount) { + Cerr << "Closing session. Got " << messagesReceived << " messages" << Endl; + reader->Close(TDuration::Seconds(0)); + Cerr << "Session closed" << Endl; + } + }, false); + reader = CreateReader(*ydbDriver, settings); + + Cout << "Created reader\n"; + + Cout.Flush(); + while (messagesReceived < maxMessagesCount) Sleep(TDuration::MilliSeconds(10)); + + if (tsIt == 0) { + for (ui32 i = 0; i < ts.size(); ++i) { + Cout << "TS " << ts[i] << " OFFSET " << firstOffset[i] << "\n"; + } + } + + + tsIt++; + if (tsIt == ts.size()) break; + if (firstOffset[tsIt] >= messagesReceived) break; + } + } + + + Y_UNIT_TEST(TestReadAtTimestamp) { + auto generate1 = [](ui32 messageId) { + Y_UNUSED(messageId); + TString message = "Hello___" + CreateGuidAsString() + TString(1024*1024, 'a'); + return message; + }; + + TestReadAtTimestampImpl(10, generate1); + + auto generate2 = [](ui32 messageId) { + Y_UNUSED(messageId); + TString message = "Hello___" + CreateGuidAsString() + TString(1024*10240, 'a'); + return message; + }; + + TestReadAtTimestampImpl(3, generate2); + + } + Y_UNIT_TEST(TestWriteStat1stClass) { auto testWriteStat1stClass = [](const TString& consumerName) { TTestServer server(false); @@ -544,5 +706,8 @@ namespace NKikimr::NPersQueueTests { } } + + + } } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 1b8da1967bf..7b3cfcd6a97 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3742,6 +3742,7 @@ namespace { .ReadOnlyOriginal(true) ); + { TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1); auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event); @@ -3758,5 +3759,6 @@ namespace { Cerr << "partition status: " << partitionStatus->DebugString() << Endl; } } + } } |