diff options
| author | qyryq <[email protected]> | 2025-02-04 14:57:58 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-02-04 14:57:58 +0300 |
| commit | abeb38c387bd87fb055f2a7ea84dcd1f2bf60fad (patch) | |
| tree | c810e087e0a9ba3fc25be3388025d5a5255b238f | |
| parent | 2229ce9f4a1575f1f7e4fb7688b980e20eb7386d (diff) | |
Always call LogPrefix under Lock (#14115)
| -rw-r--r-- | ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp | 151 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h | 4 |
2 files changed, 80 insertions, 75 deletions
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp index 0401e17f43d..369e7d3b936 100644 --- a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp @@ -167,7 +167,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat THandleResult result; if (Aborting.load()) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session is aborting and will not restart"); return result; } SessionEstablished = false; @@ -190,12 +190,12 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat if (nextDelay) { result.StartDelay = *nextDelay; result.DoRestart = true; - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got error. " << status.ToDebugString()); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will restart in " << result.StartDelay); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Got error. " << status.ToDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will restart in " << result.StartDelay); ResetForRetryImpl(); } else { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Got error. " << status.ToDebugString()); - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session will not restart after a fatal error"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Got error. " << status.ToDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Write session will not restart after a fatal error"); result.DoStop = true; CheckHandleResultImpl(result); } @@ -230,7 +230,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del auto partition_id = Settings.PartitionId_.has_value() ? *Settings.PartitionId_ : *DirectWriteToPartitionId; - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Get partition location async, partition " << partition_id << ", delay " << delay ); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Get partition location async, partition " << partition_id << ", delay " << delay ); NYdbGrpc::IQueueClientContextPtr prevDescribePartitionContext; NYdbGrpc::IQueueClientContextPtr describePartitionContext = Client->CreateContext(); @@ -269,7 +269,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del auto callback = [req = std::move(request), extr = std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState = DbDriverState, - context = describePartitionContext, prefix = std::string(LogPrefix()), + context = describePartitionContext, prefix = std::string(LogPrefixImpl()), partId = partition_id]() mutable { LOG_LAZY(dbState->Log, TLOG_DEBUG, prefix + " Getting partition location, partition " + ToString(partId)); connections->Run<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>( @@ -286,16 +286,24 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::Topic::DescribePartitionResult& proto, const NYdbGrpc::IQueueClientContextPtr& describePartitionContext) { - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got PartitionLocation response. Status " << status.GetStatus() << ", proto:\n" << proto.DebugString()); - std::string endpoint, name; THandleResult handleResult; + const Ydb::Topic::DescribeTopicResult_PartitionInfo& partition = proto.partition(); + with_lock(Lock) { + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Got PartitionLocation response. Status " << status.GetStatus() << ", proto:\n" << proto.DebugString()); + if (DescribePartitionContext == describePartitionContext) { DescribePartitionContext = nullptr; } else { return; } + + TRACE_LAZY(DbDriverState->Log, "DescribePartitionResponse", + TRACE_KV("partition_id", partition.partition_id()), + TRACE_KV("active", partition.active()), + TRACE_KV("pl_node_id", partition.partition_location().node_id()), + TRACE_KV("pl_generation", partition.partition_location().generation())); } if (!status.IsSuccess()) { @@ -314,14 +322,6 @@ void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::To return; } - const Ydb::Topic::DescribeTopicResult_PartitionInfo& partition = proto.partition(); - - TRACE_LAZY(DbDriverState->Log, "DescribePartitionResponse", - TRACE_KV("partition_id", partition.partition_id()), - TRACE_KV("active", partition.active()), - TRACE_KV("pl_node_id", partition.partition_location().node_id()), - TRACE_KV("pl_generation", partition.partition_location().generation())); - if (partition.partition_id() != Settings.PartitionId_ && Settings.PartitionId_.has_value() || !partition.has_partition_location() || partition.partition_location().node_id() == 0 || partition.partition_location().generation() == 0) { { @@ -368,12 +368,12 @@ std::optional<TEndpointKey> TWriteSessionImpl::GetPreferredEndpointImpl(ui32 par bool nodeIsKnown = (bool)DbDriverState->EndpointPool.GetEndpoint(preferredEndpoint, true); if (nodeIsKnown) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "GetPreferredEndpoint: partitionId " << partitionId << ", partitionNodeId " << partitionNodeId << " exists in the endpoint pool."); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "GetPreferredEndpoint: partitionId " << partitionId << ", partitionNodeId " << partitionNodeId << " exists in the endpoint pool."); return preferredEndpoint; } else { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetPreferredEndpoint: partitionId " << partitionId << ", nodeId " << partitionNodeId << " does not exist in the endpoint pool."); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "GetPreferredEndpoint: partitionId " << partitionId << ", nodeId " << partitionNodeId << " does not exist in the endpoint pool."); DbDriverState->EndpointPool.UpdateAsync(); return {}; } @@ -405,13 +405,15 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race } else { // Deduplication explicitly disabled, ProducerId & MessageGroupId must be empty. if (!Settings.ProducerId_.empty() || !Settings.MessageGroupId_.empty()) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "ProducerId or MessageGroupId is not empty when deduplication is switched off"); ThrowFatalError("Explicitly disabled deduplication conflicts with non-empty ProducerId or MessageGroupId"); } } if (!Settings.ProducerId_.empty() && !Settings.MessageGroupId_.empty() && Settings.ProducerId_ != Settings.MessageGroupId_) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "ProducerId and MessageGroupId mismatch"); ThrowFatalError("ProducerId != MessageGroupId scenario is currently not supported"); } @@ -419,21 +421,23 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race Settings.CompressionExecutor_->Start(); Settings.EventHandlers_.HandlersExecutor_->Start(); - } + // Client method NThreading::TFuture<uint64_t> TWriteSessionImpl::GetInitSeqNo() { if (!Settings.DeduplicationEnabled_.value_or(true)) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetInitSeqNo called with deduplication disabled"); + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "GetInitSeqNo called with deduplication disabled"); ThrowFatalError("Cannot call GetInitSeqNo when deduplication is disabled"); } if (Settings.ValidateSeqNo_) { if (AutoSeqNoMode.has_value() && *AutoSeqNoMode) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); - } - else + } else { AutoSeqNoMode = false; + } } return InitSeqNoPromise.GetFuture(); } @@ -466,7 +470,6 @@ uint64_t TWriteSessionImpl::GetSeqNoImpl(uint64_t id) { } uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional<uint64_t>& seqNo) { - Y_ABORT_UNLESS(Lock.IsLocked()); uint64_t id = ++NextId; @@ -475,13 +478,13 @@ uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional<uint64_t>& seqNo) } if (seqNo.has_value()) { if (!Settings.DeduplicationEnabled_.value_or(true)) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "SeqNo is provided on write when deduplication is disabled"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "SeqNo is provided on write when deduplication is disabled"); ThrowFatalError("Cannot provide SeqNo on Write() when deduplication is disabled"); } if (*AutoSeqNoMode) { LOG_LAZY(DbDriverState->Log, TLOG_ERR, - LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + LogPrefixImpl() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" ); ThrowFatalError( "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" @@ -492,7 +495,7 @@ uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional<uint64_t>& seqNo) } else if (!(*AutoSeqNoMode)) { LOG_LAZY(DbDriverState->Log, TLOG_ERR, - LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + LogPrefixImpl() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" ); ThrowFatalError( "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" @@ -570,7 +573,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) auto p = WrittenInTx.find(seqNo); if (p == WrittenInTx.end()) { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=?"); + LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=?"); return; } @@ -581,7 +584,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) ++txInfo->AckCount; LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) { txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); @@ -628,7 +631,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess ++txInfo->WriteCount; LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + LogPrefixImpl() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); } WrittenInTx[seqNo] = txId; } @@ -689,7 +692,7 @@ void TWriteSessionImpl::Connect(const TDuration& delay) { return; } - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to nodeId: " << PreferredPartitionLocation.Endpoint.NodeId); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Start write session. Will connect to nodeId: " << PreferredPartitionLocation.Endpoint.NodeId); ++ConnectionGeneration; @@ -764,10 +767,10 @@ void TWriteSessionImpl::Connect(const TDuration& delay) { // RPC callback. void TWriteSessionImpl::OnConnectTimeout(const NYdbGrpc::IQueueClientContextPtr& connectTimeoutContext) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session: connect timeout"); THandleResult handleResult; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Write session: connect timeout"); if (ConnectTimeoutContext == connectTimeoutContext) { Cancel(ConnectContext); ConnectContext = nullptr; @@ -843,20 +846,20 @@ void TWriteSessionImpl::InitImpl() { auto* p = init->mutable_partition_with_generation(); p->set_partition_id(partition_id); p->set_generation(PreferredPartitionLocation.Generation); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: direct write to partition: " << partition_id << ", generation " << PreferredPartitionLocation.Generation); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: direct write to partition: " << partition_id << ", generation " << PreferredPartitionLocation.Generation); } else if (Settings.PartitionId_.has_value()) { init->set_partition_id(*Settings.PartitionId_); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to partition: " << *Settings.PartitionId_); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: write to partition: " << *Settings.PartitionId_); } else { init->set_message_group_id(TStringType{Settings.MessageGroupId_}); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to message_group: " << Settings.MessageGroupId_); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: write to message_group: " << Settings.MessageGroupId_); } for (const auto& attr : Settings.Meta_.Fields) { (*init->mutable_write_session_meta())[attr.first] = attr.second; } - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); - + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: send init request: "<< req.ShortDebugString()); + TRACE_LAZY(DbDriverState->Log, "InitRequest", TRACE_KV_IF(init->partitioning_case() == Ydb::Topic::StreamWriteMessage_InitRequest::kPartitionId, "partition_id", init->partition_id()), TRACE_IF(init->partitioning_case() == Ydb::Topic::StreamWriteMessage_InitRequest::kPartitionWithGeneration, @@ -911,11 +914,10 @@ void TWriteSessionImpl::ReadFromProcessor() { } void TWriteSessionImpl::OnWriteDone(NYdbGrpc::TGrpcStatus&& status, size_t connectionGeneration) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnWriteDone " << status.ToDebugString()); - THandleResult handleResult; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: OnWriteDone " << status.ToDebugString()); if (connectionGeneration != ConnectionGeneration) { return; // Message from previous connection. Ignore. } @@ -930,8 +932,6 @@ void TWriteSessionImpl::OnWriteDone(NYdbGrpc::TGrpcStatus&& status, size_t conne } void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnReadDone " << grpcStatus.ToDebugString()); - TPlainStatus errorStatus; TProcessSrvMessageResult processResult; bool needSetValue = false; @@ -941,6 +941,7 @@ void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t co bool doRead = false; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: OnReadDone " << grpcStatus.ToDebugString()); UpdateTimedCountersImpl(); if (connectionGeneration != ConnectionGeneration) { return; // Message from previous connection. Ignore. @@ -983,7 +984,9 @@ void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t co ProcessHandleResult(processResult.HandleResult); } -TStringBuilder TWriteSessionImpl::LogPrefix() const { +TStringBuilder TWriteSessionImpl::LogPrefixImpl() const { + Y_ABORT_UNLESS(Lock.IsLocked()); + TStringBuilder ret; ret << " SessionId [" << SessionId << "] "; @@ -1042,7 +1045,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kInitResponse: { const auto& initResponse = ServerMessage->init_response(); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session established. Init response: " << initResponse.ShortDebugString()); TRACE_LAZY(DbDriverState->Log, "InitResponse", TRACE_KV("partition_id", initResponse.partition_id()), TRACE_KV("session_id", initResponse.session_id())); @@ -1050,7 +1053,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess auto prevDirectWriteToPartitionId = DirectWriteToPartitionId; if (Settings.DirectWriteToPartition_ && !Settings.PartitionId_.has_value()) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id()); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id()); DirectWriteToPartitionId = initResponse.partition_id(); } PartitionId = initResponse.partition_id(); @@ -1089,7 +1092,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess const auto& batchWriteResponse = ServerMessage->write_response(); LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() + LogPrefixImpl() << "Write session got write response: " << batchWriteResponse.ShortDebugString() ); TWriteStat::TPtr writeStat = new TWriteStat{}; const auto& stat = batchWriteResponse.write_statistics(); @@ -1136,7 +1139,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess writeStat, }); - if (CleanupOnAcknowledged(msgId)) { + if (CleanupOnAcknowledgedImpl(msgId)) { result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } @@ -1146,7 +1149,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kUpdateTokenResponse: { UpdateTokenInProgress = false; - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: token updated successfully"); UpdateTokenIfNeededImpl(); break; } @@ -1154,9 +1157,11 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess return result; } -bool TWriteSessionImpl::CleanupOnAcknowledged(uint64_t id) { +bool TWriteSessionImpl::CleanupOnAcknowledgedImpl(uint64_t id) { + Y_ABORT_UNLESS(Lock.IsLocked()); + bool result = false; - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << id); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: acknoledged message " << id); UpdateTimedCountersImpl(); const auto& sentFront = SentOriginalMessages.front(); uint64_t size = 0; @@ -1213,14 +1218,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) { if (wasOk) { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Estimated memory usage " << MemoryUsage + LogPrefixImpl() << "Estimated memory usage " << MemoryUsage << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" ); } else { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" + LogPrefixImpl() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" ); } } @@ -1360,7 +1365,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with Id from " + LogPrefixImpl() << "Write " << CurrentBatch.Messages.size() << " messages with Id from " << CurrentBatch.Messages.begin()->Id << " to " << CurrentBatch.Messages.back().Id ); @@ -1451,7 +1456,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const { void TWriteSessionImpl::UpdateTokenIfNeededImpl() { Y_ABORT_UNLESS(Lock.IsLocked()); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: try to update token"); if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) { return; @@ -1462,7 +1467,7 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() { return; } - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: updating token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: updating token"); UpdateTokenInProgress = true; PrevToken = token; @@ -1552,7 +1557,7 @@ void TWriteSessionImpl::SendImpl() { UpdateTokenIfNeededImpl(); LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) (" + LogPrefixImpl() << "Send " << writeRequest->messages_size() << " message(s) (" << OriginalMessagesToSend.size() << " left), first sequence number is " << writeRequest->messages(0).seq_no() ); @@ -1565,7 +1570,10 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) { if (Aborting.load()) { return false; } - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout); + { + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session: close. Timeout " << closeTimeout); + } auto startTime = TInstant::Now(); auto remaining = closeTimeout; bool ready = false; @@ -1588,20 +1596,17 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) { { std::lock_guard guard(Lock); ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !Aborting.load(); - } - { - std::lock_guard guard(Lock); CloseImpl(EStatus::SUCCESS, NYdb::NIssue::TIssues{}); needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + if (ready) { + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session: gracefully shut down, all writes complete"); + } else { + LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefixImpl() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown"); + } } if (needSetSeqNoValue) { InitSeqNoPromise.SetException("session closed"); } - if (ready) { - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); - } else { - LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefix() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown"); - } return ready; } @@ -1654,7 +1659,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { << Counters->counter->Val() \ /**/ - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Counters: {" LOG_COUNTER(Errors) LOG_COUNTER(CurrentSessionLifetimeMs) @@ -1676,7 +1681,7 @@ void TWriteSessionImpl::AbortImpl() { Y_ABORT_UNLESS(Lock.IsLocked()); if (!Aborting.load()) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: aborting"); Aborting.store(1); Cancel(DescribePartitionContext); Cancel(ConnectContext); @@ -1686,7 +1691,7 @@ void TWriteSessionImpl::AbortImpl() { Processor->Cancel(); Cancel(ClientContext); ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. - + CancelTransactions(); } } @@ -1708,7 +1713,7 @@ void TWriteSessionImpl::CancelTransactions() void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYdb::NIssue::TIssues&& issues) { Y_ABORT_UNLESS(Lock.IsLocked()); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); AbortImpl(); } @@ -1724,16 +1729,16 @@ void TWriteSessionImpl::CloseImpl(EStatus statusCode, const std::string& message void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) { Y_ABORT_UNLESS(Lock.IsLocked()); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(std::move(status))); AbortImpl(); } TWriteSessionImpl::~TWriteSessionImpl() { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: destroy"); bool needClose = false; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: destroy"); if (!Aborting.load()) { CloseImpl(EStatus::SUCCESS, NYdb::NIssue::TIssues{}); diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h index 448134b60f4..5a3a536d71e 100644 --- a/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h @@ -368,7 +368,7 @@ public: private: - TStringBuilder LogPrefix() const; + TStringBuilder LogPrefixImpl() const; void UpdateTokenIfNeededImpl(); @@ -399,7 +399,7 @@ private: //std::string GetDebugIdentity() const; TClientMessage GetInitClientMessage(); - bool CleanupOnAcknowledged(uint64_t id); + bool CleanupOnAcknowledgedImpl(uint64_t id); bool IsReadyToSendNextImpl() const; uint64_t GetNextIdImpl(const std::optional<uint64_t>& seqNo); uint64_t GetSeqNoImpl(uint64_t id); |
