aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-03-05 19:31:36 +0300
committerGitHub <noreply@github.com>2025-03-05 16:31:36 +0000
commit796e7a6782a5eafe54b530c5244aa07c7aff1870 (patch)
treedb80384f34c7c34b5f85ec1547e7c2c81bf253c9
parentafb7274af78e1095cf4db704449e84c95d4711ae (diff)
downloadydb-796e7a6782a5eafe54b530c5244aa07c7aff1870.tar.gz
Fix error status for Oltp Sink (#15364)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp10
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h23
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp5
-rw-r--r--ydb/services/ydb/ydb_common_ut.h4
-rw-r--r--ydb/services/ydb/ydb_ut.cpp6
5 files changed, 34 insertions, 14 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index aa6a1d1755..b71045ccd3 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -301,6 +301,7 @@ public:
try {
switch(ev->GetTypeRewrite()) {
hFunc(TEvKqp::TEvAbortExecution, HandleFinalize);
+ hFunc(TEvKqpBuffer::TEvError, Handle);
hFunc(TEvKqpBuffer::TEvResult, HandleFinalize);
hFunc(TEvents::TEvUndelivered, HandleFinalize);
@@ -405,6 +406,7 @@ public:
hFunc(TEvSaveScriptExternalEffectResponse, HandleResolve);
hFunc(TEvDescribeSecretsResponse, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ hFunc(TEvKqpBuffer::TEvError, Handle);
default:
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
}
@@ -456,6 +458,7 @@ private:
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
+ hFunc(TEvKqpBuffer::TEvError, Handle);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
@@ -1136,6 +1139,7 @@ private:
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
+ hFunc(TEvKqpBuffer::TEvError, Handle);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default:
UnexpectedEvent("ExecuteState", ev->GetTypeRewrite());
@@ -1192,6 +1196,11 @@ private:
}
}
+ void Handle(TEvKqpBuffer::TEvError::TPtr& ev) {
+ auto& msg = *ev->Get();
+ TBase::HandleAbortExecution(msg.StatusCode, msg.Issues, false);
+ }
+
void HandleExecute(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) {
TEvColumnShard::TEvProposeTransactionResult* res = ev->Get();
const ui64 shardId = res->Record.GetOrigin();
@@ -2295,6 +2304,7 @@ private:
switch (ev->GetTypeRewrite()) {
hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ hFunc(TEvKqpBuffer::TEvError, Handle);
default:
UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite());
}
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index b768bc2695..0e77538238 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -760,15 +760,22 @@ protected:
void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
NYql::TIssues issues = ev->Get()->GetIssues();
- LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())
+ HandleAbortExecution(msg.GetStatusCode(), ev->Get()->GetIssues(), ev->Sender != Target);
+ }
+
+ void HandleAbortExecution(
+ NYql::NDqProto::StatusIds::StatusCode statusCode,
+ const NYql::TIssues& issues,
+ const bool sessionSender) {
+ LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode)
<< ", message: " << issues.ToOneLineString());
- auto statusCode = NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode());
- if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
+ auto ydbStatusCode = NYql::NDq::DqStatusToYdbStatus(statusCode);
+ if (ydbStatusCode == Ydb::StatusIds::INTERNAL_ERROR) {
InternalError(issues);
- } else if (statusCode == Ydb::StatusIds::TIMEOUT) {
- TimeoutError(ev->Sender, issues);
+ } else if (ydbStatusCode == Ydb::StatusIds::TIMEOUT) {
+ TimeoutError(sessionSender, issues);
} else {
- RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
+ RuntimeError(NYql::NDq::DqStatusToYdbStatus(statusCode), issues);
}
}
@@ -1891,7 +1898,7 @@ protected:
ReplyErrorAndDie(status, &issues);
}
- void TimeoutError(TActorId abortSender, NYql::TIssues issues) {
+ void TimeoutError(bool sessionSender, NYql::TIssues issues) {
if (AlreadyReplied) {
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
return;
@@ -1915,7 +1922,7 @@ protected:
NYql::IssuesToMessage(issues, ResponseEv->Record.MutableResponse()->MutableIssues());
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
- if (abortSender != Target) {
+ if (!sessionSender) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, issues);
this->Send(Target, abortEv.Release());
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index ae19bbed36..aaa03f43a4 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1745,7 +1745,7 @@ public:
}
void Handle(TEvKqpBuffer::TEvError::TPtr& ev) {
- const auto& msg = *ev->Get();
+ auto& msg = *ev->Get();
TString logMsg = TStringBuilder() << "got TEvKqpBuffer::TEvError in " << CurrentStateFuncName()
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << " send to: " << ExecuterId << " from: " << ev->Sender;
@@ -1758,8 +1758,7 @@ public:
}
if (ExecuterId) {
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.StatusCode, msg.Issues);
- Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
+ Send(ExecuterId, new TEvKqpBuffer::TEvError{msg.StatusCode, std::move(msg.Issues)}, IEventHandle::FlagTrackDelivery);
} else {
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.StatusCode), logMsg, MessageFromIssues(msg.Issues));
}
diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h
index 5a00834855..aad0d679a1 100644
--- a/ydb/services/ydb/ydb_common_ut.h
+++ b/ydb/services/ydb/ydb_common_ut.h
@@ -139,10 +139,12 @@ public:
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG);
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG);
- Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_INFO);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_INFO);
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG);
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG);
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
if (enableYq) {
Server_->GetRuntime()->SetLogPriority(NKikimrServices::YQL_PROXY, NActors::NLog::PRI_DEBUG);
Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
diff --git a/ydb/services/ydb/ydb_ut.cpp b/ydb/services/ydb/ydb_ut.cpp
index 3b17744f0a..b6bfd312b4 100644
--- a/ydb/services/ydb/ydb_ut.cpp
+++ b/ydb/services/ydb/ydb_ut.cpp
@@ -5703,8 +5703,10 @@ Y_UNIT_TEST_SUITE(TYqlDateTimeTests) {
#endif
Y_UNIT_TEST_SUITE(LocalityOperation) {
-Y_UNIT_TEST(LocksFromAnotherTenants) {
- TKikimrWithGrpcAndRootSchema server;
+Y_UNIT_TEST_TWIN(LocksFromAnotherTenants, UseSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
+ TKikimrWithGrpcAndRootSchema server(appConfig);
//server.Server_->SetupLogging(
auto connection = NYdb::TDriver(