diff options
author | snaury <[email protected]> | 2023-03-21 14:28:52 +0300 |
---|---|---|
committer | snaury <[email protected]> | 2023-03-21 14:28:52 +0300 |
commit | 53871f3cefea89d32812e8a68bcd0b803d658fbd (patch) | |
tree | ff6b70ffdeafa2d5bc9d7834c504592591eebc1c | |
parent | 71ddd44b7b0d38f2f983644e6b91d5a15c71a215 (diff) |
Handle cdc dependencies in bulk operations
20 files changed, 480 insertions, 61 deletions
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 61d132b9e94..50aef32273e 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -15,7 +15,8 @@ TCommonUploadOps<TEvRequest, TEvResponse>::TCommonUploadOps(typename TEvRequest: template <typename TEvRequest, typename TEvResponse> bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTransactionContext& txc, - const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId) + const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId, + absl::flat_hash_set<ui64>* volatileReadDependencies) { const auto& record = Ev->Get()->Record; Result = MakeHolder<TEvResponse>(self->TabletID()); @@ -91,6 +92,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans TSerializedCellVec valueCells; bool pageFault = false; + bool commitAdded = false; NTable::TRowState rowState; absl::flat_hash_set<ui64> volatileDependencies; @@ -178,30 +180,38 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } if (!writeToTableShadow) { - if (ChangeCollector) { - Y_VERIFY(CollectChanges); - - if (!ChangeCollector->OnUpdate(fullTableId, writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion)) { - pageFault = true; + if (BreakLocks) { + if (breakWriteConflicts) { + if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { + pageFault = true; + } } - if (pageFault) { - continue; + if (!pageFault) { + self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); } } - if (BreakLocks) { - if (breakWriteConflicts) { - if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { - pageFault = true; + if (ChangeCollector) { + Y_VERIFY(CollectChanges); + + if (!volatileDependencies.empty()) { + if (!globalTxId) { + throw TNeedGlobalTxId(); } - if (pageFault) { - continue; + if (!ChangeCollector->OnUpdateTx(fullTableId, writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId)) { + pageFault = true; + } + } else { + if (!ChangeCollector->OnUpdate(fullTableId, writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion)) { + pageFault = true; } } + } - self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); + if (pageFault) { + continue; } } @@ -210,11 +220,22 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans throw TNeedGlobalTxId(); } txc.DB.UpdateTx(writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId); + if (!commitAdded) { + // Make sure we see our own changes on further iterations + userDb.AddCommitTxId(globalTxId, writeVersion); + commitAdded = true; + } } else { txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion); } } + if (volatileReadDependencies && !userDb.GetVolatileReadDependencies().empty()) { + *volatileReadDependencies = std::move(userDb.GetVolatileReadDependencies()); + txc.Reschedule(); + pageFault = true; + } + if (pageFault) { if (ChangeCollector) { ChangeCollector->OnRestart(); diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h index 50284f42979..0359770362a 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.h +++ b/ydb/core/tx/datashard/datashard_common_upload.h @@ -21,7 +21,9 @@ public: explicit TCommonUploadOps(typename TEvRequest::TPtr& ev, bool breakLocks, bool collectChanges); protected: - bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId); + bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, + const TRowVersion& writeVersion, ui64 globalTxId, + absl::flat_hash_set<ui64>* volatileReadDependencies); void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie); const TEvRequest* GetRequest() const; TEvResponse* GetResult(); diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 2d7c033bb4a..cf6505c307a 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -81,6 +81,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( absl::flat_hash_set<ui64> volatileDependencies; bool pageFault = false; + bool commitAdded = false; for (const auto& serializedKey : request.GetKeyColumns()) { TSerializedCellVec keyCells; if (!TSerializedCellVec::TryParse(serializedKey, keyCells)) { @@ -141,15 +142,25 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } } - if (auto collector = params.GetChangeCollector()) { - if (!collector->OnUpdate(fullTableId, localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion)) { + if (breakWriteConflicts) { + if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { pageFault = true; } } - if (breakWriteConflicts) { - if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { - pageFault = true; + if (auto collector = params.GetChangeCollector()) { + if (!volatileDependencies.empty()) { + if (!params.GlobalTxId) { + throw TNeedGlobalTxId(); + } + + if (!collector->OnUpdateTx(fullTableId, localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId)) { + pageFault = true; + } + } else { + if (!collector->OnUpdate(fullTableId, localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion)) { + pageFault = true; + } } } @@ -164,11 +175,22 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( throw TNeedGlobalTxId(); } params.Txc->DB.UpdateTx(localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId); + if (!commitAdded && userDb) { + // Make sure we see our own changes on further iterations + userDb->AddCommitTxId(params.GlobalTxId, params.WriteVersion); + commitAdded = true; + } } else { params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion); } } + if (params.VolatileReadDependencies && userDb && !userDb->GetVolatileReadDependencies().empty()) { + *params.VolatileReadDependencies = std::move(userDb->GetVolatileReadDependencies()); + params.Txc->Reschedule(); + pageFault = true; + } + if (pageFault) { if (auto collector = params.GetChangeCollector()) { collector->OnRestart(); @@ -210,14 +232,15 @@ bool TDirectTxErase::CheckRequest(TDataShard* self, const NKikimrTxDataShard::TE bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId) + ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) { const auto& record = Ev->Get()->Record; Result = MakeHolder<TEvDataShard::TEvEraseRowsResponse>(); Result->Record.SetTabletID(self->TabletID()); - const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion, globalTxId); + const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion, + globalTxId, &volatileReadDependencies); NKikimrTxDataShard::TEvEraseRowsResponse::EStatus status; TString error; diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h index 9904fd404e9..df5b88b71ad 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.h +++ b/ydb/core/tx/datashard/datashard_direct_erase.h @@ -23,22 +23,24 @@ class TDirectTxErase : public IDirectTx { const TRowVersion ReadVersion; const TRowVersion WriteVersion; const ui64 GlobalTxId; + absl::flat_hash_set<ui64>* const VolatileReadDependencies; private: explicit TExecuteParams(TDirectTxErase* tx, TTransactionContext* txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - const ui64 globalTxId) + ui64 globalTxId, absl::flat_hash_set<ui64>* volatileReadDependencies) : Tx(tx) , Txc(txc) , ReadVersion(readVersion) , WriteVersion(writeVersion) , GlobalTxId(globalTxId) + , VolatileReadDependencies(volatileReadDependencies) { } public: static TExecuteParams ForCheck() { - return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion(), 0); + return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion(), 0, nullptr); } template <typename... Args> @@ -73,7 +75,7 @@ public: bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId) override; + ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) override; TDirectTxResult GetResult(TDataShard* self) override; TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp index 3b228035e64..c4075840ee7 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp @@ -34,8 +34,17 @@ bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) { auto [readVersion, writeVersion] = self->GetReadWriteVersions(this); // NOTE: may throw TNeedGlobalTxId exception, which is handled in direct tx unit - if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId())) + absl::flat_hash_set<ui64> volatileReadDependencies; + if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId(), volatileReadDependencies)) { + if (!volatileReadDependencies.empty()) { + for (ui64 txId : volatileReadDependencies) { + AddVolatileDependency(txId); + bool ok = self->GetVolatileTxManager().AttachBlockedOperation(txId, GetTxId()); + Y_VERIFY_S(ok, "Unexpected failure to attach " << *static_cast<TOperation*>(this) << " to volatile tx " << txId); + } + } return false; + } if (self->IsMvccEnabled()) { // Note: we always wait for completion, so we can ignore the result diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h index f2bfbc94f28..85cacda5f84 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.h +++ b/ydb/core/tx/datashard/datashard_direct_transaction.h @@ -22,7 +22,7 @@ public: virtual ~IDirectTx() = default; virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId) = 0; + ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) = 0; virtual TDirectTxResult GetResult(TDataShard* self) = 0; virtual TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const = 0; }; diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp index 21dd40d7b6d..d0da2be607c 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.cpp +++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp @@ -10,9 +10,10 @@ TDirectTxUpload::TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev) bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId) + ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) { - return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion, globalTxId); + return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion, + globalTxId, &volatileReadDependencies); } TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) { diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h index 4a59bdeb806..5452728cd42 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.h +++ b/ydb/core/tx/datashard/datashard_direct_upload.h @@ -16,7 +16,7 @@ public: bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId) override; + ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) override; TDirectTxResult GetResult(TDataShard* self) override; TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp index 16d4766971b..e1352945f1b 100644 --- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp +++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp @@ -13,7 +13,9 @@ bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TA auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); // NOTE: will not throw TNeedGlobalTxId since we set breakLocks to false - if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion, /* globalTxId */ 0)) { + if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion, + /* globalTxId */ 0, /* volatile read dependencies */ nullptr)) + { return false; } diff --git a/ydb/core/tx/datashard/datashard_user_db.cpp b/ydb/core/tx/datashard/datashard_user_db.cpp index 9787d473a42..0d46b9b5d5d 100644 --- a/ydb/core/tx/datashard/datashard_user_db.cpp +++ b/ydb/core/tx/datashard/datashard_user_db.cpp @@ -13,7 +13,10 @@ NTable::EReady TDataShardUserDb::SelectRow( auto tid = Self.GetLocalTableId(tableId); Y_VERIFY(tid != 0, "Unexpected SelectRow for an unknown table"); - return Db.Select(tid, key, tags, row, stats, /* readFlags */ 0, readVersion.GetOrElse(ReadVersion)); + return Db.Select(tid, key, tags, row, stats, /* readFlags */ 0, + readVersion.GetOrElse(ReadVersion), + GetReadTxMap(), + GetReadTxObserver()); } NTable::EReady TDataShardUserDb::SelectRow( @@ -27,4 +30,78 @@ NTable::EReady TDataShardUserDb::SelectRow( return SelectRow(tableId, key, tags, row, stats, readVersion); } +void TDataShardUserDb::AddCommitTxId(ui64 txId, const TRowVersion& commitVersion) { + if (!DynamicTxMap) { + DynamicTxMap = new NTable::TDynamicTransactionMap(Self.GetVolatileTxManager().GetTxMap()); + TxMap = DynamicTxMap; + } + DynamicTxMap->Add(txId, commitVersion); +} + +NTable::ITransactionMapPtr& TDataShardUserDb::GetReadTxMap() { + if (!TxMap) { + auto baseTxMap = Self.GetVolatileTxManager().GetTxMap(); + if (baseTxMap) { + DynamicTxMap = new NTable::TDynamicTransactionMap(baseTxMap); + TxMap = DynamicTxMap; + } + } + return TxMap; +} + +class TDataShardUserDb::TReadTxObserver : public NTable::ITransactionObserver { +public: + TReadTxObserver(TDataShardUserDb& userDb) + : UserDb(userDb) + { } + + void OnSkipUncommitted(ui64) override { + // nothing + } + + void OnSkipCommitted(const TRowVersion&) override { + // nothing + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&, ui64 txId) override { + UserDb.CheckReadDependency(txId); + } + +private: + TDataShardUserDb& UserDb; +}; + +NTable::ITransactionObserverPtr& TDataShardUserDb::GetReadTxObserver() { + if (!TxObserver) { + auto baseTxMap = Self.GetVolatileTxManager().GetTxMap(); + if (baseTxMap) { + TxObserver = new TReadTxObserver(*this); + } + } + return TxObserver; +} + +void TDataShardUserDb::CheckReadDependency(ui64 txId) { + if (auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId)) { + switch (info->State) { + case EVolatileTxState::Waiting: + VolatileReadDependencies.insert(info->TxId); + break; + case EVolatileTxState::Committed: + break; + case EVolatileTxState::Aborting: + VolatileReadDependencies.insert(info->TxId); + break; + } + } +} + } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_user_db.h b/ydb/core/tx/datashard/datashard_user_db.h index 5731066d5c2..5cd95171dfb 100644 --- a/ydb/core/tx/datashard/datashard_user_db.h +++ b/ydb/core/tx/datashard/datashard_user_db.h @@ -49,10 +49,26 @@ public: NTable::TRowState& row, const TMaybe<TRowVersion>& readVersion = {}) override; + void AddCommitTxId(ui64 txId, const TRowVersion& commitVersion); + + absl::flat_hash_set<ui64>& GetVolatileReadDependencies() { + return VolatileReadDependencies; + } + +private: + class TReadTxObserver; + NTable::ITransactionMapPtr& GetReadTxMap(); + NTable::ITransactionObserverPtr& GetReadTxObserver(); + void CheckReadDependency(ui64 txId); + private: TDataShard& Self; NTable::TDatabase& Db; TRowVersion ReadVersion; + NTable::ITransactionMapPtr TxMap; + TIntrusivePtr<NTable::TDynamicTransactionMap> DynamicTxMap; + NTable::ITransactionObserverPtr TxObserver; + absl::flat_hash_set<ui64> VolatileReadDependencies; }; } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_ut_common_pq.cpp b/ydb/core/tx/datashard/datashard_ut_common_pq.cpp new file mode 100644 index 00000000000..e9f5bcdf998 --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_common_pq.cpp @@ -0,0 +1,68 @@ +#include "datashard_ut_common_pq.h" + +#include <ydb/core/persqueue/user_info.h> +#include <ydb/core/persqueue/write_meta.h> +#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +namespace NKikimr { + + using namespace NYdb::NPersQueue; + using namespace NYdb::NDataStreams::V1; + + ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) { + auto streamDesc = Ls(runtime, sender, path); + + const auto& streamEntry = streamDesc->ResultSet.at(0); + UNIT_ASSERT(streamEntry.ListNodeEntry); + + const auto& children = streamEntry.ListNodeEntry->Children; + UNIT_ASSERT_VALUES_EQUAL(children.size(), 1); + + auto topicDesc = Navigate(runtime, sender, JoinPath(ChildPath(SplitPath(path), children.at(0).Name)), + NSchemeCache::TSchemeCacheNavigate::EOp::OpTopic); + + const auto& topicEntry = topicDesc->ResultSet.at(0); + UNIT_ASSERT(topicEntry.PQGroupInfo); + + const auto& pqDesc = topicEntry.PQGroupInfo->Description; + for (const auto& partition : pqDesc.GetPartitions()) { + if (partitionId == partition.GetPartitionId()) { + return partition.GetTabletId(); + } + } + + UNIT_ASSERT_C(false, "Cannot find partition: " << partitionId); + return 0; + } + + TVector<std::pair<TString, TString>> GetPqRecords(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) { + NKikimrClient::TPersQueueRequest request; + request.MutablePartitionRequest()->SetTopic(path); + request.MutablePartitionRequest()->SetPartition(partitionId); + + auto& cmd = *request.MutablePartitionRequest()->MutableCmdRead(); + cmd.SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER); + cmd.SetCount(10000); + cmd.SetOffset(0); + cmd.SetReadTimestampMs(0); + cmd.SetExternalOperation(true); + + auto req = MakeHolder<TEvPersQueue::TEvRequest>(); + req->Record = std::move(request); + ForwardToTablet(runtime, ResolvePqTablet(runtime, sender, path, partitionId), sender, req.Release()); + + auto resp = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvResponse>(sender); + UNIT_ASSERT(resp); + + TVector<std::pair<TString, TString>> result; + for (const auto& r : resp->Get()->Record.GetPartitionResponse().GetCmdReadResult().GetResult()) { + const auto data = NKikimr::GetDeserializedData(r.GetData()); + result.emplace_back(r.GetPartitionKey(), data.GetData()); + } + + return result; + } + +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_common_pq.h b/ydb/core/tx/datashard/datashard_ut_common_pq.h new file mode 100644 index 00000000000..bc449e1df5b --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_common_pq.h @@ -0,0 +1,9 @@ +#include "datashard_ut_common.h" + +namespace NKikimr { + + ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId); + + TVector<std::pair<TString, TString>> GetPqRecords(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId); + +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index be43f27767c..e7fe922b1ba 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1,10 +1,8 @@ #include "datashard_ut_common.h" #include "datashard_ut_common_kqp.h" +#include "datashard_ut_common_pq.h" #include "datashard_active_transaction.h" -#include <ydb/core/tx/tx_proxy/proxy.h> -#include <ydb/core/tx/tx_proxy/upload_rows.h> - #include <ydb/core/kqp/executer_actor/kqp_executer.h> namespace NKikimr { @@ -1478,31 +1476,32 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); // Write to key 2 using bulk upsert + NThreading::TFuture<Ydb::Table::BulkUpsertResponse> bulkUpsertFuture; { - using TRows = TVector<std::pair<TSerializedCellVec, TString>>; - using TRowTypes = TVector<std::pair<TString, Ydb::Type>>; - - auto types = std::make_shared<TRowTypes>(); - - Ydb::Type type; - type.set_type_id(Ydb::Type::UINT32); - types->emplace_back("key", type); - types->emplace_back("value", type); - - auto rows = std::make_shared<TRows>(); - - TVector<TCell> key{ TCell::Make(ui32(2)) }; - TVector<TCell> values{ TCell::Make(ui32(22)) }; - TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key)); - TString serializedValues(TSerializedCellVec::Serialize(values)); - rows->emplace_back(serializedKey, serializedValues); - - auto upsertSender = runtime.AllocateEdgeActor(); - auto actor = NTxProxy::CreateUploadRowsInternal(upsertSender, "/Root/table-1", types, rows); - runtime.Register(actor); - - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(upsertSender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, Ydb::StatusIds::SUCCESS); + Ydb::Table::BulkUpsertRequest request; + request.set_table("/Root/table-1"); + auto* r = request.mutable_rows(); + + auto* reqRowType = r->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type(); + auto* reqKeyType = reqRowType->add_members(); + reqKeyType->set_name("key"); + reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT32); + auto* reqValueType = reqRowType->add_members(); + reqValueType->set_name("value"); + reqValueType->mutable_type()->set_type_id(Ydb::Type::UINT32); + + auto* reqRows = r->mutable_value(); + auto* row1 = reqRows->add_items(); + row1->add_items()->set_uint32_value(2); + row1->add_items()->set_uint32_value(22); + + using TEvBulkUpsertRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall< + Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>; + bulkUpsertFuture = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>( + std::move(request), "/Root", "", runtime.GetActorSystem(0)); + + auto response = AwaitResponse(runtime, std::move(bulkUpsertFuture)); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } // This compaction verifies there's no commit race with the waiting @@ -1535,6 +1534,172 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }"); } + namespace { + using TCdcStream = TShardedTableOptions::TCdcStream; + + TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { + return TCdcStream{ + .Name = name, + .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages, + .Format = format, + }; + } + + TCdcStream WithInitialScan(TCdcStream streamDesc) { + streamDesc.InitialState = NKikimrSchemeOp::ECdcStreamStateScan; + return streamDesc; + } + + void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) { + while (true) { + const auto records = GetPqRecords(*server->GetRuntime(), sender, path, 0); + if (records.size() >= expected.size()) { + for (ui32 i = 0; i < expected.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(records.at(i).second, expected.at(i)); + } + + UNIT_ASSERT_VALUES_EQUAL(records.size(), expected.size()); + break; + } + + SimulateSleep(server, TDuration::Seconds(1)); + } + } + } // namespace + + Y_UNIT_TEST(DistributedWriteThenBulkUpsertWithCdc) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000) + .SetEnableChangefeedInitialScan(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"value2", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + WaitTxNotification(server, sender, AsyncAlterAddStream(server, "/Root", "table-1", + WithInitialScan(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + WaitForContent(server, sender, "/Root/table-1/Stream", { + R"({"update":{},"newImage":{"value2":null,"value":1},"key":[1]})", + }); + + ui64 maxReadSetStep = 0; + bool captureReadSets = true; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep()); + if (captureReadSets) { + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Write to key 2 using bulk upsert + NThreading::TFuture<Ydb::Table::BulkUpsertResponse> bulkUpsertFuture; + { + Ydb::Table::BulkUpsertRequest request; + request.set_table("/Root/table-1"); + auto* r = request.mutable_rows(); + + auto* reqRowType = r->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type(); + auto* reqKeyType = reqRowType->add_members(); + reqKeyType->set_name("key"); + reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT32); + auto* reqValueType = reqRowType->add_members(); + reqValueType->set_name("value"); + reqValueType->mutable_type()->set_type_id(Ydb::Type::UINT32); + + auto* reqRows = r->mutable_value(); + auto* row1 = reqRows->add_items(); + row1->add_items()->set_uint32_value(2); + row1->add_items()->set_uint32_value(22); + + using TEvBulkUpsertRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall< + Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>; + bulkUpsertFuture = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>( + std::move(request), "/Root", "", runtime.GetActorSystem(0)); + + // Note: we expect bulk upsert to block until key 2 outcome is decided + SimulateSleep(runtime, TDuration::Seconds(1)); + UNIT_ASSERT(!bulkUpsertFuture.HasValue()); + } + + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(future))), + "<empty>"); + + SimulateSleep(runtime, TDuration::Seconds(1)); + + // Verify the result + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value, value2 FROM `/Root/table-1` + UNION ALL + SELECT key, value, value2 FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } items { uint32_value: 42 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }"); + + WaitForContent(server, sender, "/Root/table-1/Stream", { + R"({"update":{},"newImage":{"value2":null,"value":1},"key":[1]})", + R"({"update":{},"newImage":{"value2":42,"value":2},"key":[2]})", + R"({"update":{},"newImage":{"value2":42,"value":22},"key":[2],"oldImage":{"value2":42,"value":2}})", + }); + + UNIT_ASSERT(bulkUpsertFuture.HasValue()); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 5c73693fce7..6a27d5a7db3 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -51,11 +51,22 @@ public: auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize()); if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, op->GetGlobalTxId(), - &groupProvider, changeCollector.Get())) + &userDb, &groupProvider, changeCollector.Get())) { return EExecutionStatus::Restart; } + if (!userDb.GetVolatileReadDependencies().empty()) { + for (ui64 txId : userDb.GetVolatileReadDependencies()) { + op->AddVolatileDependency(txId); + bool ok = DataShard.GetVolatileTxManager().AttachBlockedOperation(txId, op->GetTxId()); + Y_VERIFY_S(ok, "Unexpected failure to attach " << *op << " to volatile tx " << txId); + } + + txc.Reschedule(); + return EExecutionStatus::Restart; + } + if (Pipeline.AddLockDependencies(op, guardLocks)) { txc.Reschedule(); return EExecutionStatus::Restart; @@ -102,6 +113,7 @@ public: bool Execute(TTransactionContext& txc, const NKikimrTxDataShard::TEvEraseRowsRequest& request, const TDynBitMap& presentRows, const TDynBitMap& confirmedRows, const TRowVersion& writeVersion, ui64 globalTxId, + TDataShardUserDb* userDb = nullptr, TDataShardChangeGroupProvider* groupProvider = nullptr, IDataShardChangeCollector* changeCollector = nullptr) { @@ -119,6 +131,7 @@ public: size_t row = 0; bool pageFault = false; + bool commitAdded = false; Y_FOR_EACH_BIT(i, presentRows) { if (!confirmedRows.Test(i)) { ++row; @@ -169,6 +182,11 @@ public: if (!volatileDependencies.empty() || volatileOrdered) { txc.DB.UpdateTx(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId); + if (!commitAdded && userDb) { + // Make sure we see our own changes on further iterations + userDb->AddCommitTxId(globalTxId, writeVersion); + commitAdded = true; + } } else { txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion); } diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt index fe923f8e304..535f86c6ce7 100644 --- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt @@ -41,6 +41,7 @@ target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE ) target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp ) set_property( diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt index e3578c045b3..d7b53aa6756 100644 --- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt @@ -43,6 +43,7 @@ target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE ) target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp ) set_property( diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt index ee4faf0ef51..b1b84cd6a5e 100644 --- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt @@ -45,6 +45,7 @@ target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE ) target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp ) set_property( diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt index d827442621f..8430c3be5eb 100644 --- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC ) target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp ) set_property( diff --git a/ydb/core/tx/datashard/ut_volatile/ya.make b/ydb/core/tx/datashard/ut_volatile/ya.make index 3d33349e805..2e318232208 100644 --- a/ydb/core/tx/datashard/ut_volatile/ya.make +++ b/ydb/core/tx/datashard/ut_volatile/ya.make @@ -31,6 +31,8 @@ YQL_LAST_ABI_VERSION() SRCS( datashard_ut_common.cpp datashard_ut_common.h + datashard_ut_common_pq.cpp + datashard_ut_common_pq.h datashard_ut_volatile.cpp ) |