summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkseleznyov <[email protected]>2026-06-25 12:05:17 +0300
committerGitHub <[email protected]>2026-06-25 12:05:17 +0300
commitbf608a2629f4e733fa349dcd1f733f9c5c19c041 (patch)
treef38be1c83fe348b2e5e20a75d899397b0277621d
parent6dc8624ca482fc44de6b7132006ead8ece9c017b (diff)
[YDB_LOG] Migrate ydb/core/grpc_streaming (#43273)
-rw-r--r--ydb/core/grpc_streaming/grpc_streaming.h124
-rw-r--r--ydb/core/grpc_streaming/grpc_streaming_ut.cpp26
2 files changed, 87 insertions, 63 deletions
diff --git a/ydb/core/grpc_streaming/grpc_streaming.h b/ydb/core/grpc_streaming/grpc_streaming.h
index 8e32f05d060..dfa1106288b 100644
--- a/ydb/core/grpc_streaming/grpc_streaming.h
+++ b/ydb/core/grpc_streaming/grpc_streaming.h
@@ -224,10 +224,11 @@ private:
return;
}
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream accepted Name# %s ok# %s peer# %s",
- this, Name,
- status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Stream accepted",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"},
+ {"peer", this->GetPeer()});
if (status == NYdbGrpc::EQueueEventStatus::ERROR) {
// Don't bother registering if accept failed
@@ -240,8 +241,9 @@ private:
if (!Server->RegisterRequestCtx(this)) {
// It's unsafe to send replies, so just cancel the request
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] dropping request Name# %s due to shutdown",
- this, Name);
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Dropping request due to shutdown",
+ {"this", static_cast<void*>(this)},
+ {"name", Name});
this->Context.TryCancel();
return;
}
@@ -265,10 +267,11 @@ private:
}
void OnDone(NYdbGrpc::EQueueEventStatus status) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream done notification Name# %s ok# %s peer# %s",
- this, Name,
- status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Stream done notification",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"},
+ {"peer", this->GetPeer()});
bool success = status == NYdbGrpc::EQueueEventStatus::OK;
@@ -286,9 +289,10 @@ private:
}
void Cancel() {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade cancel Name# %s peer# %s",
- this, Name,
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade cancel",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"peer", this->GetPeer()});
this->Context.TryCancel();
}
@@ -298,10 +302,11 @@ private:
}
void Attach(TActorId actor) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade attach Name# %s actor# %s peer# %s",
- this, Name,
- actor.ToString().c_str(),
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade attach",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"actor", actor},
+ {"peer", this->GetPeer()});
auto guard = SingleThreaded.Enforce();
@@ -323,9 +328,10 @@ private:
}
bool Read() {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade read Name# %s peer# %s",
- this, Name,
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade read",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"peer", this->GetPeer()});
auto guard = SingleThreaded.Enforce();
@@ -349,11 +355,12 @@ private:
}
void OnReadDone(NYdbGrpc::EQueueEventStatus status) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] read finished Name# %s ok# %s data# %s peer# %s",
- this, Name,
- status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK).c_str(),
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Read finished",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"},
+ {"readInProgress", NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK)},
+ {"peer", this->GetPeer()});
// Take current in-progress read first
auto read = std::move(ReadInProgress);
@@ -375,8 +382,10 @@ private:
while ((flags & FlagRegistered) && (flags & FlagFinishDone) && ReadQueue.load() == 0) {
Y_DEBUG_ABORT_UNLESS(flags & FlagFinishCalled);
if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (read done)",
- this, Name, this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Deregistering request (read done)",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"peer", this->GetPeer()});
Server->DeregisterRequestCtx(this);
break;
}
@@ -391,17 +400,19 @@ private:
bool Write(TOut&& message, const grpc::WriteOptions& options = { }, const grpc::Status* status = nullptr) {
if (status) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s grpc status# (%d) message# %s",
- this, Name,
- NYdbGrpc::FormatMessage<TOut>(message).c_str(),
- this->GetPeer().c_str(),
- static_cast<int>(status->error_code()),
- status->error_message().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade write grpc",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"message", NYdbGrpc::FormatMessage<TOut>(message)},
+ {"peer", this->GetPeer()},
+ {"errorCode", static_cast<int>(status->error_code())},
+ {"errorMessage", status->error_message()});
} else {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s",
- this, Name,
- NYdbGrpc::FormatMessage<TOut>(message).c_str(),
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade write",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"message", NYdbGrpc::FormatMessage<TOut>(message)},
+ {"peer", this->GetPeer()});
}
Y_ABORT_UNLESS(!options.is_corked(),
@@ -453,10 +464,11 @@ private:
}
void OnWriteDone(NYdbGrpc::EQueueEventStatus status) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] write finished Name# %s ok# %s peer# %s",
- this, Name,
- status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Write finished",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"},
+ {"peer", this->GetPeer()});
auto event = MakeHolder<typename IContext::TEvWriteFinished>();
event->Success = status == NYdbGrpc::EQueueEventStatus::OK;
@@ -507,11 +519,12 @@ private:
}
bool Finish(const grpc::Status& status) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade finish Name# %s peer# %s grpc status# (%d) message# %s",
- this, Name,
- this->GetPeer().c_str(),
- static_cast<int>(status.error_code()),
- status.error_message().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Facade finish grpc",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"peer", this->GetPeer()},
+ {"errorCode", static_cast<int>(status.error_code())},
+ {"errorMessage", status.error_message()});
return FinishInternal(status);
}
@@ -542,12 +555,13 @@ private:
}
void OnFinishDone(NYdbGrpc::EQueueEventStatus status) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream finished Name# %s ok# %s peer# %s grpc status# (%d) message# %s",
- this, Name,
- status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeer().c_str(),
- static_cast<int>(Status->error_code()),
- Status->error_message().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Stream finished grpc",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"statusOk", status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false"},
+ {"peer", this->GetPeer()},
+ {"errorCode", static_cast<int>(Status->error_code())},
+ {"errorMessage", Status->error_message()});
auto flags = (Flags |= FlagFinishDone);
Y_ABORT_UNLESS(flags & FlagFinishCalled);
@@ -579,8 +593,10 @@ private:
while ((flags & FlagRegistered) && ReadQueue.load() == 0) {
if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) {
- LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (finish done)",
- this, Name, this->GetPeer().c_str());
+ YDB_LOG_DEBUG_CTX_COMP(ActorSystem, LoggerServiceId, "Deregistering request (finish done)",
+ {"this", static_cast<void*>(this)},
+ {"name", Name},
+ {"peer", this->GetPeer()});
Server->DeregisterRequestCtx(this);
break;
}
diff --git a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
index e50dc1f5dfa..90bf48eeb2b 100644
--- a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
+++ b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
@@ -6,6 +6,8 @@
#include <library/cpp/testing/unittest/registar.h>
+#define YDB_LOG_THIS_FILE_COMPONENT NKikimrServices::GRPC_SERVER
+
namespace NKikimr {
namespace NGRpcServer {
namespace NTest {
@@ -139,7 +141,8 @@ public:
}
void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success);
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished",
+ {"success", ev->Get()->Success});
Context->Write(MakeResponse(ev->Get()->Record.GetRequestCookie()));
Context->Finish(grpc::Status::OK);
PassAway();
@@ -178,20 +181,22 @@ public:
}
void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success);
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished",
+ {"success", ev->Get()->Success});
Y_ABORT_UNLESS(!ev->Get()->Success, "Unexpected read success");
Step();
}
void Handle(IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvWriteFinished, success = " << ev->Get()->Success);
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvWriteFinished",
+ {"success", ev->Get()->Success});
Y_ABORT_UNLESS(ev->Get()->Success, "Unexpected write failure");
Step();
}
void Handle(IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvNotifiedWhenDone");
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvNotifiedWhenDone");
Step();
}
@@ -227,7 +232,7 @@ public:
void Handle(IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvNotifiedWhenDone");
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvNotifiedWhenDone");
ActorFinished.Signal();
PassAway();
}
@@ -262,7 +267,8 @@ public:
}
void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success);
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished",
+ {"success", ev->Get()->Success});
if (++Counter == 1) {
ActorFinished.Signal();
PassAway();
@@ -299,7 +305,8 @@ public:
}
void Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvReadFinished, success = " << ev->Get()->Success);
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvReadFinished",
+ {"success", ev->Get()->Success});
Y_ABORT_UNLESS(ev->Get()->Success == false, "Unexpected Read success");
ReadFinished.Signal();
@@ -309,7 +316,7 @@ public:
void Handle(IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvNotifiedWhenDone");
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvNotifiedWhenDone");
PassAway();
}
@@ -344,7 +351,8 @@ public:
}
void Handle(IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::GRPC_SERVER, "Received TEvWriteFinished, success = " << ev->Get()->Success);
+ YDB_LOG_DEBUG_CTX(ctx, "Received TEvWriteFinished",
+ {"success", ev->Get()->Success});
if (++Counter == 2) {
PassAway();
}