aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-09-02 20:36:30 +0300
committershmel1k <shmel1k@ydb.tech>2022-09-02 20:36:30 +0300
commit19d2cde32aa15c99a0673eb3752a1834fb5605bf (patch)
tree9428e9c42d7cefc2845f4a7a54ab81d4192f5cc4
parent1be29b7cc0683062a9229d6e6c17ef77512736fd (diff)
downloadydb-19d2cde32aa15c99a0673eb3752a1834fb5605bf.tar.gz
[] fix data absence in topic
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.cpp30
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.h9
2 files changed, 34 insertions, 5 deletions
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
index ecbb258efa7..1ee65694354 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
@@ -167,6 +167,7 @@ namespace NYdb::NConsoleClient {
}
int TTopicReader::HandleDataReceivedEvent(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent* event, IOutputStream& output) {
+ event->GetPartitionSession()->RequestStatus();
HasFirstMessage_ = true;
NTopic::TDeferredCommit defCommit;
@@ -195,6 +196,11 @@ namespace NYdb::NConsoleClient {
int TTopicReader::HandleStartPartitionSessionEvent(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent* event) {
event->Confirm();
+ if (event->GetCommittedOffset() == event->GetEndOffset()) {
+ ReadingStatus_ = EReadingStatus::PartitionWithoutData;
+ } else {
+ ReadingStatus_ = EReadingStatus::PartitionWithData;
+ }
return EXIT_SUCCESS;
}
@@ -205,6 +211,16 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
+ int TTopicReader::HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent* event) {
+ if (event->GetReadOffset() == event->GetCommittedOffset()) {
+ ReadingStatus_ = EReadingStatus::PartitionWithoutData;
+ } else {
+ ReadingStatus_ = EReadingStatus::PartitionWithData;
+ }
+
+ return EXIT_SUCCESS;
+ }
+
int TTopicReader::HandleEvent(TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent>& event, IOutputStream& output) {
if (!event) {
return 0;
@@ -216,6 +232,8 @@ namespace NYdb::NConsoleClient {
return HandleStartPartitionSessionEvent(createPartitionStreamEvent);
} else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) {
return HandleCommitOffsetAcknowledgementEvent(commitEvent);
+ } else if (auto* partitionStatusEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) {
+ return HandlePartitionSessionStatusEvent(partitionStatusEvent);
} else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) {
ThrowOnError(*sessionClosedEvent);
return 1;
@@ -233,16 +251,18 @@ namespace NYdb::NConsoleClient {
NThreading::TFuture<void> future = ReadSession_->WaitEvent();
future.Wait(messageReceiveDeadline);
if (!future.HasValue()) {
- if (ReaderParams_.Wait() && !HasFirstMessage_) {
- LastMessageReceivedTs_ = Now();
+ if (waitForever) {
+ LastMessageReceivedTs_ = TInstant::Now();
continue;
}
- if (waitForever) {
- continue;
+ bool isReading = ReadingStatus_ == EReadingStatus::PartitionWithData;
+ if (!isReading || (isReading && HasFirstMessage_)) {
+ return EXIT_SUCCESS;
}
- return EXIT_SUCCESS;
+ LastMessageReceivedTs_ = TInstant::Now();
+ continue;
}
// TODO(shmel1k@): throttling?
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h
index cc6278c77b2..1f42e6f677a 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.h
@@ -68,6 +68,7 @@ namespace NYdb::NConsoleClient {
void HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output);
int HandleStartPartitionSessionEvent(NTopic::TReadSessionEvent::TStartPartitionSessionEvent*);
+ int HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent*);
int HandleDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent*, IOutputStream&);
int HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent*);
int HandleEvent(TMaybe<NTopic::TReadSessionEvent::TEvent>&, IOutputStream&);
@@ -76,6 +77,12 @@ namespace NYdb::NConsoleClient {
void PrintMessagesInPrettyFormat(IOutputStream& output);
void PrintMessagesInJsonArrayFormat(IOutputStream& output);
+ enum EReadingStatus {
+ NoPartitionTaken = 0,
+ PartitionWithoutData = 1,
+ PartitionWithData = 2,
+ };
+
private:
std::shared_ptr<NTopic::IReadSession> ReadSession_;
const TTopicReaderSettings ReaderParams_;
@@ -87,6 +94,8 @@ namespace NYdb::NConsoleClient {
std::unique_ptr<TPrettyTable> OutputTable_;
TVector<TReceivedMessage> ReceivedMessages_;
+ EReadingStatus ReadingStatus_ = EReadingStatus::NoPartitionTaken;
+
friend class TTopicReaderTests;
};
} // namespace NYdb::NConsoleClient \ No newline at end of file