diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-09-02 20:36:30 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-09-02 20:36:30 +0300 |
commit | 19d2cde32aa15c99a0673eb3752a1834fb5605bf (patch) | |
tree | 9428e9c42d7cefc2845f4a7a54ab81d4192f5cc4 | |
parent | 1be29b7cc0683062a9229d6e6c17ef77512736fd (diff) | |
download | ydb-19d2cde32aa15c99a0673eb3752a1834fb5605bf.tar.gz |
[] fix data absence in topic
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.cpp | 30 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.h | 9 |
2 files changed, 34 insertions, 5 deletions
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp index ecbb258efa7..1ee65694354 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp @@ -167,6 +167,7 @@ namespace NYdb::NConsoleClient { } int TTopicReader::HandleDataReceivedEvent(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent* event, IOutputStream& output) { + event->GetPartitionSession()->RequestStatus(); HasFirstMessage_ = true; NTopic::TDeferredCommit defCommit; @@ -195,6 +196,11 @@ namespace NYdb::NConsoleClient { int TTopicReader::HandleStartPartitionSessionEvent(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent* event) { event->Confirm(); + if (event->GetCommittedOffset() == event->GetEndOffset()) { + ReadingStatus_ = EReadingStatus::PartitionWithoutData; + } else { + ReadingStatus_ = EReadingStatus::PartitionWithData; + } return EXIT_SUCCESS; } @@ -205,6 +211,16 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } + int TTopicReader::HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent* event) { + if (event->GetReadOffset() == event->GetCommittedOffset()) { + ReadingStatus_ = EReadingStatus::PartitionWithoutData; + } else { + ReadingStatus_ = EReadingStatus::PartitionWithData; + } + + return EXIT_SUCCESS; + } + int TTopicReader::HandleEvent(TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent>& event, IOutputStream& output) { if (!event) { return 0; @@ -216,6 +232,8 @@ namespace NYdb::NConsoleClient { return HandleStartPartitionSessionEvent(createPartitionStreamEvent); } else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) { return HandleCommitOffsetAcknowledgementEvent(commitEvent); + } else if (auto* partitionStatusEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) { + return HandlePartitionSessionStatusEvent(partitionStatusEvent); } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) { ThrowOnError(*sessionClosedEvent); return 1; @@ -233,16 +251,18 @@ namespace NYdb::NConsoleClient { NThreading::TFuture<void> future = ReadSession_->WaitEvent(); future.Wait(messageReceiveDeadline); if (!future.HasValue()) { - if (ReaderParams_.Wait() && !HasFirstMessage_) { - LastMessageReceivedTs_ = Now(); + if (waitForever) { + LastMessageReceivedTs_ = TInstant::Now(); continue; } - if (waitForever) { - continue; + bool isReading = ReadingStatus_ == EReadingStatus::PartitionWithData; + if (!isReading || (isReading && HasFirstMessage_)) { + return EXIT_SUCCESS; } - return EXIT_SUCCESS; + LastMessageReceivedTs_ = TInstant::Now(); + continue; } // TODO(shmel1k@): throttling? diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h index cc6278c77b2..1f42e6f677a 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.h +++ b/ydb/public/lib/ydb_cli/topic/topic_read.h @@ -68,6 +68,7 @@ namespace NYdb::NConsoleClient { void HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output); int HandleStartPartitionSessionEvent(NTopic::TReadSessionEvent::TStartPartitionSessionEvent*); + int HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent*); int HandleDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent*, IOutputStream&); int HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent*); int HandleEvent(TMaybe<NTopic::TReadSessionEvent::TEvent>&, IOutputStream&); @@ -76,6 +77,12 @@ namespace NYdb::NConsoleClient { void PrintMessagesInPrettyFormat(IOutputStream& output); void PrintMessagesInJsonArrayFormat(IOutputStream& output); + enum EReadingStatus { + NoPartitionTaken = 0, + PartitionWithoutData = 1, + PartitionWithData = 2, + }; + private: std::shared_ptr<NTopic::IReadSession> ReadSession_; const TTopicReaderSettings ReaderParams_; @@ -87,6 +94,8 @@ namespace NYdb::NConsoleClient { std::unique_ptr<TPrettyTable> OutputTable_; TVector<TReceivedMessage> ReceivedMessages_; + EReadingStatus ReadingStatus_ = EReadingStatus::NoPartitionTaken; + friend class TTopicReaderTests; }; } // namespace NYdb::NConsoleClient
\ No newline at end of file |