aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-11-06 23:57:21 +0300
committergvit <gvit@ydb.tech>2022-11-06 23:57:21 +0300
commit49e3aff3ee72c3d77aa4c79aeb1cb8eb6ac0e514 (patch)
tree3e299e5f127234278b88f4a502cc066b9742379a
parent14c3f596d6c1473e1fbfbc2c8beafcfe7827f345 (diff)
downloadydb-49e3aff3ee72c3d77aa4c79aeb1cb8eb6ac0e514.tar.gz
cleanup session actor logging process
add log prefix to each session actor logging message
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp215
1 files changed, 95 insertions, 120 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index e193c39052..926c6d9d49 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -46,24 +46,22 @@ using namespace NYql;
namespace {
-#define LOG_C(msg) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
-#define LOG_E(msg) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
-#define LOG_W(msg) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
-#define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
-#define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
-#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
-#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_C(msg) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg)
+#define LOG_E(msg) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg)
+#define LOG_W(msg) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg)
+#define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg)
+#define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg)
+#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg)
+#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg)
class TRequestFail : public yexception {
public:
- TKqpRequestInfo RequestInfo;
Ydb::StatusIds::StatusCode Status;
std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> Issues;
- TRequestFail(TKqpRequestInfo info, Ydb::StatusIds::StatusCode status,
+ TRequestFail(Ydb::StatusIds::StatusCode status,
std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {})
- : RequestInfo(info)
- , Status(status)
+ : Status(status)
, Issues(std::move(issues))
{}
};
@@ -180,7 +178,7 @@ public:
}
void Bootstrap() {
- LOG_D("SessonActor bootstrapped, workerId: " << SelfId());
+ LOG_D("session actor bootstrapped");
Counters->ReportSessionActorCreated(Settings.DbCounters);
CreationTime = TInstant::Now();
@@ -190,6 +188,16 @@ public:
StartIdleTimer();
}
+ TString LogPrefix() const {
+ TStringBuilder result = TStringBuilder()
+ << "SessionId: " << SessionId << ", "
+ << "WorkerId: " << SelfId() << ", ";
+ if (Y_LIKELY(QueryState)) {
+ result << "TraceId: " << QueryState->TraceId << ", ";
+ }
+ return result;
+ }
+
NYql::TKikimrQueryDeadlines GetQueryDeadlines(const NKikimrKqp::TQueryRequest& queryRequest) {
NYql::TKikimrQueryDeadlines res;
@@ -244,7 +252,7 @@ public:
}
}
- void RollbackTx(const TKqpRequestInfo& requestInfo) {
+ void RollbackTx() {
auto& queryRequest = QueryState->Request;
YQL_ENSURE(queryRequest.HasTxControl(),
"Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest");
@@ -256,7 +264,7 @@ public:
if (!txCtx) {
std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
TStringBuilder() << "Transaction not found: " << txControl.tx_id())};
- ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
+ ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
} else {
txCtx->Invalidate();
InvalidateExplicitTransaction(txCtx, txId);
@@ -283,8 +291,7 @@ public:
if (!txCtx) {
std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
TStringBuilder() << "Transaction not found: " << txControl.tx_id())};
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
+ ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
return;
}
QueryState->TxCtx = std::move(txCtx);
@@ -323,14 +330,12 @@ public:
void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) {
ui64 proxyRequestId = ev->Cookie;
auto& event = ev->Get()->Record;
- auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId());
- YQL_ENSURE(requestInfo.GetSessionId() == SessionId,
- "Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId());
+ YQL_ENSURE(event.GetRequest().GetSessionId() == SessionId,
+ "Invalid session, expected: " << SessionId << ", got: " << event.GetRequest().GetSessionId());
if (ShutdownState && ShutdownState->SoftTimeoutReached()) {
// we reached the soft timeout, so at this point we don't allow to accept new queries for session.
- LOG_N(TKqpRequestInfo("", SessionId)
- << "System shutdown requested: soft timeout reached, no queries can be accepted. Closing session.");
+ LOG_N("system shutdown requested: soft timeout reached, no queries can be accepted");
ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown");
FinalCleanup();
return;
@@ -348,7 +353,7 @@ public:
auto action = queryRequest.GetAction();
auto id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>());
- LOG_I("Wilson Tracing started, id: " + std::to_string(id.GetTraceId()));
+ LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId()));
QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
LWTRACK(KqpSessionQueryRequest,
@@ -357,29 +362,29 @@ public:
queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED,
action,
queryRequest.GetQuery());
- LOG_D(requestInfo << "Received request,"
- << " selfId : " << SelfId()
- << " proxyRequestId: " << proxyRequestId
- << " prepared: " << queryRequest.HasPreparedQuery()
- << " tx_control: " << queryRequest.HasTxControl()
- << " action: " << action
- << " type: " << (queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED)
- << " text: " << (queryRequest.HasQuery() ? queryRequest.GetQuery() : "")
- );
QueryState->Sender = ev->Sender;
QueryState->ProxyRequestId = proxyRequestId;
- QueryState->TraceId = requestInfo.GetTraceId();
+ QueryState->TraceId = event.GetTraceId();
QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(event.GetRequestType());
QueryState->StartTime = TInstant::Now();
QueryState->UserToken = event.GetUserToken();
QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest);
+ LOG_D("received request,"
+ << " proxyRequestId: " << proxyRequestId
+ << " prepared: " << queryRequest.HasPreparedQuery()
+ << " tx_control: " << queryRequest.HasTxControl()
+ << " action: " << action
+ << " type: " << (queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED)
+ << " text: " << (queryRequest.HasQuery() ? queryRequest.GetQuery() : "")
+ );
+
if (!queryRequest.HasParameters() && queryRequest.YdbParametersSize()) {
try {
ConvertYdbParamsToMiniKQLParams(queryRequest.GetYdbParameters(), *queryRequest.MutableParameters());
} catch (const std::exception& ex) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
<< "Failed to parse query parameters. "<< ex.what();
}
}
@@ -398,7 +403,7 @@ public:
if (action == NKikimrKqp::QUERY_ACTION_PREPARE) {
if (QueryState->KeepSession && !Settings.LongSession) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
<< "Expected KeepSession=false for non-execute requests";
}
}
@@ -420,7 +425,7 @@ public:
return;
}
case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: {
- RollbackTx(requestInfo);
+ RollbackTx();
return;
}
case NKikimrKqp::QUERY_ACTION_COMMIT_TX:
@@ -446,9 +451,7 @@ public:
void AddOffsetsToTransaction(const NActors::TActorContext& ctx) {
YQL_ENSURE(QueryState);
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
-
- if (!PrepareQueryTransaction(requestInfo)) {
+ if (!PrepareQueryTransaction()) {
return;
}
@@ -627,7 +630,7 @@ public:
}
void AcquireMvccSnapshot() {
- LOG_D("AcquireMvccSnapshot");
+ LOG_D("acquire mvcc snapshot");
auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now();
auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout);
@@ -642,10 +645,9 @@ public:
auto *response = ev->Get();
if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
auto& issues = response->Issues;
- LOG_E(requestInfo << "Failed to acquire snapshot: " << issues.ToString());
- ReplyQueryError(requestInfo, GetYdbStatus(issues), "", MessageFromIssues(issues));
+ LOG_E("failed to acquire snapshot: " << issues.ToString());
+ ReplyQueryError(GetYdbStatus(issues), "", MessageFromIssues(issues));
return;
}
QueryState->TxCtx->SnapshotHandle.Snapshot = response->Snapshot;
@@ -664,11 +666,10 @@ public:
ExplicitTransactions.Erase(it);
++EvictedTx;
} else {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
std::vector<TIssue> issues{
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)
};
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_SESSION, MessageFromIssues(issues))
+ ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION, MessageFromIssues(issues))
<< "Too many transactions, current active: " << ExplicitTransactions.Size()
<< " MaxTxPerSession: " << *Config->_KqpMaxActiveTxPerSession.Get();
}
@@ -702,7 +703,7 @@ public:
return {success, ctx.IssueManager.GetIssues()};
}
- bool PrepareQueryTransaction(const TKqpRequestInfo& requestInfo) {
+ bool PrepareQueryTransaction() {
auto& queryRequest = QueryState->Request;
if (queryRequest.HasTxControl()) {
@@ -717,7 +718,7 @@ public:
if (it == ExplicitTransactions.End()) {
std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
TStringBuilder() << "Transaction not found: " << txControl.tx_id())};
- ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
+ ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
return false;
}
QueryState->TxCtx = *it;
@@ -730,7 +731,7 @@ public:
break;
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
<< "wrong TxControl: tx_selector must be set";
break;
}
@@ -744,9 +745,7 @@ public:
bool PrepareQueryContext() {
YQL_ENSURE(QueryState);
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
-
- if (!PrepareQueryTransaction(requestInfo)) {
+ if (!PrepareQueryTransaction()) {
return false;
}
@@ -758,7 +757,7 @@ public:
auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery);
if (!success) {
YQL_ENSURE(!issues.Empty());
- ReplyQueryError(requestInfo, GetYdbStatus(issues), "", MessageFromIssues(issues));
+ ReplyQueryError(GetYdbStatus(issues), "", MessageFromIssues(issues));
return false;
}
@@ -867,7 +866,6 @@ public:
NKikimrMiniKQL::TParams* ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) {
auto& txCtx = QueryState->TxCtx;
YQL_ENSURE(txCtx);
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
auto parameter = QueryState->Parameters.FindPtr(name);
if (!parameter) {
if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) {
@@ -878,12 +876,12 @@ public:
return &newParameter;
}
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Missing value for parameter: " << name;
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Missing value for parameter: " << name;
return nullptr;
}
if (!IsSameType(parameter->GetType(), type)) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name
<< " type mismatch, expected: " << type << ", actual: " << parameter->GetType();
}
@@ -904,8 +902,7 @@ public:
*GetParamValue(/*ensure*/ true, *txCtx, QueryState->Parameters, QueryState->TxResults, paramBinding));
YQL_ENSURE(it.second);
} catch (const yexception& ex) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << ex.what();
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
}
}
return paramsMap;
@@ -972,12 +969,11 @@ public:
bool CheckTransacionLocks() {
auto& txCtx = *QueryState->TxCtx;
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) {
std::vector<TIssue> issues{
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated.")
};
- ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "tx has deferred effects, but lock is broken",
+ ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has deferred effects, but lock is broken",
MessageFromIssues(issues));
return false;
}
@@ -991,11 +987,10 @@ public:
return true;
}
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
std::vector<TIssue> issues {
YqlIssue({}, TIssuesIds::KIKIMR_BAD_REQUEST, "Incorrect offset ranges in the transaction.")
};
- ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "incorrect offset ranges in the tx",
+ ReplyQueryError(Ydb::StatusIds::ABORTED, "incorrect offset ranges in the tx",
MessageFromIssues(issues));
return false;
@@ -1013,7 +1008,6 @@ public:
return;
}
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
std::shared_ptr<const NKqpProto::TKqpPhyTx> tx;
@@ -1023,7 +1017,7 @@ public:
while (tx && tx->GetHasEffects()) {
if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
<< "Failed to mix queries with old- and new- engines";
}
LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
@@ -1196,8 +1190,7 @@ public:
auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx);
if (!success) {
if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge",
+ ReplyQueryError(Ydb::StatusIds::ABORTED, "Error while locks merge",
MessageFromIssues(issues));
return false;
}
@@ -1225,16 +1218,14 @@ public:
QueryState->Orbit = std::move(ev->Get()->Orbit);
auto* response = ev->Get()->Record.MutableResponse();
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- LOG_D(SelfId() << " " << requestInfo << " TEvTxResponse, CurrentTx: " << QueryState->CurrentTx
+ LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx
<< "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0)
<< " response.status: " << response->GetStatus() << " results.size: " << response->GetResult().ResultsSize());
ExecuterId = TActorId{};
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- LOG_I(SelfId() << " " << requestInfo << " TEvTxResponse has non-success status, CurrentTx: "
- << QueryState->CurrentTx << " response->DebugString(): " << response->DebugString());
+ LOG_I("TEvTxResponse has non-success status, CurrentTx: "
+ << QueryState->CurrentTx << " response->ShortDebugString(): " << response->ShortDebugString());
QueryState->TxCtx->Invalidate();
@@ -1262,7 +1253,7 @@ public:
break;
}
- ReplyQueryError(requestInfo, status, "", MessageFromIssues(issues));
+ ReplyQueryError(status, "", MessageFromIssues(issues));
return;
}
@@ -1308,8 +1299,7 @@ public:
const auto& issues = ev->Get()->GetIssues();
LOG_I("Got TEvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()));
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "Got AbortExecution", MessageFromIssues(issues));
+ ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "Got AbortExecution", MessageFromIssues(issues));
}
TString ExtractQueryText() const {
@@ -1383,7 +1373,7 @@ public:
template<class TEvRecord>
void AddTrailingInfo(TEvRecord& record) {
if (ShutdownState) {
- LOG_D("Session [" << SessionId << "] is closing, set trailing metadata to request session shutdown");
+ LOG_D("session is closing, set trailing metadata to request session shutdown");
record.SetWorkerIsClosing(true);
}
}
@@ -1399,7 +1389,7 @@ public:
if (QueryState->TxCtx) {
auto txInfo = QueryState->TxCtx->GetInfo();
- LOG_I("txInfo"
+ LOG_I("txInfo "
<< " Status: " << txInfo.Status
<< " Kind: " << txInfo.Kind
<< " TotalDuration: " << txInfo.TotalDuration.SecondsFloat()*1e3
@@ -1543,8 +1533,7 @@ public:
}
resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS);
- LOG_D(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId)
- << " Create QueryResponse for action: " << queryRequest.GetAction() << " with SUCCESS status");
+ LOG_D("Create QueryResponse for action: " << queryRequest.GetAction() << " with SUCCESS status");
QueryResponse = std::move(resEv);
@@ -1567,7 +1556,7 @@ public:
}
}
- LOG_W("ReplyQueryCompileError, status" << compileResult->Status << " remove tx with tx_id: " << txId_Human);
+ LOG_W("ReplyQueryCompileError, status " << compileResult->Status << " remove tx with tx_id: " << txId_Human);
auto txCtx = FindTransaction(txId);
if (txCtx) {
txCtx->Invalidate();
@@ -1586,7 +1575,7 @@ public:
void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus,
const TString& message)
{
- LOG_W(TKqpRequestInfo("", SessionId) << "Reply process error, msg: " << message);
+ LOG_W("Reply process error, msg: " << message);
auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message);
@@ -1595,12 +1584,7 @@ public:
}
void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) {
-
- auto& event = ev->Get()->Record;
- auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId());
-
ui64 proxyRequestId = ev->Cookie;
-
auto busyStatus = Settings.Service.GetUseSessionBusyStatus()
? Ydb::StatusIds::SESSION_BUSY
: Ydb::StatusIds::PRECONDITION_FAILED;
@@ -1620,8 +1604,6 @@ public:
void Reply() {
YQL_ENSURE(QueryState);
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
-
YQL_ENSURE(Counters);
auto& record = QueryResponse->Record.GetRef();
@@ -1646,11 +1628,11 @@ public:
LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status);
}
Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId);
- LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId
+ LOG_D("Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId
<< ", proxyId: " << QueryState->Sender.ToString());
if (IsFatalError(status)) {
- LOG_N(requestInfo << "SessionActor destroyed due to " << status);
+ LOG_N("SessionActor destroyed due to " << status);
Counters->ReportSessionActorClosedError(Settings.DbCounters);
}
}
@@ -1680,11 +1662,6 @@ public:
void Handle(TEvKqp::TEvPingSessionRequest::TPtr& ev) {
ui64 proxyRequestId = ev->Cookie;
- auto& evRecord = ev->Get()->Record;
- auto requestInfo = TKqpRequestInfo(evRecord.GetTraceId(), evRecord.GetRequest().GetSessionId());
- YQL_ENSURE(requestInfo.GetSessionId() == SessionId,
- "Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId());
-
auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>();
auto& record = result->Record;
record.SetStatus(Ydb::StatusIds::SUCCESS);
@@ -1692,20 +1669,22 @@ public:
? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
: Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY;
record.MutableResponse()->SetSessionStatus(sessionStatus);
- StartIdleTimer();
+ if (CurrentStateFunc() == &TThis::ReadyState) {
+ StartIdleTimer();
+ }
Send(ev->Sender, result.release(), 0, proxyRequestId);
}
void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr&) {
- LOG_I(TKqpRequestInfo("", SessionId) << "Session closed due to explicit close event");
+ LOG_I("Session closed due to explicit close event");
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
FinalCleanup();
}
void HandleCompile(TEvKqp::TEvCloseSessionRequest::TPtr&) {
YQL_ENSURE(QueryState);
- ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION,
+ ReplyQueryError(Ydb::StatusIds::BAD_SESSION,
"Request cancelled due to explicit session close request");
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
}
@@ -1725,8 +1704,7 @@ public:
void Handle(TEvKqp::TEvInitiateSessionShutdown::TPtr& ev) {
if (!ShutdownState) {
- LOG_N("Started session shutdown " << TKqpRequestInfo("", SessionId));
-
+ LOG_N("Started session shutdown");
ShutdownState = TSessionShutdownState(ev->Get()->SoftTimeoutMs, ev->Get()->HardTimeoutMs);
ScheduleNextShutdownTick();
}
@@ -1740,11 +1718,11 @@ public:
YQL_ENSURE(ShutdownState);
ShutdownState->MoveToNextState();
if (ShutdownState->HardTimeoutReached()) {
- LOG_N("Reached hard shutdown timeout " << TKqpRequestInfo("", SessionId));
+ LOG_N("Reached hard shutdown timeout");
Send(SelfId(), new TEvKqp::TEvCloseSessionRequest());
} else {
ScheduleNextShutdownTick();
- LOG_I("Schedule next shutdown tick " << TKqpRequestInfo("", SessionId));
+ LOG_I("Schedule next shutdown tick");
}
}
@@ -1771,7 +1749,7 @@ public:
auto timerId = ev->Get()->TimerId;
if (timerId == IdleTimerId) {
- LOG_N(TKqpRequestInfo("", SessionId) << "SessionActor idle timeout, worker destroyed");
+ LOG_N("SessionActor idle timeout, worker destroyed");
Counters->ReportSessionActorClosedIdle(Settings.DbCounters);
FinalCleanup();
}
@@ -1854,8 +1832,7 @@ public:
SendRollbackRequest(TransactionsToBeAborted.front().Get());
}
- LOG_I(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId)
- << " Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
+ LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
<< " TransactionsToBeAborted.size(): " << TransactionsToBeAborted.size());
if (CleanupCtx) {
Become(&TKqpSessionActor::CleanupState);
@@ -1880,7 +1857,7 @@ public:
if (response.GetStatus() != Ydb::StatusIds::SUCCESS) {
TIssues issues;
IssuesFromMessage(response.GetIssues(), issues);
- LOG_E(TKqpRequestInfo("", SessionId) << "Failed to cleanup: " << issues.ToString());
+ LOG_E("Failed to cleanup: " << issues.ToString());
EndCleanup(CleanupCtx->Final);
return;
}
@@ -1897,7 +1874,7 @@ public:
}
void EndCleanup(bool isFinal) {
- LOG_D(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId) << "EndCleanup, isFinal: " << isFinal);
+ LOG_D("EndCleanup, isFinal: " << isFinal);
if (QueryResponse)
Reply();
@@ -1916,7 +1893,7 @@ public:
closeEv->Record.MutableResponse()->SetClosed(true);
Send(Owner, closeEv.release());
- LOG_D(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId) << " session actor destroyed");
+ LOG_D("Session actor destroyed");
PassAway();
} else {
TransactionsToBeAborted.clear();
@@ -1936,10 +1913,10 @@ public:
return issueMessage;
}
- void ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus,
+ void ReplyQueryError(Ydb::StatusIds::StatusCode ydbStatus,
const TString& message, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {})
{
- LOG_W("Create QueryResponse for error on request: " << requestInfo << " msg: " << message);
+ LOG_W("Create QueryResponse for error on request, msg: " << message);
QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>();
QueryResponse->Record.GetRef().SetYdbStatus(ydbStatus);
@@ -1985,7 +1962,7 @@ public:
UnexpectedEvent("ReadyState", ev);
}
} catch (const TRequestFail& ex) {
- ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues);
+ ReplyQueryError(ex.Status, ex.what(), ex.Issues);
} catch (const yexception& ex) {
InternalError(ex.what());
}
@@ -2006,7 +1983,7 @@ public:
UnexpectedEvent("CompileState", ev);
}
} catch (const TRequestFail& ex) {
- ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues);
+ ReplyQueryError(ex.Status, ex.what(), ex.Issues);
} catch (const yexception& ex) {
InternalError(ex.what());
}
@@ -2037,7 +2014,7 @@ public:
UnexpectedEvent("ExecuteState", ev);
}
} catch (const TRequestFail& ex) {
- ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues);
+ ReplyQueryError(ex.Status, ex.what(), ex.Issues);
} catch (const yexception& ex) {
InternalError(ex.what());
}
@@ -2084,7 +2061,7 @@ public:
UnexpectedEvent("TopicOpsState", ev);
}
} catch (const TRequestFail& ex) {
- ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues);
+ ReplyQueryError(ex.Status, ex.what(), ex.Issues);
} catch (const yexception& ex) {
InternalError(ex.what());
}
@@ -2097,10 +2074,9 @@ private:
}
void InternalError(const TString& message) {
- LOG_E("Internal error, SelfId: " << SelfId() << ", message: " << message);
+ LOG_E("Internal error, message: " << message);
if (QueryState) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, Ydb::StatusIds::INTERNAL_ERROR, message);
+ ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR, message);
} else {
FinalCleanup();
}
@@ -2111,28 +2087,27 @@ private:
}
void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- TKqpRequestInfo requestInfo(QueryState->TraceId, SessionId);
NSchemeCache::TSchemeCacheNavigate* response = ev->Get()->Request.Get();
Ydb::StatusIds_StatusCode status;
TString message;
if (IsAccessDenied(*response, message)) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::UNAUTHORIZED) << message;
+ ythrow TRequestFail(Ydb::StatusIds::UNAUTHORIZED) << message;
}
if (HasErrors(*response, message)) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::SCHEME_ERROR) << message;
+ ythrow TRequestFail(Ydb::StatusIds::SCHEME_ERROR) << message;
}
QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet,
status,
message);
if (status != Ydb::StatusIds::SUCCESS) {
- ythrow TRequestFail(requestInfo, status) << message;
+ ythrow TRequestFail(status) << message;
}
if (!TryMergeTopicOffsets(QueryState->TopicOperations, message)) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << message;
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message;
}
ReplySuccess();
@@ -2201,7 +2176,7 @@ private:
void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) {
YQL_ENSURE(QueryState);
- ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION,
+ ReplyQueryError(Ydb::StatusIds::BAD_SESSION,
"Request cancelled due to explicit session close request");
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
}