diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-03-31 10:21:45 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-03-31 10:21:45 +0300 |
commit | 377d6b596d86d611b4ebb517d7639895ca8361f9 (patch) | |
tree | 71d2ae50c99ff276ddadc12f4a2a16f4201a77da | |
parent | e163b69a87c70baac07bd99142916735e0c283fa (diff) | |
download | ydb-377d6b596d86d611b4ebb517d7639895ca8361f9.tar.gz |
fix
log lazily via macros
6 files changed, 123 insertions, 103 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h new file mode 100644 index 00000000000..0635ef2cc89 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h @@ -0,0 +1,10 @@ +#pragma once + +#ifdef LOG_LAZY +#error log macro redefinition +#endif + +#define LOG_LAZY(log, priority, message) \ + if (log.IsOpen() && log.FiltrationLevel() >= priority) { \ + log.Write(priority, message); \ + } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index ca1b9cde81c..5b3abb3b48c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -2,6 +2,8 @@ #include "read_session.h" #include "common.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> #undef INCLUDE_YDB_INTERNAL_H @@ -82,7 +84,7 @@ void TReadSession::Start() { return; } - Log.Write(TLOG_INFO, GetLogPrefix() << "Starting read session"); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Starting read session"); if (Settings.DisableClusterDiscovery_) { ProceedWithoutClusterDiscovery(); } else { @@ -118,7 +120,7 @@ void TReadSession::StartClusterDiscovery() { return; } - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Starting cluster discovery"); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Starting cluster discovery"); ClusterDiscoveryDelayContext = nullptr; } @@ -181,7 +183,7 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { if (sessionSettings.MaxMemoryUsageBytes_ > clusterSessionsCount && sessionSettings.MaxMemoryUsageBytes_ != std::numeric_limits<size_t>::max()) { sessionSettings.MaxMemoryUsageBytes_ /= clusterSessionsCount; } - Log.Write( + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Starting session to cluster " << clusterName << " (" << clusterSessionInfo.ClusterEndpoint << ")" @@ -225,7 +227,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu } TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status.GetStatus()); if (retryDelay) { - Log.Write( + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Cluster discovery request failed. Status: " << status.GetStatus() << ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\"" @@ -237,7 +239,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu return; } - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Cluster discovery request succeeded"); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Cluster discovery request succeeded"); ClusterDiscoveryRetryState = nullptr; // Init ClusterSessions. @@ -317,7 +319,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions if (Aborting || Closing) { return; } - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay); auto startCallback = [self = weak_from_this()](bool ok) { if (ok) { if (auto sharedSelf = self.lock()) { @@ -337,7 +339,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions } bool TReadSession::Close(TDuration timeout) { - Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); // Log final counters. DumpCountersToLog(); @@ -420,7 +422,7 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions< if (!Aborting) { Aborting = true; - Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); + LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); if (ClusterDiscoveryDelayContext) { ClusterDiscoveryDelayContext->Cancel(); ClusterDiscoveryDelayContext.reset(); @@ -486,7 +488,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB } void TReadSession::StopReadingData() { - Log.Write(TLOG_INFO, GetLogPrefix() << "Stop reading data"); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Stop reading data"); with_lock (Lock) { if (!DataReadingSuspended) { DataReadingSuspended = true; @@ -501,7 +503,7 @@ void TReadSession::StopReadingData() { } void TReadSession::ResumeReadingData() { - Log.Write(TLOG_INFO, GetLogPrefix() << "Resume reading data"); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Resume reading data"); with_lock (Lock) { if (DataReadingSuspended) { DataReadingSuspended = false; @@ -516,7 +518,7 @@ void TReadSession::ResumeReadingData() { } void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) { - Log.Write(TLOG_DEBUG, GetLogPrefix() + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "The application data is transferred to the client. Number of messages " << messagesCount << ", size " @@ -574,7 +576,7 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) { /**/ if (logCounters) { - Log.Write(TLOG_INFO, + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Counters: {" C(Errors) C(CurrentSessionLifetimeMs) diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index e229b45626e..4d9d5192249 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -6,6 +6,8 @@ #include "persqueue_impl.h" #include "common.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> #undef INCLUDE_YDB_INTERNAL_H @@ -256,7 +258,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain std::function<void(bool)> connectTimeoutCallback; if (!status.Ok()) { - Log.Write(TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues)); } @@ -290,7 +292,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain } } - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in " << delay); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in " << delay); ++ConnectionAttemptsDone; @@ -345,7 +347,7 @@ template <bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReconnectImpl( TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred) { Y_VERIFY(Lock.IsLocked()); - Log.Write(TLOG_INFO, + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Break connection due to unexpected message from server. Status: " << status.Status << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\""); @@ -424,7 +426,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect( template<> inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true>& deferred) { Y_VERIFY(Lock.IsLocked()); - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); TClientMessage<true> req; auto& init = *req.mutable_init_request(); init.set_ranges_mode(GetRangesMode()); @@ -455,7 +457,7 @@ inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true> template<> inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<false>& deferred) { Y_VERIFY(Lock.IsLocked()); - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); TClientMessage<false> req; auto& init = *req.mutable_init_request(); @@ -550,7 +552,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream if (commitOffset) { commitOffsetLogStr << ". Commit offset: " << *commitOffset; } - Log.Write( + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Confirm partition stream create. Partition stream id: " << GetPartitionStreamId(partitionStream) << ". Cluster: \"" << GetCluster(partitionStream) << "\". Topic: \"" << partitionStream->GetTopicPath() @@ -560,7 +562,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream with_lock (Lock) { if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - Log.Write( + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Skip partition stream create confirm. Partition stream id: " << GetPartitionStreamId(partitionStream) @@ -599,7 +601,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { - Log.Write( + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Confirm partition stream destroy. Partition stream id: " << GetPartitionStreamId(partitionStream) @@ -610,7 +612,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream TDeferredActions<UseMigrationProtocol> deferred; with_lock (Lock) { if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - Log.Write( + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Skip partition stream destroy confirm. Partition stream id: " << GetPartitionStreamId(partitionStream) @@ -656,7 +658,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset) { - Log.Write( + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Commit offsets [" << startOffset << ", " << endOffset << "). Partition stream id: " << GetPartitionStreamId(partitionStream) @@ -703,7 +705,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Commit(const TPartitio template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RequestPartitionStreamStatus(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { - Log.Write( + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Requesting status for partition stream id: " << GetPartitionStreamId(partitionStream) ); @@ -732,7 +734,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RequestPartitionStream template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) { - Log.Write(TLOG_DEBUG, GetLogPrefix() + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "The application data is transferred to the client. Number of messages " << messagesCount << ", size " @@ -901,7 +903,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Y_VERIFY(Lock.IsLocked()); Y_UNUSED(deferred); - Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); RetryState = nullptr; @@ -1066,7 +1068,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( TDeferredActions<true>& deferred) { Y_VERIFY(Lock.IsLocked()); - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); TMap<ui64, TIntrusivePtr<TPartitionStreamImpl<true>>> partitionStreams; for (const Ydb::PersQueue::V1::CommitCookie& cookieProto : msg.cookies()) { @@ -1128,7 +1130,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( RetryState = nullptr; - Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); // Successful init. Do nothing. ContinueReadingDataImpl(); @@ -1279,7 +1281,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( TDeferredActions<false>& deferred) { Y_VERIFY(Lock.IsLocked()); - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); for (const auto& rangeProto : msg.partitions_committed_offsets()) { auto partitionStreamIt = PartitionStreams.find(rangeProto.partition_session_id()); @@ -1439,7 +1441,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64 template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Abort() { - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster"); + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster"); with_lock (Lock) { AbortImpl(); @@ -2348,10 +2350,10 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart // Clear data to free internal session's memory. messageData.clear_data(); - partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() - << "Take Data. Partition " << partitionStream->GetPartitionId() - << ". Read: {" << Batch << ", " << Message << "} (" - << minOffset << "-" << maxOffset << ")"); + LOG_LAZY(partitionStream->GetLog(), TLOG_DEBUG, TStringBuilder() + << "Take Data. Partition " << partitionStream->GetPartitionId() + << ". Read: {" << Batch << ", " << Message << "} (" + << minOffset << "-" << maxOffset << ")"); } template<bool UseMigrationProtocol> @@ -2467,9 +2469,9 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator( } } if (auto session = Parent->Session.lock()) { - session->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: " - << partition_id << " (" << minOffset << "-" - << maxOffset << ")"); + LOG_LAZY(session->GetLog(), TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: " + << partition_id << " (" << minOffset << "-" + << maxOffset << ")"); } Y_ASSERT(dataProcessed == SourceDataSize); std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index 6cf20645068..083ec713c60 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -1,4 +1,6 @@ #include "write_session.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> #include <library/cpp/string_utils/url/url.h> @@ -93,11 +95,10 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat THandleResult result; if (AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); return result; } - DbDriverState->Log.Write( - TLOG_INFO, + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues) ); @@ -111,14 +112,14 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat if (nextDelay) { result.StartDelay = *nextDelay; result.DoRestart = true; - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" ); ResetForRetryImpl(); } else { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); result.DoStop = true; CheckHandleResultImpl(result); } @@ -136,7 +137,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) { if (AtomicGet(Aborting)) { return; } - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: Do CDS request"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: Do CDS request"); cdsRequestIsUnnecessary = (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Off || (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint))); @@ -161,9 +162,9 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) { if (Settings.PreferredCluster_.Defined()) params->set_preferred_cluster_name(*Settings.PreferredCluster_); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n"); auto cdsRequestCall = [wire = Tracker->MakeTrackedWire(), req_=std::move(req), extr=std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState=DbDriverState, settings=Settings]() mutable { - dbState->Log.Write(TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n"); + LOG_LAZY(dbState->Log, TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n"); connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse>( @@ -188,7 +189,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) { void TWriteSessionImpl::OnCdsResponse( TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result ) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString()); TString endpoint, name; THandleResult handleResult; if (!status.IsSuccess()) { @@ -284,7 +285,7 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() { if (Settings.ValidateSeqNo_) { if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); } else @@ -316,7 +317,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { } if (seqNo.Defined()) { if (*AutoSeqNoMode) { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" ); @@ -331,7 +332,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { OnSeqNoShift = false; SeqNoShift = 0; } else if (!(*AutoSeqNoMode)) { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" ); @@ -405,7 +406,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStat // No lock void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoint) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); NGrpc::IQueueClientContextPtr prevConnectContext; NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; @@ -493,7 +494,7 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin // RPC callback. void TWriteSessionImpl::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session: connect timeout"); THandleResult handleResult; with_lock (Lock) { if (ConnectTimeoutContext == connectTimeoutContext) { @@ -573,7 +574,7 @@ void TWriteSessionImpl::InitImpl() { for (const auto& attr : Settings.Meta_.Fields) { (*init->mutable_session_meta())[attr.first] = attr.second; } - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); WriteToProcessorImpl(std::move(req)); } @@ -724,7 +725,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kInitResponse: { const auto& initResponse = ServerMessage->init_response(); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); SessionId = initResponse.session_id(); PartitionId = initResponse.partition_id(); ui64 newLastSeqNo = initResponse.last_sequence_number(); @@ -752,7 +753,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess case TServerMessage::kBatchWriteResponse: { TWriteSessionEvent::TAcksEvent acksEvent; const auto& batchWriteResponse = ServerMessage->batch_write_response(); - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() ); @@ -788,7 +789,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kUpdateTokenResponse: { UpdateTokenInProgress = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); UpdateTokenIfNeededImpl(); break; } @@ -798,7 +799,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { bool result = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); UpdateTimedCountersImpl(); const auto& sentFront = SentOriginalMessages.front(); ui64 size = 0; @@ -850,14 +851,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) { bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; if (wasOk != nowOk) { if (wasOk) { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Estimated memory usage " << MemoryUsage << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" ); } else { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" ); @@ -985,7 +986,7 @@ void TWriteSessionImpl::FlushWriteIfRequiredImpl() { size_t TWriteSessionImpl::WriteBatchImpl() { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo @@ -1065,7 +1066,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const { void TWriteSessionImpl::UpdateTokenIfNeededImpl() { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) return; @@ -1078,7 +1079,7 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() { updateRequest->set_token(token); PrevToken = token; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: updating token"); Processor->Write(std::move(clientMessage)); } @@ -1129,7 +1130,7 @@ void TWriteSessionImpl::SendImpl() { PackedMessagesToSend.pop(); } UpdateTokenIfNeededImpl(); - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) (" << OriginalMessagesToSend.size() << " left), first sequence number is " @@ -1143,7 +1144,7 @@ void TWriteSessionImpl::SendImpl() { bool TWriteSessionImpl::Close(TDuration closeTimeout) { if (AtomicGet(Aborting)) return false; - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" ); @@ -1176,9 +1177,9 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) { InitSeqNoPromise.SetException("session closed"); } if (ready) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); } else { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefix() << "Write session: could not confirm all writes in time" << " or session aborted, perform hard shutdown" @@ -1236,7 +1237,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { << Counters->counter->Val() \ /**/ - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Counters: {" LOG_COUNTER(Errors) LOG_COUNTER(CurrentSessionLifetimeMs) @@ -1258,7 +1259,7 @@ void TWriteSessionImpl::AbortImpl() { Y_VERIFY(Lock.IsLocked()); if (!AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting"); AtomicSet(Aborting, 1); Cancel(ConnectContext); Cancel(ConnectTimeoutContext); @@ -1274,7 +1275,7 @@ void TWriteSessionImpl::AbortImpl() { void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); AbortImpl(); } @@ -1290,13 +1291,13 @@ void TWriteSessionImpl::CloseImpl(EStatus statusCode, const TString& message) { void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(std::move(status))); AbortImpl(); } TWriteSessionImpl::~TWriteSessionImpl() { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: destroy"); bool needClose = false; with_lock(Lock) { if (!AtomicGet(Aborting)) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index e5978a856cc..f2b7545b9cc 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -1,5 +1,7 @@ #include "read_session.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> #undef INCLUDE_YDB_INTERNAL_H @@ -49,7 +51,7 @@ void TReadSession::Start() { return; } - Log.Write(TLOG_INFO, GetLogPrefix() << "Starting read session"); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Starting read session"); NPersQueue::TDeferredActions<false> deferred; with_lock (Lock) { @@ -66,7 +68,7 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false> Y_VERIFY(Lock.IsLocked()); // Create cluster sessions. - Log.Write( + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Starting single session" ); @@ -126,7 +128,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB } bool TReadSession::Close(TDuration timeout) { - Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); // Log final counters. DumpCountersToLog(); with_lock (Lock) { @@ -206,7 +208,7 @@ TStringBuilder TReadSession::GetLogPrefix() const { } void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) { - Log.Write(TLOG_DEBUG, GetLogPrefix() + LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "The application data is transferred to the client. Number of messages " << messagesCount << ", size " @@ -257,7 +259,7 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) { /**/ if (logCounters) { - Log.Write(TLOG_INFO, + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Counters: {" C(Errors) C(CurrentSessionLifetimeMs) @@ -303,7 +305,7 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDefe if (!Aborting) { Aborting = true; - Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); + LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); if (DumpCountersContext) { DumpCountersContext->Cancel(); DumpCountersContext.reset(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 21b97070eaa..992eabbc386 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -1,4 +1,7 @@ #include "write_session.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <library/cpp/string_utils/url/url.h> @@ -91,10 +94,10 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat THandleResult result; if (AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); return result; } - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got error. Status: " << status.Status << ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues) @@ -109,14 +112,14 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat if (nextDelay) { result.StartDelay = *nextDelay; result.DoRestart = true; - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" ); ResetForRetryImpl(); } else { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); result.DoStop = true; CheckHandleResultImpl(result); } @@ -138,7 +141,7 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() { if (Settings.ValidateSeqNo_) { if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); } else @@ -175,7 +178,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { } if (seqNo.Defined()) { if (*AutoSeqNoMode) { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" ); @@ -186,7 +189,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { seqNoValue = *seqNo; } } else if (!(*AutoSeqNoMode)) { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" ); @@ -260,7 +263,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStat // No lock void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoint) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); NGrpc::IQueueClientContextPtr prevConnectContext; NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; @@ -348,7 +351,7 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin // RPC callback. void TWriteSessionImpl::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session: connect timeout"); THandleResult handleResult; with_lock (Lock) { if (ConnectTimeoutContext == connectTimeoutContext) { @@ -424,7 +427,7 @@ void TWriteSessionImpl::InitImpl() { for (const auto& attr : Settings.Meta_.Fields) { (*init->mutable_write_session_meta())[attr.first] = attr.second; } - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); WriteToProcessorImpl(std::move(req)); } @@ -578,7 +581,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kInitResponse: { const auto& initResponse = ServerMessage->init_response(); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); SessionId = initResponse.session_id(); PartitionId = initResponse.partition_id(); ui64 newLastSeqNo = initResponse.last_seq_no(); @@ -606,7 +609,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess case TServerMessage::kWriteResponse: { TWriteSessionEvent::TAcksEvent acksEvent; const auto& batchWriteResponse = ServerMessage->write_response(); - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() ); @@ -657,7 +660,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kUpdateTokenResponse: { UpdateTokenInProgress = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); UpdateTokenIfNeededImpl(); break; } @@ -667,7 +670,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { bool result = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); UpdateTimedCountersImpl(); const auto& sentFront = SentOriginalMessages.front(); ui64 size = 0; @@ -719,14 +722,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) { bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; if (wasOk != nowOk) { if (wasOk) { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Estimated memory usage " << MemoryUsage << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" ); } else { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" ); @@ -854,7 +857,7 @@ void TWriteSessionImpl::FlushWriteIfRequiredImpl() { size_t TWriteSessionImpl::WriteBatchImpl() { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo @@ -934,7 +937,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const { void TWriteSessionImpl::UpdateTokenIfNeededImpl() { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) return; @@ -947,7 +950,7 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() { updateRequest->set_token(token); PrevToken = token; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: updating token"); Processor->Write(std::move(clientMessage)); } @@ -997,7 +1000,7 @@ void TWriteSessionImpl::SendImpl() { PackedMessagesToSend.pop(); } UpdateTokenIfNeededImpl(); - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) (" << OriginalMessagesToSend.size() << " left), first sequence number is " @@ -1011,7 +1014,7 @@ void TWriteSessionImpl::SendImpl() { bool TWriteSessionImpl::Close(TDuration closeTimeout) { if (AtomicGet(Aborting)) return false; - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" ); @@ -1044,9 +1047,9 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) { InitSeqNoPromise.SetException("session closed"); } if (ready) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); } else { - DbDriverState->Log.Write( + LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefix() << "Write session: could not confirm all writes in time" << " or session aborted, perform hard shutdown" @@ -1103,7 +1106,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { << Counters->counter->Val() \ /**/ - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Counters: {" LOG_COUNTER(Errors) LOG_COUNTER(CurrentSessionLifetimeMs) @@ -1125,7 +1128,7 @@ void TWriteSessionImpl::AbortImpl() { Y_VERIFY(Lock.IsLocked()); if (!AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting"); AtomicSet(Aborting, 1); NPersQueue::Cancel(ConnectContext); NPersQueue::Cancel(ConnectTimeoutContext); @@ -1141,7 +1144,7 @@ void TWriteSessionImpl::AbortImpl() { void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); AbortImpl(); } @@ -1157,13 +1160,13 @@ void TWriteSessionImpl::CloseImpl(EStatus statusCode, const TString& message) { void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) { Y_VERIFY(Lock.IsLocked()); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(std::move(status))); AbortImpl(); } TWriteSessionImpl::~TWriteSessionImpl() { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: destroy"); bool needClose = false; with_lock(Lock) { if (!AtomicGet(Aborting)) { |