aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-02-06 14:23:40 +0300
committersnaury <snaury@ydb.tech>2023-02-06 14:23:40 +0300
commite21adc22a0c6891199aa38afb236ca7c82167a39 (patch)
tree6b7f4dd272856d3483a41b20d3fd4c6e41575904
parentc1a46c578c1553a1541aa1435c3356740f2a58ac (diff)
downloadydb-e21adc22a0c6891199aa38afb236ca7c82167a39.tar.gz
Wait for volatile transactions in scans
-rw-r--r--ydb/core/tx/datashard/build_index.cpp8
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard__kqp_scan.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard__read_columns.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h12
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp120
-rw-r--r--ydb/core/tx/datashard/read_table_scan_unit.cpp10
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp21
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h23
10 files changed, 212 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp
index 7be78b61763..1eb08345a05 100644
--- a/ydb/core/tx/datashard/build_index.cpp
+++ b/ydb/core/tx/datashard/build_index.cpp
@@ -555,6 +555,14 @@ TAutoPtr<NTable::IScan> CreateBuildIndexScan(
void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
+ // Note: it's very unlikely that we have volatile txs before this snapshot
+ if (VolatileTxManager.HasVolatileTxsAtSnapshot(TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()))) {
+ VolatileTxManager.AttachWaitingSnapshotEvent(
+ TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()),
+ std::unique_ptr<IEventHandle>(ev.Release()));
+ return;
+ }
+
auto response = MakeHolder<TEvDataShard::TEvBuildIndexProgressResponse>();
response->Record.SetBuildIndexId(record.GetBuildIndexId());
response->Record.SetTabletId(TabletID());
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp
index 28f708445d2..6d5641802a2 100644
--- a/ydb/core/tx/datashard/cdc_stream_scan.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp
@@ -640,6 +640,9 @@ public:
const auto snapshotVersion = TRowVersion(snapshotKey.Step, snapshotKey.TxId);
Y_VERIFY(info->SnapshotVersion == snapshotVersion);
+ // Note: cdc stream is added with a schema transaction and those wait for volatile txs
+ Y_VERIFY(!Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(snapshotVersion));
+
const ui64 localTxId = ++Self->NextTieBreakerIndex;
auto scan = MakeHolder<TCdcStreamScan>(Self, Request->Sender, localTxId,
tablePathId, streamPathId, snapshotVersion, valueTags, info->LastKey, info->Stats, record.GetLimits());
diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
index a691ba1182f..f04adb2952b 100644
--- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
@@ -543,6 +543,13 @@ void TDataShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext&
auto scanComputeActor = ev->Sender;
auto generation = request.GetGeneration();
+ if (VolatileTxManager.HasVolatileTxsAtSnapshot(TRowVersion(request.GetSnapshot().GetStep(), request.GetSnapshot().GetTxId()))) {
+ VolatileTxManager.AttachWaitingSnapshotEvent(
+ TRowVersion(request.GetSnapshot().GetStep(), request.GetSnapshot().GetTxId()),
+ std::unique_ptr<IEventHandle>(ev.Release()));
+ return;
+ }
+
auto infoIt = TableInfos.find(request.GetLocalPathId());
auto reportError = [this, scanComputeActor, generation] (const TString& table, const TString& detailedReason) {
diff --git a/ydb/core/tx/datashard/datashard__read_columns.cpp b/ydb/core/tx/datashard/datashard__read_columns.cpp
index 983abe32f79..66cbf2c77fc 100644
--- a/ydb/core/tx/datashard/datashard__read_columns.cpp
+++ b/ydb/core/tx/datashard/datashard__read_columns.cpp
@@ -212,6 +212,15 @@ public:
}
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ // FIXME: we need to transform HEAD into some non-repeatable snapshot here
+ if (!ReadVersion.IsMax() && Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(ReadVersion)) {
+ Self->GetVolatileTxManager().AttachWaitingSnapshotEvent(
+ ReadVersion,
+ std::unique_ptr<IEventHandle>(Ev.Release()));
+ Result.Destroy();
+ return true;
+ }
+
Result = new TEvDataShard::TEvReadColumnsResponse(Self->TabletID());
bool useScan = Self->ReadColumnsScanEnabled;
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 146032bdc8c..0055bc07dd2 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -366,7 +366,8 @@ void InitRoot(Tests::TServer::TPtr server,
class TLambdaActor : public IActorCallback {
public:
- using TCallback = std::function<void(TAutoPtr<IEventHandle>&)>;
+ using TCallback = std::function<void(TAutoPtr<IEventHandle>&, const TActorContext&)>;
+ using TNoCtxCallback = std::function<void(TAutoPtr<IEventHandle>&)>;
public:
TLambdaActor(TCallback&& callback)
@@ -374,10 +375,15 @@ public:
, Callback(std::move(callback))
{ }
+ TLambdaActor(TNoCtxCallback&& callback)
+ : TLambdaActor([callback = std::move(callback)](auto& ev, auto&) {
+ callback(ev);
+ })
+ { }
+
private:
STFUNC(StateWork) {
- Y_UNUSED(ctx);
- Callback(ev);
+ Callback(ev, ctx);
}
private:
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index 01827e50c52..9994ee28b37 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -120,12 +120,16 @@ namespace NKqpHelpers {
return request;
}
+ inline TString FormatResult(const Ydb::ResultSet& rs) {
+ Cerr << JoinSeq(", ", rs.rows());
+ return JoinSeq(", ", rs.rows());
+ }
+
inline TString FormatResult(const Ydb::Table::ExecuteQueryResult& result) {
if (result.result_sets_size() == 0) {
return "<empty>";
}
- Cerr << JoinSeq(", ", result.result_sets(0).rows());
- return JoinSeq(", ", result.result_sets(0).rows());
+ return FormatResult(result.result_sets(0));
}
inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) {
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 3057ac7b5d2..3152351d6dd 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -2,6 +2,8 @@
#include "datashard_ut_common_kqp.h"
#include "datashard_active_transaction.h"
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+
namespace NKikimr {
using namespace NKikimr::NDataShard;
@@ -1232,6 +1234,124 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
}
}
+ Y_UNIT_TEST(DistributedWriteThenScanQuery) {
+ TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(false);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000)
+ .SetAppConfig(app);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ ui64 maxReadSetStep = 0;
+ bool captureReadSets = true;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep());
+ if (captureReadSets) {
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ TVector<TString> observedResults;
+ TMaybe<Ydb::StatusIds::StatusCode> observedStatus;
+ auto scanSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TEvKqpExecuter::TEvStreamData::EventType: {
+ auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>();
+ Cerr << "... observed stream data" << Endl;
+ observedResults.push_back(FormatResult(msg->Record.GetResultSet()));
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(msg->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(1);
+ ctx.Send(ev->Sender, resp.Release());
+ break;
+ }
+ case NKqp::TEvKqp::TEvQueryResponse::EventType: {
+ auto* msg = ev->Get<NKqp::TEvKqp::TEvQueryResponse>();
+ Cerr << "... observed query result" << Endl;
+ observedStatus = msg->Record.GetRef().GetYdbStatus();
+ break;
+ }
+ default: {
+ Cerr << "... ignored event " << ev->GetTypeRewrite();
+ if (ev->GetBase()) {
+ Cerr << " " << ev->GetBase()->ToString();
+ }
+ Cerr << Endl;
+ }
+ }
+ }));
+
+ SendRequest(runtime, scanSender, MakeStreamRequest(scanSender, R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key;
+ )"));
+
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
+ UNIT_ASSERT_VALUES_EQUAL(observedResults.size(), 0u);
+
+ captureReadSets = false;
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
+ UNIT_ASSERT_VALUES_EQUAL(observedResults.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(
+ observedResults[0],
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
+ UNIT_ASSERT_VALUES_EQUAL(observedStatus, Ydb::StatusIds::SUCCESS);
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/read_table_scan_unit.cpp b/ydb/core/tx/datashard/read_table_scan_unit.cpp
index 7ad857035ba..9fc30ad9d29 100644
--- a/ydb/core/tx/datashard/read_table_scan_unit.cpp
+++ b/ydb/core/tx/datashard/read_table_scan_unit.cpp
@@ -99,10 +99,11 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op,
Y_VERIFY(op->HasAcquiredSnapshotKey(), "Missing snapshot reference in ReadTable tx");
bool wait = false;
- const auto& byVersion = DataShard.GetVolatileTxManager().GetVolatileTxByVersion();
- auto end = byVersion.upper_bound(TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()));
- for (auto it = byVersion.begin(); it != end; ++it) {
- auto* info = *it;
+ TRowVersion snapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
+ for (auto* info : DataShard.GetVolatileTxManager().GetVolatileTxByVersion()) {
+ if (!(info->Version <= snapshot)) {
+ break;
+ }
op->AddVolatileDependency(info->TxId);
bool ok = DataShard.GetVolatileTxManager().AttachWaitingRemovalOperation(info->TxId, op->GetTxId());
Y_VERIFY_S(ok, "Unexpected failure to attach TxId# " << op->GetTxId() << " to volatile tx " << info->TxId);
@@ -132,6 +133,7 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op,
auto readVersion = TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId());
options.SetSnapshotRowVersion(readVersion);
} else if (DataShard.IsMvccEnabled()) {
+ // Note: this mode is only used in legacy tests and may not work with volatile transactions
// With mvcc we have to mark all preceding transactions as logically complete
auto readVersion = DataShard.GetReadWriteVersions(tx).ReadVersion;
hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(readVersion, txc);
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index d24058be1ef..0f079b37df5 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -471,6 +471,20 @@ namespace NKikimr::NDataShard {
}
VolatileTxByVersion.erase(info);
VolatileTxs.erase(txId);
+
+ if (!WaitingSnapshotEvents.empty()) {
+ TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr;
+ while (!WaitingSnapshotEvents.empty()) {
+ auto& top = WaitingSnapshotEvents.front();
+ if (next && next->Version <= top.Snapshot) {
+ // Still waiting
+ break;
+ }
+ TActivationContext::Send(std::move(top.Event));
+ std::pop_heap(WaitingSnapshotEvents.begin(), WaitingSnapshotEvents.end());
+ WaitingSnapshotEvents.pop_back();
+ }
+ }
}
bool TVolatileTxManager::AttachVolatileTxCallback(ui64 txId, IVolatileTxCallback::TPtr callback) {
@@ -533,6 +547,13 @@ namespace NKikimr::NDataShard {
return true;
}
+ void TVolatileTxManager::AttachWaitingSnapshotEvent(const TRowVersion& snapshot, std::unique_ptr<IEventHandle>&& event) {
+ Y_VERIFY(!VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot);
+
+ WaitingSnapshotEvents.emplace_back(snapshot, std::move(event));
+ std::push_heap(WaitingSnapshotEvents.begin(), WaitingSnapshotEvents.end());
+ }
+
void TVolatileTxManager::AbortWaitingTransaction(TVolatileTxInfo* info) {
Y_VERIFY(info && info->State == EVolatileTxState::Waiting);
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index 110701e39df..be6202a8d1f 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -132,6 +132,21 @@ namespace NKikimr::NDataShard {
}
};
+ struct TWaitingSnapshotEvent {
+ TRowVersion Snapshot;
+ std::unique_ptr<IEventHandle> Event;
+
+ TWaitingSnapshotEvent(const TRowVersion& snapshot, std::unique_ptr<IEventHandle>&& event)
+ : Snapshot(snapshot)
+ , Event(std::move(event))
+ { }
+
+ bool operator<(const TWaitingSnapshotEvent& rhs) const {
+ // Note: inverted for max-heap
+ return rhs.Snapshot < Snapshot;
+ }
+ };
+
public:
using TVolatileTxByVersion = std::set<TVolatileTxInfo*, TCompareInfoByVersion>;
@@ -151,6 +166,10 @@ namespace NKikimr::NDataShard {
const TVolatileTxByVersion& GetVolatileTxByVersion() const { return VolatileTxByVersion; }
+ bool HasVolatileTxsAtSnapshot(const TRowVersion& snapshot) const {
+ return !VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot;
+ }
+
void PersistAddVolatileTx(
ui64 txId, const TRowVersion& version,
TConstArrayRef<ui64> commitTxIds,
@@ -167,6 +186,9 @@ namespace NKikimr::NDataShard {
bool AttachWaitingRemovalOperation(
ui64 txId, ui64 dependentTxId);
+ void AttachWaitingSnapshotEvent(
+ const TRowVersion& snapshot, std::unique_ptr<IEventHandle>&& event);
+
void AbortWaitingTransaction(TVolatileTxInfo* info);
void ProcessReadSet(
@@ -203,6 +225,7 @@ namespace NKikimr::NDataShard {
absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info
absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info
TVolatileTxByVersion VolatileTxByVersion;
+ std::vector<TWaitingSnapshotEvent> WaitingSnapshotEvents;
TIntrusivePtr<TTxMap> TxMap;
std::deque<ui64> PendingCommits;
std::deque<ui64> PendingAborts;