aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-03-05 11:35:37 +0300
committerGitHub <noreply@github.com>2025-03-05 08:35:37 +0000
commit9c6936d6ff5659918d27baeb0d24409f5fc9f72e (patch)
treeedd8fcf6163343c275c7f172ef1ac804777ecdd0
parentb0f93181472ba7233bb35dacad86733845ab5dbb (diff)
downloadydb-9c6936d6ff5659918d27baeb0d24409f5fc9f72e.tar.gz
Fix tests for default EvWrite (#15319)
-rw-r--r--.github/config/muted_ya.txt2
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp12
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp13
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp18
-rw-r--r--ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp79
-rw-r--r--ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp20
-rw-r--r--ydb/core/kqp/ut/pg/pg_catalog_ut.cpp6
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp117
-rw-r--r--ydb/core/kqp/ut/query/kqp_params_ut.cpp6
-rw-r--r--ydb/library/yql/dq/actors/dq.cpp4
10 files changed, 173 insertions, 104 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index dc28e71cd07..05485ca2110 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -25,6 +25,8 @@ ydb/core/kqp/ut/olap [*/*] chunk chunk
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
+ydb/core/kqp/ut/query KqpLimits.OutOfSpaceYQLUpsertFail+useSink
+ydb/core/kqp/ut/query KqpLimits.QSReplySizeEnsureMemoryLimits+useSink
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.AlterTransfer
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index 08ed96d0d94..e2c65ac7d0c 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
@@ -276,12 +276,16 @@ void TQueryData::ValidateParameter(const TString& name, const NKikimrMiniKQL::TT
void TQueryData::PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx, const TPreparedQueryHolder::TConstPtr& preparedQuery,
NMiniKQL::TTypeEnvironment& txTypeEnv)
{
- for (const auto& paramDesc : preparedQuery->GetParameters()) {
- ValidateParameter(paramDesc.GetName(), paramDesc.GetType(), txTypeEnv);
+ if (preparedQuery) {
+ for (const auto& paramDesc : preparedQuery->GetParameters()) {
+ ValidateParameter(paramDesc.GetName(), paramDesc.GetType(), txTypeEnv);
+ }
}
- for(const auto& paramBinding: tx->GetParamBindings()) {
- MaterializeParamValue(true, paramBinding);
+ if (tx) {
+ for(const auto& paramBinding: tx->GetParamBindings()) {
+ MaterializeParamValue(true, paramBinding);
+ }
}
}
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index b73a01b4bf2..df4cf50e576 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -589,7 +589,7 @@ public:
}
case NKikimrDataEvents::TEvWriteResult::STATUS_WRONG_SHARD_STATE:
CA_LOG_E("Got WRONG SHARD STATE for table `"
- << SchemeEntry->TableId.PathId.ToString() << "`."
+ << TableId.PathId.ToString() << "`."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
@@ -600,7 +600,7 @@ public:
RetryResolve();
} else {
RuntimeError(
- NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
+ NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
TStringBuilder() << "Wrong shard state for table `"
<< TablePath << "`.",
@@ -997,13 +997,13 @@ public:
Schedule(reattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(ev->Get()->TabletId));
} else {
TxManager->SetError(ev->Get()->TabletId);
- if (Mode == EMode::IMMEDIATE_COMMIT || Mode == EMode::COMMIT) {
+ if (TxManager->GetState(ev->Get()->TabletId) == IKqpTransactionManager::EXECUTING) {
RuntimeError(
NYql::NDqProto::StatusIds::UNDETERMINED,
NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
TStringBuilder()
<< "Error writing to table `" << TableId.PathId.ToString() << "`"
- << ". Transaction state unknown for shard " << ev->Get()->TabletId << ".");
+ << ". Transaction state unknown for tablet " << ev->Get()->TabletId << ".");
} else {
RuntimeError(
NYql::NDqProto::StatusIds::UNAVAILABLE,
@@ -1787,12 +1787,13 @@ public:
|| State == EState::COMMITTING
|| State == EState::ROLLINGBACK;
- if (EnableStreamWrite && outOfMemory) {
+ if (!EnableStreamWrite && outOfMemory) {
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
TStringBuilder() << "Stream write queries aren't allowed.",
{});
+ return;
}
if (needToFlush) {
@@ -2363,7 +2364,7 @@ public:
<< getIssues().ToOneLineString());
TxManager->SetError(ev->Get()->Record.GetOrigin());
ReplyErrorAndDie(
- NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
+ NYql::NDqProto::StatusIds::UNAVAILABLE,
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
TStringBuilder() << "Wrong shard state for tables " << getPathes() << ".",
getIssues());
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 07c5c58d1f1..e0f2bbbac77 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1259,6 +1259,7 @@ public:
return true;
}
+ YQL_ENSURE(tx || commit);
if (tx) {
switch (tx->GetType()) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
@@ -1285,15 +1286,16 @@ public:
QueryState->QueryData->AddUVParam(paramDesc.GetName(), paramType, value);
}
+ }
- try {
- QueryState->QueryData->PrepareParameters(tx, QueryState->PreparedQuery, txCtx.TxAlloc->TypeEnv);
- } catch (const yexception& ex) {
- ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
- }
+ try {
+ QueryState->QueryData->PrepareParameters(tx, QueryState->PreparedQuery, txCtx.TxAlloc->TypeEnv);
+ } catch (const yexception& ex) {
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
+ }
+
+ if (tx) {
request.Transactions.emplace_back(tx, QueryState->QueryData);
- } else {
- YQL_ENSURE(commit);
}
QueryState->TxCtx->OnNewExecutor(literal);
@@ -1468,7 +1470,7 @@ public:
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(),
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup,
- (!Settings.TableService.GetEnableOltpSink() && QueryState && QueryState->RequestEv->GetSyntax() == Ydb::Query::Syntax::SYNTAX_PG)
+ (QueryState && QueryState->RequestEv->GetSyntax() == Ydb::Query::Syntax::SYNTAX_PG)
? GUCSettings : nullptr,
txCtx->ShardIdToTableInfo, txCtx->TxManager, txCtx->BufferActorId);
diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
index 1625298ec22..962bfce88ee 100644
--- a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
+++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
@@ -19,10 +19,13 @@ namespace {
}
Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
- Y_UNIT_TEST_TWIN(Upsert, LogEnabled) {
+ Y_UNIT_TEST_QUAD(Upsert, LogEnabled, UseSink) {
TStringStream ss;
{
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
TKikimrSettings serverSettings;
+ serverSettings.SetAppConfig(appConfig);
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
@@ -44,8 +47,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
- // check executer logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
+ if (UseSink) {
+ // check write actor logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), LogEnabled ? 1 : 0);
+ } else {
+ // check executer logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
+ }
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0);
// check grpc logs
@@ -54,49 +62,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), LogEnabled ? 2 : 0);
}
- Y_UNIT_TEST(UpsertEvWrite) {
+ Y_UNIT_TEST_QUAD(UpsertEvWriteQueryService, isOlap, useOltpSink) {
TStringStream ss;
{
NKikimrConfig::TAppConfig AppConfig;
- AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
- TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
- serverSettings.LogStream = &ss;
- TKikimrRunner kikimr(serverSettings);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
-
- auto db = kikimr.GetTableClient();
- auto session = db.CreateSession().GetValueSync().GetSession();
-
- auto result = session.ExecuteDataQuery(R"(
- --!syntax_v1
-
- UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
- (3u, "Value3"),
- (101u, "Value101"),
- (201u, "Value201");
- )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
- }
+ AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(useOltpSink);
+ AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(isOlap);
- // check write actor logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
- // check session actor logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
- // check grpc logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
- // check datashard logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
- }
-
- Y_UNIT_TEST_TWIN(UpsertEvWriteQueryService, isOlap) {
- TStringStream ss;
- {
- NKikimrConfig::TAppConfig AppConfig;
- if (!isOlap) {
- AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
- } else {
- AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
- }
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
@@ -132,8 +104,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}
if (!isOlap) {
- // check write actor logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
+ if (useOltpSink) {
+ // check write actor logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
+ } else {
+ // check executer logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 2);
+ }
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
@@ -143,8 +120,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
} else {
// check write actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 3);
- // check executer logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 11);
+ if (useOltpSink) {
+ // check executer logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
+ } else {
+ // check executer logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 11);
+ }
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
@@ -215,10 +197,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
}
- Y_UNIT_TEST(BrokenReadLock) {
+ Y_UNIT_TEST_TWIN(BrokenReadLock, UseSink) {
TStringStream ss;
{
+ NKikimrConfig::TAppConfig AppConfig;
+ AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
TKikimrSettings serverSettings;
+ serverSettings.SetAppConfig(AppConfig);
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
index 484bee1fb7a..94b0b2ec630 100644
--- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
@@ -159,7 +159,9 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
}
}
- Y_UNIT_TEST(InsertNotNullPkPg) {
+ Y_UNIT_TEST_TWIN(InsertNotNullPkPg, useSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto client = kikimr.GetTableClient();
auto session = client.CreateSession().GetValueSync().GetSession();
@@ -609,10 +611,13 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
}
}
- Y_UNIT_TEST(InsertNotNullPg) {
+ Y_UNIT_TEST_TWIN(InsertNotNullPg, useSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableNotNullDataColumns(true);
+ .SetEnableNotNullDataColumns(true)
+ .SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
auto client = kikimr.GetTableClient();
@@ -648,8 +653,13 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!result.IsSuccess());
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
- UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
- " <main>: Error: Tried to insert NULL value into NOT NULL column: Value, code: 2031\n");
+ if (useSink) {
+ UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(),
+ "<main>: Error: Tried to insert NULL value into NOT NULL column: Value, code: 2031\n");
+ } else {
+ UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
+ " <main>: Error: Tried to insert NULL value into NOT NULL column: Value, code: 2031\n");
+ }
}
}
diff --git a/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp b/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp
index abb93736098..ad9b88a0c48 100644
--- a/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp
+++ b/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp
@@ -352,8 +352,10 @@ Y_UNIT_TEST_SUITE(PgCatalog) {
}
}
- Y_UNIT_TEST(PgDatabase) {
- TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ Y_UNIT_TEST_TWIN(PgDatabase, useSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig));
auto db = kikimr.GetQueryClient();
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
{
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index 84fd19a19af..f8d605c0118 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -54,8 +54,15 @@ namespace {
}
Y_UNIT_TEST_SUITE(KqpLimits) {
- Y_UNIT_TEST(QSReplySizeEnsureMemoryLimits) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(QSReplySizeEnsureMemoryLimits, useSink) {
+ auto app = NKikimrConfig::TAppConfig();
+ app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
+
+ auto settings = TKikimrSettings()
+ .SetAppConfig(app)
+ .SetWithSampleTables(true);
+
+ TKikimrRunner kikimr(settings);
CreateLargeTable(kikimr, 1'000, 100, 1'000, 1'000);
auto db = kikimr.GetQueryClient();
@@ -81,6 +88,9 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
UNIT_ASSERT(!to_lower(TString{result.GetIssues().ToString()}).Contains("query result"));
+ if (useSink) {
+ UNIT_ASSERT(result.GetIssues().ToString().contains("Stream write queries aren't allowed"));
+ }
}
Y_UNIT_TEST(KqpMkqlMemoryLimitException) {
@@ -164,14 +174,19 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
}
- Y_UNIT_TEST(ComputeActorMemoryAllocationFailure) {
+ Y_UNIT_TEST_TWIN(ComputeActorMemoryAllocationFailure, useSink) {
auto app = NKikimrConfig::TAppConfig();
+ app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10);
app.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(2000);
app.MutableResourceBrokerConfig()->CopyFrom(MakeResourceBrokerTestConfig());
- TKikimrRunner kikimr(app);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(app)
+ .SetWithSampleTables(false);
+
+ TKikimrRunner kikimr(settings);
CreateLargeTable(kikimr, 0, 0, 0);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);
@@ -188,8 +203,9 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
UNIT_ASSERT_C(result.GetIssues().ToString().contains("Mkql memory limit exceeded"), result.GetIssues().ToString());
}
- Y_UNIT_TEST(ComputeActorMemoryAllocationFailureQueryService) {
+ Y_UNIT_TEST_TWIN(ComputeActorMemoryAllocationFailureQueryService, useSink) {
auto app = NKikimrConfig::TAppConfig();
+ app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10);
app.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(2000);
@@ -197,7 +213,11 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
app.MutableFeatureFlags()->SetEnableResourcePools(true);
- TKikimrRunner kikimr(app);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(app)
+ .SetWithSampleTables(false);
+
+ TKikimrRunner kikimr(settings);
CreateLargeTable(kikimr, 0, 0, 0);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);
@@ -220,11 +240,16 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
Cerr << stats->ToString(true) << Endl;
}
- Y_UNIT_TEST(DatashardProgramSize) {
+ Y_UNIT_TEST_TWIN(DatashardProgramSize, useSink) {
auto app = NKikimrConfig::TAppConfig();
+ app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);
- TKikimrRunner kikimr(app);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(app)
+ .SetWithSampleTables(false);
+
+ TKikimrRunner kikimr(settings);
CreateLargeTable(kikimr, 0, 0, 0);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);
@@ -259,9 +284,12 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
SELECT * FROM AS_TABLE($rows);
)"), TTxControl::BeginTx().CommitTx(), paramsBuilder.Build()).ExtractValueSync();
result.GetIssues().PrintTo(Cerr);
- // UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
- UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED));
+ if (useSink) {
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED));
+ }
}
Y_UNIT_TEST(DatashardReplySize) {
@@ -361,13 +389,23 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}
}
- Y_UNIT_TEST(OutOfSpaceYQLUpsertFail) {
- TKikimrRunner kikimr(NFake::TStorage{
- .UseDisk = false,
- .SectorSize = 4096,
- .ChunkSize = 32_MB,
- .DiskSize = 8_GB
- });
+ Y_UNIT_TEST_TWIN(OutOfSpaceYQLUpsertFail, useSink) {
+ UNIT_ASSERT(!useSink);
+ auto app = NKikimrConfig::TAppConfig();
+ app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
+ app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);
+
+ auto settings = TKikimrSettings()
+ .SetAppConfig(app)
+ .SetWithSampleTables(false)
+ .SetStorage(NFake::TStorage{
+ .UseDisk = false,
+ .SectorSize = 4096,
+ .ChunkSize = 32_MB,
+ .DiskSize = 8_GB
+ });
+
+ TKikimrRunner kikimr(settings);
kikimr.GetTestClient().CreateTable("/Root", R"(
Name: "LargeTable"
@@ -417,10 +455,13 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}
if (result.GetStatus() != EStatus::SUCCESS) {
result.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::UNAVAILABLE, result.GetIssues().ToString());
- if (result.GetIssues().ToString().contains("OUT_OF_SPACE")) {
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::UNAVAILABLE, result.GetIssues().ToString());
+ if (result.GetIssues().ToString().contains("OUT_OF_SPACE")
+ || result.GetIssues().ToString().contains("DISK_SPACE_EXHAUSTED")) {
getOutOfSpace = true;
- } else if (result.GetIssues().ToString().contains("WRONG_SHARD_STATE")) {
+ } else if (result.GetIssues().ToString().contains("WRONG_SHARD_STATE")
+ || result.GetIssues().ToString().contains("wrong shard state")
+ || result.GetIssues().ToString().contains("can't deliver message to tablet")) {
// shards are allowed to split
continue;
}
@@ -428,7 +469,7 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}
++batchIdx;
}
- UNIT_ASSERT_C(getOutOfSpace, "Successfully inserted " << rowsPerBatch << " x " << batchCount << " lines, each of size " << dataTextSize << "bytes");
+ UNIT_ASSERT_C(getOutOfSpace, "Successfully inserted " << rowsPerBatch << " x " << batchIdx << " lines, each of size " << dataTextSize << "bytes");
}
Y_UNIT_TEST_TWIN(TooBigQuery, useSink) {
@@ -561,8 +602,10 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}), result.GetIssues().ToString());
}
- Y_UNIT_TEST(TooBigColumn) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(TooBigColumn, useSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
+ TKikimrRunner kikimr(appConfig);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -579,11 +622,15 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
)"), TTxControl::BeginTx().CommitTx(), params).ExtractValueSync();
result.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
+ if (!useSink) {
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
- [] (const auto& issue) {
- return issue.GetMessage().contains("larger than the allowed threshold");
- }));
+ [] (const auto& issue) {
+ return issue.GetMessage().contains("larger than the allowed threshold");
+ }));
}
Y_UNIT_TEST(AffectedShardsLimit) {
@@ -1364,8 +1411,15 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}
}
- Y_UNIT_TEST(QSReplySize) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(QSReplySize, useSink) {
+ auto app = NKikimrConfig::TAppConfig();
+ app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
+
+ auto settings = TKikimrSettings()
+ .SetAppConfig(app)
+ .SetWithSampleTables(true);
+
+ TKikimrRunner kikimr(settings);
CreateLargeTable(kikimr, 10'000, 100, 1'000, 1'000);
auto db = kikimr.GetQueryClient();
@@ -1380,6 +1434,9 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
UNIT_ASSERT(!to_lower(TString{result.GetIssues().ToString()}).Contains("query result"));
+ if (useSink) {
+ UNIT_ASSERT(result.GetIssues().ToString().contains("Stream write queries aren't allowed"));
+ }
}
}
diff --git a/ydb/core/kqp/ut/query/kqp_params_ut.cpp b/ydb/core/kqp/ut/query/kqp_params_ut.cpp
index 98690721b1f..510245dfde3 100644
--- a/ydb/core/kqp/ut/query/kqp_params_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_params_ut.cpp
@@ -797,8 +797,10 @@ Y_UNIT_TEST_SUITE(KqpParams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
}
- Y_UNIT_TEST_TWIN(Decimal, QueryService) {
- auto kikimr = DefaultKikimrRunner();
+ Y_UNIT_TEST_QUAD(Decimal, QueryService, UseSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
+ auto kikimr = DefaultKikimrRunner({}, appConfig);
auto tableClient = kikimr.GetTableClient();
auto queryClient = kikimr.GetQueryClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/library/yql/dq/actors/dq.cpp b/ydb/library/yql/dq/actors/dq.cpp
index 793d1f0c7f9..9b95629295b 100644
--- a/ydb/library/yql/dq/actors/dq.cpp
+++ b/ydb/library/yql/dq/actors/dq.cpp
@@ -16,6 +16,8 @@ Ydb::StatusIds::StatusCode DqStatusToYdbStatus(NYql::NDqProto::StatusIds::Status
return Ydb::StatusIds::ABORTED;
case NYql::NDqProto::StatusIds::UNAVAILABLE:
return Ydb::StatusIds::UNAVAILABLE;
+ case NYql::NDqProto::StatusIds::UNDETERMINED:
+ return Ydb::StatusIds::UNDETERMINED;
case NYql::NDqProto::StatusIds::BAD_REQUEST:
return Ydb::StatusIds::BAD_REQUEST;
case NYql::NDqProto::StatusIds::PRECONDITION_FAILED:
@@ -58,6 +60,8 @@ NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::Status
return NYql::NDqProto::StatusIds::ABORTED;
case Ydb::StatusIds::UNAVAILABLE:
return NYql::NDqProto::StatusIds::UNAVAILABLE;
+ case Ydb::StatusIds::UNDETERMINED:
+ return NYql::NDqProto::StatusIds::UNDETERMINED;
case Ydb::StatusIds::OVERLOADED:
return NYql::NDqProto::StatusIds::OVERLOADED;
case Ydb::StatusIds::TIMEOUT: