diff options
| author | kseleznyov <[email protected]> | 2026-06-25 12:05:17 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-06-25 12:05:17 +0300 |
| commit | bf608a2629f4e733fa349dcd1f733f9c5c19c041 (patch) | |
| tree | f38be1c83fe348b2e5e20a75d899397b0277621d | |
| parent | 6dc8624ca482fc44de6b7132006ead8ece9c017b (diff) | |
[YDB_LOG] Migrate ydb/core/grpc_streaming (#43273)
| -rw-r--r-- | ydb/core/grpc_streaming/grpc_streaming.h | 124 | ||||
| -rw-r--r-- | ydb/core/grpc_streaming/grpc_streaming_ut.cpp | 26 |
2 files changed, 87 insertions, 63 deletions
diff --git a/ydb/core/grpc_streaming/grpc_streaming.h b/ydb/core/grpc_streaming/grpc_streaming.h index 8e32f05d060..dfa1106288b 100644 --- a/ydb/core/grpc_streaming/grpc_streaming.h +++ b/ydb/core/grpc_streaming/grpc_streaming.h @@ -224,10 +224,11 @@ private: return; } - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream accepted Name# %s ok# %s peer# %s", - this, Name, - status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Stream accepted", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"}, + {"peer", this->GetPeer()}); if (status == NYdbGrpc::EQueueEventStatus::ERROR) { // Don't bother registering if accept failed @@ -240,8 +241,9 @@ private: if (!Server->RegisterRequestCtx(this)) { // It's unsafe to send replies, so just cancel the request - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] dropping request Name# %s due to shutdown", - this, Name); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Dropping request due to shutdown", + {"this", static_cast<void*>(this)}, + {"name", Name}); this->Context.TryCancel(); return; } @@ -265,10 +267,11 @@ private: } void OnDone(NYdbGrpc::EQueueEventStatus status) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream done notification Name# %s ok# %s peer# %s", - this, Name, - status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Stream done notification", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"}, + {"peer", this->GetPeer()}); bool success = status == NYdbGrpc::EQueueEventStatus::OK; @@ -286,9 +289,10 @@ private: } void Cancel() { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade cancel Name# %s peer# %s", - this, Name, - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade cancel", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"peer", this->GetPeer()}); this->Context.TryCancel(); } @@ -298,10 +302,11 @@ private: } void Attach(TActorId actor) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade attach Name# %s actor# %s peer# %s", - this, Name, - actor.ToString().c_str(), - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade attach", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"actor", actor}, + {"peer", this->GetPeer()}); auto guard = SingleThreaded.Enforce(); @@ -323,9 +328,10 @@ private: } bool Read() { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade read Name# %s peer# %s", - this, Name, - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade read", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"peer", this->GetPeer()}); auto guard = SingleThreaded.Enforce(); @@ -349,11 +355,12 @@ private: } void OnReadDone(NYdbGrpc::EQueueEventStatus status) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] read finished Name# %s ok# %s data# %s peer# %s", - this, Name, - status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK).c_str(), - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Read finished", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"}, + {"readInProgress", NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK)}, + {"peer", this->GetPeer()}); // Take current in-progress read first auto read = std::move(ReadInProgress); @@ -375,8 +382,10 @@ private: while ((flags & FlagRegistered) && (flags & FlagFinishDone) && ReadQueue.load() == 0) { Y_DEBUG_ABORT_UNLESS(flags & FlagFinishCalled); if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (read done)", - this, Name, this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Deregistering request (read done)", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"peer", this->GetPeer()}); Server->DeregisterRequestCtx(this); break; } @@ -391,17 +400,19 @@ private: bool Write(TOut&& message, const grpc::WriteOptions& options = { }, const grpc::Status* status = nullptr) { if (status) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s grpc status# (%d) message# %s", - this, Name, - NYdbGrpc::FormatMessage<TOut>(message).c_str(), - this->GetPeer().c_str(), - static_cast<int>(status->error_code()), - status->error_message().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade write grpc", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"message", NYdbGrpc::FormatMessage<TOut>(message)}, + {"peer", this->GetPeer()}, + {"errorCode", static_cast<int>(status->error_code())}, + {"errorMessage", status->error_message()}); } else { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s", - this, Name, - NYdbGrpc::FormatMessage<TOut>(message).c_str(), - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade write", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"message", NYdbGrpc::FormatMessage<TOut>(message)}, + {"peer", this->GetPeer()}); } Y_ABORT_UNLESS(!options.is_corked(), @@ -453,10 +464,11 @@ private: } void OnWriteDone(NYdbGrpc::EQueueEventStatus status) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] write finished Name# %s ok# %s peer# %s", - this, Name, - status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Write finished", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"}, + {"peer", this->GetPeer()}); auto event = MakeHolder<typename IContext::TEvWriteFinished>(); event->Success = status == NYdbGrpc::EQueueEventStatus::OK; @@ -507,11 +519,12 @@ private: } bool Finish(const grpc::Status& status) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade finish Name# %s peer# %s grpc status# (%d) message# %s", - this, Name, - this->GetPeer().c_str(), - static_cast<int>(status.error_code()), - status.error_message().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade finish grpc", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"peer", this->GetPeer()}, + {"errorCode", static_cast<int>(status.error_code())}, + {"errorMessage", status.error_message()}); return FinishInternal(status); } @@ -542,12 +555,13 @@ private: } void OnFinishDone(NYdbGrpc::EQueueEventStatus status) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream finished Name# %s ok# %s peer# %s grpc status# (%d) message# %s", - this, Name, - status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeer().c_str(), - static_cast<int>(Status->error_code()), - Status->error_message().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Stream finished grpc", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"}, + {"peer", this->GetPeer()}, + {"errorCode", static_cast<int>(Status->error_code())}, + {"errorMessage", Status->error_message()}); auto flags = (Flags |= FlagFinishDone); Y_ABORT_UNLESS(flags & FlagFinishCalled); @@ -579,8 +593,10 @@ private: while ((flags & FlagRegistered) && ReadQueue.load() == 0) { if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) { - LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (finish done)", - this, Name, this->GetPeer().c_str()); + YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Deregistering request (finish done)", + {"this", static_cast<void*>(this)}, + {"name", Name}, + {"peer", this->GetPeer()}); Server->DeregisterRequestCtx(this); break; } diff --git a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp index e50dc1f5dfa..90bf48eeb2b 100644 --- a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp +++ b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp @@ -6,6 +6,8 @@ #include <library/cpp/testing/unittest/registar.h> +#define YDB_LOG_THIS_FILE_COMPONENT NKikimrServices::GRPC_SERVER + namespace NKikimr { namespace NGRpcServer { namespace NTest { @@ -139,7 +141,8 @@ public: } void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished", + {"success", ev->Get()->Success}); Context->Write(MakeResponse(ev->Get()->Record.GetRequestCookie())); Context->Finish(grpc::Status::OK); PassAway(); @@ -178,20 +181,22 @@ public: } void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished", + {"success", ev->Get()->Success}); Y_ABORT_UNLESS(!ev->Get()->Success, "Unexpected read success"); Step(); } void Handle(IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvWriteFinished, success = " << ev->Get()->Success); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvWriteFinished", + {"success", ev->Get()->Success}); Y_ABORT_UNLESS(ev->Get()->Success, "Unexpected write failure"); Step(); } void Handle(IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ev); - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvNotifiedWhenDone"); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvNotifiedWhenDone"); Step(); } @@ -227,7 +232,7 @@ public: void Handle(IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ev); - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvNotifiedWhenDone"); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvNotifiedWhenDone"); ActorFinished.Signal(); PassAway(); } @@ -262,7 +267,8 @@ public: } void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished", + {"success", ev->Get()->Success}); if (++Counter == 1) { ActorFinished.Signal(); PassAway(); @@ -299,7 +305,8 @@ public: } void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished", + {"success", ev->Get()->Success}); Y_ABORT_UNLESS(ev->Get()->Success == false, "Unexpected Read success"); ReadFinished.Signal(); @@ -309,7 +316,7 @@ public: void Handle(IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ev); - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvNotifiedWhenDone"); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvNotifiedWhenDone"); PassAway(); } @@ -344,7 +351,8 @@ public: } void Handle(IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvWriteFinished, success = " << ev->Get()->Success); + YDB_LOG_DEBUG_CTX(ctx, "Received TEvWriteFinished", + {"success", ev->Get()->Success}); if (++Counter == 2) { PassAway(); } |
