summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <[email protected]>2023-03-31 16:22:54 +0300
committerulya-sidorina <[email protected]>2023-03-31 16:22:54 +0300
commit986e64263081c9086bbac0384d3d2ef455252760 (patch)
tree834830d41c44bec3c2efa9ee9060b6296e6ad863
parent6a960771f404ceb5b191e44a696406cb52859336 (diff)
,fix commit of broken tx with immediate effects
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp40
-rw-r--r--ydb/core/kqp/executer_actor/kqp_locks_helper.cpp6
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.cpp3
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.h1
-rw-r--r--ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp169
7 files changed, 215 insertions, 7 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index d4090dec0f5..64f6455a2b4 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -139,6 +139,8 @@ public:
if (Request.Snapshot.IsValid()) {
YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE);
}
+
+ ReadOnlyTx = IsReadOnlyTx();
}
void CheckExecutionComplete() {
@@ -1274,6 +1276,28 @@ private:
}
private:
+ bool IsReadOnlyTx() const {
+ if (Request.TopicOperations.HasOperations()) {
+ YQL_ENSURE(!Request.UseImmediateEffects);
+ return false;
+ }
+
+ if (Request.ValidateLocks && Request.EraseLocks) {
+ YQL_ENSURE(!Request.UseImmediateEffects);
+ return false;
+ }
+
+ for (const auto& tx : Request.Transactions) {
+ for (const auto& stage : tx.Body->GetStages()) {
+ if (stage.GetIsEffectsStage()) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse) {
if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
// Validate parameters
@@ -1548,7 +1572,7 @@ private:
const ui32 flags =
(ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) |
(VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0);
- if (Snapshot.IsValid() && (ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects())) {
+ if (Snapshot.IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) {
ev.reset(new TEvDataShard::TEvProposeTransaction(
NKikimrTxDataShard::TX_KIND_DATA,
SelfId(),
@@ -1624,7 +1648,6 @@ private:
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
size_t readActors = 0;
- ReadOnlyTx = !Request.TopicOperations.HasOperations();
for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) {
auto& tx = Request.Transactions[txIdx];
@@ -1653,8 +1676,6 @@ private:
LOG_D("Stage " << stageInfo.Id << " AST: " << stage.GetProgramAst());
- ReadOnlyTx &= !stage.GetIsEffectsStage();
-
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
@@ -1859,8 +1880,9 @@ private:
break;
}
- if ((ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects()) && Request.Snapshot.IsValid()) {
+ if ((ReadOnlyTx || Request.UseImmediateEffects) && Request.Snapshot.IsValid()) {
// Snapshot reads are always immediate
+ // Uncommitted writes are executed without coordinators, so they can be immediate
YQL_ENSURE(!VolatileTx);
Snapshot = Request.Snapshot;
ImmediateTx = true;
@@ -2092,13 +2114,21 @@ private:
if (!locksList.empty()) {
auto* protoLocks = tx.MutableLocks()->MutableLocks();
protoLocks->Reserve(locksList.size());
+ bool hasWrites = false;
for (auto& lock : locksList) {
+ hasWrites = hasWrites || lock.GetHasWrites();
protoLocks->Add()->Swap(&lock);
}
if (needCommit) {
// We also send the result on commit
sendingShardsSet.insert(shardId);
+
+ if (hasWrites) {
+ // Tx with uncommitted changes can be aborted due to conflicts,
+ // so shards with write locks should receive readsets
+ receivingShardsSet.insert(shardId);
+ }
}
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp
index 19fbb4cd602..6073aea73f1 100644
--- a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp
@@ -24,6 +24,7 @@ void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShar
setMemberDataType(*structType.AddMember(), "LockId", NKikimr::NUdf::TDataType<ui64>::Id);
setMemberDataType(*structType.AddMember(), "PathId", NKikimr::NUdf::TDataType<ui64>::Id);
setMemberDataType(*structType.AddMember(), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id);
+ setMemberDataType(*structType.AddMember(), "HasWrites", NKikimr::NUdf::TDataType<bool>::Id);
auto& value = *result.MutableValue();
for (auto& lock : locks) {
@@ -34,6 +35,7 @@ void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShar
item.AddStruct()->SetUint64(lock.GetLockId());
item.AddStruct()->SetUint64(lock.GetPathId());
item.AddStruct()->SetUint64(lock.GetSchemeShard());
+ item.AddStruct()->SetBool(lock.GetHasWrites());
}
}
@@ -50,13 +52,14 @@ NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock) {
YQL_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
auto& structType = type.GetStruct();
- YQL_ENSURE(structType.MemberSize() == 6);
+ YQL_ENSURE(structType.MemberSize() == 7);
ensureMemberDataType(structType.GetMember(0), "Counter", NKikimr::NUdf::TDataType<ui64>::Id);
ensureMemberDataType(structType.GetMember(1), "DataShard", NKikimr::NUdf::TDataType<ui64>::Id);
ensureMemberDataType(structType.GetMember(2), "Generation", NKikimr::NUdf::TDataType<ui32>::Id);
ensureMemberDataType(structType.GetMember(3), "LockId", NKikimr::NUdf::TDataType<ui64>::Id);
ensureMemberDataType(structType.GetMember(4), "PathId", NKikimr::NUdf::TDataType<ui64>::Id);
ensureMemberDataType(structType.GetMember(5), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id);
+ ensureMemberDataType(structType.GetMember(6), "HasWrites", NKikimr::NUdf::TDataType<bool>::Id);
NKikimrTxDataShard::TLock dsLock;
dsLock.SetCounter(value.GetStruct(0).GetUint64());
@@ -65,6 +68,7 @@ NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock) {
dsLock.SetLockId(value.GetStruct(3).GetUint64());
dsLock.SetPathId(value.GetStruct(4).GetUint64());
dsLock.SetSchemeShard(value.GetStruct(5).GetUint64());
+ dsLock.SetHasWrites(value.GetStruct(6).GetBool());
return dsLock;
}
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 9914d9a6930..e814922e0d9 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -129,6 +129,7 @@ public:
NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
TMaybe<NKikimrKqp::TRlPath> RlPath;
bool NeedTxId = true;
+ bool UseImmediateEffects = false;
NLWTrace::TOrbit Orbit;
NWilson::TTraceId TraceId;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 5a41d20d34e..e0d916beaf0 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -957,6 +957,8 @@ public:
request.TopicOperations = std::move(txCtx.TopicOperations);
} else if (txCtx.ShouldAcquireLocks(query, QueryState->Commit)) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
+ // TODO: Use immediate effects only if tx contains uncommitted changes reading (KIKIMR-17576)
+ request.UseImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects();
}
LWTRACK(KqpSessionPhyQueryProposeTx,
diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp
index 8ae8e014291..2e069170748 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.cpp
+++ b/ydb/core/kqp/session_actor/kqp_tx.cpp
@@ -41,13 +41,14 @@ std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& typ
YQL_ENSURE(locksListType.GetItem().GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
auto lockType = locksListType.GetItem().GetStruct();
- YQL_ENSURE(lockType.MemberSize() == 6);
+ YQL_ENSURE(lockType.MemberSize() == 7);
YQL_ENSURE(lockType.GetMember(0).GetName() == "Counter");
YQL_ENSURE(lockType.GetMember(1).GetName() == "DataShard");
YQL_ENSURE(lockType.GetMember(2).GetName() == "Generation");
YQL_ENSURE(lockType.GetMember(3).GetName() == "LockId");
YQL_ENSURE(lockType.GetMember(4).GetName() == "PathId");
YQL_ENSURE(lockType.GetMember(5).GetName() == "SchemeShard");
+ YQL_ENSURE(lockType.GetMember(6).GetName() == "HasWrites");
res.first = true;
for (auto& lockValue : value.GetList()) {
diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h
index f56ac6cf349..5098bffb516 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.h
+++ b/ydb/core/kqp/session_actor/kqp_tx.h
@@ -25,6 +25,7 @@ public:
ui64 GetPathId() const { return LockValue.GetStruct(4).GetUint64(); }
ui32 GetGeneration() const { return LockValue.GetStruct(2).GetUint32(); }
ui64 GetCounter() const { return LockValue.GetStruct(0).GetUint64(); }
+ bool HasWrites() const { return LockValue.GetStruct(6).GetBool(); }
TKey GetKey() const { return std::make_tuple(GetLockId(), GetDataShard(), GetSchemeShard(), GetPathId()); }
NKikimrMiniKQL::TValue GetValue() const { return LockValue; }
diff --git a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
index 86c40267f44..d7508c69616 100644
--- a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
+++ b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
@@ -27,6 +27,31 @@ namespace {
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
+
+ void CreateShardedTestTable(TSession& session) {
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+
+ CREATE TABLE TestImmediateEffects (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ ) WITH (
+ PARTITION_AT_KEYS = (100, 200)
+ );
+ )").GetValueSync());
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "Value1"),
+ (2u, "Value2"),
+ (100u, "Value100"),
+ (200u, "Value200");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
} // namespase
Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
@@ -844,6 +869,150 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(3)));
CompareYson(FormatResultSetYson(result.GetResultSet(1)), FormatResultSetYson(result.GetResultSet(2)));
}
+
+ Y_UNIT_TEST(InsertConflictTxAborted) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session);
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (3u, "Value3"),
+ (101u, "Value101");
+
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (3u, "NewValue3"),
+ (201u, "Value201");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]];
+ [[2u];["Value2"]];
+ [[100u];["Value100"]];
+ [[200u];["Value200"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(UpsertConflictInteractiveTxAborted) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx;
+ {
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (3u, "Value3"),
+ (101u, "Value101");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx = result.GetTransaction();
+ UNIT_ASSERT(tx);
+ }
+
+ {
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]];
+ [[2u];["Value2"]];
+ [[100u];["Value100"]];
+ [[200u];["Value200"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ {
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (3u, "NewValue3");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = tx->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]];
+ [[2u];["Value2"]];
+ [[3u];["NewValue3"]];
+ [[100u];["Value100"]];
+ [[200u];["Value200"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(MultiShardUpsertAfterRead) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session);
+
+ TMaybe<TTransaction> tx;
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects ORDER BY Key;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx = result.GetTransaction();
+ UNIT_ASSERT(tx);
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (3u, "Value3"),
+ (101u, "Value101");
+ )", TTxControl::Tx(*tx).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+
}
} // namespace NKqp