diff options
author | ulya-sidorina <[email protected]> | 2023-03-31 16:22:54 +0300 |
---|---|---|
committer | ulya-sidorina <[email protected]> | 2023-03-31 16:22:54 +0300 |
commit | 986e64263081c9086bbac0384d3d2ef455252760 (patch) | |
tree | 834830d41c44bec3c2efa9ee9060b6296e6ad863 | |
parent | 6a960771f404ceb5b191e44a696406cb52859336 (diff) |
,fix commit of broken tx with immediate effects
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 40 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_locks_helper.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp | 169 |
7 files changed, 215 insertions, 7 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index d4090dec0f5..64f6455a2b4 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -139,6 +139,8 @@ public: if (Request.Snapshot.IsValid()) { YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE); } + + ReadOnlyTx = IsReadOnlyTx(); } void CheckExecutionComplete() { @@ -1274,6 +1276,28 @@ private: } private: + bool IsReadOnlyTx() const { + if (Request.TopicOperations.HasOperations()) { + YQL_ENSURE(!Request.UseImmediateEffects); + return false; + } + + if (Request.ValidateLocks && Request.EraseLocks) { + YQL_ENSURE(!Request.UseImmediateEffects); + return false; + } + + for (const auto& tx : Request.Transactions) { + for (const auto& stage : tx.Body->GetStages()) { + if (stage.GetIsEffectsStage()) { + return false; + } + } + } + + return true; + } + void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse) { if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) { // Validate parameters @@ -1548,7 +1572,7 @@ private: const ui32 flags = (ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) | (VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0); - if (Snapshot.IsValid() && (ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects())) { + if (Snapshot.IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) { ev.reset(new TEvDataShard::TEvProposeTransaction( NKikimrTxDataShard::TX_KIND_DATA, SelfId(), @@ -1624,7 +1648,6 @@ private: LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); size_t readActors = 0; - ReadOnlyTx = !Request.TopicOperations.HasOperations(); for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) { auto& tx = Request.Transactions[txIdx]; @@ -1653,8 +1676,6 @@ private: LOG_D("Stage " << stageInfo.Id << " AST: " << stage.GetProgramAst()); - ReadOnlyTx &= !stage.GetIsEffectsStage(); - if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: @@ -1859,8 +1880,9 @@ private: break; } - if ((ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects()) && Request.Snapshot.IsValid()) { + if ((ReadOnlyTx || Request.UseImmediateEffects) && Request.Snapshot.IsValid()) { // Snapshot reads are always immediate + // Uncommitted writes are executed without coordinators, so they can be immediate YQL_ENSURE(!VolatileTx); Snapshot = Request.Snapshot; ImmediateTx = true; @@ -2092,13 +2114,21 @@ private: if (!locksList.empty()) { auto* protoLocks = tx.MutableLocks()->MutableLocks(); protoLocks->Reserve(locksList.size()); + bool hasWrites = false; for (auto& lock : locksList) { + hasWrites = hasWrites || lock.GetHasWrites(); protoLocks->Add()->Swap(&lock); } if (needCommit) { // We also send the result on commit sendingShardsSet.insert(shardId); + + if (hasWrites) { + // Tx with uncommitted changes can be aborted due to conflicts, + // so shards with write locks should receive readsets + receivingShardsSet.insert(shardId); + } } } } diff --git a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp index 19fbb4cd602..6073aea73f1 100644 --- a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp @@ -24,6 +24,7 @@ void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShar setMemberDataType(*structType.AddMember(), "LockId", NKikimr::NUdf::TDataType<ui64>::Id); setMemberDataType(*structType.AddMember(), "PathId", NKikimr::NUdf::TDataType<ui64>::Id); setMemberDataType(*structType.AddMember(), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id); + setMemberDataType(*structType.AddMember(), "HasWrites", NKikimr::NUdf::TDataType<bool>::Id); auto& value = *result.MutableValue(); for (auto& lock : locks) { @@ -34,6 +35,7 @@ void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShar item.AddStruct()->SetUint64(lock.GetLockId()); item.AddStruct()->SetUint64(lock.GetPathId()); item.AddStruct()->SetUint64(lock.GetSchemeShard()); + item.AddStruct()->SetBool(lock.GetHasWrites()); } } @@ -50,13 +52,14 @@ NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock) { YQL_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct); auto& structType = type.GetStruct(); - YQL_ENSURE(structType.MemberSize() == 6); + YQL_ENSURE(structType.MemberSize() == 7); ensureMemberDataType(structType.GetMember(0), "Counter", NKikimr::NUdf::TDataType<ui64>::Id); ensureMemberDataType(structType.GetMember(1), "DataShard", NKikimr::NUdf::TDataType<ui64>::Id); ensureMemberDataType(structType.GetMember(2), "Generation", NKikimr::NUdf::TDataType<ui32>::Id); ensureMemberDataType(structType.GetMember(3), "LockId", NKikimr::NUdf::TDataType<ui64>::Id); ensureMemberDataType(structType.GetMember(4), "PathId", NKikimr::NUdf::TDataType<ui64>::Id); ensureMemberDataType(structType.GetMember(5), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id); + ensureMemberDataType(structType.GetMember(6), "HasWrites", NKikimr::NUdf::TDataType<bool>::Id); NKikimrTxDataShard::TLock dsLock; dsLock.SetCounter(value.GetStruct(0).GetUint64()); @@ -65,6 +68,7 @@ NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock) { dsLock.SetLockId(value.GetStruct(3).GetUint64()); dsLock.SetPathId(value.GetStruct(4).GetUint64()); dsLock.SetSchemeShard(value.GetStruct(5).GetUint64()); + dsLock.SetHasWrites(value.GetStruct(6).GetBool()); return dsLock; } diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 9914d9a6930..e814922e0d9 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -129,6 +129,7 @@ public: NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; TMaybe<NKikimrKqp::TRlPath> RlPath; bool NeedTxId = true; + bool UseImmediateEffects = false; NLWTrace::TOrbit Orbit; NWilson::TTraceId TraceId; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 5a41d20d34e..e0d916beaf0 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -957,6 +957,8 @@ public: request.TopicOperations = std::move(txCtx.TopicOperations); } else if (txCtx.ShouldAcquireLocks(query, QueryState->Commit)) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); + // TODO: Use immediate effects only if tx contains uncommitted changes reading (KIKIMR-17576) + request.UseImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects(); } LWTRACK(KqpSessionPhyQueryProposeTx, diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index 8ae8e014291..2e069170748 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -41,13 +41,14 @@ std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& typ YQL_ENSURE(locksListType.GetItem().GetKind() == NKikimrMiniKQL::ETypeKind::Struct); auto lockType = locksListType.GetItem().GetStruct(); - YQL_ENSURE(lockType.MemberSize() == 6); + YQL_ENSURE(lockType.MemberSize() == 7); YQL_ENSURE(lockType.GetMember(0).GetName() == "Counter"); YQL_ENSURE(lockType.GetMember(1).GetName() == "DataShard"); YQL_ENSURE(lockType.GetMember(2).GetName() == "Generation"); YQL_ENSURE(lockType.GetMember(3).GetName() == "LockId"); YQL_ENSURE(lockType.GetMember(4).GetName() == "PathId"); YQL_ENSURE(lockType.GetMember(5).GetName() == "SchemeShard"); + YQL_ENSURE(lockType.GetMember(6).GetName() == "HasWrites"); res.first = true; for (auto& lockValue : value.GetList()) { diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index f56ac6cf349..5098bffb516 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -25,6 +25,7 @@ public: ui64 GetPathId() const { return LockValue.GetStruct(4).GetUint64(); } ui32 GetGeneration() const { return LockValue.GetStruct(2).GetUint32(); } ui64 GetCounter() const { return LockValue.GetStruct(0).GetUint64(); } + bool HasWrites() const { return LockValue.GetStruct(6).GetBool(); } TKey GetKey() const { return std::make_tuple(GetLockId(), GetDataShard(), GetSchemeShard(), GetPathId()); } NKikimrMiniKQL::TValue GetValue() const { return LockValue; } diff --git a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp index 86c40267f44..d7508c69616 100644 --- a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp @@ -27,6 +27,31 @@ namespace { )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } + + void CreateShardedTestTable(TSession& session) { + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE TestImmediateEffects ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ) WITH ( + PARTITION_AT_KEYS = (100, 200) + ); + )").GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "Value1"), + (2u, "Value2"), + (100u, "Value100"), + (200u, "Value200"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } } // namespase Y_UNIT_TEST_SUITE(KqpImmediateEffects) { @@ -844,6 +869,150 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(3))); CompareYson(FormatResultSetYson(result.GetResultSet(1)), FormatResultSetYson(result.GetResultSet(2))); } + + Y_UNIT_TEST(InsertConflictTxAborted) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session); + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (3u, "Value3"), + (101u, "Value101"); + + INSERT INTO TestImmediateEffects (Key, Value) VALUES + (3u, "NewValue3"), + (201u, "Value201"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]]; + [[2u];["Value2"]]; + [[100u];["Value100"]]; + [[200u];["Value200"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(UpsertConflictInteractiveTxAborted) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx; + { + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (3u, "Value3"), + (101u, "Value101"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx = result.GetTransaction(); + UNIT_ASSERT(tx); + } + + { + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]]; + [[2u];["Value2"]]; + [[100u];["Value100"]]; + [[200u];["Value200"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + { + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (3u, "NewValue3"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = tx->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + + { + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]]; + [[2u];["Value2"]]; + [[3u];["NewValue3"]]; + [[100u];["Value100"]]; + [[200u];["Value200"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(MultiShardUpsertAfterRead) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session); + + TMaybe<TTransaction> tx; + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects ORDER BY Key; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx = result.GetTransaction(); + UNIT_ASSERT(tx); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (3u, "Value3"), + (101u, "Value101"); + )", TTxControl::Tx(*tx).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + } } // namespace NKqp |