diff options
author | snaury <snaury@ydb.tech> | 2023-01-30 14:51:41 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-01-30 14:51:41 +0300 |
commit | d1b44adf32afaa3332a4c1338d8c2112cc96bb5e (patch) | |
tree | f66ab7ee05d069095c68c2591a3c5482c28553ef | |
parent | 7c1b72786425e9a2f49dcb3fd78f2f17a8dad07a (diff) | |
download | ydb-d1b44adf32afaa3332a4c1338d8c2112cc96bb5e.tar.gz |
Support waiting for volatile transactions in read iterators
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 59 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 217 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 278 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.h | 4 |
7 files changed, 559 insertions, 61 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 4424a1481ca..2786e05dee0 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -489,6 +489,59 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven delayedAcks.clear(); } +class TDataShard::TWaitVolatileDependencies final : public IVolatileTxCallback { +public: + TWaitVolatileDependencies( + TDataShard* self, const absl::flat_hash_set<ui64>& dependencies, + const TActorId& target, + std::unique_ptr<IEventBase> event, + ui64 cookie) + : Self(self) + , Dependencies(dependencies) + , Target(target) + , Event(std::move(event)) + , Cookie(cookie) + { } + + void OnCommit(ui64 txId) override { + Dependencies.erase(txId); + if (Dependencies.empty()) { + Finish(); + } + } + + void OnAbort(ui64 txId) override { + Dependencies.erase(txId); + if (Dependencies.empty()) { + Finish(); + } + } + + void Finish() { + Self->Send(Target, Event.release(), 0, Cookie); + } + +private: + TDataShard* Self; + absl::flat_hash_set<ui64> Dependencies; + TActorId Target; + std::unique_ptr<IEventBase> Event; + ui64 Cookie; +}; + +void TDataShard::WaitVolatileDependenciesThenSend( + const absl::flat_hash_set<ui64>& dependencies, + const TActorId& target, std::unique_ptr<IEventBase> event, + ui64 cookie) +{ + Y_VERIFY(!dependencies.empty(), "Unexpected empty dependencies"); + auto callback = MakeIntrusive<TWaitVolatileDependencies>(this, dependencies, target, std::move(event), cookie); + for (ui64 txId : dependencies) { + bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, callback); + Y_VERIFY_S(ok, "Unexpected failure to attach callback to volatile tx " << txId); + } +} + class TDataShard::TSendVolatileResult final : public IVolatileTxCallback { public: TSendVolatileResult( @@ -502,7 +555,7 @@ public: , TxId(txId) { } - void OnCommit() override { + void OnCommit(ui64) override { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Complete [" << Step << " : " << TxId << "] from " << Self->TabletID() << " at tablet " << Self->TabletID() << " send result to client " @@ -514,11 +567,11 @@ public: Self->Send(Target, Result.Release(), flags); } - void OnAbort() override { + void OnAbort(ui64 txId) override { Result->Record.ClearTxResult(); Result->Record.SetStatus(NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED); Result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Distributed transaction aborted due to commit failure"); - OnCommit(); + OnCommit(txId); } private: diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index e2fecfff623..5700ad9fdb9 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -280,6 +280,8 @@ class TReader { NTable::ITransactionMapPtr TxMap; NTable::ITransactionObserverPtr TxObserver; + absl::flat_hash_set<ui64> VolatileReadDependencies; + bool VolatileWaitForCommit = false; enum class EReadStatus { Done = 0, @@ -597,6 +599,9 @@ public: bool HadInvisibleRowSkips() const { return InvisibleRowSkips > 0; } bool HadInconsistentResult() const { return HadInconsistentResult_; } + const absl::flat_hash_set<ui64>& GetVolatileReadDependencies() const { return VolatileReadDependencies; } + bool NeedVolatileWaitForCommit() const { return VolatileWaitForCommit; } + private: bool OutOfQuota() const { return RowsRead >= State.Quota.Rows || @@ -676,32 +681,52 @@ private: } const NTable::ITransactionMapPtr& GetReadTxMap() { - if (!TxMap && - State.LockId && - !TSysTables::IsSystemTable(State.PathId) && - Self->SysLocksTable().HasCurrentWriteLock(State.PathId)) - { - TxMap = new NTable::TSingleTransactionMap(State.LockId, TRowVersion::Min()); + if (!TxMap && Self->IsUserTable(State.PathId)) { + auto baseTxMap = Self->GetVolatileTxManager().GetTxMap(); + + bool needTxMap = ( + // We need tx map when there are waiting volatile transactions + baseTxMap || + // We need tx map when current lock has uncommitted changes + State.LockId && Self->SysLocksTable().HasCurrentWriteLock(State.PathId)); + + if (needTxMap) { + auto ptr = MakeIntrusive<NTable::TDynamicTransactionMap>(baseTxMap); + if (State.LockId) { + ptr->Add(State.LockId, TRowVersion::Min()); + } + TxMap = ptr; + } } return TxMap; } const NTable::ITransactionObserverPtr& GetReadTxObserver() { - if (!TxObserver && - State.LockId && - !TSysTables::IsSystemTable(State.PathId) && - Self->SysLocksTable().HasWriteLocks(State.PathId)) - { - TxObserver = new TReadTxObserver(this); + if (!TxObserver && Self->IsUserTable(State.PathId)) { + auto baseTxMap = Self->GetVolatileTxManager().GetTxMap(); + + bool needTxObserver = ( + // We need tx observer when there are waiting volatile transactions + baseTxMap || + // We need tx observer when current lock has uncommitted changes + State.LockId && Self->SysLocksTable().HasCurrentWriteLock(State.PathId)); + + if (needTxObserver) { + if (State.LockId) { + TxObserver = new TLockedReadTxObserver(this); + } else { + TxObserver = new TReadTxObserver(this); + } + } } return TxObserver; } - class TReadTxObserver : public NTable::ITransactionObserver { + class TLockedReadTxObserver : public NTable::ITransactionObserver { public: - TReadTxObserver(TReader* reader) + TLockedReadTxObserver(TReader* reader) : Reader(reader) { } @@ -722,8 +747,40 @@ private: Reader->CheckReadConflict(rowVersion); } - void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override { + void OnApplyCommitted(const TRowVersion& rowVersion, ui64 txId) override { Reader->CheckReadConflict(rowVersion); + Reader->CheckReadDependency(txId); + } + + private: + TReader* const Reader; + }; + + class TReadTxObserver : public NTable::ITransactionObserver { + public: + TReadTxObserver(TReader* reader) + : Reader(reader) + { + } + + void OnSkipUncommitted(ui64) override { + // We don't care about uncommitted changes + } + + void OnSkipCommitted(const TRowVersion&) override { + // We already use InvisibleRowSkips for these + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // We already use InvisibleRowSkips for these + } + + void OnApplyCommitted(const TRowVersion&) override { + // Not needed + } + + void OnApplyCommitted(const TRowVersion&, ui64 txId) override { + Reader->CheckReadDependency(txId); } private: @@ -749,6 +806,28 @@ private: HadInconsistentResult_ = true; } } + + void CheckReadDependency(ui64 txId) { + if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { + switch (info->State) { + case EVolatileTxState::Waiting: + // We are reading undecided changes and need to wait until they are resolved + VolatileReadDependencies.insert(info->TxId); + break; + case EVolatileTxState::Committed: + // Committed changes are immediately visible and don't need a dependency + if (!info->AddCommitted) { + // However we may need to wait until they are persistent + VolatileWaitForCommit = true; + } + break; + case EVolatileTxState::Aborting: + // We just read something that we know is aborting, we would have to retry later + VolatileReadDependencies.insert(info->TxId); + break; + } + } + } }; const NHPTimer::STime TReader::MaxCyclesPerIteration = @@ -957,6 +1036,18 @@ public: if (!Read(txc, ctx, state)) return EExecutionStatus::Restart; + // Check if successful result depends on unresolved volatile transactions + if (Result && !Result->Record.HasStatus() && !Reader->GetVolatileReadDependencies().empty()) { + for (ui64 txId : Reader->GetVolatileReadDependencies()) { + AddVolatileDependency(txId); + bool ok = Self->GetVolatileTxManager().AttachBlockedOperation(txId, GetTxId()); + Y_VERIFY(ok, "Unexpected failure to attach a blocked operation"); + } + Reader.reset(); + Result.reset(new TEvDataShard::TEvReadResult()); + return EExecutionStatus::Continue; + } + TDataShard::EPromotePostExecuteEdges readType = TDataShard::EPromotePostExecuteEdges::RepeatableRead; if (state.IsHeadRead) { @@ -1011,7 +1102,7 @@ public: if (hadWrites) return EExecutionStatus::DelayCompleteNoMoreRestarts; - if (Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion)) + if (Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion) || Reader && Reader->NeedVolatileWaitForCommit()) return EExecutionStatus::DelayComplete; Complete(ctx); @@ -1662,6 +1753,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase std::unique_ptr<IBlockBuilder> BlockBuilder; TShortTableInfo TableInfo; std::unique_ptr<TReader> Reader; + bool DelayedResult = false; public: TTxReadContinue(TDataShard* ds, TEvDataShard::TEvReadContinue::TPtr ev) @@ -1703,7 +1795,7 @@ public: Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId); - SendResult(txc, ctx); + SendResult(ctx); return true; } auto userTableInfo = it->second; @@ -1715,7 +1807,7 @@ public: Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Schema changed, current " << currentSchemaVersion << ", requested table schemaversion " << state.SchemaVersion); - SendResult(txc, ctx); + SendResult(ctx); return true; } @@ -1728,7 +1820,7 @@ public: Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Failed to get scheme for table local id: " << state.PathId.LocalPathId); - SendResult(txc, ctx); + SendResult(ctx); return true; } TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema); @@ -1751,7 +1843,7 @@ public: << state.ReadVersion << " shard " << Self->TabletID() << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() << (Self->IsFollower() ? " RO replica" : "")); - SendResult(txc, ctx); + SendResult(ctx); return true; } @@ -1762,7 +1854,7 @@ public: Result->Record, Ydb::StatusIds::BAD_REQUEST, p.second); - SendResult(txc, ctx); + SendResult(ctx); return true; } std::swap(BlockBuilder, p.first); @@ -1785,19 +1877,34 @@ public: Self)); if (Reader->Read(txc, ctx)) { - SendResult(txc, ctx); + // Retry later when dependencies are resolved + if (!Reader->GetVolatileReadDependencies().empty()) { + Self->WaitVolatileDependenciesThenSend( + Reader->GetVolatileReadDependencies(), + Self->SelfId(), + std::make_unique<TEvDataShard::TEvReadContinue>(Ev->Get()->Reader, Ev->Get()->ReadId)); + return true; + } + + ApplyLocks(ctx); + + if (!Reader->NeedVolatileWaitForCommit()) { + SendResult(ctx); + } else { + DelayedResult = true; + } return true; } return false; } - void Complete(const TActorContext&) override { - // nothing to do + void Complete(const TActorContext& ctx) override { + if (DelayedResult) { + SendResult(ctx); + } } - void SendResult(TTransactionContext& txc, const TActorContext& ctx) { - Y_UNUSED(txc); - + void ApplyLocks(const TActorContext& ctx) { const auto* request = Ev->Get(); TReadIteratorId readId(request->Reader, request->ReadId); auto it = Self->ReadIterators.find(readId); @@ -1806,31 +1913,15 @@ public: auto& state = *it->second; if (!Result) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " TTxReadContinue::Execute() finished without Result, aborting"); - - Result.reset(new TEvDataShard::TEvReadResult()); - SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); - Result->Record.SetReadId(readId.ReadId); - Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); - Self->DeleteReadIterator(it); return; } - // error happened and status set auto& record = Result->Record; if (record.HasStatus()) { - record.SetSeqNo(state.SeqNo + 1); - record.SetReadId(readId.ReadId); - Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString()); - Self->DeleteReadIterator(it); return; } Y_ASSERT(Reader); - Y_ASSERT(BlockBuilder); if (state.Lock) { auto& sysLocks = Self->SysLocksTable(); @@ -1858,10 +1949,6 @@ public: // A broken write lock means we are reading inconsistent results and must abort if (state.Lock->IsWriteLock()) { SetStatusError(record, Ydb::StatusIds::ABORTED, "Read conflict with concurrent transaction"); - record.SetSeqNo(state.SeqNo + 1); - record.SetReadId(readId.ReadId); - Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); - Self->DeleteReadIterator(it); return; } @@ -1872,6 +1959,42 @@ public: Y_VERIFY(locks.empty(), "ApplyLocks acquired unexpected locks"); } } + } + + void SendResult(const TActorContext& ctx) { + const auto* request = Ev->Get(); + TReadIteratorId readId(request->Reader, request->ReadId); + auto it = Self->ReadIterators.find(readId); + Y_VERIFY(it != Self->ReadIterators.end()); + Y_VERIFY(it->second); + auto& state = *it->second; + + if (!Result) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TTxReadContinue::Execute() finished without Result, aborting"); + + Result.reset(new TEvDataShard::TEvReadResult()); + SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); + Result->Record.SetReadId(readId.ReadId); + Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + Self->DeleteReadIterator(it); + return; + } + + // error happened and status set + auto& record = Result->Record; + if (record.HasStatus()) { + record.SetSeqNo(state.SeqNo + 1); + record.SetReadId(readId.ReadId); + Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString()); + Self->DeleteReadIterator(it); + return; + } + + Y_ASSERT(Reader); + Y_ASSERT(BlockBuilder); LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << readId << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries() diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 072c621701f..ef53f5e38ee 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -301,6 +301,7 @@ class TDataShard class TTxApplyReplicationChanges; + class TWaitVolatileDependencies; class TSendVolatileResult; struct TEvPrivate { @@ -1342,6 +1343,12 @@ public: TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets); void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno); void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const; + + void WaitVolatileDependenciesThenSend( + const absl::flat_hash_set<ui64>& dependencies, + const TActorId& target, std::unique_ptr<IEventBase> event, + ui64 cookie = 0); + void SendResult(const TActorContext &ctx, TOutputOpData::TResultPtr &result, const TActorId &target, diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index c81b07a1252..146032bdc8c 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -364,6 +364,26 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSQLRequest(const TString &sql, void InitRoot(Tests::TServer::TPtr server, TActorId sender); +class TLambdaActor : public IActorCallback { +public: + using TCallback = std::function<void(TAutoPtr<IEventHandle>&)>; + +public: + TLambdaActor(TCallback&& callback) + : IActorCallback(static_cast<TReceiveFunc>(&TLambdaActor::StateWork)) + , Callback(std::move(callback)) + { } + +private: + STFUNC(StateWork) { + Y_UNUSED(ctx); + Callback(ev); + } + +private: + TCallback Callback; +}; + enum class EShadowDataMode { Default, Enabled, diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 40fd19086fa..2d12b9bb6b4 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -948,6 +948,284 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); } + Y_UNIT_TEST(DistributedWriteThenReadIterator) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"value2", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + ui64 maxReadSetStep = 0; + bool captureReadSets = true; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep()); + if (captureReadSets) { + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + + // Note: observer works strangely with edge actor results, so we use a normal actor here + TVector<THolder<IEventHandle>> readResults; + auto readSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvReadResult::EventType: { + Cerr << "... observed TEvReadResult:" << Endl; + Cerr << ev->Get<TEvDataShard::TEvReadResult>()->Record.DebugString() << Endl; + readResults.emplace_back(ev.Release()); + break; + } + default: { + Cerr << "... ignore event " << ev->GetTypeRewrite() << Endl; + } + } + })); + + { + auto msg = std::make_unique<TEvDataShard::TEvRead>(); + msg->Record.SetReadId(1); + msg->Record.MutableTableId()->SetOwnerId(tableId1.PathId.OwnerId); + msg->Record.MutableTableId()->SetTableId(tableId1.PathId.LocalPathId); + msg->Record.MutableTableId()->SetSchemaVersion(tableId1.SchemaVersion); + msg->Record.MutableSnapshot()->SetStep(maxReadSetStep); + msg->Record.MutableSnapshot()->SetTxId(Max<ui64>()); + msg->Record.AddColumns(1); + msg->Record.AddColumns(2); + msg->Record.SetResultFormat(NKikimrTxDataShard::ARROW); + + TVector<TCell> fromKeyCells = { TCell::Make(ui32(0)) }; + TVector<TCell> toKeyCells = { TCell::Make(ui32(10)) }; + auto fromBuf = TSerializedCellVec::Serialize(fromKeyCells); + auto toBuf = TSerializedCellVec::Serialize(toKeyCells); + msg->Ranges.emplace_back(fromBuf, toBuf, true, true); + + ForwardToTablet(runtime, shard1, readSender, msg.release()); + } + + // Since key=2 is not committed we must not observe results yet + SimulateSleep(runtime, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(readResults.size(), 0u); + + captureReadSets = false; + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + WaitFor(runtime, [&]{ return readResults.size() > 0; }, "read result"); + UNIT_ASSERT_VALUES_EQUAL(readResults.size(), 1u); + + { + auto* msg = readResults[0]->Get<TEvDataShard::TEvReadResult>(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(msg->GetArrowBatch()->ToString(), + "key: [\n" + " 1,\n" + " 2\n" + " ]\n" + "value: [\n" + " 1,\n" + " 2\n" + " ]\n"); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetFinished(), true); + } + } + + Y_UNIT_TEST(DistributedWriteThenReadIteratorStream) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"value2", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + ui64 maxReadSetStep = 0; + bool captureReadSets = true; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep()); + if (captureReadSets) { + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + + // Note: observer works strangely with edge actor results, so we use a normal actor here + TVector<THolder<IEventHandle>> readResults; + auto readSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvReadResult::EventType: { + Cerr << "... observed TEvReadResult:" << Endl; + Cerr << ev->Get<TEvDataShard::TEvReadResult>()->Record.DebugString() << Endl; + readResults.emplace_back(ev.Release()); + break; + } + default: { + Cerr << "... ignore event " << ev->GetTypeRewrite() << Endl; + } + } + })); + + { + auto msg = std::make_unique<TEvDataShard::TEvRead>(); + msg->Record.SetReadId(1); + msg->Record.MutableTableId()->SetOwnerId(tableId1.PathId.OwnerId); + msg->Record.MutableTableId()->SetTableId(tableId1.PathId.LocalPathId); + msg->Record.MutableTableId()->SetSchemaVersion(tableId1.SchemaVersion); + msg->Record.MutableSnapshot()->SetStep(maxReadSetStep); + msg->Record.MutableSnapshot()->SetTxId(Max<ui64>()); + msg->Record.AddColumns(1); + msg->Record.AddColumns(2); + msg->Record.SetResultFormat(NKikimrTxDataShard::ARROW); + msg->Record.SetMaxRowsInResult(1); + + TVector<TCell> fromKeyCells = { TCell::Make(ui32(0)) }; + TVector<TCell> toKeyCells = { TCell::Make(ui32(10)) }; + auto fromBuf = TSerializedCellVec::Serialize(fromKeyCells); + auto toBuf = TSerializedCellVec::Serialize(toKeyCells); + msg->Ranges.emplace_back(fromBuf, toBuf, true, true); + + ForwardToTablet(runtime, shard1, readSender, msg.release()); + } + + // We expect to receive key=1 as soon as possible since it's committed + // However further data should not be available so soon + SimulateSleep(runtime, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(readResults.size(), 1u); + + // Verify we actually receive key=1 + { + auto* msg = readResults[0]->Get<TEvDataShard::TEvReadResult>(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(msg->GetArrowBatch()->ToString(), + "key: [\n" + " 1\n" + " ]\n" + "value: [\n" + " 1\n" + " ]\n"); + readResults.clear(); + } + + // Unblock readsets and let key=2 to commit + captureReadSets = false; + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + WaitFor(runtime, [&]{ return readResults.size() > 0; }, "read result"); + UNIT_ASSERT_GE(readResults.size(), 1u); + + { + auto* msg = readResults[0]->Get<TEvDataShard::TEvReadResult>(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(msg->GetArrowBatch()->ToString(), + "key: [\n" + " 2\n" + " ]\n" + "value: [\n" + " 2\n" + " ]\n"); + + msg = readResults.back()->Get<TEvDataShard::TEvReadResult>(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetFinished(), true); + } + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 9d5a41d2346..d24058be1ef 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -40,6 +40,26 @@ namespace NKikimr::NDataShard { Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc); + if (info->AddCommitted) { + OnCommitted(ctx); + } else { + Delayed = true; + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + if (Delayed) { + OnCommitted(ctx); + } + } + + void OnCommitted(const TActorContext& ctx) { + auto* info = Self->VolatileTxManager.FindByTxId(TxId); + Y_VERIFY(info && info->State == EVolatileTxState::Committed); + Y_VERIFY(info->AddCommitted); + Self->VolatileTxManager.UnblockDependents(info); Self->VolatileTxManager.RemoveFromTxMap(info); @@ -47,15 +67,11 @@ namespace NKikimr::NDataShard { Self->VolatileTxManager.RemoveVolatileTx(TxId); Self->CheckSplitCanStart(ctx); - return true; - } - - void Complete(const TActorContext&) override { - // nothing } private: ui64 TxId; + bool Delayed = false; }; class TDataShard::TTxVolatileTxAbort @@ -100,6 +116,7 @@ namespace NKikimr::NDataShard { void Complete(const TActorContext& ctx) override { auto* info = Self->VolatileTxManager.FindByTxId(TxId); Y_VERIFY(info && info->State == EVolatileTxState::Aborting); + Y_VERIFY(info->AddCommitted); // Run callbacks only after we successfully persist aborted tx Self->VolatileTxManager.RunAbortCallbacks(info); @@ -407,7 +424,7 @@ namespace NKikimr::NDataShard { db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Update(); } - txc.OnCommit([this, txId]() { + txc.OnCommitted([this, txId]() { auto* info = FindByTxId(txId); Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId); Y_VERIFY_S(!info->AddCommitted, "Unexpected commit of a committed volatile txId# " << txId); @@ -472,7 +489,7 @@ namespace NKikimr::NDataShard { case EVolatileTxState::Committed: // We call commit callbacks only when effects are committed if (it->second->AddCommitted) { - callback->OnCommit(); + callback->OnCommit(txId); } else { it->second->Callbacks.push_back(std::move(callback)); } @@ -631,7 +648,7 @@ namespace NKikimr::NDataShard { auto callbacks = std::move(info->Callbacks); info->Callbacks.clear(); for (auto& callback : callbacks) { - callback->OnCommit(); + callback->OnCommit(info->TxId); } UnblockOperations(info, true); } @@ -640,7 +657,7 @@ namespace NKikimr::NDataShard { auto callbacks = std::move(info->Callbacks); info->Callbacks.clear(); for (auto& callback : callbacks) { - callback->OnAbort(); + callback->OnAbort(info->TxId); } UnblockOperations(info, false); } diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index 154ad150308..110701e39df 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -36,8 +36,8 @@ namespace NKikimr::NDataShard { using TPtr = TIntrusivePtr<IVolatileTxCallback>; public: - virtual void OnCommit() = 0; - virtual void OnAbort() = 0; + virtual void OnCommit(ui64 txId) = 0; + virtual void OnAbort(ui64 txId) = 0; }; struct TVolatileTxInfo { |