aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-09-26 13:41:52 +0300
committersnaury <snaury@ydb.tech>2023-09-26 14:46:26 +0300
commit0bcf9c9bae01164715bc1a41c28e0c18a2ba32ea (patch)
treefebb3460a7e39c5e42e16c97c77e7a5de9940911
parent5ad1dcc7936525dc6742bcbf855e291cc71dbda4 (diff)
downloadydb-0bcf9c9bae01164715bc1a41c28e0c18a2ba32ea.tar.gz
Don't block writes longer than necessary after local snapshot reads KIKIMR-19381
-rw-r--r--ydb/core/protos/tx.proto7
-rw-r--r--ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp13
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.cpp6
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.h3
-rw-r--r--ydb/core/tx/datashard/datashard.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp91
-rw-r--r--ydb/core/tx/time_cast/time_cast.cpp85
-rw-r--r--ydb/core/tx/tx.h20
8 files changed, 231 insertions, 2 deletions
diff --git a/ydb/core/protos/tx.proto b/ydb/core/protos/tx.proto
index 821b8007f4f..dbc323ceaf2 100644
--- a/ydb/core/protos/tx.proto
+++ b/ydb/core/protos/tx.proto
@@ -149,6 +149,13 @@ message TEvUpdatedLastStep {
optional fixed64 LastStep = 3;
}
+// Time cast to coordinators
+// Notifies coordinator that clients are waiting for some specific steps
+message TEvRequirePlanSteps {
+ optional fixed64 CoordinatorID = 1;
+ repeated uint64 PlanSteps = 2 [packed = true];
+}
+
// coordinator to mediator
message TCoordinatorTransaction {
repeated uint64 AffectedSet = 1; // read and write set - joined and then filtered for concrete mediator
diff --git a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
index 73a62325232..52909ae7f9a 100644
--- a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
+++ b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
@@ -184,6 +184,19 @@ namespace NKikimr::NFlatTxCoordinator {
}
}
+ void TTxCoordinator::Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr& ev) {
+ auto* msg = ev->Get();
+ for (ui64 step : msg->Record.GetPlanSteps()) {
+ // Note: we could schedule an exact volatile step here in the future
+ step = AlignPlanStep(step);
+ // Note: this is not a sibling step, but it behaves similar enough
+ // so we reuse the same queue here.
+ if (step > VolatileState.LastPlanned && PendingSiblingSteps.insert(step).second) {
+ SchedulePlanTickExact(step);
+ }
+ }
+ }
+
void TTxCoordinator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
auto* msg = ev->Get();
if (auto* state = Siblings.FindPtr(msg->TabletId); state && state->Subscribed) {
diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp
index 904358d7f54..4c2cda2edff 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.cpp
+++ b/ydb/core/tx/coordinator/coordinator_impl.cpp
@@ -314,8 +314,12 @@ void TTxCoordinator::SchedulePlanTickAligned(ui64 next) {
return;
}
+ SchedulePlanTickExact(AlignPlanStep(next));
+}
+
+ui64 TTxCoordinator::AlignPlanStep(ui64 step) {
const ui64 resolution = Config.Resolution;
- SchedulePlanTickExact((next + resolution - 1) / resolution * resolution);
+ return ((step + resolution - 1) / resolution * resolution);
}
void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx) {
diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h
index 875049e77ba..a02626cfbb3 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.h
+++ b/ydb/core/tx/coordinator/coordinator_impl.h
@@ -674,6 +674,7 @@ private:
void SubscribeToSibling(TSiblingState& state);
void UnsubscribeFromSiblings();
void Handle(TEvTxProxy::TEvUpdatedLastStep::TPtr &ev);
+ void Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr &ev);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev);
void Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx);
@@ -697,6 +698,7 @@ private:
void SchedulePlanTick();
void SchedulePlanTickExact(ui64 next);
void SchedulePlanTickAligned(ui64 next);
+ ui64 AlignPlanStep(ui64 step);
void TryInitMonCounters(const TActorContext &ctx);
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override;
@@ -780,6 +782,7 @@ public:
HFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
HFunc(TEvPrivate::TEvRestoredProcessingParams, Handle);
hFunc(TEvTxProxy::TEvUpdatedLastStep, Handle);
+ hFunc(TEvTxProxy::TEvRequirePlanSteps, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
)
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 213bfc00616..b2d20bc1e6d 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2344,9 +2344,17 @@ void TDataShard::CheckMediatorStateRestored() {
// writes before the restart, and conversely don't accidentally read any
// data that is definitely not replied yet.
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
+ const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
SnapshotManager.PromoteImmediateWriteEdgeReplied(
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
+ // Try to ensure writes become visible sooner rather than later
+ if (edge.Step < writeStep) {
+ if (MediatorTimeCastWaitingSteps.insert(writeStep).second) {
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), writeStep));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << writeStep << " from mediator time cast");
+ }
+ }
}
MediatorStateWaiting = false;
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index cce17c176a3..8bad05bcb76 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -3773,6 +3773,97 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 3 } items { uint32_value: 3 } }");
}
+ Y_UNIT_TEST(ReadIteratorLocalSnapshotThenWrite) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(100)
+ .SetAppConfig(app);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ // Perform a snapshot read, this will persist "reads from snapshots" flag
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value
+ FROM `/Root/table-2`
+ )")),
+ "");
+
+ // Insert rows using a single-shard write
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"));
+
+ // Wait until mediator goes idle
+ size_t timecastUpdates = 0;
+ auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvMediatorTimecast::TEvUpdate::EventType: {
+ ++timecastUpdates;
+ break;
+ }
+ case TEvDataShard::TEvRead::EventType: {
+ auto* msg = ev->Get<TEvDataShard::TEvRead>();
+ msg->Record.SetMaxRowsInResult(1);
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(observer);
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ waitFor([&]{ return timecastUpdates >= 3; }, "at least 3 timecast updates");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
+
+ auto start = runtime.GetCurrentTime();
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33)"));
+ auto duration = runtime.GetCurrentTime() - start;
+ UNIT_ASSERT_C(duration <= TDuration::MilliSeconds(200), "UPSERT takes too much time: " << duration);
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp
index f6e33985462..455777f2030 100644
--- a/ydb/core/tx/time_cast/time_cast.cpp
+++ b/ydb/core/tx/time_cast/time_cast.cpp
@@ -2,6 +2,7 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/base/tx_processing.h>
#include <ydb/core/protos/subdomains.pb.h>
#include <ydb/library/services/services.pb.h>
@@ -32,6 +33,21 @@ void TMediatorTimecastEntry::Update(ui64 step, ui64 *exemption, ui64 exsz) {
}
class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
+ struct TEvPrivate {
+ enum EEv {
+ EvRetryCoordinator = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvEnd,
+ };
+
+ struct TEvRetryCoordinator : public TEventLocal<TEvRetryCoordinator, EvRetryCoordinator> {
+ ui64 Coordinator;
+
+ TEvRetryCoordinator(ui64 coordinator)
+ : Coordinator(coordinator)
+ {}
+ };
+ };
+
struct TWaiter {
TActorId Sender;
ui64 TabletId;
@@ -59,6 +75,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
struct TMediator {
const ui32 BucketsSz;
TArrayHolder<TMediatorBucket> Buckets;
+ std::vector<ui64> Coordinators;
TActorId PipeClient;
@@ -68,6 +85,12 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
{}
};
+ struct TMediatorCoordinator {
+ std::set<ui64> WaitingSteps;
+ bool RetryPending = false;
+ bool Subscribed = false;
+ };
+
struct TTabletInfo {
ui64 MediatorTabletId;
TMediator* Mediator;
@@ -92,6 +115,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
};
THashMap<ui64, TMediator> Mediators; // mediator tablet -> info
+ THashMap<ui64, TMediatorCoordinator> MediatorCoordinators; // coordinator tablet -> info
THashMap<ui64, TTabletInfo> Tablets;
ui64 LastSeqNo = 0;
@@ -103,6 +127,11 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
if (!pr.second) {
Y_VERIFY(pr.first->second.BucketsSz == processing.GetTimeCastBucketsPerMediator());
}
+ if (pr.first->second.Coordinators.empty()) {
+ pr.first->second.Coordinators.assign(
+ processing.GetCoordinators().begin(),
+ processing.GetCoordinators().end());
+ }
return pr.first->second;
}
@@ -165,6 +194,8 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
void Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx);
// Client requests for readstep subscriptions
void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx);
@@ -200,6 +231,8 @@ public:
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ HFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ HFunc(TEvPrivate::TEvRetryCoordinator, Handle);
}
}
};
@@ -278,6 +311,17 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr &
const ui64 currentStep = bucket.Entry->Get(tabletId);
if (currentStep < planStep) {
bucket.Waiters.emplace(ev->Sender, tabletId, planStep);
+ for (ui64 coordinatorId : mediator.Coordinators) {
+ TMediatorCoordinator &coordinator = MediatorCoordinators[coordinatorId];
+ if (coordinator.WaitingSteps.insert(planStep).second && !coordinator.RetryPending) {
+ Send(MakePipePeNodeCacheID(false),
+ new TEvPipeCache::TEvForward(
+ new TEvTxProxy::TEvRequirePlanSteps(coordinatorId, planStep),
+ coordinatorId,
+ !coordinator.Subscribed));
+ coordinator.Subscribed = true;
+ }
+ }
return;
}
@@ -301,6 +345,37 @@ void TMediatorTimecastProxy::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev,
TryResync(msg->ClientId, msg->TabletId, ctx);
}
+void TMediatorTimecastProxy::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) {
+ auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString()
+ << " HANDLE EvDeliveryProblem " << msg->TabletId);
+ auto it = MediatorCoordinators.find(msg->TabletId);
+ if (it != MediatorCoordinators.end()) {
+ Y_VERIFY_DEBUG(!it->second.RetryPending);
+ Schedule(TDuration::MilliSeconds(5), new TEvPrivate::TEvRetryCoordinator(msg->TabletId));
+ it->second.RetryPending = true;
+ it->second.Subscribed = false;
+ }
+}
+
+void TMediatorTimecastProxy::Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx) {
+ auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString()
+ << " HANDLE EvRetryCoordinator " << msg->Coordinator);
+ auto it = MediatorCoordinators.find(msg->Coordinator);
+ if (it != MediatorCoordinators.end() && it->second.RetryPending) {
+ it->second.RetryPending = false;
+ if (!it->second.WaitingSteps.empty()) {
+ Send(MakePipePeNodeCacheID(false),
+ new TEvPipeCache::TEvForward(
+ new TEvTxProxy::TEvRequirePlanSteps(msg->Coordinator, it->second.WaitingSteps),
+ msg->Coordinator,
+ true));
+ it->second.Subscribed = true;
+ }
+ }
+}
+
void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString()
<< " HANDLE "<< ev->Get()->ToString());
@@ -314,6 +389,7 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
auto &mediator = it->second;
Y_VERIFY(record.GetBucket() < mediator.BucketsSz);
auto &bucket = mediator.Buckets[record.GetBucket()];
+ const ui64 step = record.GetTimeBarrier();
switch (bucket.Entry.RefCount()) {
case 0:
break;
@@ -322,7 +398,6 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
bucket.Waiters = { };
break;
default: {
- const ui64 step = record.GetTimeBarrier();
bucket.Entry->Update(step, nullptr, 0);
THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
while (!bucket.Waiters.empty()) {
@@ -338,6 +413,14 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
break;
}
}
+ for (ui64 coordinatorId : mediator.Coordinators) {
+ auto it = MediatorCoordinators.find(coordinatorId);
+ if (it != MediatorCoordinators.end()) {
+ while (!it->second.WaitingSteps.empty() && *it->second.WaitingSteps.begin() <= step) {
+ it->second.WaitingSteps.erase(it->second.WaitingSteps.begin());
+ }
+ }
+ }
}
}
diff --git a/ydb/core/tx/tx.h b/ydb/core/tx/tx.h
index 8a5a32f4b09..60cbba15983 100644
--- a/ydb/core/tx/tx.h
+++ b/ydb/core/tx/tx.h
@@ -18,6 +18,7 @@ struct TEvTxProxy {
EvUnsubscribeReadStep,
EvSubscribeLastStep,
EvUnsubscribeLastStep,
+ EvRequirePlanSteps,
EvProposeTransactionStatus = EvProposeTransaction + 1 * 512,
EvAcquireReadStepResult,
@@ -167,6 +168,25 @@ struct TEvTxProxy {
Record.SetLastStep(lastStep);
}
};
+
+ struct TEvRequirePlanSteps
+ : public TEventPB<TEvRequirePlanSteps, NKikimrTx::TEvRequirePlanSteps, EvRequirePlanSteps>
+ {
+ TEvRequirePlanSteps() = default;
+
+ TEvRequirePlanSteps(ui64 coordinator, ui64 planStep) {
+ Record.SetCoordinatorID(coordinator);
+ Record.AddPlanSteps(planStep);
+ }
+
+ TEvRequirePlanSteps(ui64 coordinator, const std::set<ui64>& planSteps) {
+ Record.SetCoordinatorID(coordinator);
+ Record.MutablePlanSteps()->Reserve(planSteps.size());
+ for (ui64 planStep : planSteps) {
+ Record.AddPlanSteps(planStep);
+ }
+ }
+ };
};
// basic