diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-06-01 16:12:18 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-06-01 16:12:18 +0300 |
commit | 084d18fede279d50932f83a81a13c7703a478f0d (patch) | |
tree | bc163bd7f1f0a137c84a872a8eade3dc42075cc7 | |
parent | 2fa2c0d9306bf87170f8b1f7f0790fab282716ca (diff) | |
download | ydb-084d18fede279d50932f83a81a13c7703a478f0d.tar.gz |
Fix cancel after for data query request.
We must check transaction was RO before CANCELLED reply
so we unable to shortcut reply path in the session actor.
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 37 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 142 | ||||
-rw-r--r-- | ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp | 12 |
10 files changed, 171 insertions, 81 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index a3b82b31596..ead0bfda375 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -127,11 +127,14 @@ public: const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, + const TActorId& creator) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::DataExecuter, "DataExecuter") , AsyncIoFactory(std::move(asyncIoFactory)) , StreamResult(streamResult) { + Target = creator; + YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); if (Request.AcquireLocksTxId || Request.ValidateLocks || Request.EraseLocks) { @@ -273,6 +276,10 @@ public: } private: + bool IsCancelAfterAllowed(const TEvKqp::TEvAbortExecution::TPtr& ev) const { + return ReadOnlyTx || ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED; + } + TString CurrentStateFuncName() const override { const auto& func = CurrentStateFunc(); if (func == &TThis::PrepareState) { @@ -514,8 +521,12 @@ private: } void HandlePrepare(TEvKqp::TEvAbortExecution::TPtr& ev) { - CancelProposal(0); - TBase::HandleAbortExecution(ev); + if (IsCancelAfterAllowed(ev)) { + CancelProposal(0); + TBase::HandleAbortExecution(ev); + } else { + LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed"); + } } void CancelProposal(ui64 exceptShardId) { @@ -917,10 +928,14 @@ private: } void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) { - if (ImmediateTx) { - CancelProposal(0); + if (IsCancelAfterAllowed(ev)) { + if (ImmediateTx) { + CancelProposal(0); + } + TBase::HandleAbortExecution(ev); + } else { + LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed"); } - TBase::HandleAbortExecution(ev); } void HandleExecute(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) { @@ -2351,9 +2366,9 @@ private: } // namespace IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) + TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator) { - return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory)); + return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory), creator); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 3a091b266e9..3bcfa4ad8de 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -85,7 +85,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery); + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, + const TActorId& creator); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 3e8b9de1574..af5373e7b00 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -91,12 +91,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery) + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, + const TActorId& creator) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.EraseLocks); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory)); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), creator); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -112,13 +113,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory)); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), creator); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory)); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), creator); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 70fff3396d8..60258929aca 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1109,7 +1109,7 @@ private: IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory); + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 477628da90d..089211a488e 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -725,7 +725,7 @@ public: TKqpRequestInfo requestInfo(proxyRequest->TraceId); KQP_PROXY_LOG_D(requestInfo << "Forwarded response to sender actor, requestId: " << requestId - << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId()); + << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId() << ", source: " << ev->Sender); PendingRequests.Erase(requestId); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c16b7b7f1a7..803a580c3f4 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1005,7 +1005,7 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(), - AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr); + AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId()); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1144,17 +1144,36 @@ public: TlsActivationContext->Send(ev->Forward(ExecuterId)); } - void Handle(TEvKqp::TEvAbortExecution::TPtr& ev) { + void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; + TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName(); + LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId); + if (ExecuterId) { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded"); - Send(std::exchange(ExecuterId, {}), abortEv.Release()); + Send(ExecuterId, abortEv.Release()); } - const auto& issues = ev->Get()->GetIssues(); + // Do not shortcut in case of CancelAfter event. We can send this status only in case of RO TX. + if (msg.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED) { + const auto& issues = ev->Get()->GetIssues(); + ExecuterId = TActorId{}; + ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues)); + return; + } + } + + void HandleCompile(TEvKqp::TEvAbortExecution::TPtr& ev) { + auto& msg = ev->Get()->Record; + TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName(); LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())); + + const auto& issues = ev->Get()->GetIssues(); + + YQL_ENSURE(!ExecuterId); + ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues)); } @@ -1394,7 +1413,7 @@ public: void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus, const TString& message) { - LOG_W("Reply process error, msg: " << message); + LOG_W("Reply process error, msg: " << message << " proxyRequestId: " << proxyRequestId); auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message); @@ -1741,6 +1760,8 @@ public: hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); + // message from KQP proxy in case of our reply just after kqp proxy timer tick + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop); default: UnexpectedEvent("ReadyState", ev); } @@ -1757,7 +1778,7 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleCompile); hFunc(TEvKqp::TEvCompileResponse, Handle); - hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle); + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleCompile); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); @@ -1785,7 +1806,7 @@ public: hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute); hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); - hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle); + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute); hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute); hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); @@ -1879,7 +1900,7 @@ private: void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) { InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " received unexpected event " << - ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); + ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()) << " sender: " << ev->Sender); } void InternalError(const TString& message) { diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 31d29b25e18..cbfaf808998 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -18,6 +18,18 @@ namespace NKqp { using namespace NYdb::NTable; +const TString EXPECTED_EIGHTSHARD_VALUE1 = R"( +[ + [[1];[101u];["Value1"]]; + [[2];[201u];["Value1"]]; + [[3];[301u];["Value1"]]; + [[1];[401u];["Value1"]]; + [[2];[501u];["Value1"]]; + [[3];[601u];["Value1"]]; + [[1];[701u];["Value1"]]; + [[2];[801u];["Value1"]] +])"; + SIMPLE_UDF(TTestFilter, bool(i64)) { Y_UNUSED(valueBuilder); const i64 arg = args[0].Get<i64>(); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index d92e1c8462f..c9bce432095 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -68,6 +68,8 @@ namespace NKqp { class TKqpCounters; const TString KikimrDefaultUtDomainRoot = "Root"; +extern const TString EXPECTED_EIGHTSHARD_VALUE1; + TVector<NKikimrKqp::TKqpSetting> SyntaxV1Settings(); struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> { diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index 3a82b2c5ee1..f5f08befb1b 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -1,5 +1,7 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/kqp/counters/kqp_counters.h> + namespace NKikimr { namespace NKqp { @@ -415,59 +417,107 @@ Y_UNIT_TEST_SUITE(KqpLimits) { } } - Y_UNIT_TEST(CancelAfterWithWrite) { - return; + Y_UNIT_TEST(CancelAfterRwTx) { TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); + { + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + int maxTimeoutMs = 500; + + auto createKey = [](int id) -> ui64 { + return (1u << 29) + id; + }; + + auto createExpectedRow = [](ui64 key) -> TString { + return Sprintf(R"([[100500];[%luu];["newrecords"]])", key); + }; + + TString expected; + + for (int i = 1; i <= maxTimeoutMs; i++) { + auto params = db.GetParamsBuilder() + .AddParam("$id") + .Uint64(createKey(i)) + .Build() + .Build(); + auto result = session.ExecuteDataQuery(R"( + DECLARE $id AS Uint64; + SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key; + UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES ($id, 100500, "newrecords"); + )", + TTxControl::BeginTx( + TTxSettings::SerializableRW()).CommitTx(), + params, + TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i)) + ).GetValueSync(); + + if (result.IsSuccess()) { + auto yson = FormatResultSetYson(result.GetResultSet(0)); + CompareYson(TString("[") + expected + "]", yson); + expected += createExpectedRow(createKey(i)); + if (i != maxTimeoutMs) + expected += ";"; + } else { + switch (result.GetStatus()) { + case EStatus::CANCELLED: + break; + default: { + auto msg = TStringBuilder() + << "unexpected status: " << result.GetStatus() + << " issues: " << result.GetIssues().ToString(); + UNIT_ASSERT_C(false, msg.data()); + } + } + } + } + } - int maxTimeoutMs = 500; - - auto createKey = [](int id) -> ui64 { - return (1u << 29) + id; - }; - - auto createExpectedRow = [](ui64 key) -> TString { - return Sprintf(R"([[100500];[%luu];["newrecords"]])", key); - }; - - TString expected; - - for (int i = 1; i <= maxTimeoutMs; i++) { - auto params = db.GetParamsBuilder() - .AddParam("$id") - .Uint64(createKey(i)) - .Build() - .Build(); - auto result = session.ExecuteDataQuery(R"( - DECLARE $id AS Uint64; - SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key; - UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES ($id, 100500, "newrecords"); - )", - TTxControl::BeginTx( - TTxSettings::SerializableRW()).CommitTx(), - params, - TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i)) - ).GetValueSync(); - - if (result.IsSuccess()) { - auto yson = FormatResultSetYson(result.GetResultSet(0)); - CompareYson(TString("[") + expected + "]", yson); - expected += createExpectedRow(createKey(i)); - if (i != maxTimeoutMs) - expected += ";"; - } else { - switch (result.GetStatus()) { - case EStatus::CANCELLED: - break; - default: { - auto msg = TStringBuilder() << "unexpected status: " << result.GetStatus(); - UNIT_ASSERT_C(false, msg.data()); + WaitForZeroSessions(counters); + } + + Y_UNIT_TEST(CancelAfterRoTx) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + { + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + int maxTimeoutMs = 500; + bool wasCanceled = false; + + for (int i = 1; i <= maxTimeoutMs; i++) { + auto result = session.ExecuteDataQuery(R"( + DECLARE $id AS Uint64; + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; + )", + TTxControl::BeginTx( + TTxSettings::SerializableRW()).CommitTx(), + TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i)) + ).GetValueSync(); + + if (result.IsSuccess()) { + CompareYson(EXPECTED_EIGHTSHARD_VALUE1, FormatResultSetYson(result.GetResultSet(0))); + } else { + switch (result.GetStatus()) { + case EStatus::CANCELLED: + wasCanceled = true; + break; + default: { + auto msg = TStringBuilder() + << "unexpected status: " << result.GetStatus() + << " issues: " << result.GetIssues().ToString(); + UNIT_ASSERT_C(false, msg.data()); + } } } } + UNIT_ASSERT(wasCanceled); } + WaitForZeroSessions(counters); } Y_UNIT_TEST(QueryExecTimeout) { diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp index df19a796c62..01387cd26fe 100644 --- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp @@ -15,18 +15,6 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NScripting; -static const TString EXPECTED_EIGHTSHARD_VALUE1 = R"( -[ - [[1];[101u];["Value1"]]; - [[2];[201u];["Value1"]]; - [[3];[301u];["Value1"]]; - [[1];[401u];["Value1"]]; - [[2];[501u];["Value1"]]; - [[3];[601u];["Value1"]]; - [[1];[701u];["Value1"]]; - [[2];[801u];["Value1"]] -])"; - Y_UNIT_TEST_SUITE(KqpScripting) { Y_UNIT_TEST(EndOfQueryCommit) { TKikimrRunner kikimr; |