summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqyryq <[email protected]>2025-02-04 14:57:58 +0300
committerGitHub <[email protected]>2025-02-04 14:57:58 +0300
commitabeb38c387bd87fb055f2a7ea84dcd1f2bf60fad (patch)
treec810e087e0a9ba3fc25be3388025d5a5255b238f
parent2229ce9f4a1575f1f7e4fb7688b980e20eb7386d (diff)
Always call LogPrefix under Lock (#14115)
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp151
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h4
2 files changed, 80 insertions, 75 deletions
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp
index 0401e17f43d..369e7d3b936 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp
@@ -167,7 +167,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
THandleResult result;
if (Aborting.load()) {
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session is aborting and will not restart");
return result;
}
SessionEstablished = false;
@@ -190,12 +190,12 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
if (nextDelay) {
result.StartDelay = *nextDelay;
result.DoRestart = true;
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got error. " << status.ToDebugString());
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will restart in " << result.StartDelay);
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Got error. " << status.ToDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will restart in " << result.StartDelay);
ResetForRetryImpl();
} else {
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Got error. " << status.ToDebugString());
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session will not restart after a fatal error");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Got error. " << status.ToDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Write session will not restart after a fatal error");
result.DoStop = true;
CheckHandleResultImpl(result);
}
@@ -230,7 +230,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del
auto partition_id = Settings.PartitionId_.has_value() ? *Settings.PartitionId_ : *DirectWriteToPartitionId;
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Get partition location async, partition " << partition_id << ", delay " << delay );
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Get partition location async, partition " << partition_id << ", delay " << delay );
NYdbGrpc::IQueueClientContextPtr prevDescribePartitionContext;
NYdbGrpc::IQueueClientContextPtr describePartitionContext = Client->CreateContext();
@@ -269,7 +269,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del
auto callback = [req = std::move(request), extr = std::move(extractor),
connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState = DbDriverState,
- context = describePartitionContext, prefix = std::string(LogPrefix()),
+ context = describePartitionContext, prefix = std::string(LogPrefixImpl()),
partId = partition_id]() mutable {
LOG_LAZY(dbState->Log, TLOG_DEBUG, prefix + " Getting partition location, partition " + ToString(partId));
connections->Run<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>(
@@ -286,16 +286,24 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del
void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::Topic::DescribePartitionResult& proto, const NYdbGrpc::IQueueClientContextPtr& describePartitionContext)
{
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got PartitionLocation response. Status " << status.GetStatus() << ", proto:\n" << proto.DebugString());
- std::string endpoint, name;
THandleResult handleResult;
+ const Ydb::Topic::DescribeTopicResult_PartitionInfo& partition = proto.partition();
+
with_lock(Lock) {
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Got PartitionLocation response. Status " << status.GetStatus() << ", proto:\n" << proto.DebugString());
+
if (DescribePartitionContext == describePartitionContext) {
DescribePartitionContext = nullptr;
} else {
return;
}
+
+ TRACE_LAZY(DbDriverState->Log, "DescribePartitionResponse",
+ TRACE_KV("partition_id", partition.partition_id()),
+ TRACE_KV("active", partition.active()),
+ TRACE_KV("pl_node_id", partition.partition_location().node_id()),
+ TRACE_KV("pl_generation", partition.partition_location().generation()));
}
if (!status.IsSuccess()) {
@@ -314,14 +322,6 @@ void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::To
return;
}
- const Ydb::Topic::DescribeTopicResult_PartitionInfo& partition = proto.partition();
-
- TRACE_LAZY(DbDriverState->Log, "DescribePartitionResponse",
- TRACE_KV("partition_id", partition.partition_id()),
- TRACE_KV("active", partition.active()),
- TRACE_KV("pl_node_id", partition.partition_location().node_id()),
- TRACE_KV("pl_generation", partition.partition_location().generation()));
-
if (partition.partition_id() != Settings.PartitionId_ && Settings.PartitionId_.has_value() ||
!partition.has_partition_location() || partition.partition_location().node_id() == 0 || partition.partition_location().generation() == 0) {
{
@@ -368,12 +368,12 @@ std::optional<TEndpointKey> TWriteSessionImpl::GetPreferredEndpointImpl(ui32 par
bool nodeIsKnown = (bool)DbDriverState->EndpointPool.GetEndpoint(preferredEndpoint, true);
if (nodeIsKnown)
{
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "GetPreferredEndpoint: partitionId " << partitionId << ", partitionNodeId " << partitionNodeId << " exists in the endpoint pool.");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "GetPreferredEndpoint: partitionId " << partitionId << ", partitionNodeId " << partitionNodeId << " exists in the endpoint pool.");
return preferredEndpoint;
}
else
{
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetPreferredEndpoint: partitionId " << partitionId << ", nodeId " << partitionNodeId << " does not exist in the endpoint pool.");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "GetPreferredEndpoint: partitionId " << partitionId << ", nodeId " << partitionNodeId << " does not exist in the endpoint pool.");
DbDriverState->EndpointPool.UpdateAsync();
return {};
}
@@ -405,13 +405,15 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race
} else {
// Deduplication explicitly disabled, ProducerId & MessageGroupId must be empty.
if (!Settings.ProducerId_.empty() || !Settings.MessageGroupId_.empty()) {
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix()
+ std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl()
<< "ProducerId or MessageGroupId is not empty when deduplication is switched off");
ThrowFatalError("Explicitly disabled deduplication conflicts with non-empty ProducerId or MessageGroupId");
}
}
if (!Settings.ProducerId_.empty() && !Settings.MessageGroupId_.empty() && Settings.ProducerId_ != Settings.MessageGroupId_) {
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix()
+ std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl()
<< "ProducerId and MessageGroupId mismatch");
ThrowFatalError("ProducerId != MessageGroupId scenario is currently not supported");
}
@@ -419,21 +421,23 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race
Settings.CompressionExecutor_->Start();
Settings.EventHandlers_.HandlersExecutor_->Start();
-
}
+
// Client method
NThreading::TFuture<uint64_t> TWriteSessionImpl::GetInitSeqNo() {
if (!Settings.DeduplicationEnabled_.value_or(true)) {
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetInitSeqNo called with deduplication disabled");
+ std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "GetInitSeqNo called with deduplication disabled");
ThrowFatalError("Cannot call GetInitSeqNo when deduplication is disabled");
}
if (Settings.ValidateSeqNo_) {
if (AutoSeqNoMode.has_value() && *AutoSeqNoMode) {
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
+ std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode");
- }
- else
+ } else {
AutoSeqNoMode = false;
+ }
}
return InitSeqNoPromise.GetFuture();
}
@@ -466,7 +470,6 @@ uint64_t TWriteSessionImpl::GetSeqNoImpl(uint64_t id) {
}
uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional<uint64_t>& seqNo) {
-
Y_ABORT_UNLESS(Lock.IsLocked());
uint64_t id = ++NextId;
@@ -475,13 +478,13 @@ uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional<uint64_t>& seqNo)
}
if (seqNo.has_value()) {
if (!Settings.DeduplicationEnabled_.value_or(true)) {
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "SeqNo is provided on write when deduplication is disabled");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "SeqNo is provided on write when deduplication is disabled");
ThrowFatalError("Cannot provide SeqNo on Write() when deduplication is disabled");
}
if (*AutoSeqNoMode) {
LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
- LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
+ LogPrefixImpl() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
);
ThrowFatalError(
"Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
@@ -492,7 +495,7 @@ uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional<uint64_t>& seqNo)
} else if (!(*AutoSeqNoMode)) {
LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
- LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
+ LogPrefixImpl() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
);
ThrowFatalError(
"Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
@@ -570,7 +573,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)
auto p = WrittenInTx.find(seqNo);
if (p == WrittenInTx.end()) {
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
- LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=?");
+ LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=?");
return;
}
@@ -581,7 +584,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)
++txInfo->AckCount;
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
- LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
+ LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) {
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
@@ -628,7 +631,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
++txInfo->WriteCount;
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
- LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
+ LogPrefixImpl() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
}
WrittenInTx[seqNo] = txId;
}
@@ -689,7 +692,7 @@ void TWriteSessionImpl::Connect(const TDuration& delay) {
return;
}
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to nodeId: " << PreferredPartitionLocation.Endpoint.NodeId);
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Start write session. Will connect to nodeId: " << PreferredPartitionLocation.Endpoint.NodeId);
++ConnectionGeneration;
@@ -764,10 +767,10 @@ void TWriteSessionImpl::Connect(const TDuration& delay) {
// RPC callback.
void TWriteSessionImpl::OnConnectTimeout(const NYdbGrpc::IQueueClientContextPtr& connectTimeoutContext) {
- LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session: connect timeout");
THandleResult handleResult;
{
std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Write session: connect timeout");
if (ConnectTimeoutContext == connectTimeoutContext) {
Cancel(ConnectContext);
ConnectContext = nullptr;
@@ -843,20 +846,20 @@ void TWriteSessionImpl::InitImpl() {
auto* p = init->mutable_partition_with_generation();
p->set_partition_id(partition_id);
p->set_generation(PreferredPartitionLocation.Generation);
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: direct write to partition: " << partition_id << ", generation " << PreferredPartitionLocation.Generation);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: direct write to partition: " << partition_id << ", generation " << PreferredPartitionLocation.Generation);
} else if (Settings.PartitionId_.has_value()) {
init->set_partition_id(*Settings.PartitionId_);
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to partition: " << *Settings.PartitionId_);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: write to partition: " << *Settings.PartitionId_);
} else {
init->set_message_group_id(TStringType{Settings.MessageGroupId_});
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to message_group: " << Settings.MessageGroupId_);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: write to message_group: " << Settings.MessageGroupId_);
}
for (const auto& attr : Settings.Meta_.Fields) {
(*init->mutable_write_session_meta())[attr.first] = attr.second;
}
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString());
-
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: send init request: "<< req.ShortDebugString());
+
TRACE_LAZY(DbDriverState->Log, "InitRequest",
TRACE_KV_IF(init->partitioning_case() == Ydb::Topic::StreamWriteMessage_InitRequest::kPartitionId, "partition_id", init->partition_id()),
TRACE_IF(init->partitioning_case() == Ydb::Topic::StreamWriteMessage_InitRequest::kPartitionWithGeneration,
@@ -911,11 +914,10 @@ void TWriteSessionImpl::ReadFromProcessor() {
}
void TWriteSessionImpl::OnWriteDone(NYdbGrpc::TGrpcStatus&& status, size_t connectionGeneration) {
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnWriteDone " << status.ToDebugString());
-
THandleResult handleResult;
{
std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: OnWriteDone " << status.ToDebugString());
if (connectionGeneration != ConnectionGeneration) {
return; // Message from previous connection. Ignore.
}
@@ -930,8 +932,6 @@ void TWriteSessionImpl::OnWriteDone(NYdbGrpc::TGrpcStatus&& status, size_t conne
}
void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) {
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnReadDone " << grpcStatus.ToDebugString());
-
TPlainStatus errorStatus;
TProcessSrvMessageResult processResult;
bool needSetValue = false;
@@ -941,6 +941,7 @@ void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t co
bool doRead = false;
{
std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: OnReadDone " << grpcStatus.ToDebugString());
UpdateTimedCountersImpl();
if (connectionGeneration != ConnectionGeneration) {
return; // Message from previous connection. Ignore.
@@ -983,7 +984,9 @@ void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t co
ProcessHandleResult(processResult.HandleResult);
}
-TStringBuilder TWriteSessionImpl::LogPrefix() const {
+TStringBuilder TWriteSessionImpl::LogPrefixImpl() const {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
TStringBuilder ret;
ret << " SessionId [" << SessionId << "] ";
@@ -1042,7 +1045,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
}
case TServerMessage::kInitResponse: {
const auto& initResponse = ServerMessage->init_response();
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString());
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session established. Init response: " << initResponse.ShortDebugString());
TRACE_LAZY(DbDriverState->Log, "InitResponse",
TRACE_KV("partition_id", initResponse.partition_id()),
TRACE_KV("session_id", initResponse.session_id()));
@@ -1050,7 +1053,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
auto prevDirectWriteToPartitionId = DirectWriteToPartitionId;
if (Settings.DirectWriteToPartition_ && !Settings.PartitionId_.has_value()) {
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id());
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id());
DirectWriteToPartitionId = initResponse.partition_id();
}
PartitionId = initResponse.partition_id();
@@ -1089,7 +1092,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
const auto& batchWriteResponse = ServerMessage->write_response();
LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
- LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString()
+ LogPrefixImpl() << "Write session got write response: " << batchWriteResponse.ShortDebugString()
);
TWriteStat::TPtr writeStat = new TWriteStat{};
const auto& stat = batchWriteResponse.write_statistics();
@@ -1136,7 +1139,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
writeStat,
});
- if (CleanupOnAcknowledged(msgId)) {
+ if (CleanupOnAcknowledgedImpl(msgId)) {
result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
@@ -1146,7 +1149,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
}
case TServerMessage::kUpdateTokenResponse: {
UpdateTokenInProgress = false;
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: token updated successfully");
UpdateTokenIfNeededImpl();
break;
}
@@ -1154,9 +1157,11 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
return result;
}
-bool TWriteSessionImpl::CleanupOnAcknowledged(uint64_t id) {
+bool TWriteSessionImpl::CleanupOnAcknowledgedImpl(uint64_t id) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
bool result = false;
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << id);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: acknoledged message " << id);
UpdateTimedCountersImpl();
const auto& sentFront = SentOriginalMessages.front();
uint64_t size = 0;
@@ -1213,14 +1218,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) {
if (wasOk) {
LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
- LogPrefix() << "Estimated memory usage " << MemoryUsage
+ LogPrefixImpl() << "Estimated memory usage " << MemoryUsage
<< "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])"
);
}
else {
LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
- LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]"
+ LogPrefixImpl() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]"
);
}
}
@@ -1360,7 +1365,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
- LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with Id from "
+ LogPrefixImpl() << "Write " << CurrentBatch.Messages.size() << " messages with Id from "
<< CurrentBatch.Messages.begin()->Id << " to " << CurrentBatch.Messages.back().Id
);
@@ -1451,7 +1456,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const {
void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: try to update token");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: try to update token");
if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) {
return;
@@ -1462,7 +1467,7 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
return;
}
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: updating token");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: updating token");
UpdateTokenInProgress = true;
PrevToken = token;
@@ -1552,7 +1557,7 @@ void TWriteSessionImpl::SendImpl() {
UpdateTokenIfNeededImpl();
LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
- LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) ("
+ LogPrefixImpl() << "Send " << writeRequest->messages_size() << " message(s) ("
<< OriginalMessagesToSend.size() << " left), first sequence number is "
<< writeRequest->messages(0).seq_no()
);
@@ -1565,7 +1570,10 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
if (Aborting.load()) {
return false;
}
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout);
+ {
+ std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session: close. Timeout " << closeTimeout);
+ }
auto startTime = TInstant::Now();
auto remaining = closeTimeout;
bool ready = false;
@@ -1588,20 +1596,17 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
{
std::lock_guard guard(Lock);
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !Aborting.load();
- }
- {
- std::lock_guard guard(Lock);
CloseImpl(EStatus::SUCCESS, NYdb::NIssue::TIssues{});
needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true);
+ if (ready) {
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session: gracefully shut down, all writes complete");
+ } else {
+ LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefixImpl() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown");
+ }
}
if (needSetSeqNoValue) {
InitSeqNoPromise.SetException("session closed");
}
- if (ready) {
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
- } else {
- LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefix() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown");
- }
return ready;
}
@@ -1654,7 +1659,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() {
<< Counters->counter->Val() \
/**/
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix()
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl()
<< "Counters: {"
LOG_COUNTER(Errors)
LOG_COUNTER(CurrentSessionLifetimeMs)
@@ -1676,7 +1681,7 @@ void TWriteSessionImpl::AbortImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());
if (!Aborting.load()) {
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: aborting");
Aborting.store(1);
Cancel(DescribePartitionContext);
Cancel(ConnectContext);
@@ -1686,7 +1691,7 @@ void TWriteSessionImpl::AbortImpl() {
Processor->Cancel();
Cancel(ClientContext);
ClientContext.reset(); // removes context from contexts set from underlying gRPC-client.
-
+
CancelTransactions();
}
}
@@ -1708,7 +1713,7 @@ void TWriteSessionImpl::CancelTransactions()
void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYdb::NIssue::TIssues&& issues) {
Y_ABORT_UNLESS(Lock.IsLocked());
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues)));
AbortImpl();
}
@@ -1724,16 +1729,16 @@ void TWriteSessionImpl::CloseImpl(EStatus statusCode, const std::string& message
void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) {
Y_ABORT_UNLESS(Lock.IsLocked());
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close");
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(std::move(status)));
AbortImpl();
}
TWriteSessionImpl::~TWriteSessionImpl() {
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: destroy");
bool needClose = false;
{
std::lock_guard guard(Lock);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: destroy");
if (!Aborting.load()) {
CloseImpl(EStatus::SUCCESS, NYdb::NIssue::TIssues{});
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h
index 448134b60f4..5a3a536d71e 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h
@@ -368,7 +368,7 @@ public:
private:
- TStringBuilder LogPrefix() const;
+ TStringBuilder LogPrefixImpl() const;
void UpdateTokenIfNeededImpl();
@@ -399,7 +399,7 @@ private:
//std::string GetDebugIdentity() const;
TClientMessage GetInitClientMessage();
- bool CleanupOnAcknowledged(uint64_t id);
+ bool CleanupOnAcknowledgedImpl(uint64_t id);
bool IsReadyToSendNextImpl() const;
uint64_t GetNextIdImpl(const std::optional<uint64_t>& seqNo);
uint64_t GetSeqNoImpl(uint64_t id);