aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-06-01 16:12:18 +0300
committerdcherednik <dcherednik@ydb.tech>2023-06-01 16:12:18 +0300
commit084d18fede279d50932f83a81a13c7703a478f0d (patch)
treebc163bd7f1f0a137c84a872a8eade3dc42075cc7
parent2fa2c0d9306bf87170f8b1f7f0790fab282716ca (diff)
downloadydb-084d18fede279d50932f83a81a13c7703a478f0d.tar.gz
Fix cancel after for data query request.
We must check transaction was RO before CANCELLED reply so we unable to shortcut reply path in the session actor.
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp31
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp37
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp12
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h2
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp142
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp12
10 files changed, 171 insertions, 81 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index a3b82b31596..ead0bfda375 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -127,11 +127,14 @@ public:
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
+ const TActorId& creator)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::DataExecuter, "DataExecuter")
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
{
+ Target = creator;
+
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
if (Request.AcquireLocksTxId || Request.ValidateLocks || Request.EraseLocks) {
@@ -273,6 +276,10 @@ public:
}
private:
+ bool IsCancelAfterAllowed(const TEvKqp::TEvAbortExecution::TPtr& ev) const {
+ return ReadOnlyTx || ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED;
+ }
+
TString CurrentStateFuncName() const override {
const auto& func = CurrentStateFunc();
if (func == &TThis::PrepareState) {
@@ -514,8 +521,12 @@ private:
}
void HandlePrepare(TEvKqp::TEvAbortExecution::TPtr& ev) {
- CancelProposal(0);
- TBase::HandleAbortExecution(ev);
+ if (IsCancelAfterAllowed(ev)) {
+ CancelProposal(0);
+ TBase::HandleAbortExecution(ev);
+ } else {
+ LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed");
+ }
}
void CancelProposal(ui64 exceptShardId) {
@@ -917,10 +928,14 @@ private:
}
void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
- if (ImmediateTx) {
- CancelProposal(0);
+ if (IsCancelAfterAllowed(ev)) {
+ if (ImmediateTx) {
+ CancelProposal(0);
+ }
+ TBase::HandleAbortExecution(ev);
+ } else {
+ LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed");
}
- TBase::HandleAbortExecution(ev);
}
void HandleExecute(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) {
@@ -2351,9 +2366,9 @@ private:
} // namespace
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
+ TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator)
{
- return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory));
+ return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory), creator);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 3a091b266e9..3bcfa4ad8de 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -85,7 +85,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery);
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
+ const TActorId& creator);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 3e8b9de1574..af5373e7b00 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -91,12 +91,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery)
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
+ const TActorId& creator)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.EraseLocks);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), creator);
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -112,13 +113,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), creator);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), creator);
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 70fff3396d8..60258929aca 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -1109,7 +1109,7 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory);
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 477628da90d..089211a488e 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -725,7 +725,7 @@ public:
TKqpRequestInfo requestInfo(proxyRequest->TraceId);
KQP_PROXY_LOG_D(requestInfo << "Forwarded response to sender actor, requestId: " << requestId
- << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId());
+ << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId() << ", source: " << ev->Sender);
PendingRequests.Erase(requestId);
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c16b7b7f1a7..803a580c3f4 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1005,7 +1005,7 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(),
- AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr);
+ AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId());
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
@@ -1144,17 +1144,36 @@ public:
TlsActivationContext->Send(ev->Forward(ExecuterId));
}
- void Handle(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
+ TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
+ LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);
+
if (ExecuterId) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded");
- Send(std::exchange(ExecuterId, {}), abortEv.Release());
+ Send(ExecuterId, abortEv.Release());
}
- const auto& issues = ev->Get()->GetIssues();
+ // Do not shortcut in case of CancelAfter event. We can send this status only in case of RO TX.
+ if (msg.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED) {
+ const auto& issues = ev->Get()->GetIssues();
+ ExecuterId = TActorId{};
+ ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
+ return;
+ }
+ }
+
+ void HandleCompile(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ auto& msg = ev->Get()->Record;
+
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()));
+
+ const auto& issues = ev->Get()->GetIssues();
+
+ YQL_ENSURE(!ExecuterId);
+
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
}
@@ -1394,7 +1413,7 @@ public:
void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus,
const TString& message)
{
- LOG_W("Reply process error, msg: " << message);
+ LOG_W("Reply process error, msg: " << message << " proxyRequestId: " << proxyRequestId);
auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message);
@@ -1741,6 +1760,8 @@ public:
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
+ // message from KQP proxy in case of our reply just after kqp proxy timer tick
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
default:
UnexpectedEvent("ReadyState", ev);
}
@@ -1757,7 +1778,7 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleCompile);
hFunc(TEvKqp::TEvCompileResponse, Handle);
- hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleCompile);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
@@ -1785,7 +1806,7 @@ public:
hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
- hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
@@ -1879,7 +1900,7 @@ private:
void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " received unexpected event " <<
- ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
+ ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()) << " sender: " << ev->Sender);
}
void InternalError(const TString& message) {
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 31d29b25e18..cbfaf808998 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -18,6 +18,18 @@ namespace NKqp {
using namespace NYdb::NTable;
+const TString EXPECTED_EIGHTSHARD_VALUE1 = R"(
+[
+ [[1];[101u];["Value1"]];
+ [[2];[201u];["Value1"]];
+ [[3];[301u];["Value1"]];
+ [[1];[401u];["Value1"]];
+ [[2];[501u];["Value1"]];
+ [[3];[601u];["Value1"]];
+ [[1];[701u];["Value1"]];
+ [[2];[801u];["Value1"]]
+])";
+
SIMPLE_UDF(TTestFilter, bool(i64)) {
Y_UNUSED(valueBuilder);
const i64 arg = args[0].Get<i64>();
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index d92e1c8462f..c9bce432095 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -68,6 +68,8 @@ namespace NKqp {
class TKqpCounters;
const TString KikimrDefaultUtDomainRoot = "Root";
+extern const TString EXPECTED_EIGHTSHARD_VALUE1;
+
TVector<NKikimrKqp::TKqpSetting> SyntaxV1Settings();
struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index 3a82b2c5ee1..f5f08befb1b 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -1,5 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
namespace NKikimr {
namespace NKqp {
@@ -415,59 +417,107 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}
}
- Y_UNIT_TEST(CancelAfterWithWrite) {
- return;
+ Y_UNIT_TEST(CancelAfterRwTx) {
TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
- auto db = kikimr.GetTableClient();
- auto session = db.CreateSession().GetValueSync().GetSession();
+ {
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ int maxTimeoutMs = 500;
+
+ auto createKey = [](int id) -> ui64 {
+ return (1u << 29) + id;
+ };
+
+ auto createExpectedRow = [](ui64 key) -> TString {
+ return Sprintf(R"([[100500];[%luu];["newrecords"]])", key);
+ };
+
+ TString expected;
+
+ for (int i = 1; i <= maxTimeoutMs; i++) {
+ auto params = db.GetParamsBuilder()
+ .AddParam("$id")
+ .Uint64(createKey(i))
+ .Build()
+ .Build();
+ auto result = session.ExecuteDataQuery(R"(
+ DECLARE $id AS Uint64;
+ SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key;
+ UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES ($id, 100500, "newrecords");
+ )",
+ TTxControl::BeginTx(
+ TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
+ ).GetValueSync();
+
+ if (result.IsSuccess()) {
+ auto yson = FormatResultSetYson(result.GetResultSet(0));
+ CompareYson(TString("[") + expected + "]", yson);
+ expected += createExpectedRow(createKey(i));
+ if (i != maxTimeoutMs)
+ expected += ";";
+ } else {
+ switch (result.GetStatus()) {
+ case EStatus::CANCELLED:
+ break;
+ default: {
+ auto msg = TStringBuilder()
+ << "unexpected status: " << result.GetStatus()
+ << " issues: " << result.GetIssues().ToString();
+ UNIT_ASSERT_C(false, msg.data());
+ }
+ }
+ }
+ }
+ }
- int maxTimeoutMs = 500;
-
- auto createKey = [](int id) -> ui64 {
- return (1u << 29) + id;
- };
-
- auto createExpectedRow = [](ui64 key) -> TString {
- return Sprintf(R"([[100500];[%luu];["newrecords"]])", key);
- };
-
- TString expected;
-
- for (int i = 1; i <= maxTimeoutMs; i++) {
- auto params = db.GetParamsBuilder()
- .AddParam("$id")
- .Uint64(createKey(i))
- .Build()
- .Build();
- auto result = session.ExecuteDataQuery(R"(
- DECLARE $id AS Uint64;
- SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key;
- UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES ($id, 100500, "newrecords");
- )",
- TTxControl::BeginTx(
- TTxSettings::SerializableRW()).CommitTx(),
- params,
- TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
- ).GetValueSync();
-
- if (result.IsSuccess()) {
- auto yson = FormatResultSetYson(result.GetResultSet(0));
- CompareYson(TString("[") + expected + "]", yson);
- expected += createExpectedRow(createKey(i));
- if (i != maxTimeoutMs)
- expected += ";";
- } else {
- switch (result.GetStatus()) {
- case EStatus::CANCELLED:
- break;
- default: {
- auto msg = TStringBuilder() << "unexpected status: " << result.GetStatus();
- UNIT_ASSERT_C(false, msg.data());
+ WaitForZeroSessions(counters);
+ }
+
+ Y_UNIT_TEST(CancelAfterRoTx) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ {
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ int maxTimeoutMs = 500;
+ bool wasCanceled = false;
+
+ for (int i = 1; i <= maxTimeoutMs; i++) {
+ auto result = session.ExecuteDataQuery(R"(
+ DECLARE $id AS Uint64;
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
+ )",
+ TTxControl::BeginTx(
+ TTxSettings::SerializableRW()).CommitTx(),
+ TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
+ ).GetValueSync();
+
+ if (result.IsSuccess()) {
+ CompareYson(EXPECTED_EIGHTSHARD_VALUE1, FormatResultSetYson(result.GetResultSet(0)));
+ } else {
+ switch (result.GetStatus()) {
+ case EStatus::CANCELLED:
+ wasCanceled = true;
+ break;
+ default: {
+ auto msg = TStringBuilder()
+ << "unexpected status: " << result.GetStatus()
+ << " issues: " << result.GetIssues().ToString();
+ UNIT_ASSERT_C(false, msg.data());
+ }
}
}
}
+ UNIT_ASSERT(wasCanceled);
}
+ WaitForZeroSessions(counters);
}
Y_UNIT_TEST(QueryExecTimeout) {
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index df19a796c62..01387cd26fe 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -15,18 +15,6 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NScripting;
-static const TString EXPECTED_EIGHTSHARD_VALUE1 = R"(
-[
- [[1];[101u];["Value1"]];
- [[2];[201u];["Value1"]];
- [[3];[301u];["Value1"]];
- [[1];[401u];["Value1"]];
- [[2];[501u];["Value1"]];
- [[3];[601u];["Value1"]];
- [[1];[701u];["Value1"]];
- [[2];[801u];["Value1"]]
-])";
-
Y_UNIT_TEST_SUITE(KqpScripting) {
Y_UNIT_TEST(EndOfQueryCommit) {
TKikimrRunner kikimr;