diff options
author | gvit <gvit@ydb.tech> | 2022-11-06 23:57:21 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-11-06 23:57:21 +0300 |
commit | 49e3aff3ee72c3d77aa4c79aeb1cb8eb6ac0e514 (patch) | |
tree | 3e299e5f127234278b88f4a502cc066b9742379a | |
parent | 14c3f596d6c1473e1fbfbc2c8beafcfe7827f345 (diff) | |
download | ydb-49e3aff3ee72c3d77aa4c79aeb1cb8eb6ac0e514.tar.gz |
cleanup session actor logging process
add log prefix to each session actor logging message
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 215 |
1 files changed, 95 insertions, 120 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index e193c39052..926c6d9d49 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -46,24 +46,22 @@ using namespace NYql; namespace { -#define LOG_C(msg) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) -#define LOG_E(msg) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) -#define LOG_W(msg) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) -#define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) -#define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) -#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) -#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_C(msg) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg) +#define LOG_E(msg) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg) +#define LOG_W(msg) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg) +#define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg) +#define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg) +#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg) +#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, LogPrefix() << msg) class TRequestFail : public yexception { public: - TKqpRequestInfo RequestInfo; Ydb::StatusIds::StatusCode Status; std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> Issues; - TRequestFail(TKqpRequestInfo info, Ydb::StatusIds::StatusCode status, + TRequestFail(Ydb::StatusIds::StatusCode status, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {}) - : RequestInfo(info) - , Status(status) + : Status(status) , Issues(std::move(issues)) {} }; @@ -180,7 +178,7 @@ public: } void Bootstrap() { - LOG_D("SessonActor bootstrapped, workerId: " << SelfId()); + LOG_D("session actor bootstrapped"); Counters->ReportSessionActorCreated(Settings.DbCounters); CreationTime = TInstant::Now(); @@ -190,6 +188,16 @@ public: StartIdleTimer(); } + TString LogPrefix() const { + TStringBuilder result = TStringBuilder() + << "SessionId: " << SessionId << ", " + << "WorkerId: " << SelfId() << ", "; + if (Y_LIKELY(QueryState)) { + result << "TraceId: " << QueryState->TraceId << ", "; + } + return result; + } + NYql::TKikimrQueryDeadlines GetQueryDeadlines(const NKikimrKqp::TQueryRequest& queryRequest) { NYql::TKikimrQueryDeadlines res; @@ -244,7 +252,7 @@ public: } } - void RollbackTx(const TKqpRequestInfo& requestInfo) { + void RollbackTx() { auto& queryRequest = QueryState->Request; YQL_ENSURE(queryRequest.HasTxControl(), "Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest"); @@ -256,7 +264,7 @@ public: if (!txCtx) { std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() << "Transaction not found: " << txControl.tx_id())}; - ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); + ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); } else { txCtx->Invalidate(); InvalidateExplicitTransaction(txCtx, txId); @@ -283,8 +291,7 @@ public: if (!txCtx) { std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() << "Transaction not found: " << txControl.tx_id())}; - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); + ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); return; } QueryState->TxCtx = std::move(txCtx); @@ -323,14 +330,12 @@ public: void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) { ui64 proxyRequestId = ev->Cookie; auto& event = ev->Get()->Record; - auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId()); - YQL_ENSURE(requestInfo.GetSessionId() == SessionId, - "Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId()); + YQL_ENSURE(event.GetRequest().GetSessionId() == SessionId, + "Invalid session, expected: " << SessionId << ", got: " << event.GetRequest().GetSessionId()); if (ShutdownState && ShutdownState->SoftTimeoutReached()) { // we reached the soft timeout, so at this point we don't allow to accept new queries for session. - LOG_N(TKqpRequestInfo("", SessionId) - << "System shutdown requested: soft timeout reached, no queries can be accepted. Closing session."); + LOG_N("system shutdown requested: soft timeout reached, no queries can be accepted"); ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown"); FinalCleanup(); return; @@ -348,7 +353,7 @@ public: auto action = queryRequest.GetAction(); auto id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>()); - LOG_I("Wilson Tracing started, id: " + std::to_string(id.GetTraceId())); + LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId())); QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); LWTRACK(KqpSessionQueryRequest, @@ -357,29 +362,29 @@ public: queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED, action, queryRequest.GetQuery()); - LOG_D(requestInfo << "Received request," - << " selfId : " << SelfId() - << " proxyRequestId: " << proxyRequestId - << " prepared: " << queryRequest.HasPreparedQuery() - << " tx_control: " << queryRequest.HasTxControl() - << " action: " << action - << " type: " << (queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED) - << " text: " << (queryRequest.HasQuery() ? queryRequest.GetQuery() : "") - ); QueryState->Sender = ev->Sender; QueryState->ProxyRequestId = proxyRequestId; - QueryState->TraceId = requestInfo.GetTraceId(); + QueryState->TraceId = event.GetTraceId(); QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(event.GetRequestType()); QueryState->StartTime = TInstant::Now(); QueryState->UserToken = event.GetUserToken(); QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest); + LOG_D("received request," + << " proxyRequestId: " << proxyRequestId + << " prepared: " << queryRequest.HasPreparedQuery() + << " tx_control: " << queryRequest.HasTxControl() + << " action: " << action + << " type: " << (queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED) + << " text: " << (queryRequest.HasQuery() ? queryRequest.GetQuery() : "") + ); + if (!queryRequest.HasParameters() && queryRequest.YdbParametersSize()) { try { ConvertYdbParamsToMiniKQLParams(queryRequest.GetYdbParameters(), *queryRequest.MutableParameters()); } catch (const std::exception& ex) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Failed to parse query parameters. "<< ex.what(); } } @@ -398,7 +403,7 @@ public: if (action == NKikimrKqp::QUERY_ACTION_PREPARE) { if (QueryState->KeepSession && !Settings.LongSession) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Expected KeepSession=false for non-execute requests"; } } @@ -420,7 +425,7 @@ public: return; } case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: { - RollbackTx(requestInfo); + RollbackTx(); return; } case NKikimrKqp::QUERY_ACTION_COMMIT_TX: @@ -446,9 +451,7 @@ public: void AddOffsetsToTransaction(const NActors::TActorContext& ctx) { YQL_ENSURE(QueryState); - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - - if (!PrepareQueryTransaction(requestInfo)) { + if (!PrepareQueryTransaction()) { return; } @@ -627,7 +630,7 @@ public: } void AcquireMvccSnapshot() { - LOG_D("AcquireMvccSnapshot"); + LOG_D("acquire mvcc snapshot"); auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now(); auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout); @@ -642,10 +645,9 @@ public: auto *response = ev->Get(); if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); auto& issues = response->Issues; - LOG_E(requestInfo << "Failed to acquire snapshot: " << issues.ToString()); - ReplyQueryError(requestInfo, GetYdbStatus(issues), "", MessageFromIssues(issues)); + LOG_E("failed to acquire snapshot: " << issues.ToString()); + ReplyQueryError(GetYdbStatus(issues), "", MessageFromIssues(issues)); return; } QueryState->TxCtx->SnapshotHandle.Snapshot = response->Snapshot; @@ -664,11 +666,10 @@ public: ExplicitTransactions.Erase(it); ++EvictedTx; } else { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); std::vector<TIssue> issues{ YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS) }; - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_SESSION, MessageFromIssues(issues)) + ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION, MessageFromIssues(issues)) << "Too many transactions, current active: " << ExplicitTransactions.Size() << " MaxTxPerSession: " << *Config->_KqpMaxActiveTxPerSession.Get(); } @@ -702,7 +703,7 @@ public: return {success, ctx.IssueManager.GetIssues()}; } - bool PrepareQueryTransaction(const TKqpRequestInfo& requestInfo) { + bool PrepareQueryTransaction() { auto& queryRequest = QueryState->Request; if (queryRequest.HasTxControl()) { @@ -717,7 +718,7 @@ public: if (it == ExplicitTransactions.End()) { std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() << "Transaction not found: " << txControl.tx_id())}; - ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); + ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); return false; } QueryState->TxCtx = *it; @@ -730,7 +731,7 @@ public: break; } case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "wrong TxControl: tx_selector must be set"; break; } @@ -744,9 +745,7 @@ public: bool PrepareQueryContext() { YQL_ENSURE(QueryState); - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - - if (!PrepareQueryTransaction(requestInfo)) { + if (!PrepareQueryTransaction()) { return false; } @@ -758,7 +757,7 @@ public: auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery); if (!success) { YQL_ENSURE(!issues.Empty()); - ReplyQueryError(requestInfo, GetYdbStatus(issues), "", MessageFromIssues(issues)); + ReplyQueryError(GetYdbStatus(issues), "", MessageFromIssues(issues)); return false; } @@ -867,7 +866,6 @@ public: NKikimrMiniKQL::TParams* ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) { auto& txCtx = QueryState->TxCtx; YQL_ENSURE(txCtx); - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); auto parameter = QueryState->Parameters.FindPtr(name); if (!parameter) { if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) { @@ -878,12 +876,12 @@ public: return &newParameter; } - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Missing value for parameter: " << name; + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Missing value for parameter: " << name; return nullptr; } if (!IsSameType(parameter->GetType(), type)) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name << " type mismatch, expected: " << type << ", actual: " << parameter->GetType(); } @@ -904,8 +902,7 @@ public: *GetParamValue(/*ensure*/ true, *txCtx, QueryState->Parameters, QueryState->TxResults, paramBinding)); YQL_ENSURE(it.second); } catch (const yexception& ex) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << ex.what(); + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); } } return paramsMap; @@ -972,12 +969,11 @@ public: bool CheckTransacionLocks() { auto& txCtx = *QueryState->TxCtx; - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { std::vector<TIssue> issues{ YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated.") }; - ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "tx has deferred effects, but lock is broken", + ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has deferred effects, but lock is broken", MessageFromIssues(issues)); return false; } @@ -991,11 +987,10 @@ public: return true; } - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); std::vector<TIssue> issues { YqlIssue({}, TIssuesIds::KIKIMR_BAD_REQUEST, "Incorrect offset ranges in the transaction.") }; - ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "incorrect offset ranges in the tx", + ReplyQueryError(Ydb::StatusIds::ABORTED, "incorrect offset ranges in the tx", MessageFromIssues(issues)); return false; @@ -1013,7 +1008,6 @@ public: return; } - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); std::shared_ptr<const NKqpProto::TKqpPhyTx> tx; @@ -1023,7 +1017,7 @@ public: while (tx && tx->GetHasEffects()) { if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Failed to mix queries with old- and new- engines"; } LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); @@ -1196,8 +1190,7 @@ public: auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx); if (!success) { if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", + ReplyQueryError(Ydb::StatusIds::ABORTED, "Error while locks merge", MessageFromIssues(issues)); return false; } @@ -1225,16 +1218,14 @@ public: QueryState->Orbit = std::move(ev->Get()->Orbit); auto* response = ev->Get()->Record.MutableResponse(); - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - LOG_D(SelfId() << " " << requestInfo << " TEvTxResponse, CurrentTx: " << QueryState->CurrentTx + LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0) << " response.status: " << response->GetStatus() << " results.size: " << response->GetResult().ResultsSize()); ExecuterId = TActorId{}; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - LOG_I(SelfId() << " " << requestInfo << " TEvTxResponse has non-success status, CurrentTx: " - << QueryState->CurrentTx << " response->DebugString(): " << response->DebugString()); + LOG_I("TEvTxResponse has non-success status, CurrentTx: " + << QueryState->CurrentTx << " response->ShortDebugString(): " << response->ShortDebugString()); QueryState->TxCtx->Invalidate(); @@ -1262,7 +1253,7 @@ public: break; } - ReplyQueryError(requestInfo, status, "", MessageFromIssues(issues)); + ReplyQueryError(status, "", MessageFromIssues(issues)); return; } @@ -1308,8 +1299,7 @@ public: const auto& issues = ev->Get()->GetIssues(); LOG_I("Got TEvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())); - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "Got AbortExecution", MessageFromIssues(issues)); + ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "Got AbortExecution", MessageFromIssues(issues)); } TString ExtractQueryText() const { @@ -1383,7 +1373,7 @@ public: template<class TEvRecord> void AddTrailingInfo(TEvRecord& record) { if (ShutdownState) { - LOG_D("Session [" << SessionId << "] is closing, set trailing metadata to request session shutdown"); + LOG_D("session is closing, set trailing metadata to request session shutdown"); record.SetWorkerIsClosing(true); } } @@ -1399,7 +1389,7 @@ public: if (QueryState->TxCtx) { auto txInfo = QueryState->TxCtx->GetInfo(); - LOG_I("txInfo" + LOG_I("txInfo " << " Status: " << txInfo.Status << " Kind: " << txInfo.Kind << " TotalDuration: " << txInfo.TotalDuration.SecondsFloat()*1e3 @@ -1543,8 +1533,7 @@ public: } resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS); - LOG_D(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId) - << " Create QueryResponse for action: " << queryRequest.GetAction() << " with SUCCESS status"); + LOG_D("Create QueryResponse for action: " << queryRequest.GetAction() << " with SUCCESS status"); QueryResponse = std::move(resEv); @@ -1567,7 +1556,7 @@ public: } } - LOG_W("ReplyQueryCompileError, status" << compileResult->Status << " remove tx with tx_id: " << txId_Human); + LOG_W("ReplyQueryCompileError, status " << compileResult->Status << " remove tx with tx_id: " << txId_Human); auto txCtx = FindTransaction(txId); if (txCtx) { txCtx->Invalidate(); @@ -1586,7 +1575,7 @@ public: void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus, const TString& message) { - LOG_W(TKqpRequestInfo("", SessionId) << "Reply process error, msg: " << message); + LOG_W("Reply process error, msg: " << message); auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message); @@ -1595,12 +1584,7 @@ public: } void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { - - auto& event = ev->Get()->Record; - auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId()); - ui64 proxyRequestId = ev->Cookie; - auto busyStatus = Settings.Service.GetUseSessionBusyStatus() ? Ydb::StatusIds::SESSION_BUSY : Ydb::StatusIds::PRECONDITION_FAILED; @@ -1620,8 +1604,6 @@ public: void Reply() { YQL_ENSURE(QueryState); - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - YQL_ENSURE(Counters); auto& record = QueryResponse->Record.GetRef(); @@ -1646,11 +1628,11 @@ public: LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status); } Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId); - LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId + LOG_D("Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId << ", proxyId: " << QueryState->Sender.ToString()); if (IsFatalError(status)) { - LOG_N(requestInfo << "SessionActor destroyed due to " << status); + LOG_N("SessionActor destroyed due to " << status); Counters->ReportSessionActorClosedError(Settings.DbCounters); } } @@ -1680,11 +1662,6 @@ public: void Handle(TEvKqp::TEvPingSessionRequest::TPtr& ev) { ui64 proxyRequestId = ev->Cookie; - auto& evRecord = ev->Get()->Record; - auto requestInfo = TKqpRequestInfo(evRecord.GetTraceId(), evRecord.GetRequest().GetSessionId()); - YQL_ENSURE(requestInfo.GetSessionId() == SessionId, - "Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId()); - auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>(); auto& record = result->Record; record.SetStatus(Ydb::StatusIds::SUCCESS); @@ -1692,20 +1669,22 @@ public: ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY; record.MutableResponse()->SetSessionStatus(sessionStatus); - StartIdleTimer(); + if (CurrentStateFunc() == &TThis::ReadyState) { + StartIdleTimer(); + } Send(ev->Sender, result.release(), 0, proxyRequestId); } void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr&) { - LOG_I(TKqpRequestInfo("", SessionId) << "Session closed due to explicit close event"); + LOG_I("Session closed due to explicit close event"); Counters->ReportSessionActorClosedRequest(Settings.DbCounters); FinalCleanup(); } void HandleCompile(TEvKqp::TEvCloseSessionRequest::TPtr&) { YQL_ENSURE(QueryState); - ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION, + ReplyQueryError(Ydb::StatusIds::BAD_SESSION, "Request cancelled due to explicit session close request"); Counters->ReportSessionActorClosedRequest(Settings.DbCounters); } @@ -1725,8 +1704,7 @@ public: void Handle(TEvKqp::TEvInitiateSessionShutdown::TPtr& ev) { if (!ShutdownState) { - LOG_N("Started session shutdown " << TKqpRequestInfo("", SessionId)); - + LOG_N("Started session shutdown"); ShutdownState = TSessionShutdownState(ev->Get()->SoftTimeoutMs, ev->Get()->HardTimeoutMs); ScheduleNextShutdownTick(); } @@ -1740,11 +1718,11 @@ public: YQL_ENSURE(ShutdownState); ShutdownState->MoveToNextState(); if (ShutdownState->HardTimeoutReached()) { - LOG_N("Reached hard shutdown timeout " << TKqpRequestInfo("", SessionId)); + LOG_N("Reached hard shutdown timeout"); Send(SelfId(), new TEvKqp::TEvCloseSessionRequest()); } else { ScheduleNextShutdownTick(); - LOG_I("Schedule next shutdown tick " << TKqpRequestInfo("", SessionId)); + LOG_I("Schedule next shutdown tick"); } } @@ -1771,7 +1749,7 @@ public: auto timerId = ev->Get()->TimerId; if (timerId == IdleTimerId) { - LOG_N(TKqpRequestInfo("", SessionId) << "SessionActor idle timeout, worker destroyed"); + LOG_N("SessionActor idle timeout, worker destroyed"); Counters->ReportSessionActorClosedIdle(Settings.DbCounters); FinalCleanup(); } @@ -1854,8 +1832,7 @@ public: SendRollbackRequest(TransactionsToBeAborted.front().Get()); } - LOG_I(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId) - << " Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx} + LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx} << " TransactionsToBeAborted.size(): " << TransactionsToBeAborted.size()); if (CleanupCtx) { Become(&TKqpSessionActor::CleanupState); @@ -1880,7 +1857,7 @@ public: if (response.GetStatus() != Ydb::StatusIds::SUCCESS) { TIssues issues; IssuesFromMessage(response.GetIssues(), issues); - LOG_E(TKqpRequestInfo("", SessionId) << "Failed to cleanup: " << issues.ToString()); + LOG_E("Failed to cleanup: " << issues.ToString()); EndCleanup(CleanupCtx->Final); return; } @@ -1897,7 +1874,7 @@ public: } void EndCleanup(bool isFinal) { - LOG_D(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId) << "EndCleanup, isFinal: " << isFinal); + LOG_D("EndCleanup, isFinal: " << isFinal); if (QueryResponse) Reply(); @@ -1916,7 +1893,7 @@ public: closeEv->Record.MutableResponse()->SetClosed(true); Send(Owner, closeEv.release()); - LOG_D(TKqpRequestInfo(QueryState ? QueryState->TraceId : "", SessionId) << " session actor destroyed"); + LOG_D("Session actor destroyed"); PassAway(); } else { TransactionsToBeAborted.clear(); @@ -1936,10 +1913,10 @@ public: return issueMessage; } - void ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus, + void ReplyQueryError(Ydb::StatusIds::StatusCode ydbStatus, const TString& message, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {}) { - LOG_W("Create QueryResponse for error on request: " << requestInfo << " msg: " << message); + LOG_W("Create QueryResponse for error on request, msg: " << message); QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>(); QueryResponse->Record.GetRef().SetYdbStatus(ydbStatus); @@ -1985,7 +1962,7 @@ public: UnexpectedEvent("ReadyState", ev); } } catch (const TRequestFail& ex) { - ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues); + ReplyQueryError(ex.Status, ex.what(), ex.Issues); } catch (const yexception& ex) { InternalError(ex.what()); } @@ -2006,7 +1983,7 @@ public: UnexpectedEvent("CompileState", ev); } } catch (const TRequestFail& ex) { - ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues); + ReplyQueryError(ex.Status, ex.what(), ex.Issues); } catch (const yexception& ex) { InternalError(ex.what()); } @@ -2037,7 +2014,7 @@ public: UnexpectedEvent("ExecuteState", ev); } } catch (const TRequestFail& ex) { - ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues); + ReplyQueryError(ex.Status, ex.what(), ex.Issues); } catch (const yexception& ex) { InternalError(ex.what()); } @@ -2084,7 +2061,7 @@ public: UnexpectedEvent("TopicOpsState", ev); } } catch (const TRequestFail& ex) { - ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues); + ReplyQueryError(ex.Status, ex.what(), ex.Issues); } catch (const yexception& ex) { InternalError(ex.what()); } @@ -2097,10 +2074,9 @@ private: } void InternalError(const TString& message) { - LOG_E("Internal error, SelfId: " << SelfId() << ", message: " << message); + LOG_E("Internal error, message: " << message); if (QueryState) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, Ydb::StatusIds::INTERNAL_ERROR, message); + ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR, message); } else { FinalCleanup(); } @@ -2111,28 +2087,27 @@ private: } void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - TKqpRequestInfo requestInfo(QueryState->TraceId, SessionId); NSchemeCache::TSchemeCacheNavigate* response = ev->Get()->Request.Get(); Ydb::StatusIds_StatusCode status; TString message; if (IsAccessDenied(*response, message)) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::UNAUTHORIZED) << message; + ythrow TRequestFail(Ydb::StatusIds::UNAUTHORIZED) << message; } if (HasErrors(*response, message)) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::SCHEME_ERROR) << message; + ythrow TRequestFail(Ydb::StatusIds::SCHEME_ERROR) << message; } QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet, status, message); if (status != Ydb::StatusIds::SUCCESS) { - ythrow TRequestFail(requestInfo, status) << message; + ythrow TRequestFail(status) << message; } if (!TryMergeTopicOffsets(QueryState->TopicOperations, message)) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << message; + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message; } ReplySuccess(); @@ -2201,7 +2176,7 @@ private: void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) { YQL_ENSURE(QueryState); - ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION, + ReplyQueryError(Ydb::StatusIds::BAD_SESSION, "Request cancelled due to explicit session close request"); Counters->ReportSessionActorClosedRequest(Settings.DbCounters); } |