diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-03-07 15:16:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-07 12:16:35 +0000 |
commit | d8e6990143e6acdb860b90d58059f18a9d60ee01 (patch) | |
tree | 9ba158e7af45caa511dd5929ae6c644ad0ecae78 | |
parent | d4ebd8601390add69dca6ca3fef7a5515ba8df6f (diff) | |
download | ydb-d8e6990143e6acdb860b90d58059f18a9d60ee01.tar.gz |
Fix index errors (sink) (#15409)
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_kql.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/json_ut.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/ut/tx/kqp_tx_ut.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 7 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/dq.cpp | 56 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/dq.h | 1 |
10 files changed, 109 insertions, 27 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index afe849bce70..e82fcdbb5f8 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -88,7 +88,7 @@ void TKqpComputeActor::DoBootstrap() { try { PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, ArrayBufferMinFillPercentage, std::move(wakeupCallback), std::move(errorCallback))); } catch (const NMiniKQL::TKqpEnsureFail& e) { - InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); + ErrorFromIssue((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); return; } @@ -124,7 +124,7 @@ void TKqpComputeActor::DoBootstrap() { auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ScanData->TablePath, ranges, columns, UserToken, Database, reverse); if (!scanActor) { - InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() + ErrorFromIssue(TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Failed to create system view scan, table id: " << ScanData->TableId); return; } @@ -152,9 +152,9 @@ STFUNC(TKqpComputeActor::StateFunc) { } catch (const TMemoryLimitExceededException& e) { TBase::OnMemoryLimitExceptionHandler(); } catch (const NMiniKQL::TKqpEnsureFail& e) { - InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); + ErrorFromIssue((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); } catch (const std::exception& e) { - InternalError(TIssuesIds::DEFAULT_ERROR, e.what()); + ErrorFromIssue(TIssuesIds::DEFAULT_ERROR, e.what()); } ReportEventElapsedTime(); diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 14043c411b2..a3fc5fb7bd1 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -214,11 +214,10 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns, } std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, const TKikimrTableDescription& table, - const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool isSink, + const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool /*isSink*/, TPositionHandle pos, TExprContext& ctx) { auto input = write.Input(); - const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace) && !isSink; TCoAtomList inputCols = BuildUpsertInputColumns(inputColumns, autoIncrement, pos, ctx); @@ -226,7 +225,9 @@ std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, co input = BuildKqlSequencer(input, table, inputCols, autoIncrement, pos, ctx); } + const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace); if (isWriteReplace) { + // TODO: don't need it for sinks (can be disabled when secondary indexes are supported inside write actor) std::tie(input, inputCols) = CreateRowsToReplace(input, inputCols, table, write.Pos(), ctx); } diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp index 46b1e84f2ca..b1284f52571 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp @@ -1767,8 +1767,9 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } } - Y_UNIT_TEST(DataColumnWrite) { + Y_UNIT_TEST_TWIN(DataColumnWrite, UseSink) { NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index afad4d196ab..5c7b2550948 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -2873,10 +2873,13 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { } } - Y_UNIT_TEST(SecondaryIndexReplace) { + Y_UNIT_TEST_TWIN(SecondaryIndexReplace, UseSink) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() - .SetKqpSettings({setting}); + .SetKqpSettings({setting}) + .SetAppConfig(app); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3370,10 +3373,13 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda } - Y_UNIT_TEST(MultipleSecondaryIndexWithSameComulns) { + Y_UNIT_TEST_TWIN(MultipleSecondaryIndexWithSameComulns, UseSink) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() - .SetKqpSettings({setting}); + .SetKqpSettings({setting}) + .SetAppConfig(app); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3706,10 +3712,13 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda } } - Y_UNIT_TEST(SecondaryIndexWithPrimaryKeySameComulns) { + Y_UNIT_TEST_TWIN(SecondaryIndexWithPrimaryKeySameComulns, UseSink) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() - .SetKqpSettings({setting}); + .SetKqpSettings({setting}) + .SetAppConfig(app); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp index 041728997e6..ec2581384f7 100644 --- a/ydb/core/kqp/ut/olap/json_ut.cpp +++ b/ydb/core/kqp/ut/olap/json_ut.cpp @@ -768,27 +768,27 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { ------ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "a1" ORDER BY Col1; EXPECTED: [[1u;["{\"a.b.c\":\"a1\"}"]]] - IDX_ND_SKIP_APPROVE: 0, 3, 1 + IDX_ND_SKIP_APPROVE: 0, 4, 1 ------ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "1a1" ORDER BY Col1; EXPECTED: [[11u;["{\"a.b.c\":\"1a1\"}"]]] - IDX_ND_SKIP_APPROVE: 0, 3, 1 + IDX_ND_SKIP_APPROVE: 0, 4, 1 ------ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b4" ORDER BY Col1; EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] - IDX_ND_SKIP_APPROVE: 0, 3, 1 + IDX_ND_SKIP_APPROVE: 0, 4, 1 ------ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b5" ORDER BY Col1; EXPECTED: [] - IDX_ND_SKIP_APPROVE: 0, 4, 0 + IDX_ND_SKIP_APPROVE: 0, 5, 0 ------ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b3" ORDER BY Col1; EXPECTED: [[13u;["{\"b.c.d\":\"1b3\"}"]]] - IDX_ND_SKIP_APPROVE: 0, 3, 1 + IDX_ND_SKIP_APPROVE: 0, 4, 1 ------ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b5" ORDER BY Col1; EXPECTED: [] - IDX_ND_SKIP_APPROVE: 0, 4, 0 + IDX_ND_SKIP_APPROVE: 0, 5, 0 )"; TScriptVariator(script).Execute(); diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp index 94b0b2ec630..e361195e91f 100644 --- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp @@ -162,7 +162,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { Y_UNIT_TEST_TWIN(InsertNotNullPkPg, useSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink); - TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig)); auto client = kikimr.GetTableClient(); auto session = client.CreateSession().GetValueSync().GetSession(); { @@ -195,8 +195,12 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString()); - UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n" - " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); + if (useSink) { + UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); + } else { + UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n" + " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); + } } { /* set NULL to not null pk column */ @@ -208,8 +212,12 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString()); - UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n" - " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); + if (useSink) { + UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); + } else { + UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n" + " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); + } } { /* set NULL to nullable column */ diff --git a/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp b/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp index 0976bbadcda..401e5b17e0e 100644 --- a/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp @@ -82,7 +82,10 @@ Y_UNIT_TEST_SUITE(KqpTx) { } Y_UNIT_TEST(LocksAbortOnCommit) { - auto kikimr = DefaultKikimrRunner(); + NKikimrConfig::TAppConfig app; + // See KqpSinkTx::LocksAbortOnCommit for sink version of this test + app.MutableTableServiceConfig()->SetEnableOltpSink(false); + auto kikimr = DefaultKikimrRunner({}, app); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 151f8092a10..cd692f925a6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -653,8 +653,11 @@ protected: Terminate(State == NDqProto::COMPUTE_STATE_FINISHED, NDqProto::EComputeState_Name(State)); } - void InternalError(TIssuesIds::EIssueCode issueCode, const TString& message) { - InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, issueCode, message); + void ErrorFromIssue(TIssuesIds::EIssueCode issueCode, const TString& message) { + TIssue issue(message); + SetIssueCode(issueCode, issue); + const auto statusCode = GetDqStatus(issue).GetOrElse(NYql::NDqProto::StatusIds::PRECONDITION_FAILED); + InternalError(statusCode, std::move(issue)); } void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssuesIds::EIssueCode issueCode, const TString& message) { diff --git a/ydb/library/yql/dq/actors/dq.cpp b/ydb/library/yql/dq/actors/dq.cpp index 9b95629295b..b7bdff74db0 100644 --- a/ydb/library/yql/dq/actors/dq.cpp +++ b/ydb/library/yql/dq/actors/dq.cpp @@ -1,5 +1,7 @@ #include "dq.h" +#include <yql/essentials/core/issue/yql_issue.h> + namespace NYql::NDq { Ydb::StatusIds::StatusCode DqStatusToYdbStatus(NYql::NDqProto::StatusIds::StatusCode statusCode) { @@ -84,4 +86,58 @@ NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::Status } } +TMaybe<NYql::NDqProto::StatusIds::StatusCode> GetDqStatus(const TIssue& issue) { + if (issue.GetSeverity() == TSeverityIds::S_FATAL) { + return NYql::NDqProto::StatusIds::INTERNAL_ERROR; + } + + switch (issue.GetCode()) { + case NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED: + case NYql::TIssuesIds::KIKIMR_LOCKS_ACQUIRE_FAILURE: + case NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED: + case NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH: + return NYql::NDqProto::StatusIds::ABORTED; + + case NYql::TIssuesIds::KIKIMR_SCHEME_ERROR: + return NYql::NDqProto::StatusIds::SCHEME_ERROR; + + case NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE: + return NYql::NDqProto::StatusIds::UNAVAILABLE; + + case NYql::TIssuesIds::KIKIMR_OVERLOADED: + case NYql::TIssuesIds::KIKIMR_MULTIPLE_SCHEME_MODIFICATIONS: + return NYql::NDqProto::StatusIds::OVERLOADED; + + case NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION: + case NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED: + return NYql::NDqProto::StatusIds::PRECONDITION_FAILED; + + case NYql::TIssuesIds::KIKIMR_BAD_REQUEST: + case NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE: + case NYql::TIssuesIds::KIKIMR_NO_COLUMN_DEFAULT_VALUE: + return NYql::NDqProto::StatusIds::BAD_REQUEST; + + case NYql::TIssuesIds::KIKIMR_ACCESS_DENIED: + return NYql::NDqProto::StatusIds::UNAUTHORIZED; + + case NYql::TIssuesIds::KIKIMR_TIMEOUT: + return NYql::NDqProto::StatusIds::TIMEOUT; + + case NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED: + return NYql::NDqProto::StatusIds::CANCELLED; + + case NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE: + case NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN: + return NYql::NDqProto::StatusIds::UNDETERMINED; + + case NYql::TIssuesIds::KIKIMR_UNSUPPORTED: + return NYql::NDqProto::StatusIds::UNSUPPORTED; + + default: + break; + } + + return Nothing(); +} + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/dq.h b/ydb/library/yql/dq/actors/dq.h index 64a8095060d..769674d6447 100644 --- a/ydb/library/yql/dq/actors/dq.h +++ b/ydb/library/yql/dq/actors/dq.h @@ -18,6 +18,7 @@ enum class EStatusCompatibilityLevel { Ydb::StatusIds::StatusCode DqStatusToYdbStatus(NYql::NDqProto::StatusIds::StatusCode statusCode); NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::StatusCode statusCode, EStatusCompatibilityLevel compatibility = EStatusCompatibilityLevel::Basic); +TMaybe<NYql::NDqProto::StatusIds::StatusCode> GetDqStatus(const TIssue& issue); struct TEvDq { |