diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-03-05 19:31:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-05 16:31:36 +0000 |
commit | 796e7a6782a5eafe54b530c5244aa07c7aff1870 (patch) | |
tree | db80384f34c7c34b5f85ec1547e7c2c81bf253c9 | |
parent | afb7274af78e1095cf4db704449e84c95d4711ae (diff) | |
download | ydb-796e7a6782a5eafe54b530c5244aa07c7aff1870.tar.gz |
Fix error status for Oltp Sink (#15364)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 23 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_common_ut.h | 4 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_ut.cpp | 6 |
5 files changed, 34 insertions, 14 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index aa6a1d1755..b71045ccd3 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -301,6 +301,7 @@ public: try { switch(ev->GetTypeRewrite()) { hFunc(TEvKqp::TEvAbortExecution, HandleFinalize); + hFunc(TEvKqpBuffer::TEvError, Handle); hFunc(TEvKqpBuffer::TEvResult, HandleFinalize); hFunc(TEvents::TEvUndelivered, HandleFinalize); @@ -405,6 +406,7 @@ public: hFunc(TEvSaveScriptExternalEffectResponse, HandleResolve); hFunc(TEvDescribeSecretsResponse, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvKqpBuffer::TEvError, Handle); default: UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); } @@ -456,6 +458,7 @@ private: hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck); hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare); hFunc(TEvKqp::TEvAbortExecution, HandlePrepare); + hFunc(TEvKqpBuffer::TEvError, Handle); hFunc(TEvents::TEvUndelivered, HandleUndelivered); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); @@ -1136,6 +1139,7 @@ private: hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData); hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck); hFunc(TEvKqp::TEvAbortExecution, HandleExecute); + hFunc(TEvKqpBuffer::TEvError, Handle); IgnoreFunc(TEvInterconnect::TEvNodeConnected); default: UnexpectedEvent("ExecuteState", ev->GetTypeRewrite()); @@ -1192,6 +1196,11 @@ private: } } + void Handle(TEvKqpBuffer::TEvError::TPtr& ev) { + auto& msg = *ev->Get(); + TBase::HandleAbortExecution(msg.StatusCode, msg.Issues, false); + } + void HandleExecute(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) { TEvColumnShard::TEvProposeTransactionResult* res = ev->Get(); const ui64 shardId = res->Record.GetOrigin(); @@ -2295,6 +2304,7 @@ private: switch (ev->GetTypeRewrite()) { hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvKqpBuffer::TEvError, Handle); default: UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite()); } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index b768bc2695..0e77538238 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -760,15 +760,22 @@ protected: void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; NYql::TIssues issues = ev->Get()->GetIssues(); - LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) + HandleAbortExecution(msg.GetStatusCode(), ev->Get()->GetIssues(), ev->Sender != Target); + } + + void HandleAbortExecution( + NYql::NDqProto::StatusIds::StatusCode statusCode, + const NYql::TIssues& issues, + const bool sessionSender) { + LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << ", message: " << issues.ToOneLineString()); - auto statusCode = NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()); - if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) { + auto ydbStatusCode = NYql::NDq::DqStatusToYdbStatus(statusCode); + if (ydbStatusCode == Ydb::StatusIds::INTERNAL_ERROR) { InternalError(issues); - } else if (statusCode == Ydb::StatusIds::TIMEOUT) { - TimeoutError(ev->Sender, issues); + } else if (ydbStatusCode == Ydb::StatusIds::TIMEOUT) { + TimeoutError(sessionSender, issues); } else { - RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues); + RuntimeError(NYql::NDq::DqStatusToYdbStatus(statusCode), issues); } } @@ -1891,7 +1898,7 @@ protected: ReplyErrorAndDie(status, &issues); } - void TimeoutError(TActorId abortSender, NYql::TIssues issues) { + void TimeoutError(bool sessionSender, NYql::TIssues issues) { if (AlreadyReplied) { LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl); return; @@ -1915,7 +1922,7 @@ protected: NYql::IssuesToMessage(issues, ResponseEv->Record.MutableResponse()->MutableIssues()); // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). - if (abortSender != Target) { + if (!sessionSender) { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, issues); this->Send(Target, abortEv.Release()); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index ae19bbed36..aaa03f43a4 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1745,7 +1745,7 @@ public: } void Handle(TEvKqpBuffer::TEvError::TPtr& ev) { - const auto& msg = *ev->Get(); + auto& msg = *ev->Get(); TString logMsg = TStringBuilder() << "got TEvKqpBuffer::TEvError in " << CurrentStateFuncName() << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << " send to: " << ExecuterId << " from: " << ev->Sender; @@ -1758,8 +1758,7 @@ public: } if (ExecuterId) { - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.StatusCode, msg.Issues); - Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery); + Send(ExecuterId, new TEvKqpBuffer::TEvError{msg.StatusCode, std::move(msg.Issues)}, IEventHandle::FlagTrackDelivery); } else { ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.StatusCode), logMsg, MessageFromIssues(msg.Issues)); } diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h index 5a00834855..aad0d679a1 100644 --- a/ydb/services/ydb/ydb_common_ut.h +++ b/ydb/services/ydb/ydb_common_ut.h @@ -139,10 +139,12 @@ public: //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG); //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG); - Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_INFO); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_INFO); //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG); //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG); //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG); if (enableYq) { Server_->GetRuntime()->SetLogPriority(NKikimrServices::YQL_PROXY, NActors::NLog::PRI_DEBUG); Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); diff --git a/ydb/services/ydb/ydb_ut.cpp b/ydb/services/ydb/ydb_ut.cpp index 3b17744f0a..b6bfd312b4 100644 --- a/ydb/services/ydb/ydb_ut.cpp +++ b/ydb/services/ydb/ydb_ut.cpp @@ -5703,8 +5703,10 @@ Y_UNIT_TEST_SUITE(TYqlDateTimeTests) { #endif Y_UNIT_TEST_SUITE(LocalityOperation) { -Y_UNIT_TEST(LocksFromAnotherTenants) { - TKikimrWithGrpcAndRootSchema server; +Y_UNIT_TEST_TWIN(LocksFromAnotherTenants, UseSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + TKikimrWithGrpcAndRootSchema server(appConfig); //server.Server_->SetupLogging( auto connection = NYdb::TDriver( |