aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-05-15 14:08:51 +0300
committerulya-sidorina <yulia@ydb.tech>2023-05-15 14:08:51 +0300
commit45fbde85b1f7b5e5ffcf2de4bd730e22ce160fa3 (patch)
tree5be20b13f61e8417c6664733b47cf1e7b3ae282d
parentac809add0f42f933c8a966c3309997d8563aa168 (diff)
downloadydb-45fbde85b1f7b5e5ffcf2de4bd730e22ce160fa3.tar.gz
defer effects until read or commit
refactor session actor
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h2
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp13
-rw-r--r--ydb/core/kqp/host/kqp_explain_prepared.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h1
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp12
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h6
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h90
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp122
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.cpp3
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.h33
-rw-r--r--ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp644
11 files changed, 812 insertions, 116 deletions
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 119c7193aa9..e3883fcee18 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -155,7 +155,7 @@ public:
public:
/* Compute */
virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request,
- TQueryData::TPtr params) = 0;
+ TQueryData::TPtr params, ui32 txIndex) = 0;
/* Scripting */
virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0;
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 6d4ad75241f..2116f56f1b8 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -474,8 +474,9 @@ public:
}
TKqpExecLiteralRequestHandler(IKqpGateway::TExecPhysicalRequest&& request,
- TKqpRequestCounters::TPtr counters, TPromise<TResult> promise, TQueryData::TPtr params)
+ TKqpRequestCounters::TPtr counters, TPromise<TResult> promise, TQueryData::TPtr params, ui32 txIndex)
: Request(std::move(request))
+ , TxIndex(txIndex)
, Parameters(params)
, Counters(counters)
, Promise(promise)
@@ -516,7 +517,10 @@ private:
result.Results.emplace_back(tx.GetMkql());
}
Parameters->AddTxHolders(std::move(ev->GetTxHolders()));
- Parameters->AddTxResults(std::move(txResults));
+
+ if (!txResults.empty()) {
+ Parameters->AddTxResults(TxIndex, std::move(txResults));
+ }
}
Promise.SetValue(std::move(result));
this->PassAway();
@@ -524,6 +528,7 @@ private:
private:
IKqpGateway::TExecPhysicalRequest Request;
+ const ui32 TxIndex;
TQueryData::TPtr Parameters;
TKqpRequestCounters::TPtr Counters;
TPromise<TResult> Promise;
@@ -1844,7 +1849,7 @@ public:
}
}
- TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params) override {
+ TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
YQL_ENSURE(!request.Transactions.empty());
YQL_ENSURE(request.DataShardLocks.empty());
YQL_ENSURE(!request.NeedTxId);
@@ -1867,7 +1872,7 @@ public:
YQL_ENSURE(containOnlyLiteralStages(request));
auto promise = NewPromise<TExecPhysicalResult>();
- IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params);
+ IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex);
RegisterActor(requestHandler);
return promise.GetFuture();
}
diff --git a/ydb/core/kqp/host/kqp_explain_prepared.cpp b/ydb/core/kqp/host/kqp_explain_prepared.cpp
index 1b04a0e9af5..695104abcd1 100644
--- a/ydb/core/kqp/host/kqp_explain_prepared.cpp
+++ b/ydb/core/kqp/host/kqp_explain_prepared.cpp
@@ -42,7 +42,7 @@ public:
request.Transactions.emplace_back(tx, TransformCtx->QueryCtx->QueryData);
request.NeedTxId = false;
- ExecuteFuture = Gateway->ExecuteLiteral(std::move(request), TransformCtx->QueryCtx->QueryData);
+ ExecuteFuture = Gateway->ExecuteLiteral(std::move(request), TransformCtx->QueryCtx->QueryData, CurrentTxIndex);
Promise = NewPromise();
ExecuteFuture.Apply([promise = Promise](const TFuture<IKqpGateway::TExecPhysicalResult> future) mutable {
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 080fb1492d1..02d9ae3a69d 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -277,6 +277,7 @@ public:
Invalidated = false;
Readonly = false;
Closed = false;
+ HasUncommittedChangesRead = false;
}
template<class IterableKqpTableOps, class IterableKqpTableInfos>
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index 9af65d75a6d..69c2cd65c30 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
@@ -132,8 +132,8 @@ TQueryData::TQueryData(TTxAllocatorState::TPtr allocatorState)
TQueryData::~TQueryData() {
{
auto g = TypeEnv().BindAllocator();
- TVector<TVector<TKqpExecuterTxResult>> emptyVector;
- TxResults.swap(emptyVector);
+ THashMap<ui32, TVector<TKqpExecuterTxResult>> emptyResultMap;
+ TxResults.swap(emptyResultMap);
TUnboxedParamsMap emptyMap;
UnboxedData.swap(emptyMap);
}
@@ -170,9 +170,9 @@ NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyRes
return TxResults[txIndex][resultIndex].GetMkql(arena);
}
-void TQueryData::AddTxResults(TVector<TKqpExecuterTxResult>&& results) {
+void TQueryData::AddTxResults(ui32 txIndex, TVector<TKqpExecuterTxResult>&& results) {
auto g = TypeEnv().BindAllocator();
- TxResults.emplace_back(std::move(results));
+ TxResults.emplace(std::make_pair(txIndex, std::move(results)));
}
void TQueryData::AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders) {
@@ -346,8 +346,8 @@ void TQueryData::Clear() {
Params.clear();
TUnboxedParamsMap emptyMap;
UnboxedData.swap(emptyMap);
- TVector<TVector<TKqpExecuterTxResult>> emptyVector;
- TxResults.swap(emptyVector);
+ THashMap<ui32, TVector<TKqpExecuterTxResult>> emptyResultMap;
+ TxResults.swap(emptyResultMap);
AllocState->Reset();
}
}
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index 669d77eccbc..981f8996a2b 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -165,7 +165,7 @@ private:
TParamMap Params;
TUnboxedParamsMap UnboxedData;
- TVector<TVector<TKqpExecuterTxResult>> TxResults;
+ THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults;
TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders;
TTxAllocatorState::TPtr AllocState;
@@ -188,11 +188,11 @@ public:
bool AddTypedValueParam(const TString& name, const Ydb::TypedValue& p);
bool MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyParamBinding& paramBinding);
- void AddTxResults(TVector<TKqpExecuterTxResult>&& results);
+ void AddTxResults(ui32 txIndex, TVector<TKqpExecuterTxResult>&& results);
void AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders);
bool HasResult(ui32 txIndex, ui32 resultIndex) {
- if (txIndex >= TxResults.size())
+ if (!TxResults.contains(txIndex))
return false;
return resultIndex < TxResults[txIndex].size();
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index abd838b2f65..139a536e266 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -224,6 +224,96 @@ public:
return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery());
}
+ bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) {
+ const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
+
+ if (!Commit) {
+ return false;
+ }
+
+ if (CurrentTx + 1 < phyQuery.TransactionsSize()) {
+ // commit can only be applied to the last transaction or perform separately at the end
+ return false;
+ }
+
+ if (!tx) {
+ // no physical transactions left, perform commit
+ return true;
+ }
+
+ if (TxCtx->HasUncommittedChangesRead) {
+ YQL_ENSURE(AppData()->FeatureFlags.GetEnableKqpImmediateEffects());
+
+ if (tx && tx->GetHasEffects()) {
+ YQL_ENSURE(tx->ResultsSize() == 0);
+ // commit can be applied to the last transaction with effects
+ return CurrentTx + 1 == phyQuery.TransactionsSize();
+ }
+
+ return false;
+ }
+
+ // we can merge commit with last tx only for read-only transactions
+ return !TxCtx->TxHasEffects();
+ }
+
+ bool ShouldAcquireLocks() {
+ if (*TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
+ return false;
+ }
+
+ if (TxCtx->Locks.GetLockTxId() && !TxCtx->Locks.Broken()) {
+ return true; // Continue to acquire locks
+ }
+
+ if (TxCtx->Locks.Broken()) {
+ YQL_ENSURE(TxCtx->GetSnapshot().IsValid() && !TxCtx->TxHasEffects());
+ return false; // Do not acquire locks after first lock issue
+ }
+
+ if (TxCtx->TxHasEffects()) {
+ return true; // Acquire locks in read write tx
+ }
+
+ const auto& query = PreparedQuery->GetPhysicalQuery();
+ for (auto& tx : query.GetTransactions()) {
+ if (tx.GetHasEffects()) {
+ return true; // Acquire locks in read write tx
+ }
+ }
+
+ if (!Commit) {
+ return true; // Is not a commit tx
+ }
+
+ if (TxCtx->GetSnapshot().IsValid()) {
+ return false; // It is a read only tx with snapshot, no need to acquire locks
+ }
+
+ return true;
+ }
+
+ TKqpPhyTxHolder::TConstPtr GetCurrentPhyTx() {
+ const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
+ auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx);
+
+ if (TxCtx->CanDeferEffects()) {
+ while (tx && tx->GetHasEffects()) {
+ QueryData->CreateKqpValueMap(tx);
+ bool success = TxCtx->AddDeferredEffect(tx, QueryData);
+ YQL_ENSURE(success);
+ if (CurrentTx + 1 < phyQuery.TransactionsSize()) {
+ ++CurrentTx;
+ tx = PreparedQuery->GetPhyTx(CurrentTx);
+ } else {
+ tx = nullptr;
+ }
+ }
+ }
+
+ return tx;
+ }
+
bool HasTxControl() const {
return RequestEv->HasTxControl();
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index f3aae6fb303..d7766ca8009 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -254,11 +254,11 @@ public:
QueryState->TxCtx = std::move(txCtx);
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
- if (!CheckTransacionLocks()) {
+ if (!CheckTransactionLocks(/*tx*/ nullptr)) {
return;
}
- bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true);
+ bool replied = ExecutePhyTx(/*tx*/ nullptr, /*commit*/ true);
if (!replied) {
Become(&TKqpSessionActor::ExecuteState);
}
@@ -768,13 +768,20 @@ public:
}
}
- bool CheckTransacionLocks() {
+ bool CheckTransactionLocks(const TKqpPhyTxHolder::TConstPtr& tx) {
auto& txCtx = *QueryState->TxCtx;
if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) {
ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has deferred effects, but locks are broken",
MessageFromIssues(std::vector<TIssue>{txCtx.Locks.GetIssue()}));
return false;
}
+
+ if (tx && tx->GetHasEffects() && txCtx.Locks.Broken()) {
+ ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has effects, but locks are broken",
+ MessageFromIssues(std::vector<TIssue>{txCtx.Locks.GetIssue()}));
+ return false;
+ }
+
return true;
}
@@ -794,36 +801,6 @@ public:
return false;
}
- TKqpPhyTxHolder::TConstPtr GetCurrentPhyTx(const NKqpProto::TKqpPhyQuery& phyQuery) {
- auto& txCtx = *QueryState->TxCtx;
- auto tx = QueryState->PreparedQuery->GetPhyTxOrEmpty(QueryState->CurrentTx);
-
- if (Config->FeatureFlags.GetEnableKqpImmediateEffects()) {
- // Execute every physical tx immediately
- return tx;
- }
-
- // Deffer effects and execute them at the end before commit
- while (tx && tx->GetHasEffects()) {
- try {
- QueryState->QueryData->CreateKqpValueMap(tx);
- } catch (const yexception& ex) {
- ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
- }
- bool success = txCtx.AddDeferredEffect(tx, QueryState->QueryData);
- YQL_ENSURE(success);
- LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
- if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
- ++QueryState->CurrentTx;
- tx = QueryState->PreparedQuery->GetPhyTx(QueryState->CurrentTx);
- } else {
- tx = nullptr;
- }
- }
-
- return tx;
- }
-
void ExecuteOrDefer() {
bool haveWork = QueryState->PreparedQuery &&
QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()
@@ -834,58 +811,51 @@ public:
return;
}
- const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
- auto tx = GetCurrentPhyTx(phyQuery);
-
- auto& txCtx = *QueryState->TxCtx;
- if (tx && tx->GetHasEffects() && txCtx.Locks.Broken()) {
- ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has effects, but locks are broken",
- MessageFromIssues(std::vector<TIssue>{txCtx.Locks.GetIssue()}));
- return;
+ TKqpPhyTxHolder::TConstPtr tx;
+ try {
+ tx = QueryState->GetCurrentPhyTx();
+ } catch (const yexception& ex) {
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
}
- if (!CheckTransacionLocks() || !CheckTopicOperations()) {
+ if (!CheckTransactionLocks(tx) || !CheckTopicOperations()) {
return;
}
- bool commit = ShouldCommitWithCurrentTx(phyQuery, tx);
- if (tx || commit) {
- ExecutePhyTx(&phyQuery, tx, commit);
+ if (QueryState->TxCtx->ShouldExecuteDeferredEffects()) {
+ ExecuteDeferredEffectsImmediately();
+ } else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) {
+ ExecutePhyTx(tx, commit);
} else {
ReplySuccess();
}
}
- bool ShouldCommitWithCurrentTx(const NKqpProto::TKqpPhyQuery& phyQuery, const TKqpPhyTxHolder::TConstPtr& tx) {
- if (!QueryState->Commit) {
- return false;
- }
+ void ExecuteDeferredEffectsImmediately() {
+ YQL_ENSURE(QueryState->TxCtx->ShouldExecuteDeferredEffects());
- if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
- // commit can only be applied to the last transaction or perform separately at the end
- return false;
- }
+ auto& txCtx = *QueryState->TxCtx;
+ auto request = PrepareRequest(/* tx */ nullptr, /* literal */ false, QueryState.get());
- if (!tx) {
- // no physical transactions left, perform commit
- return true;
- } else if (Config->FeatureFlags.GetEnableKqpImmediateEffects() && QueryState->TxCtx->HasUncommittedChangesRead) {
- YQL_ENSURE(tx);
- if (tx->GetHasEffects()) {
- YQL_ENSURE(tx->ResultsSize() == 0);
- // perform commit with last tx with effects
- return QueryState->CurrentTx + 1 == phyQuery.TransactionsSize();
- }
+ for (const auto& effect : txCtx.DeferredEffects) {
+ YQL_ENSURE(effect.PhysicalTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA);
+ request.Transactions.emplace_back(effect.PhysicalTx, effect.Params);
- // last tx contains reads, so commit should be sent separately
- return false;
- } else {
- // we can merge commit with last tx only for read-only transactions
- return QueryState->TxCtx->DeferredEffects.Empty();
+ LOG_D("TExecPhysicalRequest, add DeferredEffect to Transaction,"
+ << " current Transactions.size(): " << request.Transactions.size());
}
+
+ request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
+ request.UseImmediateEffects = true;
+ request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef();
+
+ txCtx.HasImmediateEffects = true;
+ txCtx.ClearDeferredEffects();
+
+ SendToExecuter(std::move(request));
}
- bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, const TKqpPhyTxHolder::TConstPtr& tx, bool commit) {
+ bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) {
auto& txCtx = *QueryState->TxCtx;
bool literal = tx && tx->IsLiteralTx();
@@ -972,10 +942,13 @@ public:
}
request.TopicOperations = std::move(txCtx.TopicOperations);
- } else if (txCtx.ShouldAcquireLocks(query, QueryState->Commit)) {
+ } else if (QueryState->ShouldAcquireLocks()) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
- // TODO: Use immediate effects only if tx contains uncommitted changes reading (KIKIMR-17576)
- request.UseImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects();
+
+ if (txCtx.HasUncommittedChangesRead) {
+ YQL_ENSURE(Config->FeatureFlags.GetEnableKqpImmediateEffects());
+ request.UseImmediateEffects = true;
+ }
}
LWTRACK(KqpSessionPhyQueryProposeTx,
@@ -986,7 +959,6 @@ public:
request.AcquireLocksTxId.Defined());
SendToExecuter(std::move(request));
-
++QueryState->CurrentTx;
return false;
}
@@ -1109,7 +1081,9 @@ public:
YQL_ENSURE(QueryState);
LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount);
- QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults()));
+ if (!ev->GetTxResults().empty()) {
+ QueryState->QueryData->AddTxResults(QueryState->CurrentTx - 1, std::move(ev->GetTxResults()));
+ }
QueryState->QueryData->AddTxHolders(std::move(ev->GetTxHolders()));
if (ev->LockHandle) {
diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp
index 48f16d9d350..deb3ec70903 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.cpp
+++ b/ydb/core/kqp/session_actor/kqp_tx.cpp
@@ -156,7 +156,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
}
}
- if (config.FeatureFlags.GetEnableKqpImmediateEffects() && txCtx.HasUncommittedChangesRead) {
+ if (txCtx.HasUncommittedChangesRead) {
+ YQL_ENSURE(config.FeatureFlags.GetEnableKqpImmediateEffects());
return true;
}
diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h
index 7c7cd10fd0c..179b8ed6b62 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.h
+++ b/ydb/core/kqp/session_actor/kqp_tx.h
@@ -221,32 +221,19 @@ public:
};
}
- bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query, bool commit) {
- if (*EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
- return false;
- }
-
- if (Locks.Broken()) {
- return false; // Do not acquire locks after first lock issue
+ bool ShouldExecuteDeferredEffects() const {
+ if (HasUncommittedChangesRead) {
+ YQL_ENSURE(AppData()->FeatureFlags.GetEnableKqpImmediateEffects());
+ return !DeferredEffects.Empty();
}
- if (TxHasEffects()) {
- return true; // Acquire locks in read write tx
- }
-
- YQL_ENSURE(query);
- for (auto& tx : query->GetTransactions()) {
- if (tx.GetHasEffects()) {
- return true; // Acquire locks in read write tx
- }
- }
-
- if (!commit) {
- return true; // Is not a commit tx
- }
+ return false;
+ }
- if (GetSnapshot().IsValid()) {
- return false; // It is a read only tx with snapshot, no need to acquire locks
+ bool CanDeferEffects() const {
+ if (HasUncommittedChangesRead) {
+ YQL_ENSURE(AppData()->FeatureFlags.GetEnableKqpImmediateEffects());
+ return false;
}
return true;
diff --git a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
index 4d0302ec1fc..c99e549d43e 100644
--- a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
+++ b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
@@ -935,7 +935,7 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
UNIT_ASSERT(tx);
}
- {
+ {
auto result = session2.ExecuteDataQuery(R"(
--!syntax_v1
@@ -962,7 +962,7 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
{
auto result = tx->Commit().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
@@ -975,8 +975,9 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
CompareYson(R"([
[[1u];["Value1"]];
[[2u];["Value2"]];
- [[3u];["NewValue3"]];
+ [[3u];["Value3"]];
[[100u];["Value100"]];
+ [[101u];["Value101"]];
[[200u];["Value200"]]
])", FormatResultSetYson(result.GetResultSet(0)));
}
@@ -1437,6 +1438,643 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
}
}
+ Y_UNIT_TEST(ConflictingKeyR1WR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // read1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // write2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["NewValue1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyR1RWR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // read1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // read2 + write2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["NewValue1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyR1WRR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // read1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // write2 + read2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["NewValue1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["NewValue1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyW1RR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // read2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyW1WR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // write2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue11");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["NewValue11"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyW1RWR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // read2 + write2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyW1WRR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // write2 + read2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["NewValue1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyRW1RR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // read1 + write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // read2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyRW1WR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // read1 + write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // write2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue11");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyRW1RWR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // read1 + write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // read2 + write2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue11");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(ConflictingKeyRW1WRR2) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ CreateShardedTestTable(session1);
+
+ TMaybe<TTransaction> tx1;
+ TMaybe<TTransaction> tx2;
+
+ { // read1 + write1
+ auto result = session1.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Value1"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // write2 + read2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
+ (1u, "NewValue11");
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["NewValue11"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx2 = result.GetTransaction();
+ UNIT_ASSERT(tx2);
+ }
+
+ { // commit1
+ auto result = tx1->Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // read2 + commit2
+ auto result = session2.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM TestImmediateEffects WHERE Key = 1u;
+ )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ }
+ }
+
}
} // namespace NKqp