aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-09-09 12:55:57 +0300
committershmel1k <shmel1k@ydb.tech>2022-09-09 12:55:57 +0300
commit50ad0f80e9c732af38309a9c9e03353c888bb323 (patch)
treefc9c82c019a50f91d2acf4ad2fc886d877307724
parent422aefd03b121a1966b80756d8a06a86d9787e04 (diff)
downloadydb-50ad0f80e9c732af38309a9c9e03353c888bb323.tar.gz
[] fix partition status for multiple partitions
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.cpp126
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.h14
2 files changed, 102 insertions, 38 deletions
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
index 1ee65694354..d898e3cd9fe 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
@@ -114,7 +114,7 @@ namespace NYdb::NConsoleClient {
}
} // namespace
- void TTopicReader::PrintMessagesInPrettyFormat(IOutputStream& output) {
+ void TTopicReader::PrintMessagesInPrettyFormat(IOutputStream& output) const {
for (const auto& message : ReceivedMessages_) {
TPrettyTable::TRow& row = OutputTable_->AddRow();
for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) {
@@ -126,7 +126,7 @@ namespace NYdb::NConsoleClient {
OutputTable_->Print(output);
}
- void TTopicReader::PrintMessagesInJsonArrayFormat(IOutputStream& output) {
+ void TTopicReader::PrintMessagesInJsonArrayFormat(IOutputStream& output) const {
// TODO(shmel1k@): not implemented yet.
Y_UNUSED(output);
}
@@ -167,7 +167,16 @@ namespace NYdb::NConsoleClient {
}
int TTopicReader::HandleDataReceivedEvent(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent* event, IOutputStream& output) {
- event->GetPartitionSession()->RequestStatus();
+ ui64 sessionId = event->GetPartitionSession()->GetPartitionSessionId();
+ if (!HasSession(sessionId)) {
+ return EXIT_SUCCESS;
+ }
+
+ if (ActivePartitionSessions_[sessionId].second == EReadingStatus::PartitionWithoutData) {
+ ActivePartitionSessions_[sessionId].second = EReadingStatus::PartitionWithData;
+ ++PartitionsBeingRead_;
+ }
+
HasFirstMessage_ = true;
NTopic::TDeferredCommit defCommit;
@@ -194,14 +203,23 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
+ bool TTopicReader::HasSession(ui64 sessionId) const {
+ auto f = ActivePartitionSessions_.find(sessionId);
+ return !(f == ActivePartitionSessions_.end());
+ }
+
int TTopicReader::HandleStartPartitionSessionEvent(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent* event) {
event->Confirm();
+
+ EReadingStatus readingStatus = EReadingStatus::PartitionWithData;
if (event->GetCommittedOffset() == event->GetEndOffset()) {
- ReadingStatus_ = EReadingStatus::PartitionWithoutData;
+ readingStatus = EReadingStatus::PartitionWithoutData;
} else {
- ReadingStatus_ = EReadingStatus::PartitionWithData;
+ ++PartitionsBeingRead_;
}
+ ActivePartitionSessions_.insert({event->GetPartitionSession()->GetPartitionSessionId(), {event->GetPartitionSession(), readingStatus}});
+
return EXIT_SUCCESS;
}
@@ -212,29 +230,71 @@ namespace NYdb::NConsoleClient {
}
int TTopicReader::HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent* event) {
- if (event->GetReadOffset() == event->GetCommittedOffset()) {
- ReadingStatus_ = EReadingStatus::PartitionWithoutData;
+ ui64 sessionId = event->GetPartitionSession()->GetPartitionSessionId();
+ if (!HasSession(sessionId)) {
+ return EXIT_SUCCESS;
+ }
+
+ auto status = ActivePartitionSessions_.find(sessionId);
+ EReadingStatus currentPartitionStatus = status->second.second;
+ if (event->GetEndOffset() == event->GetCommittedOffset()) {
+ if (currentPartitionStatus == EReadingStatus::PartitionWithData) {
+ --PartitionsBeingRead_;
+ }
+ ActivePartitionSessions_[sessionId].second = EReadingStatus::PartitionWithoutData;
} else {
- ReadingStatus_ = EReadingStatus::PartitionWithData;
+ if (currentPartitionStatus == EReadingStatus::PartitionWithoutData || currentPartitionStatus == EReadingStatus::NoPartitionTaken) {
+ ++PartitionsBeingRead_;
+ }
+ ActivePartitionSessions_[sessionId].second = EReadingStatus::PartitionWithData;
}
return EXIT_SUCCESS;
}
- int TTopicReader::HandleEvent(TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent>& event, IOutputStream& output) {
- if (!event) {
- return 0;
+ int TTopicReader::HandleStopPartitionSessionEvent(NTopic::TReadSessionEvent::TStopPartitionSessionEvent* event) {
+ if (!HasSession(event->GetPartitionSession()->GetPartitionSessionId())) {
+ return EXIT_SUCCESS;
+ }
+
+ event->Confirm();
+
+ auto f = ActivePartitionSessions_.find(event->GetPartitionSession()->GetPartitionSessionId());
+ if (f->second.second == EReadingStatus::PartitionWithData) {
+ --PartitionsBeingRead_;
}
+ ActivePartitionSessions_.erase(event->GetPartitionSession()->GetPartitionSessionId());
- if (auto* dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicReader::HandlePartitionSessionClosedEvent(NTopic::TReadSessionEvent::TPartitionSessionClosedEvent *event) {
+ if (!HasSession(event->GetPartitionSession()->GetPartitionSessionId())) {
+ return EXIT_SUCCESS;
+ }
+
+ if (ActivePartitionSessions_[event->GetPartitionSession()->GetPartitionSessionId()].second == EReadingStatus::PartitionWithData) {
+ --PartitionsBeingRead_;
+ }
+ ActivePartitionSessions_.erase(event->GetPartitionSession()->GetPartitionSessionId());
+
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicReader::HandleEvent(NYdb::NTopic::TReadSessionEvent::TEvent& event, IOutputStream& output) {
+ if (auto* dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
return HandleDataReceivedEvent(dataEvent, output);
- } else if (auto* createPartitionStreamEvent = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
+ } else if (auto* createPartitionStreamEvent = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
return HandleStartPartitionSessionEvent(createPartitionStreamEvent);
- } else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) {
+ } else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
return HandleCommitOffsetAcknowledgementEvent(commitEvent);
- } else if (auto* partitionStatusEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) {
+ } else if (auto* partitionStatusEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
return HandlePartitionSessionStatusEvent(partitionStatusEvent);
- } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) {
+ } else if (auto* stopPartitionSessionEvent = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
+ return HandleStopPartitionSessionEvent(stopPartitionSessionEvent);
+ } else if (auto* partitionSessionClosedEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
+ return HandlePartitionSessionClosedEvent(partitionSessionClosedEvent);
+ } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
ThrowOnError(*sessionClosedEvent);
return 1;
}
@@ -244,37 +304,35 @@ namespace NYdb::NConsoleClient {
int TTopicReader::Run(IOutputStream& output) {
LastMessageReceivedTs_ = TInstant::Now();
- bool waitForever = ReaderParams_.Wait() && (ReaderParams_.MessagingFormat() == EMessagingFormat::NewlineDelimited || ReaderParams_.MessagingFormat() == EMessagingFormat::Concatenated);
+ bool waitForever = (ReaderParams_.Wait() || (ReaderParams_.Limit().Defined() && *ReaderParams_.Limit() == 0)) &&
+ (ReaderParams_.MessagingFormat() == EMessagingFormat::NewlineDelimited ||
+ ReaderParams_.MessagingFormat() == EMessagingFormat::Concatenated);
while ((MessagesLeft_ > 0 || MessagesLeft_ == -1) && !IsInterrupted()) {
TInstant messageReceiveDeadline = LastMessageReceivedTs_ + ReaderParams_.IdleTimeout();
NThreading::TFuture<void> future = ReadSession_->WaitEvent();
future.Wait(messageReceiveDeadline);
- if (!future.HasValue()) {
- if (waitForever) {
- LastMessageReceivedTs_ = TInstant::Now();
- continue;
- }
-
- bool isReading = ReadingStatus_ == EReadingStatus::PartitionWithData;
- if (!isReading || (isReading && HasFirstMessage_)) {
- return EXIT_SUCCESS;
+ if (future.HasValue()) {
+ // TODO(shmel1k@): throttling?
+ // TODO(shmel1k@): think about limiting size of events
+ TVector<NTopic::TReadSessionEvent::TEvent> events = ReadSession_->GetEvents(true);
+ for (auto& event : events) {
+ if (int status = HandleEvent(event, output); status) {
+ return status;
+ }
}
- LastMessageReceivedTs_ = TInstant::Now();
continue;
}
- // TODO(shmel1k@): throttling?
- // TODO(shmel1k@): think about limiting size of events
- TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession_->GetEvent(true);
- if (!event) {
- // TODO(shmel1k@): does it work properly?
+ if (waitForever) {
+ LastMessageReceivedTs_ = TInstant::Now();
continue;
}
- if (int status = HandleEvent(event, output); status) {
- return status;
+ bool isReading = PartitionsBeingRead_ > 0;
+ if (!isReading || (isReading && HasFirstMessage_)) {
+ return EXIT_SUCCESS;
}
}
return EXIT_SUCCESS;
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h
index 1f42e6f677a..b74e1d88852 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.h
@@ -69,13 +69,15 @@ namespace NYdb::NConsoleClient {
int HandleStartPartitionSessionEvent(NTopic::TReadSessionEvent::TStartPartitionSessionEvent*);
int HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent*);
+ int HandleStopPartitionSessionEvent(NTopic::TReadSessionEvent::TStopPartitionSessionEvent*);
+ int HandlePartitionSessionClosedEvent(NTopic::TReadSessionEvent::TPartitionSessionClosedEvent*);
int HandleDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent*, IOutputStream&);
int HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent*);
- int HandleEvent(TMaybe<NTopic::TReadSessionEvent::TEvent>&, IOutputStream&);
+ int HandleEvent(NTopic::TReadSessionEvent::TEvent&, IOutputStream&);
private:
- void PrintMessagesInPrettyFormat(IOutputStream& output);
- void PrintMessagesInJsonArrayFormat(IOutputStream& output);
+ void PrintMessagesInPrettyFormat(IOutputStream& output) const;
+ void PrintMessagesInJsonArrayFormat(IOutputStream& output) const;
enum EReadingStatus {
NoPartitionTaken = 0,
@@ -83,6 +85,8 @@ namespace NYdb::NConsoleClient {
PartitionWithData = 2,
};
+ bool HasSession(ui64 sessionId) const;
+
private:
std::shared_ptr<NTopic::IReadSession> ReadSession_;
const TTopicReaderSettings ReaderParams_;
@@ -94,8 +98,10 @@ namespace NYdb::NConsoleClient {
std::unique_ptr<TPrettyTable> OutputTable_;
TVector<TReceivedMessage> ReceivedMessages_;
- EReadingStatus ReadingStatus_ = EReadingStatus::NoPartitionTaken;
+ ui32 PartitionsBeingRead_ = 0;
friend class TTopicReaderTests;
+
+ THashMap<ui64, std::pair<NTopic::TPartitionSession::TPtr, EReadingStatus>> ActivePartitionSessions_;
};
} // namespace NYdb::NConsoleClient \ No newline at end of file