aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-03-31 10:21:45 +0300
committerildar-khisam <ikhis@ydb.tech>2023-03-31 10:21:45 +0300
commit377d6b596d86d611b4ebb517d7639895ca8361f9 (patch)
tree71d2ae50c99ff276ddadc12f4a2a16f4201a77da
parente163b69a87c70baac07bd99142916735e0c283fa (diff)
downloadydb-377d6b596d86d611b4ebb517d7639895ca8361f9.tar.gz
fix
log lazily via macros
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp26
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp50
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp67
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp14
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp59
6 files changed, 123 insertions, 103 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h
new file mode 100644
index 00000000000..0635ef2cc89
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#ifdef LOG_LAZY
+#error log macro redefinition
+#endif
+
+#define LOG_LAZY(log, priority, message) \
+ if (log.IsOpen() && log.FiltrationLevel() >= priority) { \
+ log.Write(priority, message); \
+ }
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index ca1b9cde81c..5b3abb3b48c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -2,6 +2,8 @@
#include "read_session.h"
#include "common.h"
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h>
#undef INCLUDE_YDB_INTERNAL_H
@@ -82,7 +84,7 @@ void TReadSession::Start() {
return;
}
- Log.Write(TLOG_INFO, GetLogPrefix() << "Starting read session");
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Starting read session");
if (Settings.DisableClusterDiscovery_) {
ProceedWithoutClusterDiscovery();
} else {
@@ -118,7 +120,7 @@ void TReadSession::StartClusterDiscovery() {
return;
}
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Starting cluster discovery");
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Starting cluster discovery");
ClusterDiscoveryDelayContext = nullptr;
}
@@ -181,7 +183,7 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) {
if (sessionSettings.MaxMemoryUsageBytes_ > clusterSessionsCount && sessionSettings.MaxMemoryUsageBytes_ != std::numeric_limits<size_t>::max()) {
sessionSettings.MaxMemoryUsageBytes_ /= clusterSessionsCount;
}
- Log.Write(
+ LOG_LAZY(Log,
TLOG_DEBUG,
GetLogPrefix() << "Starting session to cluster " << clusterName
<< " (" << clusterSessionInfo.ClusterEndpoint << ")"
@@ -225,7 +227,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
}
TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status.GetStatus());
if (retryDelay) {
- Log.Write(
+ LOG_LAZY(Log,
TLOG_INFO,
GetLogPrefix() << "Cluster discovery request failed. Status: " << status.GetStatus()
<< ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\""
@@ -237,7 +239,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
return;
}
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Cluster discovery request succeeded");
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Cluster discovery request succeeded");
ClusterDiscoveryRetryState = nullptr;
// Init ClusterSessions.
@@ -317,7 +319,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions
if (Aborting || Closing) {
return;
}
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay);
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay);
auto startCallback = [self = weak_from_this()](bool ok) {
if (ok) {
if (auto sharedSelf = self.lock()) {
@@ -337,7 +339,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions
}
bool TReadSession::Close(TDuration timeout) {
- Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
// Log final counters.
DumpCountersToLog();
@@ -420,7 +422,7 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<
if (!Aborting) {
Aborting = true;
- Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
+ LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
if (ClusterDiscoveryDelayContext) {
ClusterDiscoveryDelayContext->Cancel();
ClusterDiscoveryDelayContext.reset();
@@ -486,7 +488,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB
}
void TReadSession::StopReadingData() {
- Log.Write(TLOG_INFO, GetLogPrefix() << "Stop reading data");
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Stop reading data");
with_lock (Lock) {
if (!DataReadingSuspended) {
DataReadingSuspended = true;
@@ -501,7 +503,7 @@ void TReadSession::StopReadingData() {
}
void TReadSession::ResumeReadingData() {
- Log.Write(TLOG_INFO, GetLogPrefix() << "Resume reading data");
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Resume reading data");
with_lock (Lock) {
if (DataReadingSuspended) {
DataReadingSuspended = false;
@@ -516,7 +518,7 @@ void TReadSession::ResumeReadingData() {
}
void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) {
- Log.Write(TLOG_DEBUG, GetLogPrefix()
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix()
<< "The application data is transferred to the client. Number of messages "
<< messagesCount
<< ", size "
@@ -574,7 +576,7 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) {
/**/
if (logCounters) {
- Log.Write(TLOG_INFO,
+ LOG_LAZY(Log, TLOG_INFO,
GetLogPrefix() << "Counters: {"
C(Errors)
C(CurrentSessionLifetimeMs)
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index e229b45626e..4d9d5192249 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -6,6 +6,8 @@
#include "persqueue_impl.h"
#include "common.h"
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h>
#undef INCLUDE_YDB_INTERNAL_H
@@ -256,7 +258,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
std::function<void(bool)> connectTimeoutCallback;
if (!status.Ok()) {
- Log.Write(TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status
<< ". Description: " << IssuesSingleLineString(status.Issues));
}
@@ -290,7 +292,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
}
}
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in " << delay);
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in " << delay);
++ConnectionAttemptsDone;
@@ -345,7 +347,7 @@ template <bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReconnectImpl(
TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred) {
Y_VERIFY(Lock.IsLocked());
- Log.Write(TLOG_INFO,
+ LOG_LAZY(Log, TLOG_INFO,
GetLogPrefix() << "Break connection due to unexpected message from server. Status: " << status.Status
<< ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\"");
@@ -424,7 +426,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect(
template<>
inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true>& deferred) {
Y_VERIFY(Lock.IsLocked());
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session");
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session");
TClientMessage<true> req;
auto& init = *req.mutable_init_request();
init.set_ranges_mode(GetRangesMode());
@@ -455,7 +457,7 @@ inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true>
template<>
inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<false>& deferred) {
Y_VERIFY(Lock.IsLocked());
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session");
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session");
TClientMessage<false> req;
auto& init = *req.mutable_init_request();
@@ -550,7 +552,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
if (commitOffset) {
commitOffsetLogStr << ". Commit offset: " << *commitOffset;
}
- Log.Write(
+ LOG_LAZY(Log,
TLOG_INFO,
GetLogPrefix() << "Confirm partition stream create. Partition stream id: " << GetPartitionStreamId(partitionStream)
<< ". Cluster: \"" << GetCluster(partitionStream) << "\". Topic: \"" << partitionStream->GetTopicPath()
@@ -560,7 +562,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
with_lock (Lock) {
if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation.
- Log.Write(
+ LOG_LAZY(Log,
TLOG_DEBUG,
GetLogPrefix() << "Skip partition stream create confirm. Partition stream id: "
<< GetPartitionStreamId(partitionStream)
@@ -599,7 +601,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) {
- Log.Write(
+ LOG_LAZY(Log,
TLOG_INFO,
GetLogPrefix() << "Confirm partition stream destroy. Partition stream id: "
<< GetPartitionStreamId(partitionStream)
@@ -610,7 +612,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
TDeferredActions<UseMigrationProtocol> deferred;
with_lock (Lock) {
if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation.
- Log.Write(
+ LOG_LAZY(Log,
TLOG_DEBUG,
GetLogPrefix() << "Skip partition stream destroy confirm. Partition stream id: "
<< GetPartitionStreamId(partitionStream)
@@ -656,7 +658,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset) {
- Log.Write(
+ LOG_LAZY(Log,
TLOG_DEBUG,
GetLogPrefix() << "Commit offsets [" << startOffset << ", " << endOffset
<< "). Partition stream id: " << GetPartitionStreamId(partitionStream)
@@ -703,7 +705,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Commit(const TPartitio
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RequestPartitionStreamStatus(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) {
- Log.Write(
+ LOG_LAZY(Log,
TLOG_DEBUG,
GetLogPrefix() << "Requesting status for partition stream id: " << GetPartitionStreamId(partitionStream)
);
@@ -732,7 +734,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RequestPartitionStream
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount)
{
- Log.Write(TLOG_DEBUG, GetLogPrefix()
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix()
<< "The application data is transferred to the client. Number of messages "
<< messagesCount
<< ", size "
@@ -901,7 +903,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
Y_VERIFY(Lock.IsLocked());
Y_UNUSED(deferred);
- Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
RetryState = nullptr;
@@ -1066,7 +1068,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
TDeferredActions<true>& deferred) {
Y_VERIFY(Lock.IsLocked());
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg);
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg);
TMap<ui64, TIntrusivePtr<TPartitionStreamImpl<true>>> partitionStreams;
for (const Ydb::PersQueue::V1::CommitCookie& cookieProto : msg.cookies()) {
@@ -1128,7 +1130,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
RetryState = nullptr;
- Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
// Successful init. Do nothing.
ContinueReadingDataImpl();
@@ -1279,7 +1281,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
TDeferredActions<false>& deferred) {
Y_VERIFY(Lock.IsLocked());
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg);
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg);
for (const auto& rangeProto : msg.partitions_committed_offsets()) {
auto partitionStreamIt = PartitionStreams.find(rangeProto.partition_session_id());
@@ -1439,7 +1441,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Abort() {
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster");
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster");
with_lock (Lock) {
AbortImpl();
@@ -2348,10 +2350,10 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
// Clear data to free internal session's memory.
messageData.clear_data();
- partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder()
- << "Take Data. Partition " << partitionStream->GetPartitionId()
- << ". Read: {" << Batch << ", " << Message << "} ("
- << minOffset << "-" << maxOffset << ")");
+ LOG_LAZY(partitionStream->GetLog(), TLOG_DEBUG, TStringBuilder()
+ << "Take Data. Partition " << partitionStream->GetPartitionId()
+ << ". Read: {" << Batch << ", " << Message << "} ("
+ << minOffset << "-" << maxOffset << ")");
}
template<bool UseMigrationProtocol>
@@ -2467,9 +2469,9 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
}
}
if (auto session = Parent->Session.lock()) {
- session->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: "
- << partition_id << " (" << minOffset << "-"
- << maxOffset << ")");
+ LOG_LAZY(session->GetLog(), TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: "
+ << partition_id << " (" << minOffset << "-"
+ << maxOffset << ")");
}
Y_ASSERT(dataProcessed == SourceDataSize);
std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index 6cf20645068..083ec713c60 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -1,4 +1,6 @@
#include "write_session.h"
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
#include <library/cpp/string_utils/url/url.h>
@@ -93,11 +95,10 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
THandleResult result;
if (AtomicGet(Aborting)) {
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
return result;
}
- DbDriverState->Log.Write(
- TLOG_INFO,
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO,
LogPrefix() << "Got error. Status: " << status.Status
<< ". Description: " << IssuesSingleLineString(status.Issues)
);
@@ -111,14 +112,14 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
if (nextDelay) {
result.StartDelay = *nextDelay;
result.DoRestart = true;
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms"
);
ResetForRetryImpl();
} else {
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
result.DoStop = true;
CheckHandleResultImpl(result);
}
@@ -136,7 +137,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
if (AtomicGet(Aborting)) {
return;
}
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: Do CDS request");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: Do CDS request");
cdsRequestIsUnnecessary = (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Off ||
(Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint)));
@@ -161,9 +162,9 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
if (Settings.PreferredCluster_.Defined())
params->set_preferred_cluster_name(*Settings.PreferredCluster_);
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n");
auto cdsRequestCall = [wire = Tracker->MakeTrackedWire(), req_=std::move(req), extr=std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState=DbDriverState, settings=Settings]() mutable {
- dbState->Log.Write(TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n");
+ LOG_LAZY(dbState->Log, TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n");
connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService,
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest,
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse>(
@@ -188,7 +189,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
void TWriteSessionImpl::OnCdsResponse(
TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result
) {
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString());
TString endpoint, name;
THandleResult handleResult;
if (!status.IsSuccess()) {
@@ -284,7 +285,7 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race
NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() {
if (Settings.ValidateSeqNo_) {
if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) {
- DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode");
}
else
@@ -316,7 +317,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
}
if (seqNo.Defined()) {
if (*AutoSeqNoMode) {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
);
@@ -331,7 +332,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
OnSeqNoShift = false;
SeqNoShift = 0;
} else if (!(*AutoSeqNoMode)) {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
);
@@ -405,7 +406,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStat
// No lock
void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoint) {
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint);
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint);
NGrpc::IQueueClientContextPtr prevConnectContext;
NGrpc::IQueueClientContextPtr prevConnectTimeoutContext;
@@ -493,7 +494,7 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
// RPC callback.
void TWriteSessionImpl::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) {
- DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session: connect timeout");
THandleResult handleResult;
with_lock (Lock) {
if (ConnectTimeoutContext == connectTimeoutContext) {
@@ -573,7 +574,7 @@ void TWriteSessionImpl::InitImpl() {
for (const auto& attr : Settings.Meta_.Fields) {
(*init->mutable_session_meta())[attr.first] = attr.second;
}
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString());
WriteToProcessorImpl(std::move(req));
}
@@ -724,7 +725,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
}
case TServerMessage::kInitResponse: {
const auto& initResponse = ServerMessage->init_response();
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString());
SessionId = initResponse.session_id();
PartitionId = initResponse.partition_id();
ui64 newLastSeqNo = initResponse.last_sequence_number();
@@ -752,7 +753,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
case TServerMessage::kBatchWriteResponse: {
TWriteSessionEvent::TAcksEvent acksEvent;
const auto& batchWriteResponse = ServerMessage->batch_write_response();
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString()
);
@@ -788,7 +789,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
}
case TServerMessage::kUpdateTokenResponse: {
UpdateTokenInProgress = false;
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully");
UpdateTokenIfNeededImpl();
break;
}
@@ -798,7 +799,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
bool result = false;
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
UpdateTimedCountersImpl();
const auto& sentFront = SentOriginalMessages.front();
ui64 size = 0;
@@ -850,14 +851,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) {
bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_;
if (wasOk != nowOk) {
if (wasOk) {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Estimated memory usage " << MemoryUsage
<< "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])"
);
}
else {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]"
);
@@ -985,7 +986,7 @@ void TWriteSessionImpl::FlushWriteIfRequiredImpl() {
size_t TWriteSessionImpl::WriteBatchImpl() {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from "
<< CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo
@@ -1065,7 +1066,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const {
void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: try to update token");
if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished)
return;
@@ -1078,7 +1079,7 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
updateRequest->set_token(token);
PrevToken = token;
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: updating token");
Processor->Write(std::move(clientMessage));
}
@@ -1129,7 +1130,7 @@ void TWriteSessionImpl::SendImpl() {
PackedMessagesToSend.pop();
}
UpdateTokenIfNeededImpl();
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) ("
<< OriginalMessagesToSend.size() << " left), first sequence number is "
@@ -1143,7 +1144,7 @@ void TWriteSessionImpl::SendImpl() {
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
if (AtomicGet(Aborting))
return false;
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_INFO,
LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms"
);
@@ -1176,9 +1177,9 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
InitSeqNoPromise.SetException("session closed");
}
if (ready) {
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
} else {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_WARNING,
LogPrefix() << "Write session: could not confirm all writes in time"
<< " or session aborted, perform hard shutdown"
@@ -1236,7 +1237,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() {
<< Counters->counter->Val() \
/**/
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix()
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix()
<< "Counters: {"
LOG_COUNTER(Errors)
LOG_COUNTER(CurrentSessionLifetimeMs)
@@ -1258,7 +1259,7 @@ void TWriteSessionImpl::AbortImpl() {
Y_VERIFY(Lock.IsLocked());
if (!AtomicGet(Aborting)) {
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
AtomicSet(Aborting, 1);
Cancel(ConnectContext);
Cancel(ConnectTimeoutContext);
@@ -1274,7 +1275,7 @@ void TWriteSessionImpl::AbortImpl() {
void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues)));
AbortImpl();
}
@@ -1290,13 +1291,13 @@ void TWriteSessionImpl::CloseImpl(EStatus statusCode, const TString& message) {
void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(std::move(status)));
AbortImpl();
}
TWriteSessionImpl::~TWriteSessionImpl() {
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: destroy");
bool needClose = false;
with_lock(Lock) {
if (!AtomicGet(Aborting)) {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index e5978a856cc..f2b7545b9cc 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -1,5 +1,7 @@
#include "read_session.h"
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h>
#undef INCLUDE_YDB_INTERNAL_H
@@ -49,7 +51,7 @@ void TReadSession::Start() {
return;
}
- Log.Write(TLOG_INFO, GetLogPrefix() << "Starting read session");
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Starting read session");
NPersQueue::TDeferredActions<false> deferred;
with_lock (Lock) {
@@ -66,7 +68,7 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>
Y_VERIFY(Lock.IsLocked());
// Create cluster sessions.
- Log.Write(
+ LOG_LAZY(Log,
TLOG_DEBUG,
GetLogPrefix() << "Starting single session"
);
@@ -126,7 +128,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB
}
bool TReadSession::Close(TDuration timeout) {
- Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
// Log final counters.
DumpCountersToLog();
with_lock (Lock) {
@@ -206,7 +208,7 @@ TStringBuilder TReadSession::GetLogPrefix() const {
}
void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) {
- Log.Write(TLOG_DEBUG, GetLogPrefix()
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix()
<< "The application data is transferred to the client. Number of messages "
<< messagesCount
<< ", size "
@@ -257,7 +259,7 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) {
/**/
if (logCounters) {
- Log.Write(TLOG_INFO,
+ LOG_LAZY(Log, TLOG_INFO,
GetLogPrefix() << "Counters: {"
C(Errors)
C(CurrentSessionLifetimeMs)
@@ -303,7 +305,7 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDefe
if (!Aborting) {
Aborting = true;
- Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
+ LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
if (DumpCountersContext) {
DumpCountersContext->Cancel();
DumpCountersContext.reset();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 21b97070eaa..992eabbc386 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -1,4 +1,7 @@
#include "write_session.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <library/cpp/string_utils/url/url.h>
@@ -91,10 +94,10 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
THandleResult result;
if (AtomicGet(Aborting)) {
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
return result;
}
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_INFO,
LogPrefix() << "Got error. Status: " << status.Status
<< ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues)
@@ -109,14 +112,14 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
if (nextDelay) {
result.StartDelay = *nextDelay;
result.DoRestart = true;
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms"
);
ResetForRetryImpl();
} else {
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
result.DoStop = true;
CheckHandleResultImpl(result);
}
@@ -138,7 +141,7 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race
NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() {
if (Settings.ValidateSeqNo_) {
if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) {
- DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode");
}
else
@@ -175,7 +178,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
}
if (seqNo.Defined()) {
if (*AutoSeqNoMode) {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
);
@@ -186,7 +189,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
seqNoValue = *seqNo;
}
} else if (!(*AutoSeqNoMode)) {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
);
@@ -260,7 +263,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStat
// No lock
void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoint) {
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint);
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint);
NGrpc::IQueueClientContextPtr prevConnectContext;
NGrpc::IQueueClientContextPtr prevConnectTimeoutContext;
@@ -348,7 +351,7 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
// RPC callback.
void TWriteSessionImpl::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) {
- DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session: connect timeout");
THandleResult handleResult;
with_lock (Lock) {
if (ConnectTimeoutContext == connectTimeoutContext) {
@@ -424,7 +427,7 @@ void TWriteSessionImpl::InitImpl() {
for (const auto& attr : Settings.Meta_.Fields) {
(*init->mutable_write_session_meta())[attr.first] = attr.second;
}
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString());
WriteToProcessorImpl(std::move(req));
}
@@ -578,7 +581,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
}
case TServerMessage::kInitResponse: {
const auto& initResponse = ServerMessage->init_response();
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString());
SessionId = initResponse.session_id();
PartitionId = initResponse.partition_id();
ui64 newLastSeqNo = initResponse.last_seq_no();
@@ -606,7 +609,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
case TServerMessage::kWriteResponse: {
TWriteSessionEvent::TAcksEvent acksEvent;
const auto& batchWriteResponse = ServerMessage->write_response();
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString()
);
@@ -657,7 +660,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
}
case TServerMessage::kUpdateTokenResponse: {
UpdateTokenInProgress = false;
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully");
UpdateTokenIfNeededImpl();
break;
}
@@ -667,7 +670,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
bool result = false;
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
UpdateTimedCountersImpl();
const auto& sentFront = SentOriginalMessages.front();
ui64 size = 0;
@@ -719,14 +722,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) {
bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_;
if (wasOk != nowOk) {
if (wasOk) {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Estimated memory usage " << MemoryUsage
<< "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])"
);
}
else {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]"
);
@@ -854,7 +857,7 @@ void TWriteSessionImpl::FlushWriteIfRequiredImpl() {
size_t TWriteSessionImpl::WriteBatchImpl() {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from "
<< CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo
@@ -934,7 +937,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const {
void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: try to update token");
if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished)
return;
@@ -947,7 +950,7 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
updateRequest->set_token(token);
PrevToken = token;
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: updating token");
Processor->Write(std::move(clientMessage));
}
@@ -997,7 +1000,7 @@ void TWriteSessionImpl::SendImpl() {
PackedMessagesToSend.pop();
}
UpdateTokenIfNeededImpl();
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) ("
<< OriginalMessagesToSend.size() << " left), first sequence number is "
@@ -1011,7 +1014,7 @@ void TWriteSessionImpl::SendImpl() {
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
if (AtomicGet(Aborting))
return false;
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_INFO,
LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms"
);
@@ -1044,9 +1047,9 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
InitSeqNoPromise.SetException("session closed");
}
if (ready) {
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
} else {
- DbDriverState->Log.Write(
+ LOG_LAZY(DbDriverState->Log,
TLOG_WARNING,
LogPrefix() << "Write session: could not confirm all writes in time"
<< " or session aborted, perform hard shutdown"
@@ -1103,7 +1106,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() {
<< Counters->counter->Val() \
/**/
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix()
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix()
<< "Counters: {"
LOG_COUNTER(Errors)
LOG_COUNTER(CurrentSessionLifetimeMs)
@@ -1125,7 +1128,7 @@ void TWriteSessionImpl::AbortImpl() {
Y_VERIFY(Lock.IsLocked());
if (!AtomicGet(Aborting)) {
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
AtomicSet(Aborting, 1);
NPersQueue::Cancel(ConnectContext);
NPersQueue::Cancel(ConnectTimeoutContext);
@@ -1141,7 +1144,7 @@ void TWriteSessionImpl::AbortImpl() {
void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues)));
AbortImpl();
}
@@ -1157,13 +1160,13 @@ void TWriteSessionImpl::CloseImpl(EStatus statusCode, const TString& message) {
void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) {
Y_VERIFY(Lock.IsLocked());
- DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(std::move(status)));
AbortImpl();
}
TWriteSessionImpl::~TWriteSessionImpl() {
- DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: destroy");
bool needClose = false;
with_lock(Lock) {
if (!AtomicGet(Aborting)) {