diff options
author | snaury <snaury@ydb.tech> | 2023-09-26 13:41:52 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-09-26 14:46:26 +0300 |
commit | 0bcf9c9bae01164715bc1a41c28e0c18a2ba32ea (patch) | |
tree | febb3460a7e39c5e42e16c97c77e7a5de9940911 | |
parent | 5ad1dcc7936525dc6742bcbf855e291cc71dbda4 (diff) | |
download | ydb-0bcf9c9bae01164715bc1a41c28e0c18a2ba32ea.tar.gz |
Don't block writes longer than necessary after local snapshot reads KIKIMR-19381
-rw-r--r-- | ydb/core/protos/tx.proto | 7 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator_impl.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 91 | ||||
-rw-r--r-- | ydb/core/tx/time_cast/time_cast.cpp | 85 | ||||
-rw-r--r-- | ydb/core/tx/tx.h | 20 |
8 files changed, 231 insertions, 2 deletions
diff --git a/ydb/core/protos/tx.proto b/ydb/core/protos/tx.proto index 821b8007f4f..dbc323ceaf2 100644 --- a/ydb/core/protos/tx.proto +++ b/ydb/core/protos/tx.proto @@ -149,6 +149,13 @@ message TEvUpdatedLastStep { optional fixed64 LastStep = 3; } +// Time cast to coordinators +// Notifies coordinator that clients are waiting for some specific steps +message TEvRequirePlanSteps { + optional fixed64 CoordinatorID = 1; + repeated uint64 PlanSteps = 2 [packed = true]; +} + // coordinator to mediator message TCoordinatorTransaction { repeated uint64 AffectedSet = 1; // read and write set - joined and then filtered for concrete mediator diff --git a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp index 73a62325232..52909ae7f9a 100644 --- a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp +++ b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp @@ -184,6 +184,19 @@ namespace NKikimr::NFlatTxCoordinator { } } + void TTxCoordinator::Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr& ev) { + auto* msg = ev->Get(); + for (ui64 step : msg->Record.GetPlanSteps()) { + // Note: we could schedule an exact volatile step here in the future + step = AlignPlanStep(step); + // Note: this is not a sibling step, but it behaves similar enough + // so we reuse the same queue here. + if (step > VolatileState.LastPlanned && PendingSiblingSteps.insert(step).second) { + SchedulePlanTickExact(step); + } + } + } + void TTxCoordinator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { auto* msg = ev->Get(); if (auto* state = Siblings.FindPtr(msg->TabletId); state && state->Subscribed) { diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp index 904358d7f54..4c2cda2edff 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.cpp +++ b/ydb/core/tx/coordinator/coordinator_impl.cpp @@ -314,8 +314,12 @@ void TTxCoordinator::SchedulePlanTickAligned(ui64 next) { return; } + SchedulePlanTickExact(AlignPlanStep(next)); +} + +ui64 TTxCoordinator::AlignPlanStep(ui64 step) { const ui64 resolution = Config.Resolution; - SchedulePlanTickExact((next + resolution - 1) / resolution * resolution); + return ((step + resolution - 1) / resolution * resolution); } void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx) { diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h index 875049e77ba..a02626cfbb3 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.h +++ b/ydb/core/tx/coordinator/coordinator_impl.h @@ -674,6 +674,7 @@ private: void SubscribeToSibling(TSiblingState& state); void UnsubscribeFromSiblings(); void Handle(TEvTxProxy::TEvUpdatedLastStep::TPtr &ev); + void Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr &ev); void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev); void Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx); @@ -697,6 +698,7 @@ private: void SchedulePlanTick(); void SchedulePlanTickExact(ui64 next); void SchedulePlanTickAligned(ui64 next); + ui64 AlignPlanStep(ui64 step); void TryInitMonCounters(const TActorContext &ctx); bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override; @@ -780,6 +782,7 @@ public: HFunc(TEvTabletPipe::TEvServerDisconnected, Handle); HFunc(TEvPrivate::TEvRestoredProcessingParams, Handle); hFunc(TEvTxProxy::TEvUpdatedLastStep, Handle); + hFunc(TEvTxProxy::TEvRequirePlanSteps, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); ) diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 213bfc00616..b2d20bc1e6d 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2344,9 +2344,17 @@ void TDataShard::CheckMediatorStateRestored() { // writes before the restart, and conversely don't accidentally read any // data that is definitely not replied yet. if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) { + const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step; const TRowVersion edge(GetMaxObservedStep(), Max<ui64>()); SnapshotManager.PromoteImmediateWriteEdgeReplied( Min(edge, SnapshotManager.GetImmediateWriteEdge())); + // Try to ensure writes become visible sooner rather than later + if (edge.Step < writeStep) { + if (MediatorTimeCastWaitingSteps.insert(writeStep).second) { + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), writeStep)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << writeStep << " from mediator time cast"); + } + } } MediatorStateWaiting = false; diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index cce17c176a3..8bad05bcb76 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3773,6 +3773,97 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); } + Y_UNIT_TEST(ReadIteratorLocalSnapshotThenWrite) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(100) + .SetAppConfig(app); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + // Perform a snapshot read, this will persist "reads from snapshots" flag + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` + UNION ALL + SELECT key, value + FROM `/Root/table-2` + )")), + ""); + + // Insert rows using a single-shard write + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)")); + + // Wait until mediator goes idle + size_t timecastUpdates = 0; + auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvMediatorTimecast::TEvUpdate::EventType: { + ++timecastUpdates; + break; + } + case TEvDataShard::TEvRead::EventType: { + auto* msg = ev->Get<TEvDataShard::TEvRead>(); + msg->Record.SetMaxRowsInResult(1); + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(observer); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + waitFor([&]{ return timecastUpdates >= 3; }, "at least 3 timecast updates"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); + + auto start = runtime.GetCurrentTime(); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33)")); + auto duration = runtime.GetCurrentTime() - start; + UNIT_ASSERT_C(duration <= TDuration::MilliSeconds(200), "UPSERT takes too much time: " << duration); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp index f6e33985462..455777f2030 100644 --- a/ydb/core/tx/time_cast/time_cast.cpp +++ b/ydb/core/tx/time_cast/time_cast.cpp @@ -2,6 +2,7 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/base/tx_processing.h> #include <ydb/core/protos/subdomains.pb.h> #include <ydb/library/services/services.pb.h> @@ -32,6 +33,21 @@ void TMediatorTimecastEntry::Update(ui64 step, ui64 *exemption, ui64 exsz) { } class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { + struct TEvPrivate { + enum EEv { + EvRetryCoordinator = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvEnd, + }; + + struct TEvRetryCoordinator : public TEventLocal<TEvRetryCoordinator, EvRetryCoordinator> { + ui64 Coordinator; + + TEvRetryCoordinator(ui64 coordinator) + : Coordinator(coordinator) + {} + }; + }; + struct TWaiter { TActorId Sender; ui64 TabletId; @@ -59,6 +75,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { struct TMediator { const ui32 BucketsSz; TArrayHolder<TMediatorBucket> Buckets; + std::vector<ui64> Coordinators; TActorId PipeClient; @@ -68,6 +85,12 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { {} }; + struct TMediatorCoordinator { + std::set<ui64> WaitingSteps; + bool RetryPending = false; + bool Subscribed = false; + }; + struct TTabletInfo { ui64 MediatorTabletId; TMediator* Mediator; @@ -92,6 +115,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { }; THashMap<ui64, TMediator> Mediators; // mediator tablet -> info + THashMap<ui64, TMediatorCoordinator> MediatorCoordinators; // coordinator tablet -> info THashMap<ui64, TTabletInfo> Tablets; ui64 LastSeqNo = 0; @@ -103,6 +127,11 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { if (!pr.second) { Y_VERIFY(pr.first->second.BucketsSz == processing.GetTimeCastBucketsPerMediator()); } + if (pr.first->second.Coordinators.empty()) { + pr.first->second.Coordinators.assign( + processing.GetCoordinators().begin(), + processing.GetCoordinators().end()); + } return pr.first->second; } @@ -165,6 +194,8 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { void Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx); // Client requests for readstep subscriptions void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx); @@ -200,6 +231,8 @@ public: HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + HFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + HFunc(TEvPrivate::TEvRetryCoordinator, Handle); } } }; @@ -278,6 +311,17 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr & const ui64 currentStep = bucket.Entry->Get(tabletId); if (currentStep < planStep) { bucket.Waiters.emplace(ev->Sender, tabletId, planStep); + for (ui64 coordinatorId : mediator.Coordinators) { + TMediatorCoordinator &coordinator = MediatorCoordinators[coordinatorId]; + if (coordinator.WaitingSteps.insert(planStep).second && !coordinator.RetryPending) { + Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward( + new TEvTxProxy::TEvRequirePlanSteps(coordinatorId, planStep), + coordinatorId, + !coordinator.Subscribed)); + coordinator.Subscribed = true; + } + } return; } @@ -301,6 +345,37 @@ void TMediatorTimecastProxy::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, TryResync(msg->ClientId, msg->TabletId, ctx); } +void TMediatorTimecastProxy::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) { + auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString() + << " HANDLE EvDeliveryProblem " << msg->TabletId); + auto it = MediatorCoordinators.find(msg->TabletId); + if (it != MediatorCoordinators.end()) { + Y_VERIFY_DEBUG(!it->second.RetryPending); + Schedule(TDuration::MilliSeconds(5), new TEvPrivate::TEvRetryCoordinator(msg->TabletId)); + it->second.RetryPending = true; + it->second.Subscribed = false; + } +} + +void TMediatorTimecastProxy::Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx) { + auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString() + << " HANDLE EvRetryCoordinator " << msg->Coordinator); + auto it = MediatorCoordinators.find(msg->Coordinator); + if (it != MediatorCoordinators.end() && it->second.RetryPending) { + it->second.RetryPending = false; + if (!it->second.WaitingSteps.empty()) { + Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward( + new TEvTxProxy::TEvRequirePlanSteps(msg->Coordinator, it->second.WaitingSteps), + msg->Coordinator, + true)); + it->second.Subscribed = true; + } + } +} + void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString() << " HANDLE "<< ev->Get()->ToString()); @@ -314,6 +389,7 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co auto &mediator = it->second; Y_VERIFY(record.GetBucket() < mediator.BucketsSz); auto &bucket = mediator.Buckets[record.GetBucket()]; + const ui64 step = record.GetTimeBarrier(); switch (bucket.Entry.RefCount()) { case 0: break; @@ -322,7 +398,6 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co bucket.Waiters = { }; break; default: { - const ui64 step = record.GetTimeBarrier(); bucket.Entry->Update(step, nullptr, 0); THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets while (!bucket.Waiters.empty()) { @@ -338,6 +413,14 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co break; } } + for (ui64 coordinatorId : mediator.Coordinators) { + auto it = MediatorCoordinators.find(coordinatorId); + if (it != MediatorCoordinators.end()) { + while (!it->second.WaitingSteps.empty() && *it->second.WaitingSteps.begin() <= step) { + it->second.WaitingSteps.erase(it->second.WaitingSteps.begin()); + } + } + } } } diff --git a/ydb/core/tx/tx.h b/ydb/core/tx/tx.h index 8a5a32f4b09..60cbba15983 100644 --- a/ydb/core/tx/tx.h +++ b/ydb/core/tx/tx.h @@ -18,6 +18,7 @@ struct TEvTxProxy { EvUnsubscribeReadStep, EvSubscribeLastStep, EvUnsubscribeLastStep, + EvRequirePlanSteps, EvProposeTransactionStatus = EvProposeTransaction + 1 * 512, EvAcquireReadStepResult, @@ -167,6 +168,25 @@ struct TEvTxProxy { Record.SetLastStep(lastStep); } }; + + struct TEvRequirePlanSteps + : public TEventPB<TEvRequirePlanSteps, NKikimrTx::TEvRequirePlanSteps, EvRequirePlanSteps> + { + TEvRequirePlanSteps() = default; + + TEvRequirePlanSteps(ui64 coordinator, ui64 planStep) { + Record.SetCoordinatorID(coordinator); + Record.AddPlanSteps(planStep); + } + + TEvRequirePlanSteps(ui64 coordinator, const std::set<ui64>& planSteps) { + Record.SetCoordinatorID(coordinator); + Record.MutablePlanSteps()->Reserve(planSteps.size()); + for (ui64 planStep : planSteps) { + Record.AddPlanSteps(planStep); + } + } + }; }; // basic |