aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-08-24 16:08:11 +0300
committeralexnick <alexnick@ydb.tech>2023-08-24 16:23:05 +0300
commitde06ce7ca02187625397ef74e16bbfbf743ac686 (patch)
tree361875adf5f13e87d811a8fc867931b0e93406e5
parent83f0168df7c60e323f0b6ebea8ac12012fca5f03 (diff)
downloadydb-de06ce7ca02187625397ef74e16bbfbf743ac686.tar.gz
more logs
more logs
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp81
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h5
2 files changed, 83 insertions, 3 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index db45a8ef1f..908ee39033 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -801,6 +801,14 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
bool result = false;
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
UpdateTimedCountersImpl();
+ if (SentOriginalMessages.empty() || SentOriginalMessages.front().SeqNo != sequenceNumber){
+ Cerr << "State before restart was:\n" << StateStr << "\n\n";
+ DumpState();
+ Cerr << "State on ack with seqNo " << sequenceNumber << " is:\n";
+ Cerr << StateStr << "\n\n";
+ Y_FAIL("got unknown ack");
+ }
+
const auto& sentFront = SentOriginalMessages.front();
ui64 size = 0;
ui64 compressedSize = 0;
@@ -940,6 +948,8 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
void TWriteSessionImpl::ResetForRetryImpl() {
Y_VERIFY(Lock.IsLocked());
+ DumpState();
+
SessionEstablished = false;
const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size();
const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size();
@@ -1045,7 +1055,7 @@ size_t GetMaxGrpcMessageSize() {
return 120_MB;
}
-bool TWriteSessionImpl::IsReadyToSendNextImpl() const {
+bool TWriteSessionImpl::IsReadyToSendNextImpl() {
Y_VERIFY(Lock.IsLocked());
if (!SessionEstablished) {
@@ -1057,11 +1067,78 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const {
return false;
}
Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages");
- Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)");
+ if (OriginalMessagesToSend.front().SeqNo > PackedMessagesToSend.top().Offset) {
+
+ Cerr << " State before restart was:\n" << StateStr << "\n\n";
+ DumpState();
+ Cerr << " State after restart is:\n" << StateStr << "\n\n";
+ Y_FAIL("Lost original message(s)");
+ }
return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo;
}
+void TWriteSessionImpl::DumpState() {
+ TStringBuilder s;
+ s << "STATE:\nSeqNoShift = " << SeqNoShift << "\n";
+
+ auto omts = OriginalMessagesToSend;
+ s << "OriginalMessagesToSend(" << omts.size() << "):";
+ ui32 i = 20;
+ while(!omts.empty() && i-- > 0) {
+ s << " " << omts.front().SeqNo;
+ omts.pop();
+ }
+ if (!omts.empty()) s << " ...";
+ s << "\n";
+
+ s << "SentOriginalMessages(" << SentOriginalMessages.size() << "):";
+ omts = SentOriginalMessages;
+ i = 20;
+ while(!omts.empty() && i-- > 0) {
+ s << " " << omts.front().SeqNo;
+ omts.pop();
+ }
+ if (omts.size() > 20) {
+ s << " ...";
+ for (ui32 skip = omts.size() <= 20 ? 0 : (omts.size() - 20); skip > 0; --skip) {
+ omts.pop();
+ }
+ }
+ while(!omts.empty()) {
+ s << " " << omts.front().SeqNo;
+ omts.pop();
+ }
+ s << "\n";
+ s << "PackedMessagesToSend(" << PackedMessagesToSend.size() << "):";
+ i = 20;
+ std::vector<TBlock> tmpPackedMessagesToSend;
+ while (!PackedMessagesToSend.empty() && i-- > 0) {
+ s << " (" << PackedMessagesToSend.top().Offset << ", " << PackedMessagesToSend.top().MessageCount << ")";
+ TBlock block;
+ block.Move(PackedMessagesToSend.top());
+ PackedMessagesToSend.pop();
+ tmpPackedMessagesToSend.emplace_back(std::move(block));
+ }
+ if (PackedMessagesToSend.size() > 0) s << " ...";
+ s << "\n";
+ for (auto it = tmpPackedMessagesToSend.begin(); it != tmpPackedMessagesToSend.end(); ++it) {
+ PackedMessagesToSend.push(std::move(*it));
+ }
+ tmpPackedMessagesToSend.clear();
+
+ auto spm = std::move(SentPackedMessage);
+ s << "SentPackedMessages(" << spm.size() << "):";
+ while(!spm.empty()) {
+ s << " (" << spm.front().Offset << ", " << spm.front().MessageCount << ")";
+ SentPackedMessage.push(std::move(spm.front()));
+ spm.pop();
+ }
+ s << "\n";
+
+ StateStr = s;
+}
+
void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
Y_VERIFY(Lock.IsLocked());
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
index 3b25b28d9c..8124fc6032 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
@@ -365,7 +365,8 @@ private:
//TString GetDebugIdentity() const;
Ydb::PersQueue::V1::StreamingWriteClientMessage GetInitClientMessage();
bool CleanupOnAcknowledged(ui64 sequenceNumber);
- bool IsReadyToSendNextImpl() const;
+ bool IsReadyToSendNextImpl();
+ void DumpState();
ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo);
void SendImpl();
void AbortImpl();
@@ -443,6 +444,8 @@ private:
TWriterCounters::TPtr Counters;
TDuration WakeupInterval;
+ TString StateStr;
+
protected:
ui64 MessagesAcquired = 0;
};