aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-01-30 14:51:41 +0300
committersnaury <snaury@ydb.tech>2023-01-30 14:51:41 +0300
commitd1b44adf32afaa3332a4c1338d8c2112cc96bb5e (patch)
treef66ab7ee05d069095c68c2591a3c5482c28553ef
parent7c1b72786425e9a2f49dcb3fd78f2f17a8dad07a (diff)
downloadydb-d1b44adf32afaa3332a4c1338d8c2112cc96bb5e.tar.gz
Support waiting for volatile transactions in read iterators
-rw-r--r--ydb/core/tx/datashard/datashard.cpp59
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp217
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h20
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp278
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp35
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h4
7 files changed, 559 insertions, 61 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 4424a1481ca..2786e05dee0 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -489,6 +489,59 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven
delayedAcks.clear();
}
+class TDataShard::TWaitVolatileDependencies final : public IVolatileTxCallback {
+public:
+ TWaitVolatileDependencies(
+ TDataShard* self, const absl::flat_hash_set<ui64>& dependencies,
+ const TActorId& target,
+ std::unique_ptr<IEventBase> event,
+ ui64 cookie)
+ : Self(self)
+ , Dependencies(dependencies)
+ , Target(target)
+ , Event(std::move(event))
+ , Cookie(cookie)
+ { }
+
+ void OnCommit(ui64 txId) override {
+ Dependencies.erase(txId);
+ if (Dependencies.empty()) {
+ Finish();
+ }
+ }
+
+ void OnAbort(ui64 txId) override {
+ Dependencies.erase(txId);
+ if (Dependencies.empty()) {
+ Finish();
+ }
+ }
+
+ void Finish() {
+ Self->Send(Target, Event.release(), 0, Cookie);
+ }
+
+private:
+ TDataShard* Self;
+ absl::flat_hash_set<ui64> Dependencies;
+ TActorId Target;
+ std::unique_ptr<IEventBase> Event;
+ ui64 Cookie;
+};
+
+void TDataShard::WaitVolatileDependenciesThenSend(
+ const absl::flat_hash_set<ui64>& dependencies,
+ const TActorId& target, std::unique_ptr<IEventBase> event,
+ ui64 cookie)
+{
+ Y_VERIFY(!dependencies.empty(), "Unexpected empty dependencies");
+ auto callback = MakeIntrusive<TWaitVolatileDependencies>(this, dependencies, target, std::move(event), cookie);
+ for (ui64 txId : dependencies) {
+ bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, callback);
+ Y_VERIFY_S(ok, "Unexpected failure to attach callback to volatile tx " << txId);
+ }
+}
+
class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
public:
TSendVolatileResult(
@@ -502,7 +555,7 @@ public:
, TxId(txId)
{ }
- void OnCommit() override {
+ void OnCommit(ui64) override {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << " send result to client "
@@ -514,11 +567,11 @@ public:
Self->Send(Target, Result.Release(), flags);
}
- void OnAbort() override {
+ void OnAbort(ui64 txId) override {
Result->Record.ClearTxResult();
Result->Record.SetStatus(NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED);
Result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Distributed transaction aborted due to commit failure");
- OnCommit();
+ OnCommit(txId);
}
private:
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index e2fecfff623..5700ad9fdb9 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -280,6 +280,8 @@ class TReader {
NTable::ITransactionMapPtr TxMap;
NTable::ITransactionObserverPtr TxObserver;
+ absl::flat_hash_set<ui64> VolatileReadDependencies;
+ bool VolatileWaitForCommit = false;
enum class EReadStatus {
Done = 0,
@@ -597,6 +599,9 @@ public:
bool HadInvisibleRowSkips() const { return InvisibleRowSkips > 0; }
bool HadInconsistentResult() const { return HadInconsistentResult_; }
+ const absl::flat_hash_set<ui64>& GetVolatileReadDependencies() const { return VolatileReadDependencies; }
+ bool NeedVolatileWaitForCommit() const { return VolatileWaitForCommit; }
+
private:
bool OutOfQuota() const {
return RowsRead >= State.Quota.Rows ||
@@ -676,32 +681,52 @@ private:
}
const NTable::ITransactionMapPtr& GetReadTxMap() {
- if (!TxMap &&
- State.LockId &&
- !TSysTables::IsSystemTable(State.PathId) &&
- Self->SysLocksTable().HasCurrentWriteLock(State.PathId))
- {
- TxMap = new NTable::TSingleTransactionMap(State.LockId, TRowVersion::Min());
+ if (!TxMap && Self->IsUserTable(State.PathId)) {
+ auto baseTxMap = Self->GetVolatileTxManager().GetTxMap();
+
+ bool needTxMap = (
+ // We need tx map when there are waiting volatile transactions
+ baseTxMap ||
+ // We need tx map when current lock has uncommitted changes
+ State.LockId && Self->SysLocksTable().HasCurrentWriteLock(State.PathId));
+
+ if (needTxMap) {
+ auto ptr = MakeIntrusive<NTable::TDynamicTransactionMap>(baseTxMap);
+ if (State.LockId) {
+ ptr->Add(State.LockId, TRowVersion::Min());
+ }
+ TxMap = ptr;
+ }
}
return TxMap;
}
const NTable::ITransactionObserverPtr& GetReadTxObserver() {
- if (!TxObserver &&
- State.LockId &&
- !TSysTables::IsSystemTable(State.PathId) &&
- Self->SysLocksTable().HasWriteLocks(State.PathId))
- {
- TxObserver = new TReadTxObserver(this);
+ if (!TxObserver && Self->IsUserTable(State.PathId)) {
+ auto baseTxMap = Self->GetVolatileTxManager().GetTxMap();
+
+ bool needTxObserver = (
+ // We need tx observer when there are waiting volatile transactions
+ baseTxMap ||
+ // We need tx observer when current lock has uncommitted changes
+ State.LockId && Self->SysLocksTable().HasCurrentWriteLock(State.PathId));
+
+ if (needTxObserver) {
+ if (State.LockId) {
+ TxObserver = new TLockedReadTxObserver(this);
+ } else {
+ TxObserver = new TReadTxObserver(this);
+ }
+ }
}
return TxObserver;
}
- class TReadTxObserver : public NTable::ITransactionObserver {
+ class TLockedReadTxObserver : public NTable::ITransactionObserver {
public:
- TReadTxObserver(TReader* reader)
+ TLockedReadTxObserver(TReader* reader)
: Reader(reader)
{
}
@@ -722,8 +747,40 @@ private:
Reader->CheckReadConflict(rowVersion);
}
- void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override {
+ void OnApplyCommitted(const TRowVersion& rowVersion, ui64 txId) override {
Reader->CheckReadConflict(rowVersion);
+ Reader->CheckReadDependency(txId);
+ }
+
+ private:
+ TReader* const Reader;
+ };
+
+ class TReadTxObserver : public NTable::ITransactionObserver {
+ public:
+ TReadTxObserver(TReader* reader)
+ : Reader(reader)
+ {
+ }
+
+ void OnSkipUncommitted(ui64) override {
+ // We don't care about uncommitted changes
+ }
+
+ void OnSkipCommitted(const TRowVersion&) override {
+ // We already use InvisibleRowSkips for these
+ }
+
+ void OnSkipCommitted(const TRowVersion&, ui64) override {
+ // We already use InvisibleRowSkips for these
+ }
+
+ void OnApplyCommitted(const TRowVersion&) override {
+ // Not needed
+ }
+
+ void OnApplyCommitted(const TRowVersion&, ui64 txId) override {
+ Reader->CheckReadDependency(txId);
}
private:
@@ -749,6 +806,28 @@ private:
HadInconsistentResult_ = true;
}
}
+
+ void CheckReadDependency(ui64 txId) {
+ if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
+ switch (info->State) {
+ case EVolatileTxState::Waiting:
+ // We are reading undecided changes and need to wait until they are resolved
+ VolatileReadDependencies.insert(info->TxId);
+ break;
+ case EVolatileTxState::Committed:
+ // Committed changes are immediately visible and don't need a dependency
+ if (!info->AddCommitted) {
+ // However we may need to wait until they are persistent
+ VolatileWaitForCommit = true;
+ }
+ break;
+ case EVolatileTxState::Aborting:
+ // We just read something that we know is aborting, we would have to retry later
+ VolatileReadDependencies.insert(info->TxId);
+ break;
+ }
+ }
+ }
};
const NHPTimer::STime TReader::MaxCyclesPerIteration =
@@ -957,6 +1036,18 @@ public:
if (!Read(txc, ctx, state))
return EExecutionStatus::Restart;
+ // Check if successful result depends on unresolved volatile transactions
+ if (Result && !Result->Record.HasStatus() && !Reader->GetVolatileReadDependencies().empty()) {
+ for (ui64 txId : Reader->GetVolatileReadDependencies()) {
+ AddVolatileDependency(txId);
+ bool ok = Self->GetVolatileTxManager().AttachBlockedOperation(txId, GetTxId());
+ Y_VERIFY(ok, "Unexpected failure to attach a blocked operation");
+ }
+ Reader.reset();
+ Result.reset(new TEvDataShard::TEvReadResult());
+ return EExecutionStatus::Continue;
+ }
+
TDataShard::EPromotePostExecuteEdges readType = TDataShard::EPromotePostExecuteEdges::RepeatableRead;
if (state.IsHeadRead) {
@@ -1011,7 +1102,7 @@ public:
if (hadWrites)
return EExecutionStatus::DelayCompleteNoMoreRestarts;
- if (Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion))
+ if (Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion) || Reader && Reader->NeedVolatileWaitForCommit())
return EExecutionStatus::DelayComplete;
Complete(ctx);
@@ -1662,6 +1753,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
std::unique_ptr<IBlockBuilder> BlockBuilder;
TShortTableInfo TableInfo;
std::unique_ptr<TReader> Reader;
+ bool DelayedResult = false;
public:
TTxReadContinue(TDataShard* ds, TEvDataShard::TEvReadContinue::TPtr ev)
@@ -1703,7 +1795,7 @@ public:
Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId);
- SendResult(txc, ctx);
+ SendResult(ctx);
return true;
}
auto userTableInfo = it->second;
@@ -1715,7 +1807,7 @@ public:
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Schema changed, current " << currentSchemaVersion
<< ", requested table schemaversion " << state.SchemaVersion);
- SendResult(txc, ctx);
+ SendResult(ctx);
return true;
}
@@ -1728,7 +1820,7 @@ public:
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Failed to get scheme for table local id: "
<< state.PathId.LocalPathId);
- SendResult(txc, ctx);
+ SendResult(ctx);
return true;
}
TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema);
@@ -1751,7 +1843,7 @@ public:
<< state.ReadVersion << " shard " << Self->TabletID()
<< " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
<< (Self->IsFollower() ? " RO replica" : ""));
- SendResult(txc, ctx);
+ SendResult(ctx);
return true;
}
@@ -1762,7 +1854,7 @@ public:
Result->Record,
Ydb::StatusIds::BAD_REQUEST,
p.second);
- SendResult(txc, ctx);
+ SendResult(ctx);
return true;
}
std::swap(BlockBuilder, p.first);
@@ -1785,19 +1877,34 @@ public:
Self));
if (Reader->Read(txc, ctx)) {
- SendResult(txc, ctx);
+ // Retry later when dependencies are resolved
+ if (!Reader->GetVolatileReadDependencies().empty()) {
+ Self->WaitVolatileDependenciesThenSend(
+ Reader->GetVolatileReadDependencies(),
+ Self->SelfId(),
+ std::make_unique<TEvDataShard::TEvReadContinue>(Ev->Get()->Reader, Ev->Get()->ReadId));
+ return true;
+ }
+
+ ApplyLocks(ctx);
+
+ if (!Reader->NeedVolatileWaitForCommit()) {
+ SendResult(ctx);
+ } else {
+ DelayedResult = true;
+ }
return true;
}
return false;
}
- void Complete(const TActorContext&) override {
- // nothing to do
+ void Complete(const TActorContext& ctx) override {
+ if (DelayedResult) {
+ SendResult(ctx);
+ }
}
- void SendResult(TTransactionContext& txc, const TActorContext& ctx) {
- Y_UNUSED(txc);
-
+ void ApplyLocks(const TActorContext& ctx) {
const auto* request = Ev->Get();
TReadIteratorId readId(request->Reader, request->ReadId);
auto it = Self->ReadIterators.find(readId);
@@ -1806,31 +1913,15 @@ public:
auto& state = *it->second;
if (!Result) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " TTxReadContinue::Execute() finished without Result, aborting");
-
- Result.reset(new TEvDataShard::TEvReadResult());
- SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
- Result->Record.SetReadId(readId.ReadId);
- Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
- Self->DeleteReadIterator(it);
return;
}
- // error happened and status set
auto& record = Result->Record;
if (record.HasStatus()) {
- record.SetSeqNo(state.SeqNo + 1);
- record.SetReadId(readId.ReadId);
- Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString());
- Self->DeleteReadIterator(it);
return;
}
Y_ASSERT(Reader);
- Y_ASSERT(BlockBuilder);
if (state.Lock) {
auto& sysLocks = Self->SysLocksTable();
@@ -1858,10 +1949,6 @@ public:
// A broken write lock means we are reading inconsistent results and must abort
if (state.Lock->IsWriteLock()) {
SetStatusError(record, Ydb::StatusIds::ABORTED, "Read conflict with concurrent transaction");
- record.SetSeqNo(state.SeqNo + 1);
- record.SetReadId(readId.ReadId);
- Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
- Self->DeleteReadIterator(it);
return;
}
@@ -1872,6 +1959,42 @@ public:
Y_VERIFY(locks.empty(), "ApplyLocks acquired unexpected locks");
}
}
+ }
+
+ void SendResult(const TActorContext& ctx) {
+ const auto* request = Ev->Get();
+ TReadIteratorId readId(request->Reader, request->ReadId);
+ auto it = Self->ReadIterators.find(readId);
+ Y_VERIFY(it != Self->ReadIterators.end());
+ Y_VERIFY(it->second);
+ auto& state = *it->second;
+
+ if (!Result) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TTxReadContinue::Execute() finished without Result, aborting");
+
+ Result.reset(new TEvDataShard::TEvReadResult());
+ SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
+ Result->Record.SetReadId(readId.ReadId);
+ Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
+ Self->DeleteReadIterator(it);
+ return;
+ }
+
+ // error happened and status set
+ auto& record = Result->Record;
+ if (record.HasStatus()) {
+ record.SetSeqNo(state.SeqNo + 1);
+ record.SetReadId(readId.ReadId);
+ Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString());
+ Self->DeleteReadIterator(it);
+ return;
+ }
+
+ Y_ASSERT(Reader);
+ Y_ASSERT(BlockBuilder);
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << readId
<< " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 072c621701f..ef53f5e38ee 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -301,6 +301,7 @@ class TDataShard
class TTxApplyReplicationChanges;
+ class TWaitVolatileDependencies;
class TSendVolatileResult;
struct TEvPrivate {
@@ -1342,6 +1343,12 @@ public:
TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets);
void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno);
void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const;
+
+ void WaitVolatileDependenciesThenSend(
+ const absl::flat_hash_set<ui64>& dependencies,
+ const TActorId& target, std::unique_ptr<IEventBase> event,
+ ui64 cookie = 0);
+
void SendResult(const TActorContext &ctx,
TOutputOpData::TResultPtr &result,
const TActorId &target,
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index c81b07a1252..146032bdc8c 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -364,6 +364,26 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSQLRequest(const TString &sql,
void InitRoot(Tests::TServer::TPtr server,
TActorId sender);
+class TLambdaActor : public IActorCallback {
+public:
+ using TCallback = std::function<void(TAutoPtr<IEventHandle>&)>;
+
+public:
+ TLambdaActor(TCallback&& callback)
+ : IActorCallback(static_cast<TReceiveFunc>(&TLambdaActor::StateWork))
+ , Callback(std::move(callback))
+ { }
+
+private:
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ Callback(ev);
+ }
+
+private:
+ TCallback Callback;
+};
+
enum class EShadowDataMode {
Default,
Enabled,
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 40fd19086fa..2d12b9bb6b4 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -948,6 +948,284 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 2 } items { uint32_value: 2 } }");
}
+ Y_UNIT_TEST(DistributedWriteThenReadIterator) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ {"value2", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ ui64 maxReadSetStep = 0;
+ bool captureReadSets = true;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep());
+ if (captureReadSets) {
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+
+ // Note: observer works strangely with edge actor results, so we use a normal actor here
+ TVector<THolder<IEventHandle>> readResults;
+ auto readSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvReadResult::EventType: {
+ Cerr << "... observed TEvReadResult:" << Endl;
+ Cerr << ev->Get<TEvDataShard::TEvReadResult>()->Record.DebugString() << Endl;
+ readResults.emplace_back(ev.Release());
+ break;
+ }
+ default: {
+ Cerr << "... ignore event " << ev->GetTypeRewrite() << Endl;
+ }
+ }
+ }));
+
+ {
+ auto msg = std::make_unique<TEvDataShard::TEvRead>();
+ msg->Record.SetReadId(1);
+ msg->Record.MutableTableId()->SetOwnerId(tableId1.PathId.OwnerId);
+ msg->Record.MutableTableId()->SetTableId(tableId1.PathId.LocalPathId);
+ msg->Record.MutableTableId()->SetSchemaVersion(tableId1.SchemaVersion);
+ msg->Record.MutableSnapshot()->SetStep(maxReadSetStep);
+ msg->Record.MutableSnapshot()->SetTxId(Max<ui64>());
+ msg->Record.AddColumns(1);
+ msg->Record.AddColumns(2);
+ msg->Record.SetResultFormat(NKikimrTxDataShard::ARROW);
+
+ TVector<TCell> fromKeyCells = { TCell::Make(ui32(0)) };
+ TVector<TCell> toKeyCells = { TCell::Make(ui32(10)) };
+ auto fromBuf = TSerializedCellVec::Serialize(fromKeyCells);
+ auto toBuf = TSerializedCellVec::Serialize(toKeyCells);
+ msg->Ranges.emplace_back(fromBuf, toBuf, true, true);
+
+ ForwardToTablet(runtime, shard1, readSender, msg.release());
+ }
+
+ // Since key=2 is not committed we must not observe results yet
+ SimulateSleep(runtime, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(readResults.size(), 0u);
+
+ captureReadSets = false;
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ WaitFor(runtime, [&]{ return readResults.size() > 0; }, "read result");
+ UNIT_ASSERT_VALUES_EQUAL(readResults.size(), 1u);
+
+ {
+ auto* msg = readResults[0]->Get<TEvDataShard::TEvReadResult>();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(msg->GetArrowBatch()->ToString(),
+ "key: [\n"
+ " 1,\n"
+ " 2\n"
+ " ]\n"
+ "value: [\n"
+ " 1,\n"
+ " 2\n"
+ " ]\n");
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetFinished(), true);
+ }
+ }
+
+ Y_UNIT_TEST(DistributedWriteThenReadIteratorStream) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ {"value2", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ ui64 maxReadSetStep = 0;
+ bool captureReadSets = true;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep());
+ if (captureReadSets) {
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+
+ // Note: observer works strangely with edge actor results, so we use a normal actor here
+ TVector<THolder<IEventHandle>> readResults;
+ auto readSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvReadResult::EventType: {
+ Cerr << "... observed TEvReadResult:" << Endl;
+ Cerr << ev->Get<TEvDataShard::TEvReadResult>()->Record.DebugString() << Endl;
+ readResults.emplace_back(ev.Release());
+ break;
+ }
+ default: {
+ Cerr << "... ignore event " << ev->GetTypeRewrite() << Endl;
+ }
+ }
+ }));
+
+ {
+ auto msg = std::make_unique<TEvDataShard::TEvRead>();
+ msg->Record.SetReadId(1);
+ msg->Record.MutableTableId()->SetOwnerId(tableId1.PathId.OwnerId);
+ msg->Record.MutableTableId()->SetTableId(tableId1.PathId.LocalPathId);
+ msg->Record.MutableTableId()->SetSchemaVersion(tableId1.SchemaVersion);
+ msg->Record.MutableSnapshot()->SetStep(maxReadSetStep);
+ msg->Record.MutableSnapshot()->SetTxId(Max<ui64>());
+ msg->Record.AddColumns(1);
+ msg->Record.AddColumns(2);
+ msg->Record.SetResultFormat(NKikimrTxDataShard::ARROW);
+ msg->Record.SetMaxRowsInResult(1);
+
+ TVector<TCell> fromKeyCells = { TCell::Make(ui32(0)) };
+ TVector<TCell> toKeyCells = { TCell::Make(ui32(10)) };
+ auto fromBuf = TSerializedCellVec::Serialize(fromKeyCells);
+ auto toBuf = TSerializedCellVec::Serialize(toKeyCells);
+ msg->Ranges.emplace_back(fromBuf, toBuf, true, true);
+
+ ForwardToTablet(runtime, shard1, readSender, msg.release());
+ }
+
+ // We expect to receive key=1 as soon as possible since it's committed
+ // However further data should not be available so soon
+ SimulateSleep(runtime, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(readResults.size(), 1u);
+
+ // Verify we actually receive key=1
+ {
+ auto* msg = readResults[0]->Get<TEvDataShard::TEvReadResult>();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(msg->GetArrowBatch()->ToString(),
+ "key: [\n"
+ " 1\n"
+ " ]\n"
+ "value: [\n"
+ " 1\n"
+ " ]\n");
+ readResults.clear();
+ }
+
+ // Unblock readsets and let key=2 to commit
+ captureReadSets = false;
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ WaitFor(runtime, [&]{ return readResults.size() > 0; }, "read result");
+ UNIT_ASSERT_GE(readResults.size(), 1u);
+
+ {
+ auto* msg = readResults[0]->Get<TEvDataShard::TEvReadResult>();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(msg->GetArrowBatch()->ToString(),
+ "key: [\n"
+ " 2\n"
+ " ]\n"
+ "value: [\n"
+ " 2\n"
+ " ]\n");
+
+ msg = readResults.back()->Get<TEvDataShard::TEvReadResult>();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetFinished(), true);
+ }
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 9d5a41d2346..d24058be1ef 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -40,6 +40,26 @@ namespace NKikimr::NDataShard {
Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc);
+ if (info->AddCommitted) {
+ OnCommitted(ctx);
+ } else {
+ Delayed = true;
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ if (Delayed) {
+ OnCommitted(ctx);
+ }
+ }
+
+ void OnCommitted(const TActorContext& ctx) {
+ auto* info = Self->VolatileTxManager.FindByTxId(TxId);
+ Y_VERIFY(info && info->State == EVolatileTxState::Committed);
+ Y_VERIFY(info->AddCommitted);
+
Self->VolatileTxManager.UnblockDependents(info);
Self->VolatileTxManager.RemoveFromTxMap(info);
@@ -47,15 +67,11 @@ namespace NKikimr::NDataShard {
Self->VolatileTxManager.RemoveVolatileTx(TxId);
Self->CheckSplitCanStart(ctx);
- return true;
- }
-
- void Complete(const TActorContext&) override {
- // nothing
}
private:
ui64 TxId;
+ bool Delayed = false;
};
class TDataShard::TTxVolatileTxAbort
@@ -100,6 +116,7 @@ namespace NKikimr::NDataShard {
void Complete(const TActorContext& ctx) override {
auto* info = Self->VolatileTxManager.FindByTxId(TxId);
Y_VERIFY(info && info->State == EVolatileTxState::Aborting);
+ Y_VERIFY(info->AddCommitted);
// Run callbacks only after we successfully persist aborted tx
Self->VolatileTxManager.RunAbortCallbacks(info);
@@ -407,7 +424,7 @@ namespace NKikimr::NDataShard {
db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Update();
}
- txc.OnCommit([this, txId]() {
+ txc.OnCommitted([this, txId]() {
auto* info = FindByTxId(txId);
Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId);
Y_VERIFY_S(!info->AddCommitted, "Unexpected commit of a committed volatile txId# " << txId);
@@ -472,7 +489,7 @@ namespace NKikimr::NDataShard {
case EVolatileTxState::Committed:
// We call commit callbacks only when effects are committed
if (it->second->AddCommitted) {
- callback->OnCommit();
+ callback->OnCommit(txId);
} else {
it->second->Callbacks.push_back(std::move(callback));
}
@@ -631,7 +648,7 @@ namespace NKikimr::NDataShard {
auto callbacks = std::move(info->Callbacks);
info->Callbacks.clear();
for (auto& callback : callbacks) {
- callback->OnCommit();
+ callback->OnCommit(info->TxId);
}
UnblockOperations(info, true);
}
@@ -640,7 +657,7 @@ namespace NKikimr::NDataShard {
auto callbacks = std::move(info->Callbacks);
info->Callbacks.clear();
for (auto& callback : callbacks) {
- callback->OnAbort();
+ callback->OnAbort(info->TxId);
}
UnblockOperations(info, false);
}
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index 154ad150308..110701e39df 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -36,8 +36,8 @@ namespace NKikimr::NDataShard {
using TPtr = TIntrusivePtr<IVolatileTxCallback>;
public:
- virtual void OnCommit() = 0;
- virtual void OnAbort() = 0;
+ virtual void OnCommit(ui64 txId) = 0;
+ virtual void OnAbort(ui64 txId) = 0;
};
struct TVolatileTxInfo {