diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-05-15 14:08:51 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-05-15 14:08:51 +0300 |
commit | 45fbde85b1f7b5e5ffcf2de4bd730e22ce160fa3 (patch) | |
tree | 5be20b13f61e8417c6664733b47cf1e7b3ae282d | |
parent | ac809add0f42f933c8a966c3309997d8563aa168 (diff) | |
download | ydb-45fbde85b1f7b5e5ffcf2de4bd730e22ce160fa3.tar.gz |
defer effects until read or commit
refactor session actor
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_explain_prepared.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 90 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 122 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.h | 33 | ||||
-rw-r--r-- | ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp | 644 |
11 files changed, 812 insertions, 116 deletions
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 119c7193aa9..e3883fcee18 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -155,7 +155,7 @@ public: public: /* Compute */ virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, - TQueryData::TPtr params) = 0; + TQueryData::TPtr params, ui32 txIndex) = 0; /* Scripting */ virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0; diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 6d4ad75241f..2116f56f1b8 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -474,8 +474,9 @@ public: } TKqpExecLiteralRequestHandler(IKqpGateway::TExecPhysicalRequest&& request, - TKqpRequestCounters::TPtr counters, TPromise<TResult> promise, TQueryData::TPtr params) + TKqpRequestCounters::TPtr counters, TPromise<TResult> promise, TQueryData::TPtr params, ui32 txIndex) : Request(std::move(request)) + , TxIndex(txIndex) , Parameters(params) , Counters(counters) , Promise(promise) @@ -516,7 +517,10 @@ private: result.Results.emplace_back(tx.GetMkql()); } Parameters->AddTxHolders(std::move(ev->GetTxHolders())); - Parameters->AddTxResults(std::move(txResults)); + + if (!txResults.empty()) { + Parameters->AddTxResults(TxIndex, std::move(txResults)); + } } Promise.SetValue(std::move(result)); this->PassAway(); @@ -524,6 +528,7 @@ private: private: IKqpGateway::TExecPhysicalRequest Request; + const ui32 TxIndex; TQueryData::TPtr Parameters; TKqpRequestCounters::TPtr Counters; TPromise<TResult> Promise; @@ -1844,7 +1849,7 @@ public: } } - TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params) override { + TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override { YQL_ENSURE(!request.Transactions.empty()); YQL_ENSURE(request.DataShardLocks.empty()); YQL_ENSURE(!request.NeedTxId); @@ -1867,7 +1872,7 @@ public: YQL_ENSURE(containOnlyLiteralStages(request)); auto promise = NewPromise<TExecPhysicalResult>(); - IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params); + IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex); RegisterActor(requestHandler); return promise.GetFuture(); } diff --git a/ydb/core/kqp/host/kqp_explain_prepared.cpp b/ydb/core/kqp/host/kqp_explain_prepared.cpp index 1b04a0e9af5..695104abcd1 100644 --- a/ydb/core/kqp/host/kqp_explain_prepared.cpp +++ b/ydb/core/kqp/host/kqp_explain_prepared.cpp @@ -42,7 +42,7 @@ public: request.Transactions.emplace_back(tx, TransformCtx->QueryCtx->QueryData); request.NeedTxId = false; - ExecuteFuture = Gateway->ExecuteLiteral(std::move(request), TransformCtx->QueryCtx->QueryData); + ExecuteFuture = Gateway->ExecuteLiteral(std::move(request), TransformCtx->QueryCtx->QueryData, CurrentTxIndex); Promise = NewPromise(); ExecuteFuture.Apply([promise = Promise](const TFuture<IKqpGateway::TExecPhysicalResult> future) mutable { diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 080fb1492d1..02d9ae3a69d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -277,6 +277,7 @@ public: Invalidated = false; Readonly = false; Closed = false; + HasUncommittedChangesRead = false; } template<class IterableKqpTableOps, class IterableKqpTableInfos> diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index 9af65d75a6d..69c2cd65c30 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -132,8 +132,8 @@ TQueryData::TQueryData(TTxAllocatorState::TPtr allocatorState) TQueryData::~TQueryData() { { auto g = TypeEnv().BindAllocator(); - TVector<TVector<TKqpExecuterTxResult>> emptyVector; - TxResults.swap(emptyVector); + THashMap<ui32, TVector<TKqpExecuterTxResult>> emptyResultMap; + TxResults.swap(emptyResultMap); TUnboxedParamsMap emptyMap; UnboxedData.swap(emptyMap); } @@ -170,9 +170,9 @@ NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyRes return TxResults[txIndex][resultIndex].GetMkql(arena); } -void TQueryData::AddTxResults(TVector<TKqpExecuterTxResult>&& results) { +void TQueryData::AddTxResults(ui32 txIndex, TVector<TKqpExecuterTxResult>&& results) { auto g = TypeEnv().BindAllocator(); - TxResults.emplace_back(std::move(results)); + TxResults.emplace(std::make_pair(txIndex, std::move(results))); } void TQueryData::AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders) { @@ -346,8 +346,8 @@ void TQueryData::Clear() { Params.clear(); TUnboxedParamsMap emptyMap; UnboxedData.swap(emptyMap); - TVector<TVector<TKqpExecuterTxResult>> emptyVector; - TxResults.swap(emptyVector); + THashMap<ui32, TVector<TKqpExecuterTxResult>> emptyResultMap; + TxResults.swap(emptyResultMap); AllocState->Reset(); } } diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index 669d77eccbc..981f8996a2b 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -165,7 +165,7 @@ private: TParamMap Params; TUnboxedParamsMap UnboxedData; - TVector<TVector<TKqpExecuterTxResult>> TxResults; + THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults; TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders; TTxAllocatorState::TPtr AllocState; @@ -188,11 +188,11 @@ public: bool AddTypedValueParam(const TString& name, const Ydb::TypedValue& p); bool MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyParamBinding& paramBinding); - void AddTxResults(TVector<TKqpExecuterTxResult>&& results); + void AddTxResults(ui32 txIndex, TVector<TKqpExecuterTxResult>&& results); void AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders); bool HasResult(ui32 txIndex, ui32 resultIndex) { - if (txIndex >= TxResults.size()) + if (!TxResults.contains(txIndex)) return false; return resultIndex < TxResults[txIndex].size(); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index abd838b2f65..139a536e266 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -224,6 +224,96 @@ public: return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery()); } + bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) { + const auto& phyQuery = PreparedQuery->GetPhysicalQuery(); + + if (!Commit) { + return false; + } + + if (CurrentTx + 1 < phyQuery.TransactionsSize()) { + // commit can only be applied to the last transaction or perform separately at the end + return false; + } + + if (!tx) { + // no physical transactions left, perform commit + return true; + } + + if (TxCtx->HasUncommittedChangesRead) { + YQL_ENSURE(AppData()->FeatureFlags.GetEnableKqpImmediateEffects()); + + if (tx && tx->GetHasEffects()) { + YQL_ENSURE(tx->ResultsSize() == 0); + // commit can be applied to the last transaction with effects + return CurrentTx + 1 == phyQuery.TransactionsSize(); + } + + return false; + } + + // we can merge commit with last tx only for read-only transactions + return !TxCtx->TxHasEffects(); + } + + bool ShouldAcquireLocks() { + if (*TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { + return false; + } + + if (TxCtx->Locks.GetLockTxId() && !TxCtx->Locks.Broken()) { + return true; // Continue to acquire locks + } + + if (TxCtx->Locks.Broken()) { + YQL_ENSURE(TxCtx->GetSnapshot().IsValid() && !TxCtx->TxHasEffects()); + return false; // Do not acquire locks after first lock issue + } + + if (TxCtx->TxHasEffects()) { + return true; // Acquire locks in read write tx + } + + const auto& query = PreparedQuery->GetPhysicalQuery(); + for (auto& tx : query.GetTransactions()) { + if (tx.GetHasEffects()) { + return true; // Acquire locks in read write tx + } + } + + if (!Commit) { + return true; // Is not a commit tx + } + + if (TxCtx->GetSnapshot().IsValid()) { + return false; // It is a read only tx with snapshot, no need to acquire locks + } + + return true; + } + + TKqpPhyTxHolder::TConstPtr GetCurrentPhyTx() { + const auto& phyQuery = PreparedQuery->GetPhysicalQuery(); + auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx); + + if (TxCtx->CanDeferEffects()) { + while (tx && tx->GetHasEffects()) { + QueryData->CreateKqpValueMap(tx); + bool success = TxCtx->AddDeferredEffect(tx, QueryData); + YQL_ENSURE(success); + if (CurrentTx + 1 < phyQuery.TransactionsSize()) { + ++CurrentTx; + tx = PreparedQuery->GetPhyTx(CurrentTx); + } else { + tx = nullptr; + } + } + } + + return tx; + } + bool HasTxControl() const { return RequestEv->HasTxControl(); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index f3aae6fb303..d7766ca8009 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -254,11 +254,11 @@ public: QueryState->TxCtx = std::move(txCtx); QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); QueryState->TxId = txId; - if (!CheckTransacionLocks()) { + if (!CheckTransactionLocks(/*tx*/ nullptr)) { return; } - bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true); + bool replied = ExecutePhyTx(/*tx*/ nullptr, /*commit*/ true); if (!replied) { Become(&TKqpSessionActor::ExecuteState); } @@ -768,13 +768,20 @@ public: } } - bool CheckTransacionLocks() { + bool CheckTransactionLocks(const TKqpPhyTxHolder::TConstPtr& tx) { auto& txCtx = *QueryState->TxCtx; if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has deferred effects, but locks are broken", MessageFromIssues(std::vector<TIssue>{txCtx.Locks.GetIssue()})); return false; } + + if (tx && tx->GetHasEffects() && txCtx.Locks.Broken()) { + ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has effects, but locks are broken", + MessageFromIssues(std::vector<TIssue>{txCtx.Locks.GetIssue()})); + return false; + } + return true; } @@ -794,36 +801,6 @@ public: return false; } - TKqpPhyTxHolder::TConstPtr GetCurrentPhyTx(const NKqpProto::TKqpPhyQuery& phyQuery) { - auto& txCtx = *QueryState->TxCtx; - auto tx = QueryState->PreparedQuery->GetPhyTxOrEmpty(QueryState->CurrentTx); - - if (Config->FeatureFlags.GetEnableKqpImmediateEffects()) { - // Execute every physical tx immediately - return tx; - } - - // Deffer effects and execute them at the end before commit - while (tx && tx->GetHasEffects()) { - try { - QueryState->QueryData->CreateKqpValueMap(tx); - } catch (const yexception& ex) { - ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); - } - bool success = txCtx.AddDeferredEffect(tx, QueryState->QueryData); - YQL_ENSURE(success); - LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); - if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { - ++QueryState->CurrentTx; - tx = QueryState->PreparedQuery->GetPhyTx(QueryState->CurrentTx); - } else { - tx = nullptr; - } - } - - return tx; - } - void ExecuteOrDefer() { bool haveWork = QueryState->PreparedQuery && QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() @@ -834,58 +811,51 @@ public: return; } - const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); - auto tx = GetCurrentPhyTx(phyQuery); - - auto& txCtx = *QueryState->TxCtx; - if (tx && tx->GetHasEffects() && txCtx.Locks.Broken()) { - ReplyQueryError(Ydb::StatusIds::ABORTED, "tx has effects, but locks are broken", - MessageFromIssues(std::vector<TIssue>{txCtx.Locks.GetIssue()})); - return; + TKqpPhyTxHolder::TConstPtr tx; + try { + tx = QueryState->GetCurrentPhyTx(); + } catch (const yexception& ex) { + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); } - if (!CheckTransacionLocks() || !CheckTopicOperations()) { + if (!CheckTransactionLocks(tx) || !CheckTopicOperations()) { return; } - bool commit = ShouldCommitWithCurrentTx(phyQuery, tx); - if (tx || commit) { - ExecutePhyTx(&phyQuery, tx, commit); + if (QueryState->TxCtx->ShouldExecuteDeferredEffects()) { + ExecuteDeferredEffectsImmediately(); + } else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) { + ExecutePhyTx(tx, commit); } else { ReplySuccess(); } } - bool ShouldCommitWithCurrentTx(const NKqpProto::TKqpPhyQuery& phyQuery, const TKqpPhyTxHolder::TConstPtr& tx) { - if (!QueryState->Commit) { - return false; - } + void ExecuteDeferredEffectsImmediately() { + YQL_ENSURE(QueryState->TxCtx->ShouldExecuteDeferredEffects()); - if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { - // commit can only be applied to the last transaction or perform separately at the end - return false; - } + auto& txCtx = *QueryState->TxCtx; + auto request = PrepareRequest(/* tx */ nullptr, /* literal */ false, QueryState.get()); - if (!tx) { - // no physical transactions left, perform commit - return true; - } else if (Config->FeatureFlags.GetEnableKqpImmediateEffects() && QueryState->TxCtx->HasUncommittedChangesRead) { - YQL_ENSURE(tx); - if (tx->GetHasEffects()) { - YQL_ENSURE(tx->ResultsSize() == 0); - // perform commit with last tx with effects - return QueryState->CurrentTx + 1 == phyQuery.TransactionsSize(); - } + for (const auto& effect : txCtx.DeferredEffects) { + YQL_ENSURE(effect.PhysicalTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA); + request.Transactions.emplace_back(effect.PhysicalTx, effect.Params); - // last tx contains reads, so commit should be sent separately - return false; - } else { - // we can merge commit with last tx only for read-only transactions - return QueryState->TxCtx->DeferredEffects.Empty(); + LOG_D("TExecPhysicalRequest, add DeferredEffect to Transaction," + << " current Transactions.size(): " << request.Transactions.size()); } + + request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); + request.UseImmediateEffects = true; + request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef(); + + txCtx.HasImmediateEffects = true; + txCtx.ClearDeferredEffects(); + + SendToExecuter(std::move(request)); } - bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, const TKqpPhyTxHolder::TConstPtr& tx, bool commit) { + bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) { auto& txCtx = *QueryState->TxCtx; bool literal = tx && tx->IsLiteralTx(); @@ -972,10 +942,13 @@ public: } request.TopicOperations = std::move(txCtx.TopicOperations); - } else if (txCtx.ShouldAcquireLocks(query, QueryState->Commit)) { + } else if (QueryState->ShouldAcquireLocks()) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); - // TODO: Use immediate effects only if tx contains uncommitted changes reading (KIKIMR-17576) - request.UseImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects(); + + if (txCtx.HasUncommittedChangesRead) { + YQL_ENSURE(Config->FeatureFlags.GetEnableKqpImmediateEffects()); + request.UseImmediateEffects = true; + } } LWTRACK(KqpSessionPhyQueryProposeTx, @@ -986,7 +959,6 @@ public: request.AcquireLocksTxId.Defined()); SendToExecuter(std::move(request)); - ++QueryState->CurrentTx; return false; } @@ -1109,7 +1081,9 @@ public: YQL_ENSURE(QueryState); LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount); - QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults())); + if (!ev->GetTxResults().empty()) { + QueryState->QueryData->AddTxResults(QueryState->CurrentTx - 1, std::move(ev->GetTxResults())); + } QueryState->QueryData->AddTxHolders(std::move(ev->GetTxHolders())); if (ev->LockHandle) { diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index 48f16d9d350..deb3ec70903 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -156,7 +156,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig } } - if (config.FeatureFlags.GetEnableKqpImmediateEffects() && txCtx.HasUncommittedChangesRead) { + if (txCtx.HasUncommittedChangesRead) { + YQL_ENSURE(config.FeatureFlags.GetEnableKqpImmediateEffects()); return true; } diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index 7c7cd10fd0c..179b8ed6b62 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -221,32 +221,19 @@ public: }; } - bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query, bool commit) { - if (*EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { - return false; - } - - if (Locks.Broken()) { - return false; // Do not acquire locks after first lock issue + bool ShouldExecuteDeferredEffects() const { + if (HasUncommittedChangesRead) { + YQL_ENSURE(AppData()->FeatureFlags.GetEnableKqpImmediateEffects()); + return !DeferredEffects.Empty(); } - if (TxHasEffects()) { - return true; // Acquire locks in read write tx - } - - YQL_ENSURE(query); - for (auto& tx : query->GetTransactions()) { - if (tx.GetHasEffects()) { - return true; // Acquire locks in read write tx - } - } - - if (!commit) { - return true; // Is not a commit tx - } + return false; + } - if (GetSnapshot().IsValid()) { - return false; // It is a read only tx with snapshot, no need to acquire locks + bool CanDeferEffects() const { + if (HasUncommittedChangesRead) { + YQL_ENSURE(AppData()->FeatureFlags.GetEnableKqpImmediateEffects()); + return false; } return true; diff --git a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp index 4d0302ec1fc..c99e549d43e 100644 --- a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp @@ -935,7 +935,7 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { UNIT_ASSERT(tx); } - { + { auto result = session2.ExecuteDataQuery(R"( --!syntax_v1 @@ -962,7 +962,7 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { { auto result = tx->Commit().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } { @@ -975,8 +975,9 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { CompareYson(R"([ [[1u];["Value1"]]; [[2u];["Value2"]]; - [[3u];["NewValue3"]]; + [[3u];["Value3"]]; [[100u];["Value100"]]; + [[101u];["Value101"]]; [[200u];["Value200"]] ])", FormatResultSetYson(result.GetResultSet(0))); } @@ -1437,6 +1438,643 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { } } + Y_UNIT_TEST(ConflictingKeyR1WR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // read1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // write2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["NewValue1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ConflictingKeyR1RWR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // read1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // read2 + write2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["NewValue1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ConflictingKeyR1WRR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // read1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // write2 + read2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["NewValue1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["NewValue1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ConflictingKeyW1RR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // read2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ConflictingKeyW1WR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // write2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue11"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["NewValue11"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ConflictingKeyW1RWR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // read2 + write2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(ConflictingKeyW1WRR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // write2 + read2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["NewValue1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(ConflictingKeyRW1RR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // read1 + write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // read2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ConflictingKeyRW1WR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // read1 + write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // write2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue11"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(ConflictingKeyRW1RWR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // read1 + write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // read2 + write2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue11"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(ConflictingKeyRW1WRR2) { + auto serverSettings = TKikimrSettings() + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + CreateShardedTestTable(session1); + + TMaybe<TTransaction> tx1; + TMaybe<TTransaction> tx2; + + { // read1 + write1 + auto result = session1.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // write2 + read2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES + (1u, "NewValue11"); + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["NewValue11"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx2 = result.GetTransaction(); + UNIT_ASSERT(tx2); + } + + { // commit1 + auto result = tx1->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // read2 + commit2 + auto result = session2.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM TestImmediateEffects WHERE Key = 1u; + )", TTxControl::Tx(*tx2).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + } + } } // namespace NKqp |