diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-03-03 12:22:44 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-03 12:22:44 +0300 |
commit | e3fc2797ef3787a2b5f969a21af45334686fef75 (patch) | |
tree | 6149f1f5983445403d458d6237ab57de88c77ad0 | |
parent | 33c7eadcf55cd21d72340758329cf50eec68a7c8 (diff) | |
download | ydb-e3fc2797ef3787a2b5f969a21af45334686fef75.tar.gz |
Fix limit tests for EvWrite (#15214)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 36 |
2 files changed, 35 insertions, 13 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 54cbd44dc1..de55152154 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -298,7 +298,7 @@ public: STATEFN(FinalizeState) { try { switch(ev->GetTypeRewrite()) { - hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvKqp::TEvAbortExecution, HandleFinalize); hFunc(TEvKqpBuffer::TEvResult, HandleFinalize); hFunc(TEvents::TEvUndelivered, HandleFinalize); @@ -325,6 +325,14 @@ public: ReportEventElapsedTime(); } + void HandleFinalize(TEvKqp::TEvAbortExecution::TPtr& ev) { + if (IsCancelAfterAllowed(ev)) { + TBase::HandleAbortExecution(ev); + } else { + LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed"); + } + } + void HandleFinalize(TEvKqpBuffer::TEvResult::TPtr& ev) { if (ev->Get()->Stats) { if (Stats) { @@ -1712,7 +1720,7 @@ private: void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap) { - YQL_ENSURE(!TxManager); + YQL_ENSURE(ReadOnlyTx || !TxManager); TShardState shardState; shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing; shardState.DatashardState.ConstructInPlace(); diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index bca95aa56d..84fd19a19a 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -431,8 +431,9 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT_C(getOutOfSpace, "Successfully inserted " << rowsPerBatch << " x " << batchCount << " lines, each of size " << dataTextSize << "bytes"); } - Y_UNIT_TEST(TooBigQuery) { + Y_UNIT_TEST_TWIN(TooBigQuery, useSink) { auto app = NKikimrConfig::TAppConfig(); + app.MutableTableServiceConfig()->SetEnableOltpSink(useSink); app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000); app.MutableTableServiceConfig()->SetCompileTimeoutMs(TDuration::Minutes(5).MilliSeconds()); @@ -468,8 +469,12 @@ Y_UNIT_TEST_SUITE(KqpLimits) { auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); result.GetIssues().PrintTo(Cerr); //UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); - UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED)); + if (useSink) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED)); + } } Y_UNIT_TEST(BigParameter) { @@ -522,8 +527,10 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - Y_UNIT_TEST(TooBigKey) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(TooBigKey, useSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink); + TKikimrRunner kikimr(appConfig); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -543,10 +550,15 @@ Y_UNIT_TEST_SUITE(KqpLimits) { result.GetIssues().PrintTo(Cerr); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, - [] (const auto& issue) { - return issue.GetMessage().contains("exceeds limit"); - })); + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, + [&](const auto& issue) { + if (useSink) { + return issue.GetMessage().contains("Row key size of") + && issue.GetMessage().contains("bytes is larger than the allowed threshold"); + } else { + return issue.GetMessage().contains("exceeds limit"); + } + }), result.GetIssues().ToString()); } Y_UNIT_TEST(TooBigColumn) { @@ -700,8 +712,10 @@ Y_UNIT_TEST_SUITE(KqpLimits) { } } - Y_UNIT_TEST(CancelAfterRwTx) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(CancelAfterRwTx, useSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink); + TKikimrRunner kikimr(appConfig); NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); { |