aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-03-07 15:16:35 +0300
committerGitHub <noreply@github.com>2025-03-07 12:16:35 +0000
commitd8e6990143e6acdb860b90d58059f18a9d60ee01 (patch)
tree9ba158e7af45caa511dd5929ae6c644ad0ecae78
parentd4ebd8601390add69dca6ca3fef7a5515ba8df6f (diff)
downloadydb-d8e6990143e6acdb860b90d58059f18a9d60ee01.tar.gz
Fix index errors (sink) (#15409)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp8
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp5
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp3
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp21
-rw-r--r--ydb/core/kqp/ut/olap/json_ut.cpp12
-rw-r--r--ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp18
-rw-r--r--ydb/core/kqp/ut/tx/kqp_tx_ut.cpp5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h7
-rw-r--r--ydb/library/yql/dq/actors/dq.cpp56
-rw-r--r--ydb/library/yql/dq/actors/dq.h1
10 files changed, 109 insertions, 27 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index afe849bce70..e82fcdbb5f8 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -88,7 +88,7 @@ void TKqpComputeActor::DoBootstrap() {
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, ArrayBufferMinFillPercentage, std::move(wakeupCallback), std::move(errorCallback)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
- InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
+ ErrorFromIssue((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
}
@@ -124,7 +124,7 @@ void TKqpComputeActor::DoBootstrap() {
auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ScanData->TablePath, ranges, columns, UserToken, Database, reverse);
if (!scanActor) {
- InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
+ ErrorFromIssue(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
<< "Failed to create system view scan, table id: " << ScanData->TableId);
return;
}
@@ -152,9 +152,9 @@ STFUNC(TKqpComputeActor::StateFunc) {
} catch (const TMemoryLimitExceededException& e) {
TBase::OnMemoryLimitExceptionHandler();
} catch (const NMiniKQL::TKqpEnsureFail& e) {
- InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
+ ErrorFromIssue((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
} catch (const std::exception& e) {
- InternalError(TIssuesIds::DEFAULT_ERROR, e.what());
+ ErrorFromIssue(TIssuesIds::DEFAULT_ERROR, e.what());
}
ReportEventElapsedTime();
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index 14043c411b2..a3fc5fb7bd1 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -214,11 +214,10 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns,
}
std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, const TKikimrTableDescription& table,
- const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool isSink,
+ const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool /*isSink*/,
TPositionHandle pos, TExprContext& ctx)
{
auto input = write.Input();
- const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace) && !isSink;
TCoAtomList inputCols = BuildUpsertInputColumns(inputColumns, autoIncrement, pos, ctx);
@@ -226,7 +225,9 @@ std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, co
input = BuildKqlSequencer(input, table, inputCols, autoIncrement, pos, ctx);
}
+ const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace);
if (isWriteReplace) {
+ // TODO: don't need it for sinks (can be disabled when secondary indexes are supported inside write actor)
std::tie(input, inputCols) = CreateRowsToReplace(input, inputCols, table, write.Pos(), ctx);
}
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
index 46b1e84f2ca..b1284f52571 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
@@ -1767,8 +1767,9 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
}
- Y_UNIT_TEST(DataColumnWrite) {
+ Y_UNIT_TEST_TWIN(DataColumnWrite, UseSink) {
NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index afad4d196ab..5c7b2550948 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -2873,10 +2873,13 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
}
}
- Y_UNIT_TEST(SecondaryIndexReplace) {
+ Y_UNIT_TEST_TWIN(SecondaryIndexReplace, UseSink) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
- .SetKqpSettings({setting});
+ .SetKqpSettings({setting})
+ .SetAppConfig(app);
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3370,10 +3373,13 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
}
- Y_UNIT_TEST(MultipleSecondaryIndexWithSameComulns) {
+ Y_UNIT_TEST_TWIN(MultipleSecondaryIndexWithSameComulns, UseSink) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
- .SetKqpSettings({setting});
+ .SetKqpSettings({setting})
+ .SetAppConfig(app);
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3706,10 +3712,13 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
}
}
- Y_UNIT_TEST(SecondaryIndexWithPrimaryKeySameComulns) {
+ Y_UNIT_TEST_TWIN(SecondaryIndexWithPrimaryKeySameComulns, UseSink) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
- .SetKqpSettings({setting});
+ .SetKqpSettings({setting})
+ .SetAppConfig(app);
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp
index 041728997e6..ec2581384f7 100644
--- a/ydb/core/kqp/ut/olap/json_ut.cpp
+++ b/ydb/core/kqp/ut/olap/json_ut.cpp
@@ -768,27 +768,27 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
------
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "a1" ORDER BY Col1;
EXPECTED: [[1u;["{\"a.b.c\":\"a1\"}"]]]
- IDX_ND_SKIP_APPROVE: 0, 3, 1
+ IDX_ND_SKIP_APPROVE: 0, 4, 1
------
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "1a1" ORDER BY Col1;
EXPECTED: [[11u;["{\"a.b.c\":\"1a1\"}"]]]
- IDX_ND_SKIP_APPROVE: 0, 3, 1
+ IDX_ND_SKIP_APPROVE: 0, 4, 1
------
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b4" ORDER BY Col1;
EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]]
- IDX_ND_SKIP_APPROVE: 0, 3, 1
+ IDX_ND_SKIP_APPROVE: 0, 4, 1
------
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b5" ORDER BY Col1;
EXPECTED: []
- IDX_ND_SKIP_APPROVE: 0, 4, 0
+ IDX_ND_SKIP_APPROVE: 0, 5, 0
------
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b3" ORDER BY Col1;
EXPECTED: [[13u;["{\"b.c.d\":\"1b3\"}"]]]
- IDX_ND_SKIP_APPROVE: 0, 3, 1
+ IDX_ND_SKIP_APPROVE: 0, 4, 1
------
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b5" ORDER BY Col1;
EXPECTED: []
- IDX_ND_SKIP_APPROVE: 0, 4, 0
+ IDX_ND_SKIP_APPROVE: 0, 5, 0
)";
TScriptVariator(script).Execute();
diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
index 94b0b2ec630..e361195e91f 100644
--- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
@@ -162,7 +162,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
Y_UNIT_TEST_TWIN(InsertNotNullPkPg, useSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
- TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig));
auto client = kikimr.GetTableClient();
auto session = client.CreateSession().GetValueSync().GetSession();
{
@@ -195,8 +195,12 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
- UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
- " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
+ if (useSink) {
+ UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
+ } else {
+ UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
+ " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
+ }
}
{ /* set NULL to not null pk column */
@@ -208,8 +212,12 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
- UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
- " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
+ if (useSink) {
+ UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
+ } else {
+ UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
+ " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
+ }
}
{ /* set NULL to nullable column */
diff --git a/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp b/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp
index 0976bbadcda..401e5b17e0e 100644
--- a/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp
+++ b/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp
@@ -82,7 +82,10 @@ Y_UNIT_TEST_SUITE(KqpTx) {
}
Y_UNIT_TEST(LocksAbortOnCommit) {
- auto kikimr = DefaultKikimrRunner();
+ NKikimrConfig::TAppConfig app;
+ // See KqpSinkTx::LocksAbortOnCommit for sink version of this test
+ app.MutableTableServiceConfig()->SetEnableOltpSink(false);
+ auto kikimr = DefaultKikimrRunner({}, app);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
{
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 151f8092a10..cd692f925a6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -653,8 +653,11 @@ protected:
Terminate(State == NDqProto::COMPUTE_STATE_FINISHED, NDqProto::EComputeState_Name(State));
}
- void InternalError(TIssuesIds::EIssueCode issueCode, const TString& message) {
- InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, issueCode, message);
+ void ErrorFromIssue(TIssuesIds::EIssueCode issueCode, const TString& message) {
+ TIssue issue(message);
+ SetIssueCode(issueCode, issue);
+ const auto statusCode = GetDqStatus(issue).GetOrElse(NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
+ InternalError(statusCode, std::move(issue));
}
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssuesIds::EIssueCode issueCode, const TString& message) {
diff --git a/ydb/library/yql/dq/actors/dq.cpp b/ydb/library/yql/dq/actors/dq.cpp
index 9b95629295b..b7bdff74db0 100644
--- a/ydb/library/yql/dq/actors/dq.cpp
+++ b/ydb/library/yql/dq/actors/dq.cpp
@@ -1,5 +1,7 @@
#include "dq.h"
+#include <yql/essentials/core/issue/yql_issue.h>
+
namespace NYql::NDq {
Ydb::StatusIds::StatusCode DqStatusToYdbStatus(NYql::NDqProto::StatusIds::StatusCode statusCode) {
@@ -84,4 +86,58 @@ NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::Status
}
}
+TMaybe<NYql::NDqProto::StatusIds::StatusCode> GetDqStatus(const TIssue& issue) {
+ if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
+ return NYql::NDqProto::StatusIds::INTERNAL_ERROR;
+ }
+
+ switch (issue.GetCode()) {
+ case NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED:
+ case NYql::TIssuesIds::KIKIMR_LOCKS_ACQUIRE_FAILURE:
+ case NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED:
+ case NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH:
+ return NYql::NDqProto::StatusIds::ABORTED;
+
+ case NYql::TIssuesIds::KIKIMR_SCHEME_ERROR:
+ return NYql::NDqProto::StatusIds::SCHEME_ERROR;
+
+ case NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE:
+ return NYql::NDqProto::StatusIds::UNAVAILABLE;
+
+ case NYql::TIssuesIds::KIKIMR_OVERLOADED:
+ case NYql::TIssuesIds::KIKIMR_MULTIPLE_SCHEME_MODIFICATIONS:
+ return NYql::NDqProto::StatusIds::OVERLOADED;
+
+ case NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION:
+ case NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED:
+ return NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
+
+ case NYql::TIssuesIds::KIKIMR_BAD_REQUEST:
+ case NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE:
+ case NYql::TIssuesIds::KIKIMR_NO_COLUMN_DEFAULT_VALUE:
+ return NYql::NDqProto::StatusIds::BAD_REQUEST;
+
+ case NYql::TIssuesIds::KIKIMR_ACCESS_DENIED:
+ return NYql::NDqProto::StatusIds::UNAUTHORIZED;
+
+ case NYql::TIssuesIds::KIKIMR_TIMEOUT:
+ return NYql::NDqProto::StatusIds::TIMEOUT;
+
+ case NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED:
+ return NYql::NDqProto::StatusIds::CANCELLED;
+
+ case NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE:
+ case NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN:
+ return NYql::NDqProto::StatusIds::UNDETERMINED;
+
+ case NYql::TIssuesIds::KIKIMR_UNSUPPORTED:
+ return NYql::NDqProto::StatusIds::UNSUPPORTED;
+
+ default:
+ break;
+ }
+
+ return Nothing();
+}
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/dq.h b/ydb/library/yql/dq/actors/dq.h
index 64a8095060d..769674d6447 100644
--- a/ydb/library/yql/dq/actors/dq.h
+++ b/ydb/library/yql/dq/actors/dq.h
@@ -18,6 +18,7 @@ enum class EStatusCompatibilityLevel {
Ydb::StatusIds::StatusCode DqStatusToYdbStatus(NYql::NDqProto::StatusIds::StatusCode statusCode);
NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::StatusCode statusCode, EStatusCompatibilityLevel compatibility = EStatusCompatibilityLevel::Basic);
+TMaybe<NYql::NDqProto::StatusIds::StatusCode> GetDqStatus(const TIssue& issue);
struct TEvDq {