summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <[email protected]>2023-06-01 16:12:18 +0300
committerdcherednik <[email protected]>2023-06-01 16:12:18 +0300
commit084d18fede279d50932f83a81a13c7703a478f0d (patch)
treebc163bd7f1f0a137c84a872a8eade3dc42075cc7
parent2fa2c0d9306bf87170f8b1f7f0790fab282716ca (diff)
Fix cancel after for data query request.
We must check transaction was RO before CANCELLED reply so we unable to shortcut reply path in the session actor.
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp31
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp37
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp12
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h2
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp142
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp12
10 files changed, 171 insertions, 81 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index a3b82b31596..ead0bfda375 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -127,11 +127,14 @@ public:
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
+ const TActorId& creator)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::DataExecuter, "DataExecuter")
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
{
+ Target = creator;
+
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
if (Request.AcquireLocksTxId || Request.ValidateLocks || Request.EraseLocks) {
@@ -273,6 +276,10 @@ public:
}
private:
+ bool IsCancelAfterAllowed(const TEvKqp::TEvAbortExecution::TPtr& ev) const {
+ return ReadOnlyTx || ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED;
+ }
+
TString CurrentStateFuncName() const override {
const auto& func = CurrentStateFunc();
if (func == &TThis::PrepareState) {
@@ -514,8 +521,12 @@ private:
}
void HandlePrepare(TEvKqp::TEvAbortExecution::TPtr& ev) {
- CancelProposal(0);
- TBase::HandleAbortExecution(ev);
+ if (IsCancelAfterAllowed(ev)) {
+ CancelProposal(0);
+ TBase::HandleAbortExecution(ev);
+ } else {
+ LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed");
+ }
}
void CancelProposal(ui64 exceptShardId) {
@@ -917,10 +928,14 @@ private:
}
void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
- if (ImmediateTx) {
- CancelProposal(0);
+ if (IsCancelAfterAllowed(ev)) {
+ if (ImmediateTx) {
+ CancelProposal(0);
+ }
+ TBase::HandleAbortExecution(ev);
+ } else {
+ LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed");
}
- TBase::HandleAbortExecution(ev);
}
void HandleExecute(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) {
@@ -2351,9 +2366,9 @@ private:
} // namespace
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
+ TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator)
{
- return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory));
+ return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory), creator);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 3a091b266e9..3bcfa4ad8de 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -85,7 +85,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery);
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
+ const TActorId& creator);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 3e8b9de1574..af5373e7b00 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -91,12 +91,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery)
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
+ const TActorId& creator)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.EraseLocks);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), creator);
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -112,13 +113,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), creator);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), creator);
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 70fff3396d8..60258929aca 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -1109,7 +1109,7 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory);
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 477628da90d..089211a488e 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -725,7 +725,7 @@ public:
TKqpRequestInfo requestInfo(proxyRequest->TraceId);
KQP_PROXY_LOG_D(requestInfo << "Forwarded response to sender actor, requestId: " << requestId
- << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId());
+ << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId() << ", source: " << ev->Sender);
PendingRequests.Erase(requestId);
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c16b7b7f1a7..803a580c3f4 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1005,7 +1005,7 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(),
- AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr);
+ AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId());
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
@@ -1144,17 +1144,36 @@ public:
TlsActivationContext->Send(ev->Forward(ExecuterId));
}
- void Handle(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
+ TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
+ LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);
+
if (ExecuterId) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded");
- Send(std::exchange(ExecuterId, {}), abortEv.Release());
+ Send(ExecuterId, abortEv.Release());
}
- const auto& issues = ev->Get()->GetIssues();
+ // Do not shortcut in case of CancelAfter event. We can send this status only in case of RO TX.
+ if (msg.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED) {
+ const auto& issues = ev->Get()->GetIssues();
+ ExecuterId = TActorId{};
+ ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
+ return;
+ }
+ }
+
+ void HandleCompile(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ auto& msg = ev->Get()->Record;
+
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()));
+
+ const auto& issues = ev->Get()->GetIssues();
+
+ YQL_ENSURE(!ExecuterId);
+
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
}
@@ -1394,7 +1413,7 @@ public:
void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus,
const TString& message)
{
- LOG_W("Reply process error, msg: " << message);
+ LOG_W("Reply process error, msg: " << message << " proxyRequestId: " << proxyRequestId);
auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message);
@@ -1741,6 +1760,8 @@ public:
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
+ // message from KQP proxy in case of our reply just after kqp proxy timer tick
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
default:
UnexpectedEvent("ReadyState", ev);
}
@@ -1757,7 +1778,7 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleCompile);
hFunc(TEvKqp::TEvCompileResponse, Handle);
- hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleCompile);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
@@ -1785,7 +1806,7 @@ public:
hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
- hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
@@ -1879,7 +1900,7 @@ private:
void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " received unexpected event " <<
- ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
+ ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()) << " sender: " << ev->Sender);
}
void InternalError(const TString& message) {
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 31d29b25e18..cbfaf808998 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -18,6 +18,18 @@ namespace NKqp {
using namespace NYdb::NTable;
+const TString EXPECTED_EIGHTSHARD_VALUE1 = R"(
+[
+ [[1];[101u];["Value1"]];
+ [[2];[201u];["Value1"]];
+ [[3];[301u];["Value1"]];
+ [[1];[401u];["Value1"]];
+ [[2];[501u];["Value1"]];
+ [[3];[601u];["Value1"]];
+ [[1];[701u];["Value1"]];
+ [[2];[801u];["Value1"]]
+])";
+
SIMPLE_UDF(TTestFilter, bool(i64)) {
Y_UNUSED(valueBuilder);
const i64 arg = args[0].Get<i64>();
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index d92e1c8462f..c9bce432095 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -68,6 +68,8 @@ namespace NKqp {
class TKqpCounters;
const TString KikimrDefaultUtDomainRoot = "Root";
+extern const TString EXPECTED_EIGHTSHARD_VALUE1;
+
TVector<NKikimrKqp::TKqpSetting> SyntaxV1Settings();
struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index 3a82b2c5ee1..f5f08befb1b 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -1,5 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
namespace NKikimr {
namespace NKqp {
@@ -415,59 +417,107 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}
}
- Y_UNIT_TEST(CancelAfterWithWrite) {
- return;
+ Y_UNIT_TEST(CancelAfterRwTx) {
TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
- auto db = kikimr.GetTableClient();
- auto session = db.CreateSession().GetValueSync().GetSession();
+ {
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ int maxTimeoutMs = 500;
+
+ auto createKey = [](int id) -> ui64 {
+ return (1u << 29) + id;
+ };
+
+ auto createExpectedRow = [](ui64 key) -> TString {
+ return Sprintf(R"([[100500];[%luu];["newrecords"]])", key);
+ };
+
+ TString expected;
+
+ for (int i = 1; i <= maxTimeoutMs; i++) {
+ auto params = db.GetParamsBuilder()
+ .AddParam("$id")
+ .Uint64(createKey(i))
+ .Build()
+ .Build();
+ auto result = session.ExecuteDataQuery(R"(
+ DECLARE $id AS Uint64;
+ SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key;
+ UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES ($id, 100500, "newrecords");
+ )",
+ TTxControl::BeginTx(
+ TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
+ ).GetValueSync();
+
+ if (result.IsSuccess()) {
+ auto yson = FormatResultSetYson(result.GetResultSet(0));
+ CompareYson(TString("[") + expected + "]", yson);
+ expected += createExpectedRow(createKey(i));
+ if (i != maxTimeoutMs)
+ expected += ";";
+ } else {
+ switch (result.GetStatus()) {
+ case EStatus::CANCELLED:
+ break;
+ default: {
+ auto msg = TStringBuilder()
+ << "unexpected status: " << result.GetStatus()
+ << " issues: " << result.GetIssues().ToString();
+ UNIT_ASSERT_C(false, msg.data());
+ }
+ }
+ }
+ }
+ }
- int maxTimeoutMs = 500;
-
- auto createKey = [](int id) -> ui64 {
- return (1u << 29) + id;
- };
-
- auto createExpectedRow = [](ui64 key) -> TString {
- return Sprintf(R"([[100500];[%luu];["newrecords"]])", key);
- };
-
- TString expected;
-
- for (int i = 1; i <= maxTimeoutMs; i++) {
- auto params = db.GetParamsBuilder()
- .AddParam("$id")
- .Uint64(createKey(i))
- .Build()
- .Build();
- auto result = session.ExecuteDataQuery(R"(
- DECLARE $id AS Uint64;
- SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key;
- UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES ($id, 100500, "newrecords");
- )",
- TTxControl::BeginTx(
- TTxSettings::SerializableRW()).CommitTx(),
- params,
- TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
- ).GetValueSync();
-
- if (result.IsSuccess()) {
- auto yson = FormatResultSetYson(result.GetResultSet(0));
- CompareYson(TString("[") + expected + "]", yson);
- expected += createExpectedRow(createKey(i));
- if (i != maxTimeoutMs)
- expected += ";";
- } else {
- switch (result.GetStatus()) {
- case EStatus::CANCELLED:
- break;
- default: {
- auto msg = TStringBuilder() << "unexpected status: " << result.GetStatus();
- UNIT_ASSERT_C(false, msg.data());
+ WaitForZeroSessions(counters);
+ }
+
+ Y_UNIT_TEST(CancelAfterRoTx) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ {
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ int maxTimeoutMs = 500;
+ bool wasCanceled = false;
+
+ for (int i = 1; i <= maxTimeoutMs; i++) {
+ auto result = session.ExecuteDataQuery(R"(
+ DECLARE $id AS Uint64;
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
+ )",
+ TTxControl::BeginTx(
+ TTxSettings::SerializableRW()).CommitTx(),
+ TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
+ ).GetValueSync();
+
+ if (result.IsSuccess()) {
+ CompareYson(EXPECTED_EIGHTSHARD_VALUE1, FormatResultSetYson(result.GetResultSet(0)));
+ } else {
+ switch (result.GetStatus()) {
+ case EStatus::CANCELLED:
+ wasCanceled = true;
+ break;
+ default: {
+ auto msg = TStringBuilder()
+ << "unexpected status: " << result.GetStatus()
+ << " issues: " << result.GetIssues().ToString();
+ UNIT_ASSERT_C(false, msg.data());
+ }
}
}
}
+ UNIT_ASSERT(wasCanceled);
}
+ WaitForZeroSessions(counters);
}
Y_UNIT_TEST(QueryExecTimeout) {
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index df19a796c62..01387cd26fe 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -15,18 +15,6 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NScripting;
-static const TString EXPECTED_EIGHTSHARD_VALUE1 = R"(
-[
- [[1];[101u];["Value1"]];
- [[2];[201u];["Value1"]];
- [[3];[301u];["Value1"]];
- [[1];[401u];["Value1"]];
- [[2];[501u];["Value1"]];
- [[3];[601u];["Value1"]];
- [[1];[701u];["Value1"]];
- [[2];[801u];["Value1"]]
-])";
-
Y_UNIT_TEST_SUITE(KqpScripting) {
Y_UNIT_TEST(EndOfQueryCommit) {
TKikimrRunner kikimr;