diff options
author | alexnick <alexnick@ydb.tech> | 2023-08-24 16:08:11 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-08-24 16:23:05 +0300 |
commit | de06ce7ca02187625397ef74e16bbfbf743ac686 (patch) | |
tree | 361875adf5f13e87d811a8fc867931b0e93406e5 | |
parent | 83f0168df7c60e323f0b6ebea8ac12012fca5f03 (diff) | |
download | ydb-de06ce7ca02187625397ef74e16bbfbf743ac686.tar.gz |
more logs
more logs
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp | 81 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h | 5 |
2 files changed, 83 insertions, 3 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index db45a8ef1f..908ee39033 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -801,6 +801,14 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { bool result = false; LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); UpdateTimedCountersImpl(); + if (SentOriginalMessages.empty() || SentOriginalMessages.front().SeqNo != sequenceNumber){ + Cerr << "State before restart was:\n" << StateStr << "\n\n"; + DumpState(); + Cerr << "State on ack with seqNo " << sequenceNumber << " is:\n"; + Cerr << StateStr << "\n\n"; + Y_FAIL("got unknown ack"); + } + const auto& sentFront = SentOriginalMessages.front(); ui64 size = 0; ui64 compressedSize = 0; @@ -940,6 +948,8 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { void TWriteSessionImpl::ResetForRetryImpl() { Y_VERIFY(Lock.IsLocked()); + DumpState(); + SessionEstablished = false; const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size(); const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size(); @@ -1045,7 +1055,7 @@ size_t GetMaxGrpcMessageSize() { return 120_MB; } -bool TWriteSessionImpl::IsReadyToSendNextImpl() const { +bool TWriteSessionImpl::IsReadyToSendNextImpl() { Y_VERIFY(Lock.IsLocked()); if (!SessionEstablished) { @@ -1057,11 +1067,78 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const { return false; } Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); - Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); + if (OriginalMessagesToSend.front().SeqNo > PackedMessagesToSend.top().Offset) { + + Cerr << " State before restart was:\n" << StateStr << "\n\n"; + DumpState(); + Cerr << " State after restart is:\n" << StateStr << "\n\n"; + Y_FAIL("Lost original message(s)"); + } return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; } +void TWriteSessionImpl::DumpState() { + TStringBuilder s; + s << "STATE:\nSeqNoShift = " << SeqNoShift << "\n"; + + auto omts = OriginalMessagesToSend; + s << "OriginalMessagesToSend(" << omts.size() << "):"; + ui32 i = 20; + while(!omts.empty() && i-- > 0) { + s << " " << omts.front().SeqNo; + omts.pop(); + } + if (!omts.empty()) s << " ..."; + s << "\n"; + + s << "SentOriginalMessages(" << SentOriginalMessages.size() << "):"; + omts = SentOriginalMessages; + i = 20; + while(!omts.empty() && i-- > 0) { + s << " " << omts.front().SeqNo; + omts.pop(); + } + if (omts.size() > 20) { + s << " ..."; + for (ui32 skip = omts.size() <= 20 ? 0 : (omts.size() - 20); skip > 0; --skip) { + omts.pop(); + } + } + while(!omts.empty()) { + s << " " << omts.front().SeqNo; + omts.pop(); + } + s << "\n"; + s << "PackedMessagesToSend(" << PackedMessagesToSend.size() << "):"; + i = 20; + std::vector<TBlock> tmpPackedMessagesToSend; + while (!PackedMessagesToSend.empty() && i-- > 0) { + s << " (" << PackedMessagesToSend.top().Offset << ", " << PackedMessagesToSend.top().MessageCount << ")"; + TBlock block; + block.Move(PackedMessagesToSend.top()); + PackedMessagesToSend.pop(); + tmpPackedMessagesToSend.emplace_back(std::move(block)); + } + if (PackedMessagesToSend.size() > 0) s << " ..."; + s << "\n"; + for (auto it = tmpPackedMessagesToSend.begin(); it != tmpPackedMessagesToSend.end(); ++it) { + PackedMessagesToSend.push(std::move(*it)); + } + tmpPackedMessagesToSend.clear(); + + auto spm = std::move(SentPackedMessage); + s << "SentPackedMessages(" << spm.size() << "):"; + while(!spm.empty()) { + s << " (" << spm.front().Offset << ", " << spm.front().MessageCount << ")"; + SentPackedMessage.push(std::move(spm.front())); + spm.pop(); + } + s << "\n"; + + StateStr = s; +} + void TWriteSessionImpl::UpdateTokenIfNeededImpl() { Y_VERIFY(Lock.IsLocked()); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h index 3b25b28d9c..8124fc6032 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h @@ -365,7 +365,8 @@ private: //TString GetDebugIdentity() const; Ydb::PersQueue::V1::StreamingWriteClientMessage GetInitClientMessage(); bool CleanupOnAcknowledged(ui64 sequenceNumber); - bool IsReadyToSendNextImpl() const; + bool IsReadyToSendNextImpl(); + void DumpState(); ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); void SendImpl(); void AbortImpl(); @@ -443,6 +444,8 @@ private: TWriterCounters::TPtr Counters; TDuration WakeupInterval; + TString StateStr; + protected: ui64 MessagesAcquired = 0; }; |