diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-09-09 12:55:57 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-09-09 12:55:57 +0300 |
commit | 50ad0f80e9c732af38309a9c9e03353c888bb323 (patch) | |
tree | fc9c82c019a50f91d2acf4ad2fc886d877307724 | |
parent | 422aefd03b121a1966b80756d8a06a86d9787e04 (diff) | |
download | ydb-50ad0f80e9c732af38309a9c9e03353c888bb323.tar.gz |
[] fix partition status for multiple partitions
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.cpp | 126 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.h | 14 |
2 files changed, 102 insertions, 38 deletions
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp index 1ee65694354..d898e3cd9fe 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp @@ -114,7 +114,7 @@ namespace NYdb::NConsoleClient { } } // namespace - void TTopicReader::PrintMessagesInPrettyFormat(IOutputStream& output) { + void TTopicReader::PrintMessagesInPrettyFormat(IOutputStream& output) const { for (const auto& message : ReceivedMessages_) { TPrettyTable::TRow& row = OutputTable_->AddRow(); for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) { @@ -126,7 +126,7 @@ namespace NYdb::NConsoleClient { OutputTable_->Print(output); } - void TTopicReader::PrintMessagesInJsonArrayFormat(IOutputStream& output) { + void TTopicReader::PrintMessagesInJsonArrayFormat(IOutputStream& output) const { // TODO(shmel1k@): not implemented yet. Y_UNUSED(output); } @@ -167,7 +167,16 @@ namespace NYdb::NConsoleClient { } int TTopicReader::HandleDataReceivedEvent(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent* event, IOutputStream& output) { - event->GetPartitionSession()->RequestStatus(); + ui64 sessionId = event->GetPartitionSession()->GetPartitionSessionId(); + if (!HasSession(sessionId)) { + return EXIT_SUCCESS; + } + + if (ActivePartitionSessions_[sessionId].second == EReadingStatus::PartitionWithoutData) { + ActivePartitionSessions_[sessionId].second = EReadingStatus::PartitionWithData; + ++PartitionsBeingRead_; + } + HasFirstMessage_ = true; NTopic::TDeferredCommit defCommit; @@ -194,14 +203,23 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } + bool TTopicReader::HasSession(ui64 sessionId) const { + auto f = ActivePartitionSessions_.find(sessionId); + return !(f == ActivePartitionSessions_.end()); + } + int TTopicReader::HandleStartPartitionSessionEvent(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent* event) { event->Confirm(); + + EReadingStatus readingStatus = EReadingStatus::PartitionWithData; if (event->GetCommittedOffset() == event->GetEndOffset()) { - ReadingStatus_ = EReadingStatus::PartitionWithoutData; + readingStatus = EReadingStatus::PartitionWithoutData; } else { - ReadingStatus_ = EReadingStatus::PartitionWithData; + ++PartitionsBeingRead_; } + ActivePartitionSessions_.insert({event->GetPartitionSession()->GetPartitionSessionId(), {event->GetPartitionSession(), readingStatus}}); + return EXIT_SUCCESS; } @@ -212,29 +230,71 @@ namespace NYdb::NConsoleClient { } int TTopicReader::HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent* event) { - if (event->GetReadOffset() == event->GetCommittedOffset()) { - ReadingStatus_ = EReadingStatus::PartitionWithoutData; + ui64 sessionId = event->GetPartitionSession()->GetPartitionSessionId(); + if (!HasSession(sessionId)) { + return EXIT_SUCCESS; + } + + auto status = ActivePartitionSessions_.find(sessionId); + EReadingStatus currentPartitionStatus = status->second.second; + if (event->GetEndOffset() == event->GetCommittedOffset()) { + if (currentPartitionStatus == EReadingStatus::PartitionWithData) { + --PartitionsBeingRead_; + } + ActivePartitionSessions_[sessionId].second = EReadingStatus::PartitionWithoutData; } else { - ReadingStatus_ = EReadingStatus::PartitionWithData; + if (currentPartitionStatus == EReadingStatus::PartitionWithoutData || currentPartitionStatus == EReadingStatus::NoPartitionTaken) { + ++PartitionsBeingRead_; + } + ActivePartitionSessions_[sessionId].second = EReadingStatus::PartitionWithData; } return EXIT_SUCCESS; } - int TTopicReader::HandleEvent(TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent>& event, IOutputStream& output) { - if (!event) { - return 0; + int TTopicReader::HandleStopPartitionSessionEvent(NTopic::TReadSessionEvent::TStopPartitionSessionEvent* event) { + if (!HasSession(event->GetPartitionSession()->GetPartitionSessionId())) { + return EXIT_SUCCESS; + } + + event->Confirm(); + + auto f = ActivePartitionSessions_.find(event->GetPartitionSession()->GetPartitionSessionId()); + if (f->second.second == EReadingStatus::PartitionWithData) { + --PartitionsBeingRead_; } + ActivePartitionSessions_.erase(event->GetPartitionSession()->GetPartitionSessionId()); - if (auto* dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) { + return EXIT_SUCCESS; + } + + int TTopicReader::HandlePartitionSessionClosedEvent(NTopic::TReadSessionEvent::TPartitionSessionClosedEvent *event) { + if (!HasSession(event->GetPartitionSession()->GetPartitionSessionId())) { + return EXIT_SUCCESS; + } + + if (ActivePartitionSessions_[event->GetPartitionSession()->GetPartitionSessionId()].second == EReadingStatus::PartitionWithData) { + --PartitionsBeingRead_; + } + ActivePartitionSessions_.erase(event->GetPartitionSession()->GetPartitionSessionId()); + + return EXIT_SUCCESS; + } + + int TTopicReader::HandleEvent(NYdb::NTopic::TReadSessionEvent::TEvent& event, IOutputStream& output) { + if (auto* dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) { return HandleDataReceivedEvent(dataEvent, output); - } else if (auto* createPartitionStreamEvent = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { + } else if (auto* createPartitionStreamEvent = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) { return HandleStartPartitionSessionEvent(createPartitionStreamEvent); - } else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) { + } else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) { return HandleCommitOffsetAcknowledgementEvent(commitEvent); - } else if (auto* partitionStatusEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) { + } else if (auto* partitionStatusEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) { return HandlePartitionSessionStatusEvent(partitionStatusEvent); - } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) { + } else if (auto* stopPartitionSessionEvent = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) { + return HandleStopPartitionSessionEvent(stopPartitionSessionEvent); + } else if (auto* partitionSessionClosedEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) { + return HandlePartitionSessionClosedEvent(partitionSessionClosedEvent); + } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&event)) { ThrowOnError(*sessionClosedEvent); return 1; } @@ -244,37 +304,35 @@ namespace NYdb::NConsoleClient { int TTopicReader::Run(IOutputStream& output) { LastMessageReceivedTs_ = TInstant::Now(); - bool waitForever = ReaderParams_.Wait() && (ReaderParams_.MessagingFormat() == EMessagingFormat::NewlineDelimited || ReaderParams_.MessagingFormat() == EMessagingFormat::Concatenated); + bool waitForever = (ReaderParams_.Wait() || (ReaderParams_.Limit().Defined() && *ReaderParams_.Limit() == 0)) && + (ReaderParams_.MessagingFormat() == EMessagingFormat::NewlineDelimited || + ReaderParams_.MessagingFormat() == EMessagingFormat::Concatenated); while ((MessagesLeft_ > 0 || MessagesLeft_ == -1) && !IsInterrupted()) { TInstant messageReceiveDeadline = LastMessageReceivedTs_ + ReaderParams_.IdleTimeout(); NThreading::TFuture<void> future = ReadSession_->WaitEvent(); future.Wait(messageReceiveDeadline); - if (!future.HasValue()) { - if (waitForever) { - LastMessageReceivedTs_ = TInstant::Now(); - continue; - } - - bool isReading = ReadingStatus_ == EReadingStatus::PartitionWithData; - if (!isReading || (isReading && HasFirstMessage_)) { - return EXIT_SUCCESS; + if (future.HasValue()) { + // TODO(shmel1k@): throttling? + // TODO(shmel1k@): think about limiting size of events + TVector<NTopic::TReadSessionEvent::TEvent> events = ReadSession_->GetEvents(true); + for (auto& event : events) { + if (int status = HandleEvent(event, output); status) { + return status; + } } - LastMessageReceivedTs_ = TInstant::Now(); continue; } - // TODO(shmel1k@): throttling? - // TODO(shmel1k@): think about limiting size of events - TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession_->GetEvent(true); - if (!event) { - // TODO(shmel1k@): does it work properly? + if (waitForever) { + LastMessageReceivedTs_ = TInstant::Now(); continue; } - if (int status = HandleEvent(event, output); status) { - return status; + bool isReading = PartitionsBeingRead_ > 0; + if (!isReading || (isReading && HasFirstMessage_)) { + return EXIT_SUCCESS; } } return EXIT_SUCCESS; diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h index 1f42e6f677a..b74e1d88852 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.h +++ b/ydb/public/lib/ydb_cli/topic/topic_read.h @@ -69,13 +69,15 @@ namespace NYdb::NConsoleClient { int HandleStartPartitionSessionEvent(NTopic::TReadSessionEvent::TStartPartitionSessionEvent*); int HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent*); + int HandleStopPartitionSessionEvent(NTopic::TReadSessionEvent::TStopPartitionSessionEvent*); + int HandlePartitionSessionClosedEvent(NTopic::TReadSessionEvent::TPartitionSessionClosedEvent*); int HandleDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent*, IOutputStream&); int HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent*); - int HandleEvent(TMaybe<NTopic::TReadSessionEvent::TEvent>&, IOutputStream&); + int HandleEvent(NTopic::TReadSessionEvent::TEvent&, IOutputStream&); private: - void PrintMessagesInPrettyFormat(IOutputStream& output); - void PrintMessagesInJsonArrayFormat(IOutputStream& output); + void PrintMessagesInPrettyFormat(IOutputStream& output) const; + void PrintMessagesInJsonArrayFormat(IOutputStream& output) const; enum EReadingStatus { NoPartitionTaken = 0, @@ -83,6 +85,8 @@ namespace NYdb::NConsoleClient { PartitionWithData = 2, }; + bool HasSession(ui64 sessionId) const; + private: std::shared_ptr<NTopic::IReadSession> ReadSession_; const TTopicReaderSettings ReaderParams_; @@ -94,8 +98,10 @@ namespace NYdb::NConsoleClient { std::unique_ptr<TPrettyTable> OutputTable_; TVector<TReceivedMessage> ReceivedMessages_; - EReadingStatus ReadingStatus_ = EReadingStatus::NoPartitionTaken; + ui32 PartitionsBeingRead_ = 0; friend class TTopicReaderTests; + + THashMap<ui64, std::pair<NTopic::TPartitionSession::TPtr, EReadingStatus>> ActivePartitionSessions_; }; } // namespace NYdb::NConsoleClient
\ No newline at end of file |