summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <[email protected]>2023-03-21 14:28:52 +0300
committersnaury <[email protected]>2023-03-21 14:28:52 +0300
commit53871f3cefea89d32812e8a68bcd0b803d658fbd (patch)
treeff6b70ffdeafa2d5bc9d7834c504592591eebc1c
parent71ddd44b7b0d38f2f983644e6b91d5a15c71a215 (diff)
Handle cdc dependencies in bulk operations
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp51
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.h4
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp37
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.h8
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.cpp11
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.h2
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.h2
-rw-r--r--ydb/core/tx/datashard/datashard_unsafe_upload.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_user_db.cpp79
-rw-r--r--ydb/core/tx/datashard/datashard_user_db.h16
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_pq.cpp68
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_pq.h9
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp219
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp20
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_volatile/ya.make2
20 files changed, 480 insertions, 61 deletions
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index 61d132b9e94..50aef32273e 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -15,7 +15,8 @@ TCommonUploadOps<TEvRequest, TEvResponse>::TCommonUploadOps(typename TEvRequest:
template <typename TEvRequest, typename TEvResponse>
bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTransactionContext& txc,
- const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId)
+ const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId,
+ absl::flat_hash_set<ui64>* volatileReadDependencies)
{
const auto& record = Ev->Get()->Record;
Result = MakeHolder<TEvResponse>(self->TabletID());
@@ -91,6 +92,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
TSerializedCellVec valueCells;
bool pageFault = false;
+ bool commitAdded = false;
NTable::TRowState rowState;
absl::flat_hash_set<ui64> volatileDependencies;
@@ -178,30 +180,38 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
if (!writeToTableShadow) {
- if (ChangeCollector) {
- Y_VERIFY(CollectChanges);
-
- if (!ChangeCollector->OnUpdate(fullTableId, writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion)) {
- pageFault = true;
+ if (BreakLocks) {
+ if (breakWriteConflicts) {
+ if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
+ pageFault = true;
+ }
}
- if (pageFault) {
- continue;
+ if (!pageFault) {
+ self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
}
}
- if (BreakLocks) {
- if (breakWriteConflicts) {
- if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
- pageFault = true;
+ if (ChangeCollector) {
+ Y_VERIFY(CollectChanges);
+
+ if (!volatileDependencies.empty()) {
+ if (!globalTxId) {
+ throw TNeedGlobalTxId();
}
- if (pageFault) {
- continue;
+ if (!ChangeCollector->OnUpdateTx(fullTableId, writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId)) {
+ pageFault = true;
+ }
+ } else {
+ if (!ChangeCollector->OnUpdate(fullTableId, writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion)) {
+ pageFault = true;
}
}
+ }
- self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
+ if (pageFault) {
+ continue;
}
}
@@ -210,11 +220,22 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
throw TNeedGlobalTxId();
}
txc.DB.UpdateTx(writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId);
+ if (!commitAdded) {
+ // Make sure we see our own changes on further iterations
+ userDb.AddCommitTxId(globalTxId, writeVersion);
+ commitAdded = true;
+ }
} else {
txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion);
}
}
+ if (volatileReadDependencies && !userDb.GetVolatileReadDependencies().empty()) {
+ *volatileReadDependencies = std::move(userDb.GetVolatileReadDependencies());
+ txc.Reschedule();
+ pageFault = true;
+ }
+
if (pageFault) {
if (ChangeCollector) {
ChangeCollector->OnRestart();
diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h
index 50284f42979..0359770362a 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.h
+++ b/ydb/core/tx/datashard/datashard_common_upload.h
@@ -21,7 +21,9 @@ public:
explicit TCommonUploadOps(typename TEvRequest::TPtr& ev, bool breakLocks, bool collectChanges);
protected:
- bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId);
+ bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion,
+ const TRowVersion& writeVersion, ui64 globalTxId,
+ absl::flat_hash_set<ui64>* volatileReadDependencies);
void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie);
const TEvRequest* GetRequest() const;
TEvResponse* GetResult();
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 2d7c033bb4a..cf6505c307a 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -81,6 +81,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
absl::flat_hash_set<ui64> volatileDependencies;
bool pageFault = false;
+ bool commitAdded = false;
for (const auto& serializedKey : request.GetKeyColumns()) {
TSerializedCellVec keyCells;
if (!TSerializedCellVec::TryParse(serializedKey, keyCells)) {
@@ -141,15 +142,25 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
}
- if (auto collector = params.GetChangeCollector()) {
- if (!collector->OnUpdate(fullTableId, localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion)) {
+ if (breakWriteConflicts) {
+ if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
pageFault = true;
}
}
- if (breakWriteConflicts) {
- if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
- pageFault = true;
+ if (auto collector = params.GetChangeCollector()) {
+ if (!volatileDependencies.empty()) {
+ if (!params.GlobalTxId) {
+ throw TNeedGlobalTxId();
+ }
+
+ if (!collector->OnUpdateTx(fullTableId, localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId)) {
+ pageFault = true;
+ }
+ } else {
+ if (!collector->OnUpdate(fullTableId, localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion)) {
+ pageFault = true;
+ }
}
}
@@ -164,11 +175,22 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
throw TNeedGlobalTxId();
}
params.Txc->DB.UpdateTx(localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId);
+ if (!commitAdded && userDb) {
+ // Make sure we see our own changes on further iterations
+ userDb->AddCommitTxId(params.GlobalTxId, params.WriteVersion);
+ commitAdded = true;
+ }
} else {
params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion);
}
}
+ if (params.VolatileReadDependencies && userDb && !userDb->GetVolatileReadDependencies().empty()) {
+ *params.VolatileReadDependencies = std::move(userDb->GetVolatileReadDependencies());
+ params.Txc->Reschedule();
+ pageFault = true;
+ }
+
if (pageFault) {
if (auto collector = params.GetChangeCollector()) {
collector->OnRestart();
@@ -210,14 +232,15 @@ bool TDirectTxErase::CheckRequest(TDataShard* self, const NKikimrTxDataShard::TE
bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc,
const TRowVersion& readVersion, const TRowVersion& writeVersion,
- ui64 globalTxId)
+ ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies)
{
const auto& record = Ev->Get()->Record;
Result = MakeHolder<TEvDataShard::TEvEraseRowsResponse>();
Result->Record.SetTabletID(self->TabletID());
- const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion, globalTxId);
+ const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion,
+ globalTxId, &volatileReadDependencies);
NKikimrTxDataShard::TEvEraseRowsResponse::EStatus status;
TString error;
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h
index 9904fd404e9..df5b88b71ad 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.h
+++ b/ydb/core/tx/datashard/datashard_direct_erase.h
@@ -23,22 +23,24 @@ class TDirectTxErase : public IDirectTx {
const TRowVersion ReadVersion;
const TRowVersion WriteVersion;
const ui64 GlobalTxId;
+ absl::flat_hash_set<ui64>* const VolatileReadDependencies;
private:
explicit TExecuteParams(TDirectTxErase* tx, TTransactionContext* txc,
const TRowVersion& readVersion, const TRowVersion& writeVersion,
- const ui64 globalTxId)
+ ui64 globalTxId, absl::flat_hash_set<ui64>* volatileReadDependencies)
: Tx(tx)
, Txc(txc)
, ReadVersion(readVersion)
, WriteVersion(writeVersion)
, GlobalTxId(globalTxId)
+ , VolatileReadDependencies(volatileReadDependencies)
{
}
public:
static TExecuteParams ForCheck() {
- return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion(), 0);
+ return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion(), 0, nullptr);
}
template <typename... Args>
@@ -73,7 +75,7 @@ public:
bool Execute(TDataShard* self, TTransactionContext& txc,
const TRowVersion& readVersion, const TRowVersion& writeVersion,
- ui64 globalTxId) override;
+ ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) override;
TDirectTxResult GetResult(TDataShard* self) override;
TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
index 3b228035e64..c4075840ee7 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
@@ -34,8 +34,17 @@ bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) {
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
// NOTE: may throw TNeedGlobalTxId exception, which is handled in direct tx unit
- if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId()))
+ absl::flat_hash_set<ui64> volatileReadDependencies;
+ if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId(), volatileReadDependencies)) {
+ if (!volatileReadDependencies.empty()) {
+ for (ui64 txId : volatileReadDependencies) {
+ AddVolatileDependency(txId);
+ bool ok = self->GetVolatileTxManager().AttachBlockedOperation(txId, GetTxId());
+ Y_VERIFY_S(ok, "Unexpected failure to attach " << *static_cast<TOperation*>(this) << " to volatile tx " << txId);
+ }
+ }
return false;
+ }
if (self->IsMvccEnabled()) {
// Note: we always wait for completion, so we can ignore the result
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h
index f2bfbc94f28..85cacda5f84 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.h
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.h
@@ -22,7 +22,7 @@ public:
virtual ~IDirectTx() = default;
virtual bool Execute(TDataShard* self, TTransactionContext& txc,
const TRowVersion& readVersion, const TRowVersion& writeVersion,
- ui64 globalTxId) = 0;
+ ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) = 0;
virtual TDirectTxResult GetResult(TDataShard* self) = 0;
virtual TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const = 0;
};
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp
index 21dd40d7b6d..d0da2be607c 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp
@@ -10,9 +10,10 @@ TDirectTxUpload::TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev)
bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc,
const TRowVersion& readVersion, const TRowVersion& writeVersion,
- ui64 globalTxId)
+ ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies)
{
- return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion, globalTxId);
+ return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion,
+ globalTxId, &volatileReadDependencies);
}
TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) {
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h
index 4a59bdeb806..5452728cd42 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.h
+++ b/ydb/core/tx/datashard/datashard_direct_upload.h
@@ -16,7 +16,7 @@ public:
bool Execute(TDataShard* self, TTransactionContext& txc,
const TRowVersion& readVersion, const TRowVersion& writeVersion,
- ui64 globalTxId) override;
+ ui64 globalTxId, absl::flat_hash_set<ui64>& volatileReadDependencies) override;
TDirectTxResult GetResult(TDataShard* self) override;
TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
index 16d4766971b..e1352945f1b 100644
--- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
@@ -13,7 +13,9 @@ bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TA
auto [readVersion, writeVersion] = Self->GetReadWriteVersions();
// NOTE: will not throw TNeedGlobalTxId since we set breakLocks to false
- if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion, /* globalTxId */ 0)) {
+ if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion,
+ /* globalTxId */ 0, /* volatile read dependencies */ nullptr))
+ {
return false;
}
diff --git a/ydb/core/tx/datashard/datashard_user_db.cpp b/ydb/core/tx/datashard/datashard_user_db.cpp
index 9787d473a42..0d46b9b5d5d 100644
--- a/ydb/core/tx/datashard/datashard_user_db.cpp
+++ b/ydb/core/tx/datashard/datashard_user_db.cpp
@@ -13,7 +13,10 @@ NTable::EReady TDataShardUserDb::SelectRow(
auto tid = Self.GetLocalTableId(tableId);
Y_VERIFY(tid != 0, "Unexpected SelectRow for an unknown table");
- return Db.Select(tid, key, tags, row, stats, /* readFlags */ 0, readVersion.GetOrElse(ReadVersion));
+ return Db.Select(tid, key, tags, row, stats, /* readFlags */ 0,
+ readVersion.GetOrElse(ReadVersion),
+ GetReadTxMap(),
+ GetReadTxObserver());
}
NTable::EReady TDataShardUserDb::SelectRow(
@@ -27,4 +30,78 @@ NTable::EReady TDataShardUserDb::SelectRow(
return SelectRow(tableId, key, tags, row, stats, readVersion);
}
+void TDataShardUserDb::AddCommitTxId(ui64 txId, const TRowVersion& commitVersion) {
+ if (!DynamicTxMap) {
+ DynamicTxMap = new NTable::TDynamicTransactionMap(Self.GetVolatileTxManager().GetTxMap());
+ TxMap = DynamicTxMap;
+ }
+ DynamicTxMap->Add(txId, commitVersion);
+}
+
+NTable::ITransactionMapPtr& TDataShardUserDb::GetReadTxMap() {
+ if (!TxMap) {
+ auto baseTxMap = Self.GetVolatileTxManager().GetTxMap();
+ if (baseTxMap) {
+ DynamicTxMap = new NTable::TDynamicTransactionMap(baseTxMap);
+ TxMap = DynamicTxMap;
+ }
+ }
+ return TxMap;
+}
+
+class TDataShardUserDb::TReadTxObserver : public NTable::ITransactionObserver {
+public:
+ TReadTxObserver(TDataShardUserDb& userDb)
+ : UserDb(userDb)
+ { }
+
+ void OnSkipUncommitted(ui64) override {
+ // nothing
+ }
+
+ void OnSkipCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnSkipCommitted(const TRowVersion&, ui64) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&, ui64 txId) override {
+ UserDb.CheckReadDependency(txId);
+ }
+
+private:
+ TDataShardUserDb& UserDb;
+};
+
+NTable::ITransactionObserverPtr& TDataShardUserDb::GetReadTxObserver() {
+ if (!TxObserver) {
+ auto baseTxMap = Self.GetVolatileTxManager().GetTxMap();
+ if (baseTxMap) {
+ TxObserver = new TReadTxObserver(*this);
+ }
+ }
+ return TxObserver;
+}
+
+void TDataShardUserDb::CheckReadDependency(ui64 txId) {
+ if (auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId)) {
+ switch (info->State) {
+ case EVolatileTxState::Waiting:
+ VolatileReadDependencies.insert(info->TxId);
+ break;
+ case EVolatileTxState::Committed:
+ break;
+ case EVolatileTxState::Aborting:
+ VolatileReadDependencies.insert(info->TxId);
+ break;
+ }
+ }
+}
+
} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_user_db.h b/ydb/core/tx/datashard/datashard_user_db.h
index 5731066d5c2..5cd95171dfb 100644
--- a/ydb/core/tx/datashard/datashard_user_db.h
+++ b/ydb/core/tx/datashard/datashard_user_db.h
@@ -49,10 +49,26 @@ public:
NTable::TRowState& row,
const TMaybe<TRowVersion>& readVersion = {}) override;
+ void AddCommitTxId(ui64 txId, const TRowVersion& commitVersion);
+
+ absl::flat_hash_set<ui64>& GetVolatileReadDependencies() {
+ return VolatileReadDependencies;
+ }
+
+private:
+ class TReadTxObserver;
+ NTable::ITransactionMapPtr& GetReadTxMap();
+ NTable::ITransactionObserverPtr& GetReadTxObserver();
+ void CheckReadDependency(ui64 txId);
+
private:
TDataShard& Self;
NTable::TDatabase& Db;
TRowVersion ReadVersion;
+ NTable::ITransactionMapPtr TxMap;
+ TIntrusivePtr<NTable::TDynamicTransactionMap> DynamicTxMap;
+ NTable::ITransactionObserverPtr TxObserver;
+ absl::flat_hash_set<ui64> VolatileReadDependencies;
};
} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_ut_common_pq.cpp b/ydb/core/tx/datashard/datashard_ut_common_pq.cpp
new file mode 100644
index 00000000000..e9f5bcdf998
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_ut_common_pq.cpp
@@ -0,0 +1,68 @@
+#include "datashard_ut_common_pq.h"
+
+#include <ydb/core/persqueue/user_info.h>
+#include <ydb/core/persqueue/write_meta.h>
+#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+namespace NKikimr {
+
+ using namespace NYdb::NPersQueue;
+ using namespace NYdb::NDataStreams::V1;
+
+ ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) {
+ auto streamDesc = Ls(runtime, sender, path);
+
+ const auto& streamEntry = streamDesc->ResultSet.at(0);
+ UNIT_ASSERT(streamEntry.ListNodeEntry);
+
+ const auto& children = streamEntry.ListNodeEntry->Children;
+ UNIT_ASSERT_VALUES_EQUAL(children.size(), 1);
+
+ auto topicDesc = Navigate(runtime, sender, JoinPath(ChildPath(SplitPath(path), children.at(0).Name)),
+ NSchemeCache::TSchemeCacheNavigate::EOp::OpTopic);
+
+ const auto& topicEntry = topicDesc->ResultSet.at(0);
+ UNIT_ASSERT(topicEntry.PQGroupInfo);
+
+ const auto& pqDesc = topicEntry.PQGroupInfo->Description;
+ for (const auto& partition : pqDesc.GetPartitions()) {
+ if (partitionId == partition.GetPartitionId()) {
+ return partition.GetTabletId();
+ }
+ }
+
+ UNIT_ASSERT_C(false, "Cannot find partition: " << partitionId);
+ return 0;
+ }
+
+ TVector<std::pair<TString, TString>> GetPqRecords(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) {
+ NKikimrClient::TPersQueueRequest request;
+ request.MutablePartitionRequest()->SetTopic(path);
+ request.MutablePartitionRequest()->SetPartition(partitionId);
+
+ auto& cmd = *request.MutablePartitionRequest()->MutableCmdRead();
+ cmd.SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER);
+ cmd.SetCount(10000);
+ cmd.SetOffset(0);
+ cmd.SetReadTimestampMs(0);
+ cmd.SetExternalOperation(true);
+
+ auto req = MakeHolder<TEvPersQueue::TEvRequest>();
+ req->Record = std::move(request);
+ ForwardToTablet(runtime, ResolvePqTablet(runtime, sender, path, partitionId), sender, req.Release());
+
+ auto resp = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvResponse>(sender);
+ UNIT_ASSERT(resp);
+
+ TVector<std::pair<TString, TString>> result;
+ for (const auto& r : resp->Get()->Record.GetPartitionResponse().GetCmdReadResult().GetResult()) {
+ const auto data = NKikimr::GetDeserializedData(r.GetData());
+ result.emplace_back(r.GetPartitionKey(), data.GetData());
+ }
+
+ return result;
+ }
+
+} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_common_pq.h b/ydb/core/tx/datashard/datashard_ut_common_pq.h
new file mode 100644
index 00000000000..bc449e1df5b
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_ut_common_pq.h
@@ -0,0 +1,9 @@
+#include "datashard_ut_common.h"
+
+namespace NKikimr {
+
+ ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId);
+
+ TVector<std::pair<TString, TString>> GetPqRecords(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId);
+
+} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index be43f27767c..e7fe922b1ba 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -1,10 +1,8 @@
#include "datashard_ut_common.h"
#include "datashard_ut_common_kqp.h"
+#include "datashard_ut_common_pq.h"
#include "datashard_active_transaction.h"
-#include <ydb/core/tx/tx_proxy/proxy.h>
-#include <ydb/core/tx/tx_proxy/upload_rows.h>
-
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
namespace NKikimr {
@@ -1478,31 +1476,32 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
// Write to key 2 using bulk upsert
+ NThreading::TFuture<Ydb::Table::BulkUpsertResponse> bulkUpsertFuture;
{
- using TRows = TVector<std::pair<TSerializedCellVec, TString>>;
- using TRowTypes = TVector<std::pair<TString, Ydb::Type>>;
-
- auto types = std::make_shared<TRowTypes>();
-
- Ydb::Type type;
- type.set_type_id(Ydb::Type::UINT32);
- types->emplace_back("key", type);
- types->emplace_back("value", type);
-
- auto rows = std::make_shared<TRows>();
-
- TVector<TCell> key{ TCell::Make(ui32(2)) };
- TVector<TCell> values{ TCell::Make(ui32(22)) };
- TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key));
- TString serializedValues(TSerializedCellVec::Serialize(values));
- rows->emplace_back(serializedKey, serializedValues);
-
- auto upsertSender = runtime.AllocateEdgeActor();
- auto actor = NTxProxy::CreateUploadRowsInternal(upsertSender, "/Root/table-1", types, rows);
- runtime.Register(actor);
-
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(upsertSender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, Ydb::StatusIds::SUCCESS);
+ Ydb::Table::BulkUpsertRequest request;
+ request.set_table("/Root/table-1");
+ auto* r = request.mutable_rows();
+
+ auto* reqRowType = r->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type();
+ auto* reqKeyType = reqRowType->add_members();
+ reqKeyType->set_name("key");
+ reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT32);
+ auto* reqValueType = reqRowType->add_members();
+ reqValueType->set_name("value");
+ reqValueType->mutable_type()->set_type_id(Ydb::Type::UINT32);
+
+ auto* reqRows = r->mutable_value();
+ auto* row1 = reqRows->add_items();
+ row1->add_items()->set_uint32_value(2);
+ row1->add_items()->set_uint32_value(22);
+
+ using TEvBulkUpsertRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<
+ Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>;
+ bulkUpsertFuture = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(
+ std::move(request), "/Root", "", runtime.GetActorSystem(0));
+
+ auto response = AwaitResponse(runtime, std::move(bulkUpsertFuture));
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
// This compaction verifies there's no commit race with the waiting
@@ -1535,6 +1534,172 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }");
}
+ namespace {
+ using TCdcStream = TShardedTableOptions::TCdcStream;
+
+ TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
+ return TCdcStream{
+ .Name = name,
+ .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages,
+ .Format = format,
+ };
+ }
+
+ TCdcStream WithInitialScan(TCdcStream streamDesc) {
+ streamDesc.InitialState = NKikimrSchemeOp::ECdcStreamStateScan;
+ return streamDesc;
+ }
+
+ void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
+ while (true) {
+ const auto records = GetPqRecords(*server->GetRuntime(), sender, path, 0);
+ if (records.size() >= expected.size()) {
+ for (ui32 i = 0; i < expected.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(records.at(i).second, expected.at(i));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(records.size(), expected.size());
+ break;
+ }
+
+ SimulateSleep(server, TDuration::Seconds(1));
+ }
+ }
+ } // namespace
+
+ Y_UNIT_TEST(DistributedWriteThenBulkUpsertWithCdc) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000)
+ .SetEnableChangefeedInitialScan(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ {"value2", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ WaitTxNotification(server, sender, AsyncAlterAddStream(server, "/Root", "table-1",
+ WithInitialScan(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatJson))));
+
+ WaitForContent(server, sender, "/Root/table-1/Stream", {
+ R"({"update":{},"newImage":{"value2":null,"value":1},"key":[1]})",
+ });
+
+ ui64 maxReadSetStep = 0;
+ bool captureReadSets = true;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep());
+ if (captureReadSets) {
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Write to key 2 using bulk upsert
+ NThreading::TFuture<Ydb::Table::BulkUpsertResponse> bulkUpsertFuture;
+ {
+ Ydb::Table::BulkUpsertRequest request;
+ request.set_table("/Root/table-1");
+ auto* r = request.mutable_rows();
+
+ auto* reqRowType = r->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type();
+ auto* reqKeyType = reqRowType->add_members();
+ reqKeyType->set_name("key");
+ reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT32);
+ auto* reqValueType = reqRowType->add_members();
+ reqValueType->set_name("value");
+ reqValueType->mutable_type()->set_type_id(Ydb::Type::UINT32);
+
+ auto* reqRows = r->mutable_value();
+ auto* row1 = reqRows->add_items();
+ row1->add_items()->set_uint32_value(2);
+ row1->add_items()->set_uint32_value(22);
+
+ using TEvBulkUpsertRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<
+ Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>;
+ bulkUpsertFuture = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(
+ std::move(request), "/Root", "", runtime.GetActorSystem(0));
+
+ // Note: we expect bulk upsert to block until key 2 outcome is decided
+ SimulateSleep(runtime, TDuration::Seconds(1));
+ UNIT_ASSERT(!bulkUpsertFuture.HasValue());
+ }
+
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(future))),
+ "<empty>");
+
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ // Verify the result
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value, value2 FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 22 } items { uint32_value: 42 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } items { null_flag_value: NULL_VALUE } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }");
+
+ WaitForContent(server, sender, "/Root/table-1/Stream", {
+ R"({"update":{},"newImage":{"value2":null,"value":1},"key":[1]})",
+ R"({"update":{},"newImage":{"value2":42,"value":2},"key":[2]})",
+ R"({"update":{},"newImage":{"value2":42,"value":22},"key":[2],"oldImage":{"value2":42,"value":2}})",
+ });
+
+ UNIT_ASSERT(bulkUpsertFuture.HasValue());
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index 5c73693fce7..6a27d5a7db3 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -51,11 +51,22 @@ public:
auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize());
if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, op->GetGlobalTxId(),
- &groupProvider, changeCollector.Get()))
+ &userDb, &groupProvider, changeCollector.Get()))
{
return EExecutionStatus::Restart;
}
+ if (!userDb.GetVolatileReadDependencies().empty()) {
+ for (ui64 txId : userDb.GetVolatileReadDependencies()) {
+ op->AddVolatileDependency(txId);
+ bool ok = DataShard.GetVolatileTxManager().AttachBlockedOperation(txId, op->GetTxId());
+ Y_VERIFY_S(ok, "Unexpected failure to attach " << *op << " to volatile tx " << txId);
+ }
+
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+
if (Pipeline.AddLockDependencies(op, guardLocks)) {
txc.Reschedule();
return EExecutionStatus::Restart;
@@ -102,6 +113,7 @@ public:
bool Execute(TTransactionContext& txc, const NKikimrTxDataShard::TEvEraseRowsRequest& request,
const TDynBitMap& presentRows, const TDynBitMap& confirmedRows, const TRowVersion& writeVersion,
ui64 globalTxId,
+ TDataShardUserDb* userDb = nullptr,
TDataShardChangeGroupProvider* groupProvider = nullptr,
IDataShardChangeCollector* changeCollector = nullptr)
{
@@ -119,6 +131,7 @@ public:
size_t row = 0;
bool pageFault = false;
+ bool commitAdded = false;
Y_FOR_EACH_BIT(i, presentRows) {
if (!confirmedRows.Test(i)) {
++row;
@@ -169,6 +182,11 @@ public:
if (!volatileDependencies.empty() || volatileOrdered) {
txc.DB.UpdateTx(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId);
+ if (!commitAdded && userDb) {
+ // Make sure we see our own changes on further iterations
+ userDb->AddCommitTxId(globalTxId, writeVersion);
+ commitAdded = true;
+ }
} else {
txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion);
}
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt
index fe923f8e304..535f86c6ce7 100644
--- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin-x86_64.txt
@@ -41,6 +41,7 @@ target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE
)
target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp
)
set_property(
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt
index e3578c045b3..d7b53aa6756 100644
--- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt
@@ -43,6 +43,7 @@ target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE
)
target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp
)
set_property(
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt
index ee4faf0ef51..b1b84cd6a5e 100644
--- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt
@@ -45,6 +45,7 @@ target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE
)
target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp
)
set_property(
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt
index d827442621f..8430c3be5eb 100644
--- a/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC
)
target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common_pq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp
)
set_property(
diff --git a/ydb/core/tx/datashard/ut_volatile/ya.make b/ydb/core/tx/datashard/ut_volatile/ya.make
index 3d33349e805..2e318232208 100644
--- a/ydb/core/tx/datashard/ut_volatile/ya.make
+++ b/ydb/core/tx/datashard/ut_volatile/ya.make
@@ -31,6 +31,8 @@ YQL_LAST_ABI_VERSION()
SRCS(
datashard_ut_common.cpp
datashard_ut_common.h
+ datashard_ut_common_pq.cpp
+ datashard_ut_common_pq.h
datashard_ut_volatile.cpp
)